栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

Q-learning算法

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Q-learning算法

####Q-learnig算法  深度强化学习原理与实践   P115-119
# coding: utf-8

import gym
import numpy as np
import sys
import time
import pandas as pd
import matplotlib
from collections import defaultdict, namedtuple
from matplotlib import pyplot as plt

env = gym.make ("CartPole-v0")

class QLearning ():
    def __init__(self, env, num_episodes, discount=1.0, alpha=0.5, epsilon=0.1, n_bins=10):
        self.nA = env.action_space.n    #动作数,直接传入gym自带的游戏环境CartPole,不用自定义动作和状态,直接获取
        print("动作数",self.nA)
        self.nS = env.observation_space.shape[0]  #状态数
        print("observation_spacen",env.observation_space.shape,"n",env.observation_space)
        print ("状态数", self.nS)
        self.env = env
        self.num_episodes = num_episodes    #迭代次数
        self.discount = discount
        self.alpha = alpha           #时间差分误差系数
        self.epsilon = epsilon     #贪婪策略系数
        # Initialize Q(s; a)
        self.Q = defaultdict (lambda: np.zeros (self.nA))
        print ("初始化动作值函数Q", self.Q)

        # Keeps track of useful statistics
        record = namedtuple ("Record", ["episode_lengths", "episode_rewards"])
        self.rec = record (episode_lengths=np.zeros (num_episodes),
                           episode_rewards=np.zeros (num_episodes))
        print ("记录recordn", self.rec)
        #分桶简化状态空间数量
        self.cart_position_bins = pd.cut ([-2.4, 2.4], bins=n_bins, retbins=True)[1]  #位置  将连续区间[-2.4,2.4]划分为10个大小相同的小区间
        print ("cart_position_binsn", self.cart_position_bins)
        self.pole_angle_bins = pd.cut ([-2, 2], bins=n_bins, retbins=True)[1]            #杆子的角度
        print("pole_angle_binsn",self.pole_angle_bins)
        self.cart_velocity_bins = pd.cut ([-1, 1], bins=n_bins, retbins=True)[1]          #车的速度
        print ("pole_angle_binsn", self.cart_velocity_bins)

        self.angle_rate_bins = pd.cut ([-3.5, 3.5], bins=n_bins, retbins=True)[1]           #杆子角度变化率
        print ("angle_rate_binsn", self.angle_rate_bins)

    def __get_bins_states(self, state):  #返回简化后的动作空间
        """
        Case number of the sate is huge so in order to simplify the situation
        cut the state sapece in to bins.

        if the state_idx is [1,3,6,4] than the return will be 1364
        """
        s1_, s2_, s3_, s4_ = state
        cart_position_idx = np.digitize (s1_, self.cart_position_bins)    #np.digitize (x,a) 返回x在有序数组a中的位置(从1算起)    如a=[1,2,3]  print(np.digitize(2,a))  => 2
        pole_angle_idx = np.digitize (s2_, self.pole_angle_bins)
        cart_velocity_idx = np.digitize (s3_, self.cart_velocity_bins)
        angle_rate_idx = np.digitize (s4_, self.angle_rate_bins)

        state_ = [cart_position_idx, pole_angle_idx,
                  cart_velocity_idx, angle_rate_idx]

        state = map (lambda s: int (s), state_)    #将state中所有元素转换为int型
        return tuple (state)     #转换为元组

    def __epislon_greedy_policy(self, epsilon, nA):  #贪婪策略

        def policy(state):
            A = np.ones (nA, dtype=float) * epsilon / nA
            best_action = np.argmax (self.Q[state])
            A[best_action] += (1.0 - epsilon)
            return A

        return policy

    def __next_action(self, prob):
        return np.random.choice (np.arange (len (prob)), p=prob)   #np.random.choice(a,p)从数组a中随机抽取元素,p用来指定概率  例如:
                                                                   #aa_milne_arr = ['pooh', 'rabbit', 'piglet', 'Christopher']
                                                                   #np.random.choice(aa_milne_arr, 5, p=[0.5, 0.1, 0.1, 0.3])
    #array(['pooh', 'pooh', 'pooh', 'Christopher', 'piglet'], dtype='|S11')    可以看到,‘pooh’被选取的概率明显比其他几个高很多

    def qlearning(self):
        """
        q-learning algo
        """
        #定义策略
        policy = self.__epislon_greedy_policy (self.epsilon, self.nA)
        sumlist = []


        #迭代经验轨迹
        for i_episode in range (self.num_episodes):
            # Print out which episode we are on
            if 0 == (i_episode + 1) % 10:
                print ("r Episode {} in {}".format (i_episode + 1, self.num_episodes))
                # sys.stdout.flush()

            step = 0
            # Initialize S
            state__ = self.env.reset ()

            state = self.__get_bins_states (state__)
            print ("初始化状态state__:",state__,"简化状态state:", state)

            # Repeat (for each step of episode)
            while (True):
                # Choose A from S using policy derived from Q
                prob_actions = policy (state)    #根据策略(贪婪)和状态选择动作
                action = self.__next_action (prob_actions)
                print("贪婪算法得出的动作概率:",prob_actions,"根据概率选择的动作:",action)

                # Take action A, observe R, S'
                next_state__, reward, done, info = env.step (action)
                next_state = self.__get_bins_states (next_state__)
                print("执行动作后(下一个状态, 奖励, 是否完成, 其他)==", next_state__, reward, done, info,"   简化后的下一个状态:",next_state)

                # update history record
                self.rec.episode_lengths[i_episode] += reward
                self.rec.episode_rewards[i_episode] = step
                print("累计步长:",self.rec.episode_rewards[i_episode],"累计奖励:", self.rec.episode_lengths[i_episode])


                # TD update: Q(S; A)<-Q(S; A) + aplha*[R + discount * max Q(S'; a) − Q(S; A)]
                best_next_action = np.argmax (self.Q[next_state])
                print("Q[next_state]:",self.Q[next_state],"best_next_action:",best_next_action)
                td_target = reward + self.discount * self.Q[next_state][best_next_action]
                td_delta = td_target - self.Q[state][action]
                print("td_target:",td_target,"td_delta:",td_delta)
                self.Q[state][action] += self.alpha * td_delta
                print("状态-动作:",state,"-",action,"   Q[s][a]=",self.Q[state][action])
                print (self.Q)

                if done:
                    # until S is terminal
                    print ("Episode finished after {} timesteps".format (step))

                    print ("*************************************************************************************************************")
                    sumlist.append (step)
                    break
                else:
                    step += 1
                    # S<-S'
                    state = next_state

        iter_time = sum (sumlist) / len (sumlist)
        print ("CartPole game iter average time is: {}".format (iter_time))
        print(self.Q)
        return self.Q


def plot_episode_stats(stats, smoothing_window=10):
    # 步长-迭代次数曲线
    fig1 = plt.figure (figsize=(10, 5))
    plt.plot (stats.episode_lengths[:200])
    plt.xlabel ("Episode")
    plt.ylabel ("Episode Length")
    plt.title ("Episode Length over Time")
    plt.show()

    # 奖励-迭代次数曲线
    fig2 = plt.figure (figsize=(10, 5))
    rewards_smoothed = pd.Series (stats.episode_rewards[:200]).rolling (smoothing_window,
                                                                        min_periods=smoothing_window).mean ()
    plt.plot (rewards_smoothed)
    plt.xlabel ("Episode")
    plt.ylabel ("Episode Reward")
    plt.title ("Episode Reward over Time".format (smoothing_window))
    plt.show()

    return fig1, fig2

cls_qlearning = QLearning (env, num_episodes=10)   #看细节
#cls_qlearning = QLearning (env, num_episodes=200)
Q = cls_qlearning.qlearning ()
plot_episode_stats (cls_qlearning.rec)

算法

更新细节,过程模拟

1、初始化

2、刚开始迭代,大部分Q值为0


3、再次遇到相同状态

4、迭代200次的结果
(1) 经验轨迹长度随迭代次数的关系:随着迭代次数的增加,时间步越来越长,表明游戏存活实践越久。

(2) 奖励随迭代次数的关系:随着迭代次数的增加,所获奖励逐渐增大。

参考:DQN原理简介

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/325799.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号