In [1]:
import pandas as pd
import numpy as np
from itertools import product
from copy import copy
import random
import warnings
import matplotlib.pyplot as plt
from matplotlib import patches
from matplotlib.collections import PatchCollection
warnings.filterwarnings('ignore')
Q-Learning¶
Q-Learning¶
In [2]:
def QLearning(env, num_episodes=None, gamma=0.9, alpha=0.1, theta=1e-6):
# Define the q-table and the learning rate
Q = pd.DataFrame(index=env.state_space, columns=env.action_space)
Q.loc[:, :] = 0
num = 0
while True:
env.reset()
Q_t = Q.copy()
while not env.done:
# Choose an action using the uniform policy
state = env.state
action = random.choice(env.action_space)
# Take the action and observe the next state and reward
next_state, reward, done = env.step(action)
# Update the Q-table using the Bellman optimality equation
Q.loc[state, action] = Q.loc[state, action] + alpha * (
reward + gamma * np.max(Q.loc[next_state]) - Q.loc[state, action])
num +=1
if num_episodes is not None and num >= num_episodes:
break
elif Q_t.sub(Q).abs().lt(theta).all(axis=None):
print("Converged!")
break
# Update target policy
policy = Q.idxmax(axis=1)
return policy
Grid World¶
In [3]:
class GridWorld:
def __init__(self,
env_size=(3, 3),
start_state=(0, 2),
target_state=(2, 0),
grass_states=None,
action_space=tuple([(0,0), (0,1), (1,0), (0,-1), (-1,0)]),
reward_target=0,
reward_step=-1,
reward_grass=-2,
reward_bound=-2
):
self.env_size = env_size
self.state_space = tuple(product(range(env_size[0]), range(env_size[1])))
self.start_state = start_state
self.target_state = target_state
self.grass_states = grass_states
self.action_space = action_space
self.reward_target = reward_target
self.reward_grass = reward_grass
self.reward_step = reward_step
self.reward_bound = reward_bound
self.state = start_state
self.done = False
def reset(self):
self.state = self.start_state
self.done = False
def step(self, action):
assert action in self.action_space, "Invalid action"
next_state = tuple(np.array(self.state) + np.array(action))
if next_state == self.target_state:
reward = self.reward_target
elif next_state in self.grass_states:
reward = self.reward_grass
elif next_state not in self.state_space:
next_state = self.state
reward = self.reward_bound
else:
reward = self.reward_step
self.state = next_state
self.done = self.state == self.target_state
return self.state, reward, self.done
def render(self, figsize=(5,5), policy=None, trajectory=None):
xlim, ylim = self.env_size
fig = plt.figure(figsize=figsize)
ax = fig.add_subplot(111, aspect='equal')
rect = patches.Rectangle(self.start_state, 1, 1, linewidth=1, facecolor='skyblue', alpha=0.5)
ax.add_patch(rect)
rect = patches.Rectangle(self.target_state, 1, 1, linewidth=1, facecolor='pink', alpha=0.5)
ax.add_patch(rect)
for xy in self.grass_states:
rect = patches.Rectangle(xy, 1, 1, linewidth=1, facecolor='green', alpha=0.25)
ax.add_patch(rect)
if policy is not None:
for state, action in policy.items():
x, y = tuple(np.array(state)+0.5)
if action == (0, 0):
arrow = patches.Circle((x, y), 0.1, linewidth=0.5, color="skyblue", fill=False)
ax.add_patch(arrow)
else:
dx, dy = tuple(np.array(action)*0.4)
arrow = patches.Arrow(x, y, dx, dy, width=0.1)
ax.add_patch(arrow)
if trajectory is not None:
x, y = zip(*trajectory)
ax.plot(np.array(x)+0.5, np.array(y)+0.5, linestyle=":", color="red")
ax.set_xbound(0, xlim)
ax.set_ybound(0, ylim)
ax.set_xticks(np.arange(xlim+1), labels=[])
ax.set_yticks(np.arange(ylim+1), labels=[])
plt.grid()
plt.show()
def set_grass_states(self, num_state, seed=None):
random.seed(seed)
np.random.seed(seed)
sample_space = list(set(self.state_space) - set([self.target_state]) - set([self.start_state]))
grass_states = random.sample(sample_space, num_state)
self.grass_states = tuple(grass_states)
random.seed(None)
np.random.seed(None)
In [4]:
if __name__ == "__main__":
env_size = (10, 10)
start_state = (0, 0)
target_state = tuple(np.array(env_size) - 1)
env = GridWorld(env_size=env_size, start_state=start_state, target_state=target_state)
env.set_grass_states(num_state=40)
policy = QLearning(env, num_episodes=1e6, theta=1e-6) # training
env.reset()
trajectory = [env.start_state]
while not env.done:
action = policy[env.state]
env.step(action)
trajectory.append(env.state)
env.render(policy=policy, trajectory=trajectory)
Converged!
Cliff Walking¶
In [5]:
class CliffWalking:
def __init__(self,
env_size=(12, 4),
start_state=(0, 0),
target_state=(11, 0),
cliff_states=tuple((i, 0) for i in range(1, 11)),
action_space=tuple([(0,0), (0,1), (1,0), (0,-1), (-1,0)]),
reward_target=0,
reward_step=-1,
reward_cliff=-100
):
self.env_size = env_size
self.state_space = tuple(product(range(env_size[0]), range(env_size[1])))
self.start_state = start_state
self.target_state = target_state
self.cliff_states = cliff_states
self.action_space = action_space
self.reward_target = reward_target
self.reward_cliff = reward_cliff
self.reward_step = reward_step
self.state = start_state
self.done = False
def reset(self):
self.state = self.start_state
self.done = False
def step(self, action):
assert action in self.action_space, "Invalid action"
next_state = tuple(np.array(self.state) + np.array(action))
if np.random.uniform(0, 1) <= 0.1:
next_state = tuple(np.array(next_state) + np.array(random.choice(self.action_space)))
if next_state == self.target_state:
reward = self.reward_target
self.done = True
elif next_state in self.cliff_states:
reward = self.reward_cliff
self.done = True
elif next_state not in self.state_space:
next_state = self.state
reward = self.reward_step
else:
reward = self.reward_step
self.state = next_state
return self.state, reward, self.done
def render(self, figsize=(8,4), policy=None, trajectory=None):
xlim, ylim = self.env_size
fig = plt.figure(figsize=figsize)
ax = fig.add_subplot(111, aspect='equal')
rect = patches.Rectangle(self.start_state, 1, 1, linewidth=1, facecolor='skyblue', alpha=0.5)
ax.add_patch(rect)
rect = patches.Rectangle(self.target_state, 1, 1, linewidth=1, facecolor='pink', alpha=0.5)
ax.add_patch(rect)
for xy in self.cliff_states:
rect = patches.Rectangle(xy, 1, 1, linewidth=1, facecolor='grey', alpha=0.25)
ax.add_patch(rect)
if policy is not None:
for state, action in policy.items():
x, y = tuple(np.array(state)+0.5)
if action == (0, 0):
arrow = patches.Circle((x, y), 0.1, linewidth=0.5, color="blue", fill=False)
ax.add_patch(arrow)
else:
dx, dy = tuple(np.array(action)*0.4)
arrow = patches.Arrow(x, y, dx, dy, width=0.1)
ax.add_patch(arrow)
if trajectory is not None:
x, y = zip(*trajectory)
ax.plot(np.array(x)+0.5, np.array(y)+0.5, linestyle=":", color="red")
ax.set_xbound(0, xlim)
ax.set_ybound(0, ylim)
ax.set_xticks(np.arange(xlim+1), labels=[])
ax.set_yticks(np.arange(ylim+1), labels=[])
plt.grid()
plt.show()
In [6]:
if __name__ == "__main__":
cliff_states = tuple((i, 0) for i in range(1, 11))
env = CliffWalking(cliff_states=cliff_states)
agent = QLearning(env, num_episodes=1e6, theta=1e-6) # training
env.reset()
trajectory = [env.start_state]
num = 0
while not env.done and num <=100:
num += 1
action = agent[env.state]
env.step(action)
trajectory.append(env.state)
env.render(policy=agent, trajectory=trajectory)
Converged!
DDPG¶
In [7]:
import torch
from torch import nn
from torch.nn import functional as F
import numpy as np
import collections
import random
Replay Buffer¶
In [8]:
class ReplayBuffer:
def __init__(self, capacity): # 经验池的最大容量
# 创建一个队列,先进先出
self.buffer = collections.deque(maxlen=capacity)
# 在队列中添加数据
def add(self, state, action, reward, next_state, done):
# 以list类型保存
self.buffer.append((state, action, reward, next_state, done))
# 在队列中随机取样batch_size组数据
def sample(self, batch_size):
transitions = random.sample(self.buffer, batch_size)
# 将数据集拆分开来
state, action, reward, next_state, done = zip(*transitions)
return np.array(state), action, reward, np.array(next_state), done
# 测量当前时刻的队列长度
def size(self):
return len(self.buffer)
Actor-Critic¶
In [9]:
# ------------------------------------- #
# 策略网络
# ------------------------------------- #
class PolicyNet(nn.Module):
def __init__(self, n_states, n_hiddens, n_actions, action_bound):
super(PolicyNet, self).__init__()
# 环境可以接受的动作最大值
self.action_bound = action_bound
# 只包含一个隐含层
self.fc1 = nn.Linear(n_states, n_hiddens)
self.fc2 = nn.Linear(n_hiddens, n_actions)
# 前向传播
def forward(self, x):
x = self.fc1(x) # [b,n_states]-->[b,n_hiddens]
x = F.relu(x)
x = self.fc2(x) # [b,n_hiddens]-->[b,n_actions]
x= torch.tanh(x) # 将数值调整到 [-1,1]
x = x * self.action_bound # 缩放到 [-action_bound, action_bound]
return x
# ------------------------------------- #
# 价值网络
# ------------------------------------- #
class QValueNet(nn.Module):
def __init__(self, n_states, n_hiddens, n_actions):
super(QValueNet, self).__init__()
#
self.fc1 = nn.Linear(n_states + n_actions, n_hiddens)
self.fc2 = nn.Linear(n_hiddens, n_hiddens)
self.fc3 = nn.Linear(n_hiddens, 1)
# 前向传播
def forward(self, x, a):
# 拼接状态和动作
cat = torch.cat([x, a], dim=1) # [b, n_states + n_actions]
x = self.fc1(cat) # -->[b, n_hiddens]
x = F.relu(x)
x = self.fc2(x) # -->[b, n_hiddens]
x = F.relu(x)
x = self.fc3(x) # -->[b, 1]
return x
DDPG¶
In [10]:
class DDPG:
def __init__(self, n_states, n_hiddens, n_actions, action_bound,
sigma, actor_lr, critic_lr, tau, gamma, device):
# 策略网络--训练
self.actor = PolicyNet(n_states, n_hiddens, n_actions, action_bound).to(device)
# 价值网络--训练
self.critic = QValueNet(n_states, n_hiddens, n_actions).to(device)
# 策略网络--目标
self.target_actor = PolicyNet(n_states, n_hiddens, n_actions, action_bound).to(device)
# 价值网络--目标
self.target_critic = QValueNet(n_states, n_hiddens, n_actions).to(device
)
# 初始化价值网络的参数,两个价值网络的参数相同
self.target_critic.load_state_dict(self.critic.state_dict())
# 初始化策略网络的参数,两个策略网络的参数相同
self.target_actor.load_state_dict(self.actor.state_dict())
# 策略网络的优化器
self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)
# 价值网络的优化器
self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr)
# 属性分配
self.gamma = gamma # 折扣因子
self.sigma = sigma # 高斯噪声的标准差,均值设为0
self.tau = tau # 目标网络的软更新参数
self.n_actions = n_actions
self.device = device
# 动作选择
def take_action(self, state):
# 维度变换 list[n_states]-->tensor[1,n_states]-->gpu
state = torch.tensor(state, dtype=torch.float).view(1,-1).to(self.device)
# 策略网络计算出当前状态下的动作价值 [1,n_states]-->[1,1]-->int
action = self.actor(state).item()
# 给动作添加噪声,增加搜索
action = action + self.sigma * np.random.randn(self.n_actions)
return action
# 软更新, 意思是每次learn的时候更新部分参数
def soft_update(self, net, target_net):
# 获取训练网络和目标网络需要更新的参数
for param_target, param in zip(target_net.parameters(), net.parameters()):
# 训练网络的参数更新要综合考虑目标网络和训练网络
param_target.data.copy_(param_target.data*(1-self.tau) + param.data*self.tau)
# 训练
def update(self, transition_dict):
# 从训练集中取出数据
transition_dict = {k: np.array(v) for k, v in transition_dict.items()}
states = torch.tensor(transition_dict['states'], dtype=torch.float).to(self.device) # [b,n_states]
actions = torch.tensor(transition_dict['actions'], dtype=torch.float).view(-1,1).to(self.device) # [b,1]
rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).view(-1,1).to(self.device) # [b,1]
next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float).to(self.device) # [b,next_states]
dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1,1).to(self.device) # [b,1]
# 价值目标网络获取下一时刻的动作[b,n_states]-->[b,n_actors]
next_q_values = self.target_actor(next_states)
# 策略目标网络获取下一时刻状态选出的动作价值 [b,n_states+n_actions]-->[b,1]
next_q_values = self.target_critic(next_states, next_q_values)
# 当前时刻的动作价值的目标值 [b,1]
q_targets = rewards + self.gamma * next_q_values * (1-dones)
# 当前时刻动作价值的预测值 [b,n_states+n_actions]-->[b,1]
q_values = self.critic(states, actions)
# 预测值和目标值之间的均方差损失
critic_loss = torch.mean(F.mse_loss(q_values, q_targets))
# 价值网络梯度
self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()
# 当前状态的每个动作的价值 [b, n_actions]
actor_q_values = self.actor(states)
# 当前状态选出的动作价值 [b,1]
score = self.critic(states, actor_q_values)
# 计算损失
actor_loss = -torch.mean(score)
# 策略网络梯度
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
# 软更新策略网络的参数
self.soft_update(self.actor, self.target_actor)
# 软更新价值网络的参数
self.soft_update(self.critic, self.target_critic)
Flat World¶
In [11]:
class FlatWorld:
def __init__(self,
state_space=None,
start_point=None,
target_space=None,
grass_space=None,
action_space=(0, np.pi*2),
reward_target=0,
reward_step=-1,
reward_grass=-10,
reward_bound=-100,
seed=None
):
self.state_space = copy(state_space)
self.start_point = start_point
self.target_space = target_space
self.grass_space = grass_space
self.action_space = copy(action_space)
self.reward_target = reward_target
self.reward_grass = reward_grass
self.reward_step = reward_step
self.reward_bound = reward_bound
self.state = start_point
self.done = False
self.step_count = 0
def reset(self):
self.state = self.start_point
self.done = False
self.step_count = 0
return self.state
def step(self, action, step_size=0.1):
dxy = step_size * np.array([np.sin(action), np.cos(action)])
next_state = tuple(np.array(self.state) + dxy)
step_grass = [patch.contains_point(next_state) for patch in self.grass_space]
if self.target_space.contains_point(next_state):
reward = self.reward_target
elif np.any(step_grass):
reward = self.reward_grass
elif not self.state_space.contains_point(next_state):
next_state = self.state
reward = self.reward_bound
else:
reward = self.reward_step
self.state = next_state
self.done = self.target_space.contains_point(self.state)
self.step_count += 1
return self.state, reward, self.done
def render(self, figsize=(5,5), policy_field=None, step_size=0.2, trajectory=None):
xlim, ylim = self.state_space.get_width(), self.state_space.get_height()
fig = plt.figure(figsize=figsize)
ax = fig.add_subplot(111, aspect='equal')
start_point = patches.Circle(self.start_point, 0.2, linewidth=1, facecolor='skyblue', alpha=0.5)
ax.add_patch(start_point)
ax.add_patch(copy(self.target_space))
for patch in self.grass_space:
ax.add_patch(copy(patch))
if policy_field is not None:
for (x, y), action in policy_field:
dx, dy = step_size * np.sin(action) * 0.8, step_size * np.cos(action) * 0.8
arrow = patches.Arrow(x, y, dx, dy, width=0.1)
ax.add_patch(arrow)
if trajectory is not None:
x, y = zip(*trajectory)
ax.plot(np.array(x), np.array(y), linestyle="-", color="red")
ax.set_xbound(0, xlim)
ax.set_ybound(0, ylim)
ax.set_xticks(np.arange(xlim+1), labels=[])
ax.set_yticks(np.arange(ylim+1), labels=[])
return ax
In [12]:
# -------------------------------------- #
# 环境加载
# -------------------------------------- #
state_space = patches.Rectangle((0, 0), 6, 6)
target_space = patches.Rectangle((4, 4), 2, 2, linewidth=1, facecolor='pink', alpha=0.5)
grass_space = [
patches.Circle((3, 3), 1, linewidth=1, facecolor='green', alpha=0.25),
patches.Rectangle((0, 4), 2, 2, linewidth=1, facecolor='green', alpha=0.25)
]
start_point = (1, 1)
env = FlatWorld(
state_space=state_space,
start_point = start_point,
target_space=target_space,
grass_space=grass_space
)
env.reset()
trajectory = [env.start_point]
while not env.done and env.step_count <= 10000:
action = np.random.uniform(*env.action_space)
env.step(action, step_size=0.2)
trajectory.append(env.state)
env.render(trajectory=trajectory)
Out[12]:
<Axes: >
Training¶
In [13]:
# 参数配置
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
n_states = 2 # 状态数 2
n_actions = 1 # 动作数 1
action_bound = 2*np.pi # 动作的最大值
n_hiddens = 5
buffer_size = 100000
min_size = 1000
sigma = 0.15 # Std of Gaussian exploration noise
batch_size = 256 # Batch size for both actor and critic
gamma = 0.99 # Discount factor
tau = 0.005 # Target network update rate
actor_lr = 0.01
critic_lr = 0.01
In [14]:
# -------------------------------------- #
# 模型构建
# -------------------------------------- #
# 经验回放池实例化
replay_buffer = ReplayBuffer(capacity=buffer_size)
# 模型实例化
agent = DDPG(n_states = n_states, # 状态数
n_hiddens = n_hiddens, # 隐含层数
n_actions = n_actions, # 动作数
action_bound = action_bound, # 动作最大值
sigma = sigma, # 高斯噪声
actor_lr = actor_lr, # 策略网络学习率
critic_lr = critic_lr, # 价值网络学习率
tau = tau, # 软更新系数
gamma = gamma, # 折扣因子
device = device
)
# -------------------------------------- #
# 模型训练
# -------------------------------------- #
return_list = [] # 记录每个回合的return
mean_return_list = [] # 记录每个回合的return均值
for i in range(10): # 迭代10回合
episode_return = 0 # 累计每条链上的reward
state = env.reset() # 初始时的状态
done = False # 回合结束标记
while not done and env.step_count<=1e5:
# 获取当前状态对应的动作
action = agent.take_action(state)
# 环境更新
next_state, reward, done = env.step(action[0], step_size=0.5)
# 更新经验回放池
replay_buffer.add(state, action, reward, next_state, done)
# 状态更新
state = next_state
# 累计每一步的reward
episode_return += reward
# 如果经验池超过容量,开始训练
if replay_buffer.size() > min_size:
# 经验池随机采样batch_size组
s, a, r, ns, d = replay_buffer.sample(batch_size)
# 构造数据集
transition_dict = {
'states': s,
'actions': a,
'rewards': r,
'next_states': ns,
'dones': d,
}
# 模型训练
agent.update(transition_dict)
# 保存每一个回合的回报
return_list.append(episode_return)
mean_return_list.append(np.mean(return_list[-10:])) # 平滑
# 打印回合信息
print(f'iter:{i}, return:{episode_return}, mean_return:{np.mean(return_list[-10:])}')
iter:0, return:-204129, mean_return:-204129.0 iter:1, return:-45, mean_return:-102087.0 iter:2, return:-45, mean_return:-68073.0 iter:3, return:-45, mean_return:-51066.0 iter:4, return:-45, mean_return:-40861.8 iter:5, return:-45, mean_return:-34059.0 iter:6, return:-37, mean_return:-29198.714285714286 iter:7, return:-46, mean_return:-25554.625 iter:8, return:-10531, mean_return:-23885.333333333332 iter:9, return:-36, mean_return:-21500.4
In [15]:
step_size = 0.2
env.reset()
trajectory = [env.start_point]
while not env.done and env.step_count <= 10000:
action = agent.take_action(env.state)
env.step(action[0], step_size)
trajectory.append(env.state)
policy_field = []
for state in product(np.arange(0, env.state_space.get_width(), step_size),
np.arange(0, env.state_space.get_height(), step_size)
):
action = agent.take_action(state)
policy_field.append((state, action[0]))
ax = env.render(policy_field=policy_field, step_size=step_size, trajectory=trajectory)
plt.show()