####Q-learnig算法 深度强化学习原理与实践 P115-119
# coding: utf-8
import gym
import numpy as np
import sys
import time
import pandas as pd
import matplotlib
from collections import defaultdict, namedtuple
from matplotlib import pyplot as plt
env = gym.make ("CartPole-v0")
class QLearning ():
def __init__(self, env, num_episodes, discount=1.0, alpha=0.5, epsilon=0.1, n_bins=10):
self.nA = env.action_space.n #动作数,直接传入gym自带的游戏环境CartPole,不用自定义动作和状态,直接获取
print("动作数",self.nA)
self.nS = env.observation_space.shape[0] #状态数
print("observation_spacen",env.observation_space.shape,"n",env.observation_space)
print ("状态数", self.nS)
self.env = env
self.num_episodes = num_episodes #迭代次数
self.discount = discount
self.alpha = alpha #时间差分误差系数
self.epsilon = epsilon #贪婪策略系数
# Initialize Q(s; a)
self.Q = defaultdict (lambda: np.zeros (self.nA))
print ("初始化动作值函数Q", self.Q)
# Keeps track of useful statistics
record = namedtuple ("Record", ["episode_lengths", "episode_rewards"])
self.rec = record (episode_lengths=np.zeros (num_episodes),
episode_rewards=np.zeros (num_episodes))
print ("记录recordn", self.rec)
#分桶简化状态空间数量
self.cart_position_bins = pd.cut ([-2.4, 2.4], bins=n_bins, retbins=True)[1] #位置 将连续区间[-2.4,2.4]划分为10个大小相同的小区间
print ("cart_position_binsn", self.cart_position_bins)
self.pole_angle_bins = pd.cut ([-2, 2], bins=n_bins, retbins=True)[1] #杆子的角度
print("pole_angle_binsn",self.pole_angle_bins)
self.cart_velocity_bins = pd.cut ([-1, 1], bins=n_bins, retbins=True)[1] #车的速度
print ("pole_angle_binsn", self.cart_velocity_bins)
self.angle_rate_bins = pd.cut ([-3.5, 3.5], bins=n_bins, retbins=True)[1] #杆子角度变化率
print ("angle_rate_binsn", self.angle_rate_bins)
def __get_bins_states(self, state): #返回简化后的动作空间
"""
Case number of the sate is huge so in order to simplify the situation
cut the state sapece in to bins.
if the state_idx is [1,3,6,4] than the return will be 1364
"""
s1_, s2_, s3_, s4_ = state
cart_position_idx = np.digitize (s1_, self.cart_position_bins) #np.digitize (x,a) 返回x在有序数组a中的位置(从1算起) 如a=[1,2,3] print(np.digitize(2,a)) => 2
pole_angle_idx = np.digitize (s2_, self.pole_angle_bins)
cart_velocity_idx = np.digitize (s3_, self.cart_velocity_bins)
angle_rate_idx = np.digitize (s4_, self.angle_rate_bins)
state_ = [cart_position_idx, pole_angle_idx,
cart_velocity_idx, angle_rate_idx]
state = map (lambda s: int (s), state_) #将state中所有元素转换为int型
return tuple (state) #转换为元组
def __epislon_greedy_policy(self, epsilon, nA): #贪婪策略
def policy(state):
A = np.ones (nA, dtype=float) * epsilon / nA
best_action = np.argmax (self.Q[state])
A[best_action] += (1.0 - epsilon)
return A
return policy
def __next_action(self, prob):
return np.random.choice (np.arange (len (prob)), p=prob) #np.random.choice(a,p)从数组a中随机抽取元素,p用来指定概率 例如:
#aa_milne_arr = ['pooh', 'rabbit', 'piglet', 'Christopher']
#np.random.choice(aa_milne_arr, 5, p=[0.5, 0.1, 0.1, 0.3])
#array(['pooh', 'pooh', 'pooh', 'Christopher', 'piglet'], dtype='|S11') 可以看到,‘pooh’被选取的概率明显比其他几个高很多
def qlearning(self):
"""
q-learning algo
"""
#定义策略
policy = self.__epislon_greedy_policy (self.epsilon, self.nA)
sumlist = []
#迭代经验轨迹
for i_episode in range (self.num_episodes):
# Print out which episode we are on
if 0 == (i_episode + 1) % 10:
print ("r Episode {} in {}".format (i_episode + 1, self.num_episodes))
# sys.stdout.flush()
step = 0
# Initialize S
state__ = self.env.reset ()
state = self.__get_bins_states (state__)
print ("初始化状态state__:",state__,"简化状态state:", state)
# Repeat (for each step of episode)
while (True):
# Choose A from S using policy derived from Q
prob_actions = policy (state) #根据策略(贪婪)和状态选择动作
action = self.__next_action (prob_actions)
print("贪婪算法得出的动作概率:",prob_actions,"根据概率选择的动作:",action)
# Take action A, observe R, S'
next_state__, reward, done, info = env.step (action)
next_state = self.__get_bins_states (next_state__)
print("执行动作后(下一个状态, 奖励, 是否完成, 其他)==", next_state__, reward, done, info," 简化后的下一个状态:",next_state)
# update history record
self.rec.episode_lengths[i_episode] += reward
self.rec.episode_rewards[i_episode] = step
print("累计步长:",self.rec.episode_rewards[i_episode],"累计奖励:", self.rec.episode_lengths[i_episode])
# TD update: Q(S; A)<-Q(S; A) + aplha*[R + discount * max Q(S'; a) − Q(S; A)]
best_next_action = np.argmax (self.Q[next_state])
print("Q[next_state]:",self.Q[next_state],"best_next_action:",best_next_action)
td_target = reward + self.discount * self.Q[next_state][best_next_action]
td_delta = td_target - self.Q[state][action]
print("td_target:",td_target,"td_delta:",td_delta)
self.Q[state][action] += self.alpha * td_delta
print("状态-动作:",state,"-",action," Q[s][a]=",self.Q[state][action])
print (self.Q)
if done:
# until S is terminal
print ("Episode finished after {} timesteps".format (step))
print ("*************************************************************************************************************")
sumlist.append (step)
break
else:
step += 1
# S<-S'
state = next_state
iter_time = sum (sumlist) / len (sumlist)
print ("CartPole game iter average time is: {}".format (iter_time))
print(self.Q)
return self.Q
def plot_episode_stats(stats, smoothing_window=10):
# 步长-迭代次数曲线
fig1 = plt.figure (figsize=(10, 5))
plt.plot (stats.episode_lengths[:200])
plt.xlabel ("Episode")
plt.ylabel ("Episode Length")
plt.title ("Episode Length over Time")
plt.show()
# 奖励-迭代次数曲线
fig2 = plt.figure (figsize=(10, 5))
rewards_smoothed = pd.Series (stats.episode_rewards[:200]).rolling (smoothing_window,
min_periods=smoothing_window).mean ()
plt.plot (rewards_smoothed)
plt.xlabel ("Episode")
plt.ylabel ("Episode Reward")
plt.title ("Episode Reward over Time".format (smoothing_window))
plt.show()
return fig1, fig2
cls_qlearning = QLearning (env, num_episodes=10) #看细节
#cls_qlearning = QLearning (env, num_episodes=200)
Q = cls_qlearning.qlearning ()
plot_episode_stats (cls_qlearning.rec)
算法
更新细节,过程模拟
1、初始化
2、刚开始迭代,大部分Q值为0
3、再次遇到相同状态
4、迭代200次的结果
(1) 经验轨迹长度随迭代次数的关系:随着迭代次数的增加,时间步越来越长,表明游戏存活实践越久。
(2) 奖励随迭代次数的关系:随着迭代次数的增加,所获奖励逐渐增大。
参考:DQN原理简介



