Python多线程并发编程 -- concurrent.futures

阅读 76

2021-09-28

在Python中,并发并不是同一时刻有多个操作(thread/task)同时进行。相反,由于全局解释器锁(GIL) 的存在,在某个特定的时刻,它只允许有一个操作发生,只不过线程或任务之间会互相切换,直到完成,如下图所示:


上图中出现了线程(thread) 和任务(task)两种切换顺序的不同方式,分别对应Python中的两种实现并发的方法:threading 和 asyncio. 对于 threading,操作系统知道每个线程的所有信息,因此它会做主在适当的时候做线程切换。而对于 asyncio,主程序想要切换任务时,必须得到此任务可以被切换的通知。

本文内容只涉及基于concurrent.futures的多线程并发,不涉及asyncio。

选择多线程还是多进程?

核心概念:

  • 进程是操作系统分配资源的最小单元,线程是操作系统调度的最小单元;
  • 一个应用程序至少包括一个进程,一个进程至少包括一个线程;
  • 每个进程在执行过程中拥有独立的内存单元,而一个进程的多个线程在执行过程中共享内存;

如果手头的任务是I/O密集型,可以使用标准库的 threading 模块,或者任务是CPU密集型,则可以使用 multiprocessing 模块。这两个模块提供了很多控制权和灵活性,但代价就是必须编写相对低级的冗长代码,在任务核心逻辑的基础上增加额外的并具有复杂性的层,并且当项目达到一定的规模,频繁创建/销毁进程或者线程是非常消耗资源的,这个时候往往需要编写自己的线程池/进程池,以空间换时间。

从Python 3.2开始,标准库提供了 concurrent.futures 模块,它在 threading 和 multiprocessing 之上的一个通用抽象层,提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,以便使用线程池/进程池并发/并行地执行任务。

多线程并发编程之concurrent.futures

concurrent.futures 模块对外提供了以下常量、函数或类:

__all__ = (
    'FIRST_COMPLETED',
    'FIRST_EXCEPTION',
    'ALL_COMPLETED',
    'CancelledError',
    'TimeoutError',
    'BrokenExecutor',
    'Future',
    'Executor',
    'wait',
    'as_completed',
    'ProcessPoolExecutor',
    'ThreadPoolExecutor',
)

Executor

Executor 是一个抽象类,不应该直接使用此类,而是使用它提供的两个子类:ThreadPoolExecutor 和 ProcessPoolExecutor,顾名思义两者分别被用来创建线程池和进程池的代码。

Executor的重要方法
ProcessPoolExecutor 和 ThreadPoolExecutor 类中最重要的几个方法如下:

  • submit(fn,*args, **kwargs):提交异步任务(一般是阻塞的可调用函数),并返回表示可调用对象执行的 Future 对象(原文:returns a Future object representing the execution of the callable);此方法不保存与原始任务相关的任何上下文,如果想把结果和原始任务对应起来,需要自己去追踪它们,比如使用字典推导式;

  • map(fn, *iterables, timeout=None, chunksize=1):和标准的map函数功能类似,同样返回一个迭代器,只不过是以异步的方式把函数依次作用在可迭代对象的每个元素上,注意:如果函数调用引发异常,当从迭代器检索其值时将引发异常,并且不会继续执行;

  • shutdown(wait=True):通知执行器,当所有挂起的 Future 对象执行完成时,释放正在使用的任何资源;在抽象基类中实现了上下文管理器协议,__exit__方法中调用了shutdown方法;

future

Futute 类封装可调用对象的异步执行,应由Executor.submit() 方法创建,可以理解为一个在未来完成的操作。比如说在写爬虫代码时会用到 requests.get ,在等待服务器返回结果之前的这段时间会产生I/O阻塞,CPU不能让出来做其他的事情,Future的引入就是帮助我们在等待的这段时间可以完成其他的操作。

Futute 类中最重要的几个方法如下:

  • result(timeout=None):返回可调用对象的实际返回值;
  • cancel():尝试取消future,如果它正在运行或已经完成,则不能取消,返回False,若取消成功则返回True;
  • cancelled():如果future已被取消,则返回True;
  • running():如果future当前正在运行,则返回True;
  • done():如果future已被取消或执行完成,则返回True;
  • add_done_callback(fn):future执行完成后,添加回调;
  • exception(timeout=None):返回future执行时所引发的异常;

wait 和 as_completed

模块下有2个重要函数 waitas_completed

  • wait(fs, timeout=None, return_when=ALL_COMPLETED)

    遍历fs提供的future(可能由不同的Executor实例创建),并等待执行完成(包含已取消),如果未设置timeout参数,则代表不限制等待时间,return_when参数则用于设置次函数应该在何时返回,支持的选项如下:

Constant Description
FIRST_COMPLETED 在当有任何future执行完成(包括已取消)时返回结果
FIRST_EXCEPTION 当有任何future执行引发异常时返回结果,若没有任何future引发异常则等同于ALL_COMPLETED
ALL_COMPLETED 当所有future执行完成(包括已取消)时返回结果

该函数返回一个包含两个元素的namedtuple,定义如下:

DoneAndNotDoneFutures = collections.namedtuple('DoneAndNotDoneFutures', 'done not_done')

  • as_completed(fs, timeout=None)

返回一个迭代器,遍历fs给出的 future 实例(可能由不同的执行器实例创建),在它们执行完成(包含已取消)时 yield future。

经验技巧

正确的使用submit和map

submit 方法返回的是 Future 对象, map方法则返回迭代器,如果没有调用future对象的result方法,即使执行过程中有异常用户也是不知道的,如下所示:

f = lambda x: 100 // x

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    future1 = executor.submit(f, 0)
    future2 = executor.submit(f, 10)
    future3 = executor.submit(f, 20)
print(future1, future2, future3, sep='\n')

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    futures = executor.map(f, [0, 10, 20])
print(futures)

>>>
<Future at 0x3d69fb8 state=finished raised ZeroDivisionError>
<Future at 0x3d85460 state=finished returned int>
<Future at 0x3d856d0 state=finished returned int>
<generator object Executor.map.<locals>.result_iterator at 0x03D59A00>

所以通常需要调用其 result 方法并且捕捉异常:

f = lambda x: 100 // x

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    future1 = executor.submit(f, 0)
    future2 = executor.submit(f, 0)
    future3 = executor.submit(f, 20)

todos = [future1, future2, future3]
for future in concurrent.futures.as_completed(todos):
    try:
        print(future.result())
    except ZeroDivisionError as e:
        print(e.__repr__())

>>>
ZeroDivisionError('integer division or modulo by zero')
5
ZeroDivisionError('integer division or modulo by zero')

相对于submit,map方法的结果就比较难获取了,这是因为map方法以异步的方式把函数依次作用在可迭代对象的每个元素上,如果在函数调用时引发了一些异常,当从迭代器检索其值时就将引发异常,因此需要使用下面的方法:

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    # If a func call raises an exception, then that exception will be raised when its value is retrieved from the iterator.
    futures = executor.map(f, [0, 10, 20])

while True:
    try:
        future = futures.__next__()
    except StopIteration:
        break
    except ZeroDivisionError as e:
        print(e.__repr__())
 
>>>
ZeroDivisionError('integer division or modulo by zero')

可以看到,当第一次错误发生后生成器迭代就结束了,所以一批任务中可能会出现异常时是不合适用 map 方法的,最好的方式还是使用 submit+as_completed. 在一些较为简单的场景下,如果不需要关心任务的返回值,则可以考虑使用map方法。

寻找合适的max_worker

使用ThreadPoolExecutor,虽然线程的数量可以自定义,但并不是越多越好,因为线程的创建、维护和删除也会有一定的开销。所以如果设置的很大,反而可能会导致速度变慢,比如下面的例子,把线程数从5改为10,运行程序会发现耗时反而增多了。所以在实际开发过程中,往往需要根据实际的需要去做一些测试,在任务不影响到全局的情况下,寻找最优的线程数量。

...
def download_all(urls: list):
    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        executor.map(download_one, urls)
...
if __name__ == "__main__":
    main()
    
>>>
...
Download 30 urls in 0.9526623 seconds

避免死锁

使用ThreadPoolExecutor时可能出现的死锁情况,当与Future 关联的可调用函数等待另一个Future的结果时,它们可能永远不会释放对线程的控制并导致死锁,官网的示例如下:

import time
def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6


executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

在上面的例子中,函数wait_on_b依赖于函数wait_on_a的结果(Future对象的结果),同时后一个函数的结果依赖于前一个函数的结果。因此,上下文管理器中的代码块永远不会执行,因为它具有相互依赖性,这就造成了死锁。

简单使用场景

  1. 使用多线程从url下载和保存文件
import os
import concurrent.futures
from pathlib import Path

import requests

htmls_dir = Path(__file__).parent.joinpath('htmls')
if not htmls_dir.exists():
    os.makedirs(htmls_dir)
else:
    for html in htmls_dir.glob("*.html"):
        os.remove(html)


def download_one(url):
    resp = requests.get(url)
    resp.encoding = 'utf-8'
    return resp.text


def save(source, html, chunk=8 * 1024):
    with open(source, 'w', encoding="utf-8") as fp:
        for text in (html[i:chunk + i] for i in range(0, len(html), chunk)):
            fp.write(text)


def download_all(urls):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        futures = {executor.submit(download_one, url): url[-7:] for url in urls}

    for future in concurrent.futures.as_completed(futures):
        source = htmls_dir.joinpath(futures[future]).with_suffix('.html')
        save(source, future.result())


def main():
    urls = [f"https://www.sogou.com/web?query={i}" for i in range(30)]
    start_time = time.perf_counter()
    download_all(urls)
    end_time = time.perf_counter()
    elapsed_time = end_time - start_time
    print(f'Download {len(urls)} urls in {elapsed_time} seconds')


if __name__ == "__main__":
    main()

参考文档

  1. asyncio --- 异步 I/O 官方文档
  2. 进程与线程的一个简单解释,阮一峰
  3. 一文看懂Python多进程与多线程编程
  4. 使用Python的concurrent.futures轻松实现并发编程
  5. 使用concurrent.futures的一些经验
  6. Python并发编程之线程池/进程池--concurrent.futures模块
  7. 官方文档
  8. [Book] Python Cook

【To Be Continued...】

精彩评论(0)

0 0 举报