如何在线程间进行事件通知?

需求:
在上节课,我们从Intrinio.com下载多支股票的csv数据,并将其转换为xml文件
额外需求:
实现一个线程TarThread,将转换出的xml文件打包。比如转换线程每生产出100个xml文件,就通知打包线程将它们打包成一个xxx.tgz文件
并删除xml文件,打包完成后,打包线程反过来通知转换线程,转换线程继续转换

思路:
线程间的事件通知,可以使用标准库中的Threading.Event
1、等待事件一端调用wait,等待事件
2、通知事件一端用set,通知事件

代码:

import requests
import base64
from io import StringIO
import csv
from xml.etree.ElementTree import ElementTree, Element, SubElement
from threading import Thread
import tarfile
import os

apikey = 'OjZlY2MzYTQwNGVlMTI3Y2VkYjMyYTZiNzJiYzdlOTFk'

class DownloadThread(Thread):
    def __init__(self, page_number,queue):
        super().__init__()
        self.page_number = page_number
        self.queue = queue

    def run(self):
        csv_file = None
        while not csv_file:
            csv_file = self.download_csv(self.page_number)
        self.queue.put((self.page_number,csv_file))

    def download_csv(self,page_number):
        print('download csv data [page=%s]' % page_number)
        url = "https://api.intrinio.com/prices.csv?api_key=OjZlY2MzYTQwNGVlMTI3Y2VkYjMyYTZiNzJiYzdlOTFk&identifier=AAPL&page_size=20&page_number=%s&start_date=2017-09-28&end_date=2020-09-28" % page_number
        # auth = b'Basic ' + base64.b64encode(b'%s' % api_key)
        # headers = {'Authorization' : auth}
        response = requests.get(url)

        if response.ok:
            return StringIO(response.text)

class ConvertThread(Thread):
    def __init__(self,queue,c_event,t_event):
        super().__init__()
        self.queue = queue
        self.c_event = c_event
        self.t_event = t_event

    def run(self):
        count = 0
        while True:
            page_number,csv_file = self.queue.get()
            if page_number == -1:
                # 对于最后不足三个的情况,也进行打包并等待打包完成
                self.c_event.set()
                self.t_event.wait()
                break
            count += 1
            self.csv_to_xml(csv_file,'data%s.xml' % page_number)

            if count == 3:
                count = 0
                # 通知转换完成
                self.c_event.set()

                # 等待打包完成
                self.t_event.wait()
                self.t_event.clear()

    def csv_to_xml(self,csv_file, xml_path):
        print('Convert csv data to %s' % xml_path)
        reader = csv.reader(csv_file)
        headers = next(reader)

        root = Element('Data')
        root.text = '
	'
        root.tail = '
'

        for row in reader:
            book = SubElement(root, 'Row')
            book.text = '
		'
            book.tail = '
	'

            for tag, text in zip(headers, row):
                e = SubElement(book, tag)
                e.text = text
                e.tail = '
		'
            e.tail = '
	'

        ElementTree(root).write(xml_path, encoding='utf8')

class TarThread(Thread):
    def __init__(self,c_event,t_event):
        super().__init__(daemon=True) # 实现成了一个守护线程,在下载线程和转换线程退出后自动退出。
        self.count = 0
        self.c_event = c_event
        self.t_event = t_event

    def run(self):
        while True:
            # 等待转换完成
            self.c_event.wait()
            self.c_event.clear()  # 要进行清理,为了下次还可以调用event

            # 打包
            self.tar_xml()

            # 通知打包完成
            self.t_event.set()


    def tar_xml(self):
        self.count += 1
        tfname = 'data%s.tgz' % self.count
        print('tar %s...' % tfname)
        tf = tarfile.open(tfname,'w:gz')
        for fname in os.listdir('.'):
            if fname.endswith('.xml'):
                tf.add(fname)
                os.remove(fname)
        tf.close()

        if not tf.members:
            os.remove(tfname)

# def download_and_save(page_number, xml_path):
    # # IO
    # csv_file = None
    # while not csv_file:
        # csv_file = download_csv(page_number)
    # # CPU
    # csv_to_xml(csv_file, 'data%s.xml' % page_number)

# class MyThread(Thread):
    # def __init__(self, page_number, xml_path):
        # super().__init__()
        # self.page_number = page_number
        # self.xml_path = xml_path
# 
    # def run(self):
        # download_and_save(self.page_number, self.xml_path)

from queue import Queue
from threading import  Event

if __name__ == '__main__':
    queue = Queue()
    c_event = Event()
    t_event = Event()
    import time
    t0 = time.time()
    thread_list = []
    for i in range(1,15):
        t = DownloadThread(i,queue)
        t.start()
        thread_list.append(t)

    convert_thread= ConvertThread(queue,c_event,t_event)
    convert_thread.start()

    tar_thread = TarThread(c_event,t_event)
    tar_thread.start()

    for t in thread_list:
        t.join()
    # for i in range(1, 6):
    #      download_and_save(i, 'data%s.xml' % i)
    print(time.time() - t0)
    print('main thread end.')
    queue.put((-1,None))

=========================================

>>> from threading import Thread,Event
>>> def f(event):
...     print('wait event...')
...     event.wait()
...     print('f end...')
... 
>>> e = Event()
>>> thread = Thread(target=f, args=(e,))
>>> thread.start()
wait event...
>>> e.set()
f end...
>>> thread = Thread(target=f, args=(e,))
>>> thread.start()
wait event...
f end...
>>> e.clear()
>>> thread = Thread(target=f, args=(e,))
>>> thread.start()
wait event...
>>> 
原文地址:https://www.cnblogs.com/Richardo-M-Q/p/13948981.html