栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

【RL】Q-learning走迷宫案例及改进

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

【RL】Q-learning走迷宫案例及改进

创建迷宫环境
import numpy as np
import time
import sys
import tkinter as tk  
import random

UNIT = 40   # pixels 像素
MAZE_H = 4  # grid height 网格高度
MAZE_W = 4 # grid width  网格宽度
class Maze(tk.Tk, object):  # 继承
    def __init__(self):
        super(Maze, self).__init__()
        self.action_space = ['u', 'd', 'l', 'r']
        self.n_actions = len(self.action_space)  # 4
        self.title('maze')
        self._build_maze()

    def _build_maze(self):
        # 初始化画布
        self.canvas = tk.Canvas(self, bg='white', height=MAZE_H * UNIT, width=MAZE_W * UNIT)
        # 画网格
        for c in range(0, MAZE_W * UNIT, UNIT):
            x0, y0, x1, y1 = c, 0, c, MAZE_H * UNIT  # 先描两侧横线上点
            self.canvas.create_line(x0, y0, x1, y1)  # 将上述的点连成线
        for r in range(0, MAZE_H * UNIT, UNIT):
            x0, y0, x1, y1 = 0, r, MAZE_W * UNIT, r
            self.canvas.create_line(x0, y0, x1, y1)

        # 创建一个原点
        origin = np.array([20, 20])

        # 画一个黑色矩形表示陷阱
        hell1_center = origin + np.array([UNIT * 2, UNIT])  # 陷阱的中心点
        self.hell1 = self.canvas.create_rectangle(  # 根据矩形的两个对角的坐标画矩形
            hell1_center[0] - 15, hell1_center[1] - 15,
            hell1_center[0] + 15, hell1_center[1] + 15,
            fill='black')
        # 同上
        hell2_center = origin + np.array([UNIT, UNIT * 2])
        self.hell2 = self.canvas.create_rectangle(
            hell2_center[0] - 15, hell2_center[1] - 15,
            hell2_center[0] + 15, hell2_center[1] + 15,
            fill='black')

        # 画一个圆表示终点
        oval_center = origin + UNIT * 2
        self.oval = self.canvas.create_oval(  # 先根据两个对角点定好矩形,再在矩形里画一个内切圆
            oval_center[0] - 15, oval_center[1] - 15,
            oval_center[0] + 15, oval_center[1] + 15,
            fill='yellow')

        # 画一个红色矩阵表示agent
        self.rect = self.canvas.create_rectangle(
            origin[0] - 15, origin[1] - 15,
            origin[0] + 15, origin[1] + 15,
            fill='red')

        # 打包
        self.canvas.pack()

    def reset(self):  # 复位
        self.update()
        time.sleep(0.5)  # 0.5后复位
        self.canvas.delete(self.rect)
        origin = np.array([20, 20])
        self.rect = self.canvas.create_rectangle(
            origin[0] - 15, origin[1] - 15,
            origin[0] + 15, origin[1] + 15,
            fill='red')
        # 返回agent的坐标(即所处的state)
        #print(self.rect)
        return self.canvas.coords(self.rect)  # 返回的是一个list

    def step(self, action):
        s = self.canvas.coords(self.rect)  # 获取当前agent的位置
        base_action = np.array([0, 0])
        if action == 0:   # up
            if s[1] > UNIT:  # 判断这时候是否在上边界,因为要向上移动
                base_action[1] -= UNIT
        elif action == 1:   # down
            if s[1] < (MAZE_H - 1) * UNIT:  # 同上
                base_action[1] += UNIT
        elif action == 2:   # right
            if s[0] < (MAZE_W - 1) * UNIT:  # 同上
                base_action[0] += UNIT
        elif action == 3:   # left
            if s[0] > UNIT:  # 同上
                base_action[0] -= UNIT
        self.canvas.move(self.rect, base_action[0], base_action[1])  # move agent
        s_ = self.canvas.coords(self.rect)  # agent执行完动作后所处的位置

        # 奖励
        if s_ == self.canvas.coords(self.oval):  # 如果当前所处位置在圆上
            reward = 1
            done = True
            s_ = 'terminal'
        elif s_ in [self.canvas.coords(self.hell1), self.canvas.coords(self.hell2)]:
            reward = -1
            done = True
            s_ = 'terminal'
        else:
            reward = 0
            done = False
        return s_, reward, done

    def render(self):
        time.sleep(0.1)  # 时间步长
        self.update()

def update():
    for t in range(10):
        s = env.reset()
        while True:
            env.render()
            a = random.randint(0, 4)
            s, r, done = env.step(a)
            if done:
                break

if __name__ == '__main__':
    env = Maze()
    env.after(100, update)  # 每100毫秒调用一次函数
    env.mainloop()  # 该方法最后执行,将标签显示在屏幕,进入等待状态(若组件为打包,则不会再窗口中显示)
                    # 准备响应用户发起的GUI事件(图形用户界面)
 
 
 
智能体
```python
#!/usr/bin/python3
# -*- coding:utf-8 -*-
 
"""
This part of code is the Q learning brain, which is a brain of the agent.
All decisions are made in here.
View more on my tutorial page: https://morvanzhou.github.io/tutorials/
"""
 
from tkinter.constants import S
import numpy as np
import pandas as pd


 
class QLearningTable:
    def __init__(self, actions, learning_rate=0.01, reward_decay=0.9, e_greedy=0.95,e_greedy_max = 0.9):
        '''
        :param actions:    行为
        :param learning_rate: 学习率, 来决定这次的误差有多少是要被学习的
        :param reward_decay: 是折扣因子,表示时间的远近对回报的影响程度,为0表示之看当前状态采取行动的reward。 
        :param e_greedy: 是用在决策上的一种策略, 比如 epsilon = 0.9 时, 就说明有90% 的情况我会按照 Q 表的最优值选择行为, 10% 的时间使用随机选行为
        '''
        self.actions = actions  # a list
        self.lr = learning_rate
        self.gamma = reward_decay
        self.epsilon = e_greedy
        self.epsilon_max = e_greedy_max
        self.q_table = pd.Dataframe(columns=self.actions, dtype=np.float64)    #一般行是状态 列是行为
 
    def choose_action(self, observation, episode):
        self.check_state_exist(observation)  #首先检验传入的数据有没有在Q_table当中,如果没有则加入。当作新的索引值
        # action selection
        self.epsilon**=episode
        if self.epsilon < self.epsilon_max:
            pass
        else:
            self.epsilon = self.epsilon_max
        if np.random.uniform() > self.epsilon:   #如果随机数<0.9 
            # choose best action  根据最优结果选择action 
            state_action = self.q_table.loc[observation, :]  #索引  选出这个observation的action值   
            # some actions may have the same value, randomly choose on in these actions
            action = np.random.choice(state_action[state_action == np.max(state_action)].index)   #这句话的意思是打乱
            #action = state_action.idxmax()
        else:
            # choose random action   随机选择(10%的可能)
            action = np.random.choice(self.actions)
        return action
 
    def learn(self, s, a, r, s_):  #从这些状态 提升学习q_table 
        self.check_state_exist(s_)
        q_predict = self.q_table.loc[s, a]
        if s_ != 'terminal':
            q_target = r + self.gamma * self.q_table.loc[s_, :].max()  # next state is not terminal
        else:
            q_target = r  # next state is terminal  已经到达了最终状态
        self.q_table.loc[s, a] += self.lr * (q_target - q_predict)  # update   
 
    def check_state_exist(self, state):  #检查state
        if state not in self.q_table.index:
            # append new state to q table
            self.q_table = self.q_table.append(
                pd.Series(
                    [0]*len(self.actions),
                    index=self.q_table.columns,
                    name=state,
                )
            )
主程序流程
#!/usr/bin/python3
# -*- coding:utf-8 -*-
   
from maze_env import Maze  #调用环境
from RL_brain import QLearningTable  #调用主方法
import matplotlib.pyplot as plt
import pandas as pd
 
def update():
    reward_list = []  #记录每一个episode的奖励总值
    count_list= []
    for episode in range(50):   #运行100个回合
        # initial observation #环境的观测值
        observation = env.reset()  #初始值的信息 (1,1) (1,2) 观测到的信息
        # observation = env.reset()
        reward_sum = 0
        i = 0
        while True:
            # fresh env  
            i+=1
            env.render()  #刷新环境
            # RL choose action based on observation
            action = RL.choose_action(str(observation), episode)  #先挑选一个动作
            # RL take action and get next observation and reward
            observation_, reward, done = env.step(action)  #observation_下一个状态  #done 判断是否拿到奖励还是跳到坑
            # RL learn from this transition  学习
            RL.learn(str(observation), action, reward, str(observation_))  
            # swap observation
            observation = observation_  #循环进入下一个状态
            # break while loop when end of this episode
            reward_sum += reward
            if done:
                count_list.append(i)
                break
        reward_list.append(reward_sum)
        print(reward_list)
        print("第%d回合"%episode)
        print(RL.q_table)
        RL.q_table.to_csv("./1.csv")
    avg_list = []
    for i in range(len(reward_list)):
        avg_list.append(reward_list[i]/count_list[i])
    plt.plot(avg_list)
    plt.show()
    #print(reward_list)
    # end of game
    print('game over')
    env.destroy()
 
if __name__ == "__main__":
    env = Maze()  #Maze()环境
    RL = QLearningTable(actions=list(range(env.n_actions)))   #选择的学习方式
    #print(RL.q_table)
 
    env.after(100, update)
    #print("hahah")
    #print(RL.q_table)
    env.mainloop()  # 进入消息循环


#找出收敛的方法:

"""
设置了一个reward_list去记录每一episode的值,
画图可视化出来
根据图中-1 和1的分布情况
如果大部分是1的情况,则代表收敛了
"""

思考与改进

可视化结果:

迭代50次:

迭代100次:

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/587746.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号