就算没有天分,只要你愿意每天花一点时间,做同样一件事情,不知不觉间,你就会走得很远。
上篇博文介绍了multiprocessing模块的内存共享,下面讲进程池。有些情况下,所要完成的工作可以上篇博文介绍了multiprocessing模块的内存共享,下面讲进程池。有些情况下,所要完成的工作可以分解并独立地分布到多个工作进程,对于这种简单的情况,可以用Pool类来管理固定数目的工作进程。作业的返回值会收集并作为一个列表返回。Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。
在网上找到了一篇非常好的分析进程池源码的文章,在这里跟大家分享下,篇幅比较长,希望大家能够有耐心的看完它,仔细体会。
进程池使用multiprocessing.pool,pool的构造如下:
multiprocessing.Pool([processes[, initializer[, initargs[, maxtasksperchild]]]])
- processes表示pool中进程的数目,默认地为当前CPU的核数可以通过multiprocessing.cpu_count()方法参考你机器上cpu数量;
- initializer表示工作进程start时调用的初始化函数;
- initargs表示initializer函数的参数,如果initializer不为None,在每个工作进程start之前会调用;
- maxtasksperchild表示每个工作进程在退出/被其他新的进程替代前,需要完成的工作任务数,默认为None,表示工作进程存活时间与pool相同,即不会自动退出/被替换。
主要方法:
- apply(func[, args[, kwds]]) :apply用于传递不定参数,同python中的apply函数一致(不过内置的apply函数从2.3以后就不建议使用了),主进程会阻塞于函数,主进程的执行流程同单进程一致;
- apply_async(func[, args[, kwds[, callback]]]) :与apply用法一致,但它是非阻塞的且支持结果返回后进行回调;
主进程循环运行过程中不等待apply_async的返回结果,在主进程结束后,即使子进程还未返回整个程序也会退出。虽然apply_async是非阻塞的,但其返回结果的get方法却是阻塞的,如使用result.get()会阻塞主进程。
如果我们对返回结果不感兴趣, 那么可以在主进程中使用pool.close与pool.join来防止主进程退出。注意join方法一定要在close或terminate之后调用。
- map(func, iterable[, chunksize]) :map方法与在功能上等价与内置的map(),只不过单个任务会并行运行。它会使进程阻塞直到结果返回。但需注意的是其第二个参数虽然描述的为iterable, 但在实际使用中发现只有在整个队列全部就绪后,程序才会运行子进程;
- map_async(func, iterable[, chunksize[, callback]]) :与map用法一致,但是它是非阻塞的。其有关事项见apply_async;
- imap(func, iterable[, chunksize]) :与map不同的是, imap的返回结果为iter,需要在主进程中主动使用next来驱动子进程的调用。即使子进程没有返回结果,主进程对于gen_list(l)的 iter还是会继续进行, 另外根据python2.6文档的描述,对于大数据量的iterable而言,将chunksize设置大一些比默认的1要好;
- imap_unordered(func, iterable[, chunksize]) :同imap一致,只不过其并不保证返回结果与迭代传入的顺序一致;
- close() :关闭pool,使其不再接受新的任务;
- terminate() :结束工作进程,不再处理未处理的任务;
- join() :主进程阻塞等待子进程的退出, join方法要在close或terminate之后使用。
它的源码在multiprocessing包pool.py里,Pool对象的初始化函数如下:
1 | class Pool(object): |
主要数据结构有:
- self._inqueue 接收任务队列(SimpleQueue),用于主进程将任务发送给worker进程;
- self._outqueue 发送结果队列(SimpleQueue),用于worker进程将结果发送给主进程;
- self._taskqueue 同步的任务队列,保存线程池分配给主进程的任务;
- self._cache = {} 任务缓存;
- self._processes worker进程个数;
- self._pool = [] woker进程队列。
进程池工作时,任务的接收、分配。结果的返回,均由进程池内部的各个线程合作完成,来看看进程池内部由那些线程:
_work_handler
线程,负责保证进程池中的worker进程在有退出的情况下,创建出新的worker进程,并添加到进程队列(pools)中,保持进程池中的worker进程数始终为processes个。
_worker_handler
线程回调函数为Pool._handler_workers
方法,在进程池state==RUN时,循环调用_maintain_pool
方法,监控是否有进程退出,并创建新的进程,append到进程池pools中,保持进程池中的worker进程数始终为processes个。
1 | self._worker_handler = threading.Thread( |
_task_handler
线程,负责从进程池中的task_queue中,将任务取出,放入接收任务队列(Pipe)
1 | self._task_handler = threading.Thread( |
Pool._handle_tasks方法不断从task_queue中获取任务,并放入接受任务队列(in_queue),以此触发worker进程进行任务处理。当从task_queue读取到None元素时,表示进程池将要被终止(terminate),不再处理之后的任务请求,同时向接受任务队列和结果任务队列put None元素,通知其他线程结束。
_handle_results线程,负责将处理完的任务结果,从outqueue(Pipe)中读取出来,放在任务缓存cache中,
1 | self._result_handler = threading.Thread( |
_terminate
,这里的_terminate
并不是一个线程,而是一个Finalize对象
1 | self._terminate = Finalize( |
下面接着看下客户端如何对向进程池分配任务,并获取结果的。
我们知道,当进程池中任务队列非空时,才会触发worker进程去工作,那么如何向进程池中的任务队列中添加任务呢,进程池类有两组关键方法来创建任务,分别是apply/apply_async和map/map_async,实际上进程池类的apply和map方法与python内建的两个同名方法类似,apply_async和map_async分别为它们的非阻塞版本。
首先来看apply_async方法,源码如下:
1 | def apply_async(self, func, args=(), kwds={}, callback=None): |
每调用一次apply_result方法,实际上就向_taskqueue
中添加了一条任务,注意这里采用了非阻塞(异步)的调用方式,即apply_async方法中新建的任务只是被添加到任务队列中,还并未执行,不需要等待,直接返回创建的ApplyResult对象,注意在创建ApplyResult对象时,将它放入进程池的缓存_cache
中。
任务队列中有了新创建的任务,那么根据上节分析的处理流程,进程池的_task_handler
线程,将任务从taskqueue中获取出来,放入_inqueue
中,触发worker进程根据args和kwds调用func,运行结束后,将结果放入_outqueue
,再由进程池中的_handle_results
线程,将运行结果从_outqueue
中取出,并找到_cache
缓存中的ApplyResult对象,_set
其运行结果,等待调用端获取。
apply_async方法既然是异步的,那么它如何知道任务结束,并获取结果呢?这里需要了解ApplyResult类中的两个主要方法:
1 | def get(self, timeout=None): |
apply方法是以阻塞的方式运行获取进程结果,它的实现很简单,同样是调用apply_async,只不过不返回ApplyResult,而是直接返回worker进程运行的结果:
1 | def apply(self, func, args=(), kwds={}): |
以上的apply/apply_async方法,每次只能向进程池分配一个任务,那如果想一次分配多个任务到进程池中,可以使用map/map_async方法。首先来看下map_async方法是如何定义的:
1 | def map_async(self, func, iterable, chunksize=None, callback=None): |
从源码可以看出,map_async要比apply_async复杂,首先它会根据chunksize对任务参数序列进行分组,chunksize表示每组中的任务个数,当默认chunksize=None时,根据任务参数序列和进程池中进程数计算分组数:chunk, extra = divmod(len(iterable), len(self._pool) * 4)。假设进程池中进程数为len(self._pool)=4,任务参数序列iterable=range(123),那么chunk=7, extra=11,向下执行,得出chunksize=8,表示将任务参数序列分为8组。任务实际分组:
1 | task_batches = Pool._get_tasks(func, iterable, chunksize) |
分组之后,这里定义了一个MapResult对象:result = MapResult(self._cache, chunksize, len(iterable), callback)它继承自AppyResult类,同样提供get和_set方法接口。将分组后的任务放入任务队列中,然后就返回刚刚创建的result对象。
1 | self._taskqueue.put((((result._job, i, mapstar, (x,), {}) |
注意这里只调用了一次put方法,将16组元组作为一个整体序列放入任务队列,那么这个任务是否_task_handler
线程是否也会像apply_async方法一样,将整个任务序列传递给_inqueue
,这样就会导致进程池中的只有一个worker进程获取到任务序列,而并非起到多进程的处理方式。我们来看下_task_handler
线程是怎样处理的:
1 | def _handle_tasks(taskqueue, put, outqueue, pool, cache): |
注意到语句 for i, task in enumerate(taskseq),原来_task_handler
线程在通过taskqueue获取到任务序列后,并不是直接放入_inqueue
中的,而是将序列中任务按照之前分好的组,依次放入_inqueue
中的,而循环中的task即上述的每个任务元组:(result._job, 0, mapstar, ((func, (0, 1, 2, 3, 4, 5, 6, 7)),), {}, None)。接着触发worker进程。worker进程获取出每组任务,进行任务的处理:
1 | job, i, func, args, kwds = task |
现在来我们来总结下,进程池的map_async方法是如何运行的,我们将range(123)这个任务序列,将它传入map_async方法,假设不指定chunksize,并且cpu为四核,那么方法内部会分为16个组(0~14组每组8个元素,最后一组3个元素)。将分组后的任务放入任务队列,一共16组,那么每个进程需要运行4次来处理,每次通过内建的map方法,顺序将组中8个任务执行,再将结果放入_outqueue
,找到_cache
缓存中的MapResult对象,_set
其运行结果,等待客户端获取。使用map_async方法会调用多个worker进程处理任务,每个worler进程运行结束,会将结果传入_outqueue
,再有_handle_result
线程将结果写入MapResult对象,那如何保证结果序列的顺序与调用map_async时传入的任务参数序列一致呢,我们来看看MapResult的构造函数和_set
方法的实现。
1 | def __init__(self, cache, chunksize, length, callback): |
MapResult类中,_value
保存map_async的运行结果,初始化时为一个元素为None的list,list的长度与任务参数序列的长度相同,_chunksize
表示将任务分组后,每组有多少个任务,_number_left
表示整个任务序列被分为多少个组。_handle_result
线程会通过_set
方法将worker进程的运行结果保存到_value
中,那么如何将worker进程运行的结果填入到_value
中正确的位置呢,还记得在map_async在向task_queue填入任务时,每组中的 i吗,i表示的就是当前任务组的组号,_set
方法会根据当前任务的组号即参数 i,并且递减_number_left
,当_number_left
递减为0时,表示任务参数序列中的所有任务都已被woker进程处理,_value
全部被计算出,唤醒阻塞在get方法上的条件变量,是客户端可以获取运行结果。
map函数为map_async的阻塞版本,它在map_async的基础上,调用get方法,直接阻塞到结果全部返回:
1 | def map(self, func, iterable, chunksize=None): |
我们知道,进程池内部由多个线程互相协作,向客户端提供可靠的服务,那么这些线程之间是怎样做到数据共享与同步的呢?在客户端使用apply/map函数向进程池分配任务时,使用self._taskqueue
来存放任务元素,_taskqueue
定义为Queue.Queue(),这是一个python标准库中的线程安全的同步队列,它保证通知时刻只有一个线程向队列添加或从队列获取元素。这样,主线程向进程池中分配任务(taskqueue.put),进程池中_handle_tasks
线程读取_taskqueue
队列中的元素,两个线程同时操作taskqueue,互不影响。进程池中有N个worker进程在等待任务下发,那么进程池中的_handle_tasks
线程读取出任务后,又如何保证一个任务不被多个worker进程获取到呢?我们来看下_handle_tasks
线程将任务读取出来之后如何交给worker进程的:
1 | for taskseq, set_length in iter(taskqueue.get, None): |
在worker进程运行结束之后,会将执行结果通过管道传回,进程池中有_handle_result
线程来负责接收result,取出之后,通过调用_set
方法将结果写回ApplyResult/MapResult对象,客户端可以通过get方法取出结果,这里通过使用条件变量进行同步,当_set
函数执行之后,通过条件变量唤醒阻塞在get函数的主进程。
进程池终止工作通过调用Pool.terminate()来实现,这里的实现很巧妙,用了一个可调用对象,将终止Pool时的需要执行的回调函数先注册好,等到需要终止时,直接调用对象即可。
1 | self._terminate = Finalize( |
使用map/map_async函数向进程池中批量分配任务时,使用了生成器表达式:
1 | self._taskqueue.put((((result._job, i, mapstar, (x,), {}) for i, x in enumerate(task_batches)), None)) |
在Pool中,_worker_handler线程负责监控、创建新的工作进程,在监控工作进程退出时,同时将退出的进程从进程池中删除掉。这类似于,一边遍历一边删除列表。我们来看下下面代码的实现:
1 | 1, 2, 3, 3, 4, 4, 4, 5] l = [ |
我们看到l没有将所有的3和4都删除掉,这是因为remove改变了l的大小。再看下面的实现:
1 | 1, 2, 3, 3, 4, 4, 4, 5] l = [ |
同样因为del l[i]时,l的大小改变,继续访问下去导致访问越界。而标准库中的进程池给出了遍历删除的一个正确示例:
1 | for i in reversed(range(len(self._pool))): |
使用reversed,从后向前删除list中的元素,这样会保证所有符合删除条件的元素被删除掉:
1 | 1, 2, 3, 3, 4, 4, 4, 5] l = [ |