冰面滑行 FrozenLake-v0
import numpy as np
np.random.seed(0)
import gym
环境使用
env = gym.make('FrozenLake-v0')
env.seed(0)
print('观察空间 = {}'.format(env.observation_space))
print('动作空间 = {}'.format(env.action_space))
print('观测空间大小 = {}'.format(env.unwrapped.nS))
print('动作空间大小 = {}'.format(env.unwrapped.nA))
env.unwrapped.P[14][2] # 查看动力
观察空间 = Discrete(16)
动作空间 = Discrete(4)
观测空间大小 = 16
动作空间大小 = 4
[(0.3333333333333333, 14, 0.0, False),
(0.3333333333333333, 15, 1.0, True),
(0.3333333333333333, 10, 0.0, False)]
用随机策略玩
def play_policy(env, policy, render=False):
total_reward = 0.
observation = env.reset()
while True:
if render:
env.render() # 此行可显示
action = np.random.choice(env.action_space.n,
p=policy[observation])
observation, reward, done, _ = env.step(action)
total_reward += reward # 统计回合奖励
if done: # 游戏结束
break
return total_reward
# 随机策略
random_policy =
np.ones((env.unwrapped.nS, env.unwrapped.nA)) / env.unwrapped.nA
episode_rewards = [play_policy(env, random_policy) for _ in range(100)]
print("随机策略 平均奖励:{}".format(np.mean(episode_rewards)))
随机策略 平均奖励:0.04
策略评估
def v2q(env, v, s=None, gamma=1.): # 根据状态价值函数计算动作价值函数
if s is not None: # 针对单个状态求解
q = np.zeros(env.unwrapped.nA)
for a in range(env.unwrapped.nA):
for prob, next_state, reward, done in env.unwrapped.P[s][a]:
q[a] += prob *
(reward + gamma * v[next_state] * (1. - done))
else: # 针对所有状态求解
q = np.zeros((env.unwrapped.nS, env.unwrapped.nA))
for s in range(env.unwrapped.nS):
q[s] = v2q(env, v, s, gamma)
return q
def evaluate_policy(env, policy, gamma=1., tolerant=1e-6):
v = np.zeros(env.unwrapped.nS) # 初始化状态价值函数
while True: # 循环
delta = 0
for s in range(env.unwrapped.nS):
vs = sum(policy[s] * v2q(env, v, s, gamma)) # 更新状态价值函数
delta = max(delta, abs(v[s]-vs)) # 更新最大误差
v[s] = vs # 更新状态价值函数
if delta < tolerant: # 查看是否满足迭代条件
break
return v
评估随机策略的价值函数
print('状态价值函数:')
v_random = evaluate_policy(env, random_policy)
print(v_random.reshape(4, 4))
print('动作价值函数:')
q_random = v2q(env, v_random)
print(q_random)
状态价值函数:
[[0.0139372 0.01162942 0.02095187 0.01047569]
[0.01624741 0. 0.04075119 0. ]
[0.03480561 0.08816967 0.14205297 0. ]
[0. 0.17582021 0.43929104 0. ]]
动作价值函数:
[[0.01470727 0.01393801 0.01393801 0.01316794]
[0.00852221 0.01162969 0.01086043 0.01550616]
[0.02444416 0.0209521 0.02405958 0.01435233]
[0.01047585 0.01047585 0.00698379 0.01396775]
[0.02166341 0.01701767 0.0162476 0.01006154]
[0. 0. 0. 0. ]
[0.05433495 0.04735099 0.05433495 0.00698396]
[0. 0. 0. 0. ]
[0.01701767 0.04099176 0.03480569 0.04640756]
[0.0702086 0.11755959 0.10595772 0.05895286]
[0.18940397 0.17582024 0.16001408 0.04297362]
[0. 0. 0. 0. ]
[0. 0. 0. 0. ]
[0.08799662 0.20503708 0.23442697 0.17582024]
[0.25238807 0.53837042 0.52711467 0.43929106]
[0. 0. 0. 0. ]]
策略改进
def improve_policy(env, v, policy, gamma=1.):
optimal = True
for s in range(env.unwrapped.nS):
q = v2q(env, v, s, gamma)
a = np.argmax(q)
if policy[s][a] != 1.:
optimal = False
policy[s] = 0.
policy[s][a] = 1.
return optimal
对随机策略进行改进
policy = random_policy.copy()
optimal = improve_policy(env, v_random, policy)
if optimal:
print('无更新,最优策略为:')
else:
print('有更新,更新后的策略为:')
print(policy)
有更新,更新后的策略为:
[[1. 0. 0. 0.]
[0. 0. 0. 1.]
[1. 0. 0. 0.]
[0. 0. 0. 1.]
[1. 0. 0. 0.]
[1. 0. 0. 0.]
[1. 0. 0. 0.]
[1. 0. 0. 0.]
[0. 0. 0. 1.]
[0. 1. 0. 0.]
[1. 0. 0. 0.]
[1. 0. 0. 0.]
[1. 0. 0. 0.]
[0. 0. 1. 0.]
[0. 1. 0. 0.]
[1. 0. 0. 0.]]
策略迭代
def iterate_policy(env, gamma=1., tolerant=1e-6):
# 初始化为任意一个策略
policy = np.ones((env.unwrapped.nS, env.unwrapped.nA))
/ env.unwrapped.nA
while True:
v = evaluate_policy(env, policy, gamma, tolerant) # 策略评估
if improve_policy(env, v, policy): # 策略改进
break
return policy, v
policy_pi, v_pi = iterate_policy(env)
print('状态价值函数 =')
print(v_pi.reshape(4, 4))
print('最优策略 =')
print(np.argmax(policy_pi, axis=1).reshape(4, 4))
状态价值函数 =
[[0.82351246 0.82350689 0.82350303 0.82350106]
[0.82351416 0. 0.5294002 0. ]
[0.82351683 0.82352026 0.76469786 0. ]
[0. 0.88234658 0.94117323 0. ]]
最优策略 =
[[0 3 3 3]
[0 0 0 0]
[3 1 0 0]
[0 2 1 0]]
测试策略
episode_rewards = [play_policy(env, policy_pi) for _ in range(100)]
print("策略迭代 平均奖励:{}".format(np.mean(episode_rewards)))
策略迭代 平均奖励:0.77
价值迭代
def iterate_value(env, gamma=1, tolerant=1e-6):
v = np.zeros(env.unwrapped.nS) # 初始化
while True:
delta = 0
for s in range(env.unwrapped.nS):
vmax = max(v2q(env, v, s, gamma)) # 更新价值函数
delta = max(delta, abs(v[s]-vmax))
v[s] = vmax
if delta < tolerant: # 满足迭代需求
break
policy = np.zeros((env.unwrapped.nS, env.unwrapped.nA)) # 计算最优策略
for s in range(env.unwrapped.nS):
a = np.argmax(v2q(env, v, s, gamma))
policy[s][a] = 1.
return policy, v
policy_vi, v_vi = iterate_value(env)
print('状态价值函数 =')
print(v_vi.reshape(4, 4))
print('最优策略 =')
print(np.argmax(policy_vi, axis=1).reshape(4, 4))
状态价值函数 =
[[0.82351232 0.82350671 0.82350281 0.82350083]
[0.82351404 0. 0.52940011 0. ]
[0.82351673 0.82352018 0.76469779 0. ]
[0. 0.88234653 0.94117321 0. ]]
最优策略 =
[[0 3 3 3]
[0 0 0 0]
[3 1 0 0]
[0 2 1 0]]
测试策略
episode_rewards = [play_policy(env, policy_vi) for _ in range(100)]
print("价值迭代 平均奖励:{}".format(np.mean(episode_rewards)))
价值迭代 平均奖励:0.7
观察空间 = Discrete(16) 动作空间 = Discrete(4) 观测空间大小 = 16 动作空间大小 = 4
[(0.3333333333333333, 14, 0.0, False), (0.3333333333333333, 15, 1.0, True), (0.3333333333333333, 10, 0.0, False)]
随机策略 平均奖励:0.04
状态价值函数: [[0.0139372 0.01162942 0.02095187 0.01047569] [0.01624741 0. 0.04075119 0. ] [0.03480561 0.08816967 0.14205297 0. ] [0. 0.17582021 0.43929104 0. ]] 动作价值函数: [[0.01470727 0.01393801 0.01393801 0.01316794] [0.00852221 0.01162969 0.01086043 0.01550616] [0.02444416 0.0209521 0.02405958 0.01435233] [0.01047585 0.01047585 0.00698379 0.01396775] [0.02166341 0.01701767 0.0162476 0.01006154] [0. 0. 0. 0. ] [0.05433495 0.04735099 0.05433495 0.00698396] [0. 0. 0. 0. ] [0.01701767 0.04099176 0.03480569 0.04640756] [0.0702086 0.11755959 0.10595772 0.05895286] [0.18940397 0.17582024 0.16001408 0.04297362] [0. 0. 0. 0. ] [0. 0. 0. 0. ] [0.08799662 0.20503708 0.23442697 0.17582024] [0.25238807 0.53837042 0.52711467 0.43929106] [0. 0. 0. 0. ]]
有更新,更新后的策略为: [[1. 0. 0. 0.] [0. 0. 0. 1.] [1. 0. 0. 0.] [0. 0. 0. 1.] [1. 0. 0. 0.] [1. 0. 0. 0.] [1. 0. 0. 0.] [1. 0. 0. 0.] [0. 0. 0. 1.] [0. 1. 0. 0.] [1. 0. 0. 0.] [1. 0. 0. 0.] [1. 0. 0. 0.] [0. 0. 1. 0.] [0. 1. 0. 0.] [1. 0. 0. 0.]]
状态价值函数 = [[0.82351246 0.82350689 0.82350303 0.82350106] [0.82351416 0. 0.5294002 0. ] [0.82351683 0.82352026 0.76469786 0. ] [0. 0.88234658 0.94117323 0. ]] 最优策略 = [[0 3 3 3] [0 0 0 0] [3 1 0 0] [0 2 1 0]]
策略迭代 平均奖励:0.77
状态价值函数 = [[0.82351232 0.82350671 0.82350281 0.82350083] [0.82351404 0. 0.52940011 0. ] [0.82351673 0.82352018 0.76469779 0. ] [0. 0.88234653 0.94117321 0. ]] 最优策略 = [[0 3 3 3] [0 0 0 0] [3 1 0 0] [0 2 1 0]]
价值迭代 平均奖励:0.7



