python——queue模块

python——queue

简介

Queue是python标准库中的线程安全的队列(FIFO)实现,提供了一个适用于多线程编程的先进先出的数据结构,即队列,用来在生产者和消费者线程之间的信息传递

基本FIFO队列

queue.Queue(maxsize=0)

import queue

q = queue.Queue()
q.put(1)
q.put(2)
q.put(3)

print(q.get())
print(q.get())
print(q.get())

输出:

1
2
3

LIFO (后入先出)

queue.LifoQueue

import queue

q = queue.LifoQueue()
q.put(1)
q.put(2)
q.put(3)

print(q.get())
print(q.get())
print(q.get())

输出:

3
2
1

PriorityQueue (数据可设置优先级)

queue.PriorityQueue
同优先级的按照 ASCII 排序

import queue

q = queue.PriorityQueue()
q.put((2, '2'))
q.put((1, '1'))
q.put((3, '3'))
q.put((1, 'a'))

print(q.get())
print(q.get())
print(q.get())
print(q.get())

输出

(1, '1')
(1, 'a')
(2, '2')
(3, '3')

一些常用方法

task_done()

意味着之前入队的一个任务已经完成。由队列的消费者线程调用。每一个get()调用得到一个任务,接下来的task_done()调用告诉队列该任务已经处理完毕。

如果当前一个join()正在阻塞,它将在队列中的所有任务都处理完时恢复执行(即每一个由put()调用入队的任务都有一个对应的task_done()调用)。

join()

阻塞调用线程,直到队列中的所有任务被处理掉。

只要有数据被加入队列,未完成的任务数就会增加。当消费者线程调用task_done()(意味着有消费者取得任务并完成任务),未完成的任务数就会减少。当未完成的任务数降到0,join()解除阻塞。

put(item[, block[, timeout]])

将item放入队列中。

  1. 如果可选的参数block为True且timeout为空对象(默认的情况,阻塞调用,无超时)。
  2. 如果timeout是个正整数,阻塞调用进程最多timeout秒,如果一直无空空间可用,抛出Full异常(带超时的阻塞调用)。
  3. 如果block为False,如果有空闲空间可用将数据放入队列,否则立即抛出Full异常

其非阻塞版本为put_nowait等同于put(item, False)

get([block[, timeout]])

从队列中移除并返回一个数据。block跟timeout参数同put方法

其非阻塞方法为get_nowait()相当与get(False)

full() & empty()

Queue.empty()/Queue.full() 用于判断队列是否为空、满

qsize()

Queue.qsize() 用于获取队列中大致的数据量
注意:在多线程的情况下不可靠
因为在获取 qsize 时,其他线程可能又对队列进行操作了

生产者消费者模型(主要用于解耦)

在多线程开发当中,如果生产线程处理速度很快,而消费线程处理速度很慢,那么生产线程就必须等待消费线程处理完,才能继续生产数据。同样的道理,如果消费线程的处理能力大于生产线程,那么消费线程就必须等待生产线程。为了解决这个问题于是引入了生产者和消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

import threading
import time
import queue


def producer():
    count = 1
    while 1:
        q.put('No.%i' % count)
        print('Producer put No.%i' % count)
        time.sleep(1)
        count += 1


def customer(name):
    while 1:
        print('%s get %s' % (name, q.get()))
        time.sleep(1.5)


q = queue.Queue(maxsize=5)
p = threading.Thread(target=producer, )
c = threading.Thread(target=customer, args=('jack', ))
p.start()
c.start()

生产者消费者爬虫

# !usr/bin/env python 3.6
# -*- coding: utf-8 -*-
# Author: fcj
# Time: 2019-05-09
# Description: python 多线程-普通多线程-生产者消费者模型

import re
import time
import requests
import threading
from lxml import etree
from bs4 import BeautifulSoup
from queue import Queue
from threading import Thread


def producer(in_q):  # 生产者
    ready_list = []
    while in_q.full() is False:
        for i in range(1, 1001):
            url = 'http://www.g.com/?page='+str(i)
            if url not in ready_list:
                ready_list.append(url)
                in_q.put(url)
            else:
                continue


def consumer(in_q, out_q):  # 消费者
    headers = {
        'Accept': ‘',
        'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
        'Connection': 'keep-alive',
        'Cookie': ',
        'DNT': '1',
        'Host': 'www..com',
        'Referer': 'http://www.g.com',
        'Upgrade-Insecure-Requests': '1',
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
                      '(KHTML, like Gecko) Chrome/73.0.3683.103 Safari/537.36'
    }
    while True:
        data = requests.get(url=in_q.get(), headers=headers)
        r = data.content
        content = str(r, encoding='utf-8', errors='ignore')
        soup = BeautifulSoup(content, 'html5lib')
        fixed_html = soup.prettify()
        html = etree.HTML(fixed_html)
        nums = html.xpath('//div[@class="col-md-1"]//text()')
        for num in nums:
            num = re.findall('[0-9]', ''.join(num))
            real_num = int(''.join(num))
            out_q.put(str(threading.current_thread().getName()) + '-' + str(real_num))
        in_q.task_done()  # 通知生产者,队列已消化完


if __name__ == '__main__':
    start = time.time()
    queue = Queue(maxsize=10)  # 设置队列最大空间为10
    result_queue = Queue()
    print('queue 开始大小 %d' % queue.qsize())

    producer_thread = Thread(target=producer, args=(queue,))
    producer_thread.daemon = True
    producer_thread.start()

    for index in range(10):
        consumer_thread = Thread(target=consumer, args=(queue, result_queue, ))
        consumer_thread.daemon = True
        consumer_thread.start()

    queue.join()
    end = time.time()
    print('总耗时:%s' % (end - start))
    print('queue 结束大小 %d' % queue.qsize())
    print('result_queue 结束大小 %d' % result_queue.qsize())



原文地址:https://www.cnblogs.com/tomyyyyy/p/14198081.html