栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

tensorflow与深度学习之队列与线程

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

tensorflow与深度学习之队列与线程

文章目录
    • 一、队列与线程
      • 1.队列
      • 2.队列管理器
      • 3.线程协调器

一、队列与线程 1.队列

在训练样本的时候,希望读入的训练样本时有序的

tf.FIFOQueue   先进先出队列,按顺序出队列

tf.RandomShuffleQueue   随机出队列

(1)tf.FIFOQueue:

FIFOQueue(capacity, dtypes, name='fifo_queue')
创建一个以先进先出的顺序对元素进行排队的队列

capacity:整数。可能存储在此队列中的元素数量的上限
dtypes:DType对象列表。长度dtypes必须等于每个队列元
素中的张量数,dtype的类型形状,决定了后面进队列元素形状


method方法:
dequeue(name=None)
enqueue(vals, name=None):
enqueue_many(vals, name=None):vals列表或者元组
返回一个进队列操作

size(name=None)

(2)举例使用tf.FIFOQueue:

完成一个出队列、+1、入队列操作(同步操作):

import os
os.environ['TF_CPP_MIN_LOG_LEVEL']='2'
import tensorflow as tf

# 模拟一下同步先处理数据,然后才能读取数据训练
# tensorflow当中,运行操作具有依赖性
# 1.定义队列
Q = tf.FIFOQueue(3,tf.float32) # (队列大小,队列内数据类型)
# 放入一些数据
enq_mqny = Q.enqueue_many([[0.1,0.2,0.3],])

# 2.定义一些读取数据,取数据的过程:  取数据,+1,入队列
# 取数据
out_q = Q.dequeue()
data = out_q+1
en_q = Q.enqueue(data)

with tf.Session() as sess:
    # 初始化数据
    sess.run(enq_mqny)
    # 处理数据
    for i in range(100):
        sess.run(en_q)
    # 训练数据
    for i in range(Q.size().eval()):
        print(sess.run(Q.dequeue()))
        

运行结果:

注意:

分析:当数据量很大时,入队操作从硬盘中读取数据,放入内存中,
主线程需要等待入队操作完成,才能进行训练。会话里可以运行多个
线程,实现异步读取。

2.队列管理器

(1)tf.RandomShuffleQueue

tf.train.QueueRunner(queue, enqueue_ops=None)
创建一个QueueRunner

queue:A Queue
enqueue_ops:添加线程的队列操作列表,[]*2,指定两个线程
create_threads(sess, coord=None,start=False)
            创建线程来运行给定会话的入队操作
start:布尔值,如果True启动线程;如果为False调用者
必须调用start()启动线程 
coord:线程协调器,后面线程管理需要用到
return:

(2)通过队列管理器来实现变量加1,入队,主线程出队列的操作(异步操作):

# 模拟异步子线程 存入样本, 主线程 读取样本

# 1、定义一个队列,1000
Q = tf.FIFOQueue(1000, tf.float32)

# 2、定义要做的事情 循环 值,+1, 放入队列当中
var = tf.Variable(0.0)

# 实现一个自增  tf.assign_add
data = tf.assign_add(var, tf.constant(1.0))

en_q = Q.enqueue(data)
# 3、定义队列管理器op, 指定多少个子线程,子线程该干什么事情
qr = tf.train.QueueRunner(Q, enqueue_ops=[en_q] * 2)

# 初始化变量的OP
init_op = tf.global_variables_initializer()

with tf.Session() as sess:
  # 初始化变量
    sess.run(init_op)
  # 真正开启子线程
   threads = qr.create_threads(sess, coord=coord, start=True)

   # 主线程,不断读取数据训练
   for i in range(300):
      print(sess.run(Q.dequeue()))
3.线程协调器

(1)

tf.train.Coordinator()
	线程协调员,实现一个简单的机制来协调一
组线程的终止

request_stop() 
should_stop() 检查是否要求停止
join(threads=None, stop_grace_period_secs=120)  
等待线程终止

return:线程协调员实例

(2)通过队列管理器来实现变量加1,入队,主线程出队列的操作(异步操作):

# 模拟异步子线程 存入样本, 主线程 读取样本

# 1、定义一个队列,1000
Q = tf.FIFOQueue(1000, tf.float32)

# 2、定义要做的事情 循环 值,+1, 放入队列当中
var = tf.Variable(0.0)

# 实现一个自增  tf.assign_add
data = tf.assign_add(var, tf.constant(1.0))

en_q = Q.enqueue(data)
# 3、定义队列管理器op, 指定多少个子线程,子线程该干什么事情
qr = tf.train.QueueRunner(Q, enqueue_ops=[en_q] * 2)

# 初始化变量的OP
init_op = tf.global_variables_initializer()

with tf.Session() as sess:
  # 初始化变量
    sess.run(init_op)

   # 开启线程管理器
   coord = tf.train.Coordinator()

   # 真正开启子线程
   threads = qr.create_threads(sess, coord=coord, start=True)

   # 主线程,不断读取数据训练
   for i in range(300):
      print(sess.run(Q.dequeue()))
      # 回收你
   	  coord.request_stop()
   	  coord.join(threads)


(3)分析:
这时候有一个问题就是,入队自顾自的去执行,在需要的出
队操作完成之后,程序没法结束。需要一个实现线程间的同步,终
止其他线程。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/498899.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号