python之深度学习-模拟异步操作(队列)

队列管理器的创建:

tf.train.QueueRunner(queue,enqueue_ops = None):

参数解释:

queue:一个队列

enqueue_ops :添加线程的队列操作的列表,[]*2就是指定两个线程

create_threads(sess,coord=None,start = False);创建线程来运行给定会话的入队操作,返回一个线程的实例

参数的解释:

start:布尔值,如果是True就启动线程,如果是False的话,调用者要是想启动线程就必须调用start()

coord:线程协调器,后面的线程管理器需要用到

#模拟异步操作:子线程存入数据,主线程从子线程的队列中读取数据
#1.定义一个队列:大小是1000
Q = tf.FIFOQueue(1000,tf.float32)
#2.定义子线程需要怎么处理数据,这里我们把数据每次都+1
var = tf.Variable(0.0) #初始值是0.0
data = tf.assign_add(var,1.0) #每次都把var这个变量的值+1,为什么不直接var+1呢,因为这是两个不同的op,
en_q = Q.enqueue(data) #进队列
#3.定义一个队列的任务管理器,可以指定多个子线程,接下来我们指定两个
qr = tf.train.QueueRunner(Q,enqueue_ops=[en_q]*2)
#不要忘记之前定义过的变量要初始化
init_op = tf.global_variables_initializer()
with tf.Session() as sess:
    #初始化变量
    sess.run(init_op)
    #正式开启子线程;这一步必须在会话中完成,在这里设置参数start=True就表示在此时已经开启子线程
    threads = qr.create_threads(sess,start=True)
    #主线程不断地读取数据
    for i in range(300):
        print(sess.run(Q.dequeue()))

如果你这时候去运行他会报错:

CancelledError: Enqueue operation was cancelled

这是因为主线程运行300次后就关闭了,这时候会话也关闭了,但是子线程需要运行1000次后才会关闭,所以这时候在会话关闭的情况下去运行子线程,(会话是进行资源管理的)子线程就得不到运行的资源了,这时候程序就会报错,提示会话已经关闭了,你被强制停止了

那该怎么办呢?

这里我们会用到一个叫线程协调器的函数:

tf.train.Coordinator():实现一个简单的机制来协调一组线程的终止

参数可以有如下:

request_stop() ===>请求线程停止:这是针对子线程运行的内容很重要的情况下调用的参数

should_stop() ===>强制停止

join(threadName) ===>回收线程

#模拟异步操作:子线程存入数据,主线程从子线程的队列中读取数据
#1.定义一个队列:大小是1000
Q = tf.FIFOQueue(1000,dtypes=tf.float32)
#2.定义子线程需要怎么处理数据,这里我们把数据每次都+1
var = tf.Variable(0.0) #初始值是0.0
data = tf.assign_add(var,1.0) #每次都把var这个变量的值+1,为什么不直接var+1呢,因为这是两个不同的op,
en_q = Q.enqueue(data) #进队列
#3.定义一个队列的任务管理器,可以指定多个子线程,接下来我们指定两个
qr = tf.train.QueueRunner(Q,enqueue_ops=[en_q]*2)
#不要忘记之前定义过的变量要初始化
init_op = tf.global_variables_initializer()
with tf.Session() as sess:
    #初始化变量
    sess.run(init_op)
    #开启线程管理器
    coord = tf.train.Coordinator()
    #正式开启子线程;这一步必须在会话中完成,在这里设置参数start=True就表示在此时已经开启子线程,指定了一个线程管理器coord
    threads = qr.create_threads(sess,coord=coord,start=True)
    #主线程不断地读取数据
    for i in range(300):
        print(sess.run(Q.dequeue()))
    #开始回收子线程
    coord.request_stop()
    coord.join(threads) #回收的线程是threads
原文地址:https://www.cnblogs.com/boost/p/13516816.html