栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

22.dqn--study代码详解

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

22.dqn--study代码详解

'''
Description: dqn--study
Autor: 365JHWZGo
Date: 2021-11-10 09:32:28
LastEditors: 365JHWZGo
LastEditTime: 2021-11-10 22:24:22
'''
import torch
import torch.nn.functional as F
import numpy as np
import gym

# hyper parameters
BATCH_SIZE = 32
LR = 0.01
EPSILON = 0.9
GAMMA = 0.9
TARGET_REPLACE_ITER = 100  # 更新频率
MEMORY_CAPACITY = 2000
env = gym.make('CartPole-v0')
env = env.unwrapped
N_ACTIONS = env.action_space.n
N_STATES = env.observation_space.shape[0]

# create network


class Net(torch.nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        # 输入观测值,输出十个奖励值
        self.fc1 = torch.nn.Linear(N_STATES, 10)
        # 随机生成初始参数的值,二次分布使其具有更好的效果
        self.fc1.weight.data.normal_(0, 0.1)
        # 输入十个奖励值,输出最大奖励所对应的动作值
        self.out = torch.nn.Linear(10, N_ACTIONS)
        self.out.weight.data.normal_(0, 0.1)

    def forward(self, x):
        x = self.fc1(x)
        x = F.relu(x)
        actions_value = self.out(x)
        return actions_value

# create dqn

# 与环境互动


class DQN(object):
    def __init__(self):
        self.eval_net, self.target_net = Net(), Net()
        # 当前学习到多少步了
        self.learn_step_counter = 0
        # 记忆库里存储里多少了
        self.memory_counter = 0
        # 初始化记忆库大小
        # MEMORY_CAPACITY 存储的行数
        # N_STATES*2+2    存储的列数【两个state+action+reward】
        self.memory = np.zeros((MEMORY_CAPACITY, N_STATES*2+2))
        self.optimizer = torch.optim.Adam(self.eval_net.parameters(), lr=LR)
        self.loss_func = torch.nn.MSELoss()

    # 接收环境中的状态值
    def choose_action(self, x):
        # 观测值用Variable包裹
        x = torch.unsqueeze(torch.FloatTensor(x), 0)
        # EPSOLON 随机选取动作的概率
        if np.random.uniform() < EPSILON:
            # 输出动作中最大的价值
            actions_value = self.eval_net.forward(x)
            # 选取最大价值
            action = torch.max(actions_value, 1)[1].data.numpy()[0]
        else:  # 随机选取
            action = np.random.randint(0, N_ACTIONS)
        return action

    # 记忆库,存储状态
    def store_transition(self, s, a, r, s_):
        # 将所有的值均捆绑到一起存入
        transition = np.hstack((s, [a, r], s_))
        # 当当前的memory_counter超过MEMORTY_CAPACITY时采取重复覆盖
        index = self.memory_counter % MEMORY_CAPACITY
        # 将transition插入到合适的位置
        self.memory[index, :] = transition
        # memory_counter数量+1
        self.memory_counter += 1
    # 从记忆库中学习强化知识

    def learn(self):
        # 判断是否要更新target_network
        if self.learn_step_counter % TARGET_REPLACE_ITER == 0:
            # 将target_network中的所有参数从eval_net中复制过来
            self.target_net.load_state_dict((self.eval_net.state_dict()))
        # 学习步数+1
        self.learn_step_counter += 1

        # 实现每一步学习,eval_net都在更新
        # 从记忆库中随机抽取一些记忆
        sample_index = np.random.choice(MEMORY_CAPACITY, BATCH_SIZE)
        # 将抽取到的记忆放入b_memory
        b_memory = self.memory[sample_index, :]
        # 将抽取到的所有记忆按照类别打包
        b_s = torch.FloatTensor(b_memory[:, :N_STATES])
        b_a = torch.LongTensor(b_memory[:, N_STATES:N_STATES+1].astype(int))
        b_r = torch.FloatTensor(b_memory[:, N_STATES+1:N_STATES+2])
        b_s_ = torch.FloatTensor(b_memory[:, -N_STATES:])
        # 输出所有动作的价值并根据它当初施加在动作上的价值
        q_eval = self.eval_net(b_s).gather(1, b_a)
        # detach 禁止反向传递,因为target_net自己在上面会更新
        q_next = self.target_net(b_s_).detach()
        # 下一步的q值=当初获得的奖励b_r+下一步q的最大价值的值,GAMMA对未来价值的递减
        q_target = b_r + GAMMA*q_next.max(1)[0].view(BATCH_SIZE, 1)
        loss = self.loss_func(q_eval, q_target)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()


dqn = DQN()

if __name__ == '__main__':
    # 强化学习的过程
    for i_episode in range(400):
        # 将当前的环境清空
        s = env.reset()
        while True:
            # 渲染环境
            env.render()
            # 根据当前的环境选择一个动作
            a = dqn.choose_action(s)
            # 环境根据我采取的行为给我的一个反馈
            s_, r, done, info = env.step(a)
            #修改奖励,越偏向两边分越小,越靠近两边越小
            x, x_dot, theta, theta_dot = s_
            r1 = (env.x_threshold-abs(x))/env.x_threshold - 0.8
            r2 = (env.theta_threshold_radians-abs(theta)) / 
                env.theta_threshold_radians-0.5
            r = r1+r2
            # s 当前的状态 a 动作  r 奖励 s_ 下一个状态
            # 根据奖励的引导去指挥我做出动作
            dqn.store_transition(s, a, r, s_)
            if dqn.memory_counter > MEMORY_CAPACITY:
                dqn.learn()
                if done:
                    print('EP:', i_episode,
                          "| EP_action:", a)
            # 这个回合结束后就跳到下一个回合去
            if done:
                break
            s = s_

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/487300.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号