- 一、队列与线程
- 1.队列
- 2.队列管理器
- 3.线程协调器
在训练样本的时候,希望读入的训练样本时有序的
tf.FIFOQueue 先进先出队列,按顺序出队列 tf.RandomShuffleQueue 随机出队列
(1)tf.FIFOQueue:
FIFOQueue(capacity, dtypes, name='fifo_queue') 创建一个以先进先出的顺序对元素进行排队的队列 capacity:整数。可能存储在此队列中的元素数量的上限 dtypes:DType对象列表。长度dtypes必须等于每个队列元 素中的张量数,dtype的类型形状,决定了后面进队列元素形状 method方法: dequeue(name=None) enqueue(vals, name=None): enqueue_many(vals, name=None):vals列表或者元组 返回一个进队列操作 size(name=None)
(2)举例使用tf.FIFOQueue:
完成一个出队列、+1、入队列操作(同步操作):
import os
os.environ['TF_CPP_MIN_LOG_LEVEL']='2'
import tensorflow as tf
# 模拟一下同步先处理数据,然后才能读取数据训练
# tensorflow当中,运行操作具有依赖性
# 1.定义队列
Q = tf.FIFOQueue(3,tf.float32) # (队列大小,队列内数据类型)
# 放入一些数据
enq_mqny = Q.enqueue_many([[0.1,0.2,0.3],])
# 2.定义一些读取数据,取数据的过程: 取数据,+1,入队列
# 取数据
out_q = Q.dequeue()
data = out_q+1
en_q = Q.enqueue(data)
with tf.Session() as sess:
# 初始化数据
sess.run(enq_mqny)
# 处理数据
for i in range(100):
sess.run(en_q)
# 训练数据
for i in range(Q.size().eval()):
print(sess.run(Q.dequeue()))
运行结果:
注意:
分析:当数据量很大时,入队操作从硬盘中读取数据,放入内存中,
主线程需要等待入队操作完成,才能进行训练。会话里可以运行多个
线程,实现异步读取。
(1)tf.RandomShuffleQueue
tf.train.QueueRunner(queue, enqueue_ops=None)
创建一个QueueRunner
queue:A Queue
enqueue_ops:添加线程的队列操作列表,[]*2,指定两个线程
create_threads(sess, coord=None,start=False)
创建线程来运行给定会话的入队操作
start:布尔值,如果True启动线程;如果为False调用者
必须调用start()启动线程
coord:线程协调器,后面线程管理需要用到
return:
(2)通过队列管理器来实现变量加1,入队,主线程出队列的操作(异步操作):
# 模拟异步子线程 存入样本, 主线程 读取样本
# 1、定义一个队列,1000
Q = tf.FIFOQueue(1000, tf.float32)
# 2、定义要做的事情 循环 值,+1, 放入队列当中
var = tf.Variable(0.0)
# 实现一个自增 tf.assign_add
data = tf.assign_add(var, tf.constant(1.0))
en_q = Q.enqueue(data)
# 3、定义队列管理器op, 指定多少个子线程,子线程该干什么事情
qr = tf.train.QueueRunner(Q, enqueue_ops=[en_q] * 2)
# 初始化变量的OP
init_op = tf.global_variables_initializer()
with tf.Session() as sess:
# 初始化变量
sess.run(init_op)
# 真正开启子线程
threads = qr.create_threads(sess, coord=coord, start=True)
# 主线程,不断读取数据训练
for i in range(300):
print(sess.run(Q.dequeue()))
3.线程协调器
(1)
tf.train.Coordinator() 线程协调员,实现一个简单的机制来协调一 组线程的终止 request_stop() should_stop() 检查是否要求停止 join(threads=None, stop_grace_period_secs=120) 等待线程终止 return:线程协调员实例
(2)通过队列管理器来实现变量加1,入队,主线程出队列的操作(异步操作):
# 模拟异步子线程 存入样本, 主线程 读取样本
# 1、定义一个队列,1000
Q = tf.FIFOQueue(1000, tf.float32)
# 2、定义要做的事情 循环 值,+1, 放入队列当中
var = tf.Variable(0.0)
# 实现一个自增 tf.assign_add
data = tf.assign_add(var, tf.constant(1.0))
en_q = Q.enqueue(data)
# 3、定义队列管理器op, 指定多少个子线程,子线程该干什么事情
qr = tf.train.QueueRunner(Q, enqueue_ops=[en_q] * 2)
# 初始化变量的OP
init_op = tf.global_variables_initializer()
with tf.Session() as sess:
# 初始化变量
sess.run(init_op)
# 开启线程管理器
coord = tf.train.Coordinator()
# 真正开启子线程
threads = qr.create_threads(sess, coord=coord, start=True)
# 主线程,不断读取数据训练
for i in range(300):
print(sess.run(Q.dequeue()))
# 回收你
coord.request_stop()
coord.join(threads)
(3)分析:
这时候有一个问题就是,入队自顾自的去执行,在需要的出
队操作完成之后,程序没法结束。需要一个实现线程间的同步,终
止其他线程。



