就算孤独,也不能一个人哭泣,无论走在哪,都要高傲地挺立;就算是挫败,也不能窝囊地依靠,无论在何时,都要倔强地站起。前面的路过于平坦,惬意久了难以找到方向;唯有脚下布满了荆棘,前方或许才是生机。
简介
Queue模块是提供队列操作的模块,队列是线程间最常用的交换数据的形式。该模块提供了三种队列:
- Queue.Queue(maxsize):先进先出,maxsize是队列的大小,其值为非正数时为无线循环队列
- Queue.LifoQueue(maxsize):后进先出,相当于栈
- Queue.PriorityQueue(maxsize):优先级队列。
其中LifoQueue,PriorityQueue是Queue的子类。三者拥有以下共同的方法:
qsize()
:返回近似的队列大小。为什么要加“近似”二字呢?因为当该值大于0的时候并不保证并发执行的时候get()方法不被阻塞,同样,对于put()方法有效。empty()
:返回布尔值,队列为空时,返回True,反之返回False。full()
:当设定了队列大小的时候,如果队列满了,则返回True,否则返回False。put(item[,block[,timeout]])
:向队列里添加元素item,block设置为False的时候,如果队列满了则抛出Full异常。如果block设置为True,timeout设置为None时,则会一种等到有空位的时候再添加进队列;否则会根据timeout设定的超时值抛出Full异常。put_nowwait(item)
:等价与put(item,False)。block设置为False的时候,如果队列为空,则抛出Empty异常。如果block设置为True,timeout设置为None时,则会一种等到有空位的时候再添加进队列;否则会根据timeout设定的超时值抛出Empty异常。get([block[,timeout]])
:从队列中删除元素并返回该元素的值,如果timeout是一个正数,它会阻塞最多超时秒数,并且如果在该时间内没有可用的项目,则引发Empty异常。get_nowwait()
:等价于get(False)task_done()
:发送信号表明入列任务已完成,经常在消费者线程中用到。join()
:阻塞直至队列所有元素处理完毕,然后再处理其它操作。
源码分析
Queue模块用起来很简单很简单,但我觉得有必要把该模块的相关源代码贴出来分析下,会学到不少东西,看看大神们写的代码多么美观,多么结构化模块化,再想想自己写的代码,都是泪呀,来学习学习。为了缩减篇幅,源码的注释部分被删减掉。
1 | from time import time as _time |
通过后面的几个函数分析知道,Queue对象是在collections模块的queue基础上(关于collections模块参考 Python:使用Counter进行计数统计及collections模块),加上threading模块互斥锁和条件变量封装的。
deque是一个双端队列,很适用于队列和栈。上面的Queue对象就是一个先进先出的队列,所以首先_init()
函数定义了一个双端队列,然后它的定义了_put()和_get()
函数,它们分别是从双端队列右边添加元素、左边删除元素,这就构成了一个先进先出队列,同理很容易想到LifoQueue(后进先出队列)的实现了,保证队列右边添加右边删除就可以。可以贴出源代码看看。
1 | class LifoQueue(Queue): |
虽然它的”queue”没有用queue(),用列表也是一样的,因为列表append()和pop()操作是在最右边添加元素和删除最右边元素。
再来看看PriorityQueue,他是个优先级队列,这里用到了heapq模块的heappush()和heappop()两个函数。heapq模块对堆这种数据结构进行了模块化,可以建立这种数据结构,同时heapq模块也提供了相应的方法来对堆做操作。其中_init()函数里self.queue=[]可以看作是建立了一个空堆。heappush() 往堆中插入一条新的值 ,heappop() 从堆中弹出最小值 ,这就可以实现优先级(关于heapq模块这里这是简单的介绍)。源代码如下:
1 | class PriorityQueue(Queue): |
基本的数据结构分析完了,接着分析其它的部分。
- mutex 是个threading.Lock()对象,是互斥锁;not_empty、 not_full 、all_tasks_done这三个都是threading.Condition()对象,条件变量,而且维护的是同一把锁对象mutex(关于threading模块中Lock对象和Condition对象可参考上篇博文 Python:线程、进程与协程(2)——threading模块)。
- 其中:
- self.mutex互斥锁:任何获取队列的状态(empty(),qsize()等),或者修改队列的内容的操作(get,put等)都必须持有该互斥锁。acquire()获取锁,release()释放锁。同时该互斥锁被三个条件变量共同维护。
- self.not_empty条件变量:线程添加数据到队列中后,会调用self.not_empty.notify()通知其它线程,然后唤醒一个移除元素的线程。
- self.not_full条件变量:当一个元素被移除出队列时,会唤醒一个添加元素的线程。
- self.all_tasks_done条件变量 :在未完成任务的数量被删除至0时,通知所有任务完成
- self.unfinished_tasks : 定义未完成任务数量
再来看看主要方法:
put()
源代码如下:
1 | def put(self, item, block=True, timeout=None): |
默认情况下block为True,timeout为None。如果队列满则会等待,未满则会调用_put方法将进程加入deque中(后面介绍),并且未完成任务加1还会通知队列非空。
如果设置block参数为Flase,队列满时则会抛异常。如果设置了超时那么在时间到之前进行阻塞,时间一到抛异常。这个方法使用not_full对象进行操作。
get()
源码如下:
1 | def get(self, block=True, timeout=None): |
逻辑跟put()函数一样,参数默认情况下队列空了则会等待,否则将会调用_get方法(往下看)移除并获得一个项,最后返回这个项。这个方法使用not_empty对象进行操作。
不过我觉得put()与get()两个函数结合起来理解比较好。not_full与not_empty代表的是两种不同操作类型的线程,not_full可以理解成is-not-full,即队列是否满了,默认是没有满,没有满时not_full这个条件变量才能获取锁,并做一些条件判断,只有符合条件才能向队列里加元素,添加成功后就会通知not_empty条件变量队列里不是空的,“我”刚刚添加进了一个元素,满足可以执行删除动作的基本条件了(队列不是空的,想想如果是空的执行删除动作就没有意义了),同时唤醒一些被挂起的执行移除动作的线程,让这些线程重新判断条件,如果条件准许就会执行删除动作,然后又通知not_full条件变量,告诉“它”队列不是满的,因为“我”刚才删除了一个元素(想想如果队列满了添加元素就添加不进呀,就没意义了),满足了添加元素的基本条件(队列不是满的),同时唤醒一些被挂起的执行添加动作的线程,这些线程又会进行条件判断,符合条件就会添加元素,否则继续挂起,依次类推,同时这样也保证了线程的安全。正与前面所说,当一个元素被移除出队列时,会唤醒一个添加元素的线程;当添加一个元素时会唤醒一个删除元素的线程。
task_done()
源码如下:
1 | def task_done(self): |
这个方法判断队列中一个线程的任务是否全部完成,首先会通过all_tasks_done对象获得锁,如果是则进行通知,最后释放锁。
join()
源码如下:
1 | def join(self): |
阻塞方法,当队列中有未完成进程时,调用join方法来阻塞,直到他们都完成。
其它的方法都比较简单,也比较好理解,有兴趣可以去看看Queue.py里的源码,要注意的是任何获取队列的状态(empty(),qsize()等),或者修改队列的内容的操作(get,put等)都必须持有互斥锁mutex。
示例
示例一
实现一个线程不断生成一个随机数到一个队列中
实现一个线程从上面的队列里面不断的取出奇数
实现另外一个线程从上面的队列里面不断取出偶数
1 | import random,threading,time |
这个例子跟上篇博文 Python:线程、进程与协程(2)——threading模块中介绍Condition的例子很像,就是构造了一个长度为20的队列,当队列1元素个数小于8时就忘队列中添加元素,当队列满后,就不再添加,当队列元素大于7个时,才会取元素,否则不取元素。有兴趣的可以动手试试,仔细体会下。
示例二
线程池
在使用多线程处理任务时也不是线程越多越好,由于在切换线程的时候,需要切换上下文环境,依然会造成cpu的大量开销。为解决这个问题,线程池的概念被提出来了。预先创建好一个较为优化的数量的线程,让过来的任务立刻能够使用,就形成了线程池。在python中,没有内置的较好的线程池模块,需要自己实现或使用第三方模块。
1 | #coding=utf-8 |