强化学习
In [1]:
import pandas as pd
import numpy as np
from itertools import product
from copy import copy
import random
import warnings
import matplotlib.pyplot as plt 
from matplotlib import patches
from matplotlib.collections import PatchCollection

warnings.filterwarnings('ignore')

Q-Learning¶

Q-Learning¶

In [2]:
def QLearning(env, num_episodes=None, gamma=0.9, alpha=0.1, theta=1e-6):
    # Define the q-table and the learning rate  
    Q = pd.DataFrame(index=env.state_space, columns=env.action_space)
    Q.loc[:, :] = 0
    num = 0
    while True: 
        env.reset()
        Q_t = Q.copy()
        while not env.done:
            # Choose an action using the uniform policy
            state = env.state 
            action = random.choice(env.action_space)

            # Take the action and observe the next state and reward
            next_state, reward, done = env.step(action)
          
            # Update the Q-table using the Bellman optimality equation 
            Q.loc[state, action] = Q.loc[state, action] + alpha * (  
                reward + gamma * np.max(Q.loc[next_state]) - Q.loc[state, action]) 
        
        num +=1
        if num_episodes is not None and num >= num_episodes:
            break
        elif Q_t.sub(Q).abs().lt(theta).all(axis=None):
            print("Converged!")
            break
    # Update target policy 
    policy = Q.idxmax(axis=1)
    return policy 

Grid World¶

In [3]:
class GridWorld:
    def __init__(self, 
                 env_size=(3, 3), 
                 start_state=(0, 2), 
                 target_state=(2, 0), 
                 grass_states=None,
                 action_space=tuple([(0,0), (0,1), (1,0), (0,-1), (-1,0)]),
                 reward_target=0,
                 reward_step=-1,
                 reward_grass=-2,
                 reward_bound=-2
                ):
        
        self.env_size = env_size
        self.state_space = tuple(product(range(env_size[0]), range(env_size[1])))
        self.start_state = start_state
        self.target_state = target_state
        self.grass_states = grass_states
        
        self.action_space = action_space          
        self.reward_target = reward_target
        self.reward_grass = reward_grass
        self.reward_step = reward_step
        self.reward_bound = reward_bound

        self.state = start_state
        self.done = False

    def reset(self):
        self.state = self.start_state
        self.done = False
    
    def step(self, action):
        assert action in self.action_space, "Invalid action"

        next_state = tuple(np.array(self.state) + np.array(action))
        if next_state == self.target_state:
            reward = self.reward_target
        elif next_state in self.grass_states:
            reward = self.reward_grass
        elif next_state not in self.state_space:
            next_state = self.state
            reward = self.reward_bound
        else:
            reward = self.reward_step

        self.state = next_state
        self.done = self.state == self.target_state
        
        return self.state, reward, self.done
    
    def render(self, figsize=(5,5), policy=None, trajectory=None):
        xlim, ylim = self.env_size
        fig = plt.figure(figsize=figsize)
        ax = fig.add_subplot(111, aspect='equal')
        
        rect = patches.Rectangle(self.start_state, 1, 1, linewidth=1, facecolor='skyblue', alpha=0.5)
        ax.add_patch(rect)
        rect = patches.Rectangle(self.target_state, 1, 1, linewidth=1, facecolor='pink', alpha=0.5)
        ax.add_patch(rect)
        for xy in self.grass_states:
            rect = patches.Rectangle(xy, 1, 1, linewidth=1, facecolor='green', alpha=0.25)
            ax.add_patch(rect)

        if policy is not None:
            for state, action in policy.items():
                x, y = tuple(np.array(state)+0.5)
                if action == (0, 0):
                    arrow = patches.Circle((x, y), 0.1, linewidth=0.5, color="skyblue", fill=False)
                    ax.add_patch(arrow)
                else:
                    dx, dy = tuple(np.array(action)*0.4)
                    arrow = patches.Arrow(x, y, dx, dy, width=0.1)
                    ax.add_patch(arrow)

        if trajectory is not None:
            x, y = zip(*trajectory)
            ax.plot(np.array(x)+0.5, np.array(y)+0.5, linestyle=":", color="red")
        
        ax.set_xbound(0, xlim)
        ax.set_ybound(0, ylim)
        ax.set_xticks(np.arange(xlim+1), labels=[])
        ax.set_yticks(np.arange(ylim+1), labels=[])
        plt.grid()
        plt.show()

    def set_grass_states(self, num_state, seed=None):
        random.seed(seed)
        np.random.seed(seed)
        sample_space = list(set(self.state_space) - set([self.target_state]) - set([self.start_state]))
        grass_states = random.sample(sample_space, num_state)
        self.grass_states = tuple(grass_states)
        random.seed(None)
        np.random.seed(None)
In [4]:
if __name__ == "__main__":
    env_size = (10, 10)
    start_state = (0, 0)
    target_state = tuple(np.array(env_size) - 1)

    env = GridWorld(env_size=env_size, start_state=start_state, target_state=target_state)
    env.set_grass_states(num_state=40)
    policy = QLearning(env, num_episodes=1e6, theta=1e-6) # training

    env.reset()
    trajectory = [env.start_state]
    while not env.done:
        action = policy[env.state]
        env.step(action)
        trajectory.append(env.state)
    
    env.render(policy=policy, trajectory=trajectory)
Converged!
No description has been provided for this image

Cliff Walking¶

In [5]:
class CliffWalking:
    def __init__(self, 
                 env_size=(12, 4), 
                 start_state=(0, 0), 
                 target_state=(11, 0), 
                 cliff_states=tuple((i, 0) for i in range(1, 11)),
                 action_space=tuple([(0,0), (0,1), (1,0), (0,-1), (-1,0)]),
                 reward_target=0,
                 reward_step=-1,
                 reward_cliff=-100
                ):
        
        self.env_size = env_size
        self.state_space = tuple(product(range(env_size[0]), range(env_size[1])))
        self.start_state = start_state
        self.target_state = target_state
        self.cliff_states = cliff_states
        
        self.action_space = action_space          
        self.reward_target = reward_target
        self.reward_cliff = reward_cliff
        self.reward_step = reward_step
        
        self.state = start_state
        self.done = False

    def reset(self):
        self.state = self.start_state
        self.done = False
    
    def step(self, action):
        assert action in self.action_space, "Invalid action"

        next_state = tuple(np.array(self.state) + np.array(action)) 
        if np.random.uniform(0, 1) <= 0.1:
            next_state = tuple(np.array(next_state) + np.array(random.choice(self.action_space))) 
        
        if next_state == self.target_state:
            reward = self.reward_target
            self.done = True
        elif next_state in self.cliff_states:
            reward = self.reward_cliff
            self.done = True
        elif next_state not in self.state_space:
            next_state = self.state
            reward = self.reward_step
        else:
            reward = self.reward_step

        self.state = next_state
        
        return self.state, reward, self.done
    
    def render(self, figsize=(8,4), policy=None, trajectory=None):
        xlim, ylim = self.env_size
        fig = plt.figure(figsize=figsize)
        ax = fig.add_subplot(111, aspect='equal')
        
        rect = patches.Rectangle(self.start_state, 1, 1, linewidth=1, facecolor='skyblue', alpha=0.5)
        ax.add_patch(rect)
        rect = patches.Rectangle(self.target_state, 1, 1, linewidth=1, facecolor='pink', alpha=0.5)
        ax.add_patch(rect)
        for xy in self.cliff_states:
            rect = patches.Rectangle(xy, 1, 1, linewidth=1, facecolor='grey', alpha=0.25)
            ax.add_patch(rect)

        if policy is not None:
            for state, action in policy.items():
                x, y = tuple(np.array(state)+0.5)
                if action == (0, 0):
                    arrow = patches.Circle((x, y), 0.1, linewidth=0.5, color="blue", fill=False)
                    ax.add_patch(arrow)
                else:
                    dx, dy = tuple(np.array(action)*0.4)
                    arrow = patches.Arrow(x, y, dx, dy, width=0.1)
                    ax.add_patch(arrow)

        if trajectory is not None:
            x, y = zip(*trajectory)
            ax.plot(np.array(x)+0.5, np.array(y)+0.5, linestyle=":", color="red")
        
        ax.set_xbound(0, xlim)
        ax.set_ybound(0, ylim)
        ax.set_xticks(np.arange(xlim+1), labels=[])
        ax.set_yticks(np.arange(ylim+1), labels=[])
        plt.grid()
        plt.show()
In [6]:
if __name__ == "__main__":
    cliff_states = tuple((i, 0) for i in range(1, 11))
    env = CliffWalking(cliff_states=cliff_states)
    agent = QLearning(env, num_episodes=1e6, theta=1e-6) # training
    
    env.reset()
    trajectory = [env.start_state]
    num = 0
    while not env.done and num <=100:
        num += 1
        action = agent[env.state]
        env.step(action)
        trajectory.append(env.state)
    
    env.render(policy=agent, trajectory=trajectory)
Converged!
No description has been provided for this image

DDPG¶

In [7]:
import torch
from torch import nn
from torch.nn import functional as F
import numpy as np
import collections
import random

Replay Buffer¶

In [8]:
class ReplayBuffer:
    def __init__(self, capacity):  # 经验池的最大容量
        # 创建一个队列,先进先出
        self.buffer = collections.deque(maxlen=capacity)
    # 在队列中添加数据
    def add(self, state, action, reward, next_state, done):
        # 以list类型保存
        self.buffer.append((state, action, reward, next_state, done))
    # 在队列中随机取样batch_size组数据
    def sample(self, batch_size):
        transitions = random.sample(self.buffer, batch_size)
        # 将数据集拆分开来
        state, action, reward, next_state, done = zip(*transitions)
        return np.array(state), action, reward, np.array(next_state), done
    # 测量当前时刻的队列长度
    def size(self):
        return len(self.buffer)

Actor-Critic¶

In [9]:
# ------------------------------------- #
# 策略网络
# ------------------------------------- #

class PolicyNet(nn.Module):
    def __init__(self, n_states, n_hiddens, n_actions, action_bound):
        super(PolicyNet, self).__init__()
        # 环境可以接受的动作最大值
        self.action_bound = action_bound
        # 只包含一个隐含层
        self.fc1 = nn.Linear(n_states, n_hiddens)
        self.fc2 = nn.Linear(n_hiddens, n_actions)
    # 前向传播
    def forward(self, x):
        x = self.fc1(x)  # [b,n_states]-->[b,n_hiddens]
        x = F.relu(x)
        x = self.fc2(x)  # [b,n_hiddens]-->[b,n_actions]
        x= torch.tanh(x)  # 将数值调整到 [-1,1]
        x = x * self.action_bound  # 缩放到 [-action_bound, action_bound]
        return x
 
# ------------------------------------- #
# 价值网络
# ------------------------------------- #
 
class QValueNet(nn.Module):
    def __init__(self, n_states, n_hiddens, n_actions):
        super(QValueNet, self).__init__()
        # 
        self.fc1 = nn.Linear(n_states + n_actions, n_hiddens)
        self.fc2 = nn.Linear(n_hiddens, n_hiddens)
        self.fc3 = nn.Linear(n_hiddens, 1)
    # 前向传播
    def forward(self, x, a):
        # 拼接状态和动作
        cat = torch.cat([x, a], dim=1)  # [b, n_states + n_actions]
        x = self.fc1(cat)  # -->[b, n_hiddens]
        x = F.relu(x)
        x = self.fc2(x)  # -->[b, n_hiddens]
        x = F.relu(x)
        x = self.fc3(x)  # -->[b, 1]
        return x

DDPG¶

In [10]:
class DDPG:
    def __init__(self, n_states, n_hiddens, n_actions, action_bound,
                 sigma, actor_lr, critic_lr, tau, gamma, device):
 
        # 策略网络--训练
        self.actor = PolicyNet(n_states, n_hiddens, n_actions, action_bound).to(device)
        # 价值网络--训练
        self.critic = QValueNet(n_states, n_hiddens, n_actions).to(device)
        # 策略网络--目标
        self.target_actor = PolicyNet(n_states, n_hiddens, n_actions, action_bound).to(device)
        # 价值网络--目标
        self.target_critic = QValueNet(n_states, n_hiddens, n_actions).to(device
                                                                          )
        # 初始化价值网络的参数,两个价值网络的参数相同
        self.target_critic.load_state_dict(self.critic.state_dict())
        # 初始化策略网络的参数,两个策略网络的参数相同
        self.target_actor.load_state_dict(self.actor.state_dict())
 
        # 策略网络的优化器
        self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)
        # 价值网络的优化器
        self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr)
 
        # 属性分配
        self.gamma = gamma  # 折扣因子
        self.sigma = sigma  # 高斯噪声的标准差,均值设为0
        self.tau = tau  # 目标网络的软更新参数
        self.n_actions = n_actions
        self.device = device
 
    # 动作选择
    def take_action(self, state):
        # 维度变换 list[n_states]-->tensor[1,n_states]-->gpu
        state = torch.tensor(state, dtype=torch.float).view(1,-1).to(self.device)
        # 策略网络计算出当前状态下的动作价值 [1,n_states]-->[1,1]-->int
        action = self.actor(state).item()
        # 给动作添加噪声,增加搜索
        action = action + self.sigma * np.random.randn(self.n_actions)
        return action
    
    # 软更新, 意思是每次learn的时候更新部分参数
    def soft_update(self, net, target_net):
        # 获取训练网络和目标网络需要更新的参数
        for param_target, param in zip(target_net.parameters(), net.parameters()):
            # 训练网络的参数更新要综合考虑目标网络和训练网络
            param_target.data.copy_(param_target.data*(1-self.tau) + param.data*self.tau)
 
    # 训练
    def update(self, transition_dict):
        # 从训练集中取出数据
        transition_dict = {k: np.array(v) for k, v in transition_dict.items()}
        states = torch.tensor(transition_dict['states'], dtype=torch.float).to(self.device)  # [b,n_states]
        actions = torch.tensor(transition_dict['actions'], dtype=torch.float).view(-1,1).to(self.device)  # [b,1]
        rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).view(-1,1).to(self.device)  # [b,1]
        next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float).to(self.device)  # [b,next_states]
        dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1,1).to(self.device)  # [b,1]
        
        # 价值目标网络获取下一时刻的动作[b,n_states]-->[b,n_actors]
        next_q_values = self.target_actor(next_states)
        # 策略目标网络获取下一时刻状态选出的动作价值 [b,n_states+n_actions]-->[b,1]
        next_q_values = self.target_critic(next_states, next_q_values)
        # 当前时刻的动作价值的目标值 [b,1]
        q_targets = rewards + self.gamma * next_q_values * (1-dones)
        
        # 当前时刻动作价值的预测值 [b,n_states+n_actions]-->[b,1]
        q_values = self.critic(states, actions)
 
        # 预测值和目标值之间的均方差损失
        critic_loss = torch.mean(F.mse_loss(q_values, q_targets))
        # 价值网络梯度
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()
 
        # 当前状态的每个动作的价值 [b, n_actions]
        actor_q_values = self.actor(states)
        # 当前状态选出的动作价值 [b,1]
        score = self.critic(states, actor_q_values)
        # 计算损失
        actor_loss = -torch.mean(score)
        # 策略网络梯度
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()
 
        # 软更新策略网络的参数  
        self.soft_update(self.actor, self.target_actor)
        # 软更新价值网络的参数
        self.soft_update(self.critic, self.target_critic)

Flat World¶

In [11]:
class FlatWorld:
    def __init__(self, 
                 state_space=None, 
                 start_point=None, 
                 target_space=None, 
                 grass_space=None,
                 action_space=(0, np.pi*2),
                 reward_target=0,
                 reward_step=-1,
                 reward_grass=-10,
                 reward_bound=-100,
                 seed=None
                ):
        self.state_space = copy(state_space)
        self.start_point = start_point
        self.target_space = target_space
        self.grass_space = grass_space
        
        self.action_space = copy(action_space)         
        self.reward_target = reward_target
        self.reward_grass = reward_grass
        self.reward_step = reward_step
        self.reward_bound = reward_bound
        
        self.state = start_point
        self.done = False
        self.step_count = 0

    def reset(self):
        self.state = self.start_point
        self.done = False
        self.step_count = 0
        return self.state
    
    def step(self, action, step_size=0.1):
        dxy = step_size * np.array([np.sin(action), np.cos(action)])
        next_state = tuple(np.array(self.state) + dxy)

        step_grass = [patch.contains_point(next_state) for patch in self.grass_space]
        if self.target_space.contains_point(next_state):
            reward = self.reward_target
        elif np.any(step_grass):
            reward = self.reward_grass
        elif not self.state_space.contains_point(next_state):
            next_state = self.state
            reward = self.reward_bound
        else:
            reward = self.reward_step
        
        self.state = next_state
        self.done = self.target_space.contains_point(self.state)
        self.step_count += 1
        
        return self.state, reward, self.done        

    def render(self, figsize=(5,5), policy_field=None, step_size=0.2, trajectory=None):
        xlim, ylim = self.state_space.get_width(), self.state_space.get_height()
        fig = plt.figure(figsize=figsize)
        ax = fig.add_subplot(111, aspect='equal')
        
        start_point = patches.Circle(self.start_point, 0.2, linewidth=1, facecolor='skyblue', alpha=0.5)
        ax.add_patch(start_point)
        ax.add_patch(copy(self.target_space))
        for patch in self.grass_space:
            ax.add_patch(copy(patch))

        if policy_field is not None:
            for (x, y), action in policy_field:
                dx, dy = step_size * np.sin(action) * 0.8, step_size * np.cos(action) * 0.8
                arrow = patches.Arrow(x, y, dx, dy, width=0.1)
                ax.add_patch(arrow)
        
        if trajectory is not None:
            x, y = zip(*trajectory)
            ax.plot(np.array(x), np.array(y), linestyle="-", color="red")
        
        ax.set_xbound(0, xlim)
        ax.set_ybound(0, ylim)
        ax.set_xticks(np.arange(xlim+1), labels=[])
        ax.set_yticks(np.arange(ylim+1), labels=[])
        return ax
In [12]:
# -------------------------------------- #
# 环境加载
# -------------------------------------- #
state_space = patches.Rectangle((0, 0), 6, 6)
target_space = patches.Rectangle((4, 4), 2, 2, linewidth=1, facecolor='pink', alpha=0.5)
grass_space = [
    patches.Circle((3, 3), 1, linewidth=1, facecolor='green', alpha=0.25),
    patches.Rectangle((0, 4), 2, 2, linewidth=1, facecolor='green', alpha=0.25)
]
start_point = (1, 1)
env = FlatWorld(
    state_space=state_space, 
    start_point = start_point, 
    target_space=target_space, 
    grass_space=grass_space
)

env.reset()
trajectory = [env.start_point]
while not env.done and env.step_count <= 10000:
    action = np.random.uniform(*env.action_space)
    env.step(action, step_size=0.2)
    trajectory.append(env.state)

env.render(trajectory=trajectory)
Out[12]:
<Axes: >
No description has been provided for this image

Training¶

In [13]:
# 参数配置

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

n_states = 2  # 状态数 2
n_actions = 1  # 动作数 1
action_bound = 2*np.pi  # 动作的最大值 
n_hiddens = 5

buffer_size = 100000
min_size = 1000
sigma = 0.15  # Std of Gaussian exploration noise
batch_size = 256  # Batch size for both actor and critic
gamma = 0.99  # Discount factor
tau = 0.005  # Target network update rate
actor_lr = 0.01
critic_lr = 0.01
In [14]:
# -------------------------------------- #
# 模型构建
# -------------------------------------- #
 
# 经验回放池实例化
replay_buffer = ReplayBuffer(capacity=buffer_size)
# 模型实例化
agent = DDPG(n_states = n_states,  # 状态数
             n_hiddens = n_hiddens,  # 隐含层数
             n_actions = n_actions,  # 动作数
             action_bound = action_bound,  # 动作最大值
             sigma = sigma,  # 高斯噪声
             actor_lr = actor_lr,  # 策略网络学习率
             critic_lr = critic_lr,  # 价值网络学习率
             tau = tau,  # 软更新系数
             gamma = gamma,  # 折扣因子
             device = device
            )
 
# -------------------------------------- #
# 模型训练
# -------------------------------------- #
 
return_list = []  # 记录每个回合的return
mean_return_list = []  # 记录每个回合的return均值
 
for i in range(10):  # 迭代10回合
    episode_return = 0  # 累计每条链上的reward
    state = env.reset()  # 初始时的状态
    done = False  # 回合结束标记
    
    while not done and env.step_count<=1e5:
        # 获取当前状态对应的动作
        action = agent.take_action(state)
        # 环境更新
        next_state, reward, done = env.step(action[0], step_size=0.5)
        # 更新经验回放池
        replay_buffer.add(state, action, reward, next_state, done)
        # 状态更新
        state = next_state
        # 累计每一步的reward
        episode_return += reward
 
        # 如果经验池超过容量,开始训练
        if replay_buffer.size() > min_size:
            # 经验池随机采样batch_size组
            s, a, r, ns, d = replay_buffer.sample(batch_size)
            # 构造数据集
            transition_dict = {
                'states': s,
                'actions': a,
                'rewards': r,
                'next_states': ns,
                'dones': d,
            }
            # 模型训练
            agent.update(transition_dict)
    
    # 保存每一个回合的回报
    return_list.append(episode_return)
    mean_return_list.append(np.mean(return_list[-10:]))  # 平滑
 
    # 打印回合信息
    print(f'iter:{i}, return:{episode_return}, mean_return:{np.mean(return_list[-10:])}')
iter:0, return:-204129, mean_return:-204129.0
iter:1, return:-45, mean_return:-102087.0
iter:2, return:-45, mean_return:-68073.0
iter:3, return:-45, mean_return:-51066.0
iter:4, return:-45, mean_return:-40861.8
iter:5, return:-45, mean_return:-34059.0
iter:6, return:-37, mean_return:-29198.714285714286
iter:7, return:-46, mean_return:-25554.625
iter:8, return:-10531, mean_return:-23885.333333333332
iter:9, return:-36, mean_return:-21500.4
In [15]:
step_size = 0.2

env.reset()
trajectory = [env.start_point]
while not env.done and env.step_count <= 10000:
    action = agent.take_action(env.state)
    env.step(action[0], step_size)
    trajectory.append(env.state)

policy_field = []
for state in product(np.arange(0, env.state_space.get_width(), step_size), 
                     np.arange(0, env.state_space.get_height(), step_size)
                    ):
    action = agent.take_action(state)
    policy_field.append((state, action[0]))
            
ax = env.render(policy_field=policy_field, step_size=step_size, trajectory=trajectory)
plt.show()
No description has been provided for this image