栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

DQN with Target代码实现

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

DQN with Target代码实现

[(s1​,s2​,...,st​),(a2​,a2​,...,at​),(r1​,r2​,...,rt​),(s2​,s3​,...,st 1​)] zip函数返回的对象不是列表 而是一个可迭代对象 此处是为了好看。 map函数使得np.stack作用在zip返回的可迭代对象的每个元素上 在0维将数组一个个串起来 最后再用state, action, reward, next_state, done去接 将右边的map返回的可迭代对象强制类型转换成列表。 3.Agent类的实现 Agent类主要实现8个方法。 _init_ 初始化agent。target_update 用于更新target network。choose_action 选择动作。replay 使用梯度下降更新价值函数。test_episode 用于测试模型。train 用于采集训练模型所需要的参数。saveModel 保存模型。loadModel 加载模型。 3.1. _init_ 初始化agent。
import tensorflow as tf 
import tensorlayer as tl
def __init__(self, env):
 #cnt用于使target network隔一段时间更新一次
 self.cnt 0
 self.env env 
 self.state_dim self.env.observation_space.shape[0]
 #只有离散动作空间才有self.env.action_space.n属性
 self.action_dim self.env.action_space.n 
 def create_model(input_state_shape): 
 input_layer tl.layers.Input(input_state_shape)
 #第一层有n_units个神经结点 激活函数是Relu
 layer_1 tl.layers.Dense(n_units 64, act tf.nn.relu)(input_layer) 
 layer_2 tl.layers.Dense(n_units 32, act tf.nn.relu)(layer_1) 
 output_layer tl.layers.Dense(n_units self.action_dim)(layer_2)
 return tl.models.Model(inputs input_layer, outputs output_layer) 
 self.model create_model([None, self.state_dim]) 
 self.target_model create_model([None, self.state_dim])
 #设置模型为训练模式
 self.model.train() 
 #设置模型为评估模式
 self.target_model.eval() 
 self.model_optim self.target_model_optim tf.optimizers.Adam(lr args.lr) 
 self.epsilon args.eps 
 self.buffer ReplayBuffer()
train模式 启用BatchNormalization和 Dropout 在训练时启用train模式。eval模式 不启用BatchNormalization和 Dropout 在评估 或测试 时启用eval模式。input_state_shape 此处将input_state_shape参数设定为[None, self.state_dim] 等价于(1,self.state_dim)。
import numpy as np 
a np.array([1,2,3,4]) 
b a.reshape([1,4]) 
print(b)
#输出 [[1 2 3 4]]

而env提供的state形状是一个一维数组 如果放到一个矩阵中 将是矩阵的一列 即一个向量 而model读数据时 一般是读batch_size个 model读入的states的形状应该是(batch_size,self.state_dim)。

import gym 
env gym.make( LunarLander-v2 ) 
state env.reset() 
print(state)
#输出 [ 0.00677748 1.4212346 0.6864661 0.45840305 -0.00784658 -0.15549478
 0. 0. ]
3.2. target_update
def target_update(self): 
 Copy q network to target q network 
 for weights, target_weights in zip( 
 self.model.trainable_weights, self.target_model.trainable_weights): 
 target_weights.assign(weights)
3.3. choose_action
def choose_action(self, state): 
 if np.random.uniform() self.epsilon: 
 return np.random.choice(self.action_dim) 
 else: 
 q_value self.model(state[np.newaxis, :])[0] 
 return np.argmax(q_value)
np.random.uniform(low 0,high 1.0) 生成随机数 默认范围是[0,1]choose_action函数首先产生一个范围为[0,1]的随机数 如果随机数小于ε 则进行探索 否则使用价值函数对当前状态进行评估 选择q值最大的动作。[np.newaxis, :]的作用是在np.newaxis的位置添加新的维度 在这里state是形状为(,state.dim)的向量 添加维度0后 就变成了(1,state.dim)维的向量。model后面加[0]是因为此时只输入了一个state 因此结果也只返回一组动作的q_value值。np.argmax的作用是找到数组中最大的数 并返回下标。 3.4. replay 在replay函数中 主要完成价值网络参数的更新 也是本代码中主要使用 Cuda 计算的地方。
def replay(self): 
 for _ in range(10): 
 # sample an experience tuple from the dataset(buffer) 
 states, actions, rewards, next_states, done self.buffer.sample() 
 # compute the target value for the sample tuple 
 # targets [batch_size, action_dim] 
 target self.model(states).numpy()
 # next_q_values [batch_size, action_dim] 
 next_target self.target_model(next_states) 
 next_q_value tf.reduce_max(next_target, axis 1) 
 target[range(args.batch_size), actions] rewards (1 - done) * args.gamma * next_q_value 
 # use sgd to update the network weight 
 with tf.GradientTape() as tape: 
 q_pred self.model(states) 
 loss tf.losses.mean_squared_error(target, q_pred) 
 grads tape.gradient(loss, self.model.trainable_weights) 
 self.model_optim.apply_gradients(zip(grads, self.model.trainable_weights))
tf.reduce_max的作用是求最大值 这个函数有点奇怪 axis 0指的是计算矩阵每列的最大值 axis 1计算行最大值。next_q_value是一个一维数组 每一项对应每一个next_state时刻能获得的最大Q值。target应该使用model计算而不是target network 参考的代码中是使用target network 因为后面计算loss函数要将target和q_pred相减 那些与actions actions是一个一维数组 保存的是每个transition采取的动作 无关的项应该为0。TD target已经根据actions保存到了target数组中了 方便到时候和q_pred比较。self.model(states)不能进行numpy化 因为numpy化之后就变成了常数数组。 3.5. test_episode 在test_episode函数中 对模型进行测试数次 并将每次运行的结果保存为gif文件。
def test_episode(self, test_episodes): 
 for episode in range(test_episodes): 
 state self.env.reset().astype(np.float32) 
 total_reward, done 0, False 
 frames [] 
 while not done: 
 # action self.model(np.array([state], dtype np.float32))[0] 
 # action np.argmax(action) 
 action self.choose_action(state) 
 next_state, reward, done, _ self.env.step(action) 
 next_state next_state.astype(np.float32) 
 total_reward reward 
 state next_state 
 # self.env.render() 
 frames.append(self.env.render(mode rgb_array )) 
 print( Test {} | episode rewards is {} .format(episode, total_reward)) 
 #将本场游戏保存为gif 
 dir_path os.path.join( testVideo , _ .join([ALG_NAME, ENV_ID])) 
 if not os.path.exists(dir_path): 
 os.makedirs(dir_path) 
 display_frames_as_gif(frames, dir_path \ str(episode) .gif )
如何将gym运行过程保存为gif文件
from matplotlib import animation 
import matplotlib.pyplot as plt
#第一步 定义帧画面转化为gif的函数
def display_frames_as_gif(frames, path): 
 patch plt.imshow(frames[0]) 
 plt.axis( off ) 
 def animate(i): 
 patch.set_data(frames[i]) 
 anim animation.FuncAnimation(plt.gcf(), animate, frames len(frames), interval 5) 
 anim.save(path, writer pillow , fps 30)
#第二步 定义一个frames 用于收集游戏过程中的画面
frames [] 
#第三步 在游戏运行过程中 收集画面
frames.append(self.env.render(mode rgb_array )) 
#第四部 游戏运行完毕后 将frames中的内容保存为gif
dir_path os.path.join( testVideo , _ .join([ALG_NAME, ENV_ID])) 
if not os.path.exists(dir_path): 
 os.makedirs(dir_path) 
display_frames_as_gif(frames, dir_path \ str(episode) .gif )
3.6. train
def train(self, train_episodes 200): 
 if args.train: 
 self.loadModel() 
 for episode in range(train_episodes): 
 total_reward, done 0, False 
 state self.env.reset().astype(np.float32) 
 print( 开始玩游戏 ) 
 while not done: 
 action self.choose_action(state) 
 next_state, reward, done, _ self.env.step(action) 
 next_state next_state.astype(np.float32) 
 #训练了几千盘后 agent总是变得非常谨慎 不肯着陆 因此要增大悬浮在空中的代价
 reward - 0.1
 self.buffer.push(state, action, reward, next_state, done) 
 total_reward reward 
 state next_state 
 # self.env.render() 
 print( 游戏结束 )
 #如果replay buffer里的transition达到最低限度
 if len(self.buffer.buffer) args.batch_size: 
 #使用梯度下降更新价值函数
 self.replay() 
 #每更新10次价值函数就更新一次target 
 if self.cnt%10 0: 
 self.target_update() 
 self.cnt (self.cnt 1) % 10 
 print( EP{} EpisodeReward {} .format(episode, total_reward)) 
 # if episode % 10 0: 
 # self.test_episode() 
 if episode%100 0: 
 self.saveModel() 
 # self.saveModel() 
 if args.test: 
 self.loadModel() 
 self.test_episode(test_episodes args.test_episodes)
原版代码中 每次更新价值函数后 都会更新target network 这样target network就和价值函数太过相似了 因此这里设置了每更新10次价值函数就更新一次target 。 3.7. saveModel
import os
def saveModel(self): 
 path os.path.join( model , _ .join([ALG_NAME, ENV_ID])) 
 if not os.path.exists(path): 
 os.makedirs(path) 
 tl.files.save_weights_to_hdf5(os.path.join(path, model.hdf5 ), self.model) 
 tl.files.save_weights_to_hdf5(os.path.join(path, target_model.hdf5 ), self.target_model) 
 print( Saved weights. )
3.8. loadModel
def loadModel(self): 
 path os.path.join( model , _ .join([ALG_NAME, ENV_ID])) 
 if os.path.exists(path): 
 print( Load DQN Network parametets ... ) 
 tl.files.load_hdf5_to_weights_in_order(os.path.join(path, model.hdf5 ), self.model) 
 tl.files.load_hdf5_to_weights_in_order(os.path.join(path, target_model.hdf5 ), self.target_model) 
 print( Load weights! ) 
 else: print( No model file find, please train model first... )
4.主程序
import gym
#算法名称
ALG_NAME DQN 
#游戏名称
# ENV_ID CartPole-v1 
ENV_ID LunarLander-v2 
if __name__ __main__ : 
 env gym.make(ENV_ID) 
 agent Agent(env) 
 agent.train(train_episodes args.train_episodes) 
 env.close()
训练结果 训练2000盘后 训练2000盘后 基本能够安全着地 但不一定能在目标点着地。


想法 train函数中 玩游戏和更新价值函数是串行的 导致大部分时间都在玩游戏 训练过程中“CUDA”利用率仅4% 是否可以使用多线程让玩游戏的过程和更新网络的过程分开 最初使用的ε为0.1 训练大概5000盘游戏后 飞船能很好地在空中保持平稳状态 但过于谨慎 下降十分缓慢。将ε改为0.2后 增大了agent的探索欲望 再在train函数中增大停留在空中的代价后 agent过于谨慎的态度得以改善。
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/267953.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号