import tensorflow as tf import tensorlayer as tl def __init__(self, env): #cnt用于使target network隔一段时间更新一次 self.cnt 0 self.env env self.state_dim self.env.observation_space.shape[0] #只有离散动作空间才有self.env.action_space.n属性 self.action_dim self.env.action_space.n def create_model(input_state_shape): input_layer tl.layers.Input(input_state_shape) #第一层有n_units个神经结点 激活函数是Relu layer_1 tl.layers.Dense(n_units 64, act tf.nn.relu)(input_layer) layer_2 tl.layers.Dense(n_units 32, act tf.nn.relu)(layer_1) output_layer tl.layers.Dense(n_units self.action_dim)(layer_2) return tl.models.Model(inputs input_layer, outputs output_layer) self.model create_model([None, self.state_dim]) self.target_model create_model([None, self.state_dim]) #设置模型为训练模式 self.model.train() #设置模型为评估模式 self.target_model.eval() self.model_optim self.target_model_optim tf.optimizers.Adam(lr args.lr) self.epsilon args.eps self.buffer ReplayBuffer()train模式 启用BatchNormalization和 Dropout 在训练时启用train模式。eval模式 不启用BatchNormalization和 Dropout 在评估 或测试 时启用eval模式。input_state_shape 此处将input_state_shape参数设定为[None, self.state_dim] 等价于(1,self.state_dim)。
import numpy as np a np.array([1,2,3,4]) b a.reshape([1,4]) print(b) #输出 [[1 2 3 4]]
而env提供的state形状是一个一维数组 如果放到一个矩阵中 将是矩阵的一列 即一个向量 而model读数据时 一般是读batch_size个 model读入的states的形状应该是(batch_size,self.state_dim)。
import gym env gym.make( LunarLander-v2 ) state env.reset() print(state) #输出 [ 0.00677748 1.4212346 0.6864661 0.45840305 -0.00784658 -0.15549478 0. 0. ]3.2. target_update
def target_update(self): Copy q network to target q network for weights, target_weights in zip( self.model.trainable_weights, self.target_model.trainable_weights): target_weights.assign(weights)3.3. choose_action
def choose_action(self, state): if np.random.uniform() self.epsilon: return np.random.choice(self.action_dim) else: q_value self.model(state[np.newaxis, :])[0] return np.argmax(q_value)np.random.uniform(low 0,high 1.0) 生成随机数 默认范围是[0,1]choose_action函数首先产生一个范围为[0,1]的随机数 如果随机数小于ε 则进行探索 否则使用价值函数对当前状态进行评估 选择q值最大的动作。[np.newaxis, :]的作用是在np.newaxis的位置添加新的维度 在这里state是形状为(,state.dim)的向量 添加维度0后 就变成了(1,state.dim)维的向量。model后面加[0]是因为此时只输入了一个state 因此结果也只返回一组动作的q_value值。np.argmax的作用是找到数组中最大的数 并返回下标。 3.4. replay 在replay函数中 主要完成价值网络参数的更新 也是本代码中主要使用 Cuda 计算的地方。
def replay(self): for _ in range(10): # sample an experience tuple from the dataset(buffer) states, actions, rewards, next_states, done self.buffer.sample() # compute the target value for the sample tuple # targets [batch_size, action_dim] target self.model(states).numpy() # next_q_values [batch_size, action_dim] next_target self.target_model(next_states) next_q_value tf.reduce_max(next_target, axis 1) target[range(args.batch_size), actions] rewards (1 - done) * args.gamma * next_q_value # use sgd to update the network weight with tf.GradientTape() as tape: q_pred self.model(states) loss tf.losses.mean_squared_error(target, q_pred) grads tape.gradient(loss, self.model.trainable_weights) self.model_optim.apply_gradients(zip(grads, self.model.trainable_weights))tf.reduce_max的作用是求最大值 这个函数有点奇怪 axis 0指的是计算矩阵每列的最大值 axis 1计算行最大值。next_q_value是一个一维数组 每一项对应每一个next_state时刻能获得的最大Q值。target应该使用model计算而不是target network 参考的代码中是使用target network 因为后面计算loss函数要将target和q_pred相减 那些与actions actions是一个一维数组 保存的是每个transition采取的动作 无关的项应该为0。TD target已经根据actions保存到了target数组中了 方便到时候和q_pred比较。self.model(states)不能进行numpy化 因为numpy化之后就变成了常数数组。 3.5. test_episode 在test_episode函数中 对模型进行测试数次 并将每次运行的结果保存为gif文件。
def test_episode(self, test_episodes):
for episode in range(test_episodes):
state self.env.reset().astype(np.float32)
total_reward, done 0, False
frames []
while not done:
# action self.model(np.array([state], dtype np.float32))[0]
# action np.argmax(action)
action self.choose_action(state)
next_state, reward, done, _ self.env.step(action)
next_state next_state.astype(np.float32)
total_reward reward
state next_state
# self.env.render()
frames.append(self.env.render(mode rgb_array ))
print( Test {} | episode rewards is {} .format(episode, total_reward))
#将本场游戏保存为gif
dir_path os.path.join( testVideo , _ .join([ALG_NAME, ENV_ID]))
if not os.path.exists(dir_path):
os.makedirs(dir_path)
display_frames_as_gif(frames, dir_path \ str(episode) .gif )
如何将gym运行过程保存为gif文件
from matplotlib import animation import matplotlib.pyplot as plt #第一步 定义帧画面转化为gif的函数 def display_frames_as_gif(frames, path): patch plt.imshow(frames[0]) plt.axis( off ) def animate(i): patch.set_data(frames[i]) anim animation.FuncAnimation(plt.gcf(), animate, frames len(frames), interval 5) anim.save(path, writer pillow , fps 30) #第二步 定义一个frames 用于收集游戏过程中的画面 frames [] #第三步 在游戏运行过程中 收集画面 frames.append(self.env.render(mode rgb_array )) #第四部 游戏运行完毕后 将frames中的内容保存为gif dir_path os.path.join( testVideo , _ .join([ALG_NAME, ENV_ID])) if not os.path.exists(dir_path): os.makedirs(dir_path) display_frames_as_gif(frames, dir_path \ str(episode) .gif )3.6. train
def train(self, train_episodes 200):
if args.train:
self.loadModel()
for episode in range(train_episodes):
total_reward, done 0, False
state self.env.reset().astype(np.float32)
print( 开始玩游戏 )
while not done:
action self.choose_action(state)
next_state, reward, done, _ self.env.step(action)
next_state next_state.astype(np.float32)
#训练了几千盘后 agent总是变得非常谨慎 不肯着陆 因此要增大悬浮在空中的代价
reward - 0.1
self.buffer.push(state, action, reward, next_state, done)
total_reward reward
state next_state
# self.env.render()
print( 游戏结束 )
#如果replay buffer里的transition达到最低限度
if len(self.buffer.buffer) args.batch_size:
#使用梯度下降更新价值函数
self.replay()
#每更新10次价值函数就更新一次target
if self.cnt%10 0:
self.target_update()
self.cnt (self.cnt 1) % 10
print( EP{} EpisodeReward {} .format(episode, total_reward))
# if episode % 10 0:
# self.test_episode()
if episode%100 0:
self.saveModel()
# self.saveModel()
if args.test:
self.loadModel()
self.test_episode(test_episodes args.test_episodes)
原版代码中 每次更新价值函数后 都会更新target network 这样target network就和价值函数太过相似了 因此这里设置了每更新10次价值函数就更新一次target 。
3.7. saveModel
import os def saveModel(self): path os.path.join( model , _ .join([ALG_NAME, ENV_ID])) if not os.path.exists(path): os.makedirs(path) tl.files.save_weights_to_hdf5(os.path.join(path, model.hdf5 ), self.model) tl.files.save_weights_to_hdf5(os.path.join(path, target_model.hdf5 ), self.target_model) print( Saved weights. )3.8. loadModel
def loadModel(self): path os.path.join( model , _ .join([ALG_NAME, ENV_ID])) if os.path.exists(path): print( Load DQN Network parametets ... ) tl.files.load_hdf5_to_weights_in_order(os.path.join(path, model.hdf5 ), self.model) tl.files.load_hdf5_to_weights_in_order(os.path.join(path, target_model.hdf5 ), self.target_model) print( Load weights! ) else: print( No model file find, please train model first... )4.主程序
import gym #算法名称 ALG_NAME DQN #游戏名称 # ENV_ID CartPole-v1 ENV_ID LunarLander-v2 if __name__ __main__ : env gym.make(ENV_ID) agent Agent(env) agent.train(train_episodes args.train_episodes) env.close()训练结果 训练2000盘后 训练2000盘后 基本能够安全着地 但不一定能在目标点着地。
想法 train函数中 玩游戏和更新价值函数是串行的 导致大部分时间都在玩游戏 训练过程中“CUDA”利用率仅4% 是否可以使用多线程让玩游戏的过程和更新网络的过程分开 最初使用的ε为0.1 训练大概5000盘游戏后 飞船能很好地在空中保持平稳状态 但过于谨慎 下降十分缓慢。将ε改为0.2后 增大了agent的探索欲望 再在train函数中增大停留在空中的代价后 agent过于谨慎的态度得以改善。



