0
点赞
收藏
分享

微信扫一扫

深度学习基础—语言模型和序列生成

罗蓁蓁 2024-11-06 阅读 11

目录

一、任务目标

二、细节分析

三、代码演示


一、任务目标

1、初始的策略已给出

efc20420ef5f4dc4898c8ceb7fec02a4.png

2、使用MC Basic算法去更新策略

3、出回馈为-1,禁止区域回馈为-10,目标区域回馈为1,折扣累计奖赏参数为0.9

6701c933c1844397bd1bd56bb95af129.png

二、细节分析

1、为达到步数动态调整的效果,使用递归函数get_expected_reward()来完成。

2、迭代的终止条件为,每一次调整的策略和未调整前的策略一样,即策略未更新。

三、代码演示

import numpy as np

class Behrman:
    # 属性
    n, m = None, None  # 棋盘的行数和列数
    xx, yy = [-1, 0, 1, 0, 0], [0, 1, 0, -1, 0]  # 五种行动(上、右、下、左、静止)

    s = "↑→↓←O"  # 用于显示的行动符号
    _lambda = 0.9  # 折扣因子

    # 矩阵
    chessboard = [
        ["***"],
        ["**#"],
        ["#*x"]
    ]
    policy = [
        [1, 3, 2],
        [2, 3, 3],
        [2, 2, 0]
    ]
    Punishment = None

    def __init__(self):
        self.n, self.m = len(self.chessboard), len(self.chessboard[0][0])
        self.init_matrix()

    def init_matrix(self):
        """初始化棋盘矩阵、奖惩值矩阵、状态和策略矩阵。"""
        self.chessboard = [list(row[0]) for row in self.chessboard]
        self.states = [[0] * self.m for _ in range(self.n)]
        self.Punishment = [[0 if cell == "*" else (-10 if cell == "#" else 1)
                            for cell in row] for row in self.chessboard]

    def next_point(self, x, y, i):
        """计算下一个位置的坐标,如果越界返回-1。"""
        wall = 0
        nx, ny = x + self.xx[i], y + self.yy[i]
        if 0 <= nx < self.n and 0 <= ny < self.m:
            return nx, ny, wall
        else:
            wall = 1
            return x, y, wall

    def action(self, x, y, i):
        """返回给定行动的奖惩值。"""
        next_pos = self.next_point(x, y, i)
        if next_pos[2] == 1:
            return -1
        else:
            return self.Punishment[next_pos[0]][next_pos[1]]

    def get_expected_reward(self, x, y, action, step_count):
        """递归计算前 `step_count` 步的累计奖励值。"""
        if step_count == 0:
            return 0
        next_pos = self.next_point(x, y, action)
        immediate_reward = self.action(x, y, action)
        future_reward = self._lambda * self.get_expected_reward(
            next_pos[0], next_pos[1], self.policy[next_pos[0]][next_pos[1]], step_count - 1
        )
        return immediate_reward + future_reward

    def get_MCpolicy(self, step_count=3):
        """动态步数策略更新函数。"""
        new_policy = [[0 for _ in range(self.m)] for _ in range(self.n)]
        for x in range(self.n):
            for y in range(self.m):
                best_action = max(range(5), key=lambda i:
                    self.get_expected_reward(x, y, i, step_count)
                )
                new_policy[x][y] = best_action
        return new_policy

    def show_policy(self):
        """显示最终策略矩阵。"""
        policy_display = "\n".join("\t".join(self.s[self.policy[x][y]] for y in range(self.m)) for x in range(self.n))
        print(policy_display)

    def run_policy_iteration(self, step_count=2):
        """蒙特卡洛算法,带有动态步数的策略迭代。"""
        count = 0
        flag = 1
        while flag:
            count += 1
            new_policy = self.get_MCpolicy(step_count)
            if np.array_equal(new_policy, self.policy):
                flag = 0
            else:
                for x in range(self.n):
                    for y in range(self.m):
                        self.policy[x][y] = new_policy[x][y]
        print(f"策略迭代算法收敛,共迭代{count}次")
        self.show_policy()

if __name__ == '__main__':
    behrman = Behrman()
    behrman.run_policy_iteration(step_count=3)  # 可以调整step_count以选择不同步数

 

 

举报

相关推荐

0 条评论