0%

TD3网络

TD3网络

  • TD3是Twin Delayed Deep Deterministic policy gradient algorithm的简称,双延迟深度确定性策略梯度

    传统的DDPG:

    image-20211208213332139

关注上图中,我们通过Critic网络估算动作的A值。一个Critic的评估可能会较高。所以我们加一个。

image-20211208213403185

这就相当于我们把途中的Critic的框框,一个变为两个。

在目标网络中,我们估算出来的Q值会用min()函数求出较少值。以这个值作为更新的目标。

这个目标会更新两个网络 Critic网络_1 和 Critic网络_2。

你可以理解为这两个网络是完全独立,他们只是都用同一个目标进行更新。

剩余的就和DDPG一样了。过一段时间,把学习好的网络赋值给目标网络。

我们再仔细分别看Critic部分和Actor部分的学习。

Critic部分的学习

只有我们在计算Critic的更新目标时,我们才用target network。其中就包括了一个Policy network,用于计算A’;两个Q network ,用于计算两个Q值:Q1(A’) 和Q2(A’)。

Q1(A’) 和Q2(A’) 取最小值 min(Q1,Q2) 将代替DDPG的 Q(a’) 计算更新目标,也就是说: target = min(Q1,Q2) * gamma + r

target 将会是 Q_network_1 和 Q_network_2 两个网络的更新目标。

这里可能会有同学问,既然更新目标是一样的,那么为什么还需要两个网络呢?

虽然更新目标一样,两个网络会越来越趋近与和实际q值相同。但由于网络参数的初始值不一样,会导致计算出来的值有所不同。所以我们可以有空间选择较小的值去估算q值,避免q值被高估。

Actor部分的学习

我们在DDPG中说过,DDPG网络图像上就可以想象成一张布,覆盖在qtable上。当我们输入某个状态的时候,相当于这块布上的一个截面,我们我们能够看到在这个状态下的一条曲线。

而actor的任务,就是用梯度上升的方法,寻着这条线的最高点。

对于actor来说,其实并不在乎Q值是否会被高估,他的任务只是不断做梯度上升,寻找这条最大的Q值。随着更新的进行Q1和Q2两个网络,将会变得越来越像。所以用Q1还是Q2,还是两者都用,对于actor的问题不大。

Delayed - 延迟

这里说的Dalayed ,是actor更新的delay。也就是说相对于critic可以更新多次后,actor再进行更新。

为什么要这样做呢?

还是回到我们qnet拟合出来的那块”布”上。

qnet在学习过程中,我们的q值是不断变化的,也就是说这块布是不断变形的。所以要寻着最高点的任务有时候就挺难为为的actor了。

可以想象,本来是最高点的,当actor好不容易去到最高点;q值更新了,这并不是最高点。这时候actor只能转头再继续寻找新的最高点。更坏的情况可能是actor被困在次高点,没有找到正确的最高点。

所以我们可以把Critic的更新频率,调的比Actor要高一点。让critic更加确定,actor再行动。

target policy smoothing regularization

TD3中,价值函数的更新目标每次都在action上加一个小扰动,这个操作就是target policy smoothing regularization

为什么要这样呢?

我们可以再次回到我们关于“布”的想象。

在DDPG中,计算target的时候,我们输入时s_和a_,获得q,也就是这块布上的一点A。通过估算target估算另外一点s,a,也就是布上的另外一点B的Q值。
image-20211208213605697

在TD3中,计算target时候,输入s_到actor输出a后,给a加上噪音,让a在一定范围内随机。这又什么好处呢。

好处就是,当更新多次的时候,就相当于用A点附近的一小部分范围(准确来说是在s_这条线上的一定范围)的去估算B,这样可以让B点的估计更准确,更健壮。

image-20211208213632464

  • 这注意区分三个地方:

​ 在跑游戏的时候,我们同样加上了了noise。这个时候的noise是为了更充分地开发整个游戏空间。
​ 计算target的时候,actor加上noise,是为了预估更准确,网络更有健壮性。
​ 更新actor的时候,我们不需要加上noise,这里是希望actor能够寻着最大值。加上noise并没有任何意义。

下面附上源代码

import gym
import math
import random
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import os
'''迁移到了GPU上进行训练'''

class Actor(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super(Actor, self).__init__()
self.linear1 = nn.Linear(input_size, hidden_size)
self.linear2 = nn.Linear(hidden_size, hidden_size)
self.linear3 = nn.Linear(hidden_size, hidden_size)
self.linear4 = nn.Linear(hidden_size, output_size)
def forward(self, s):
x = F.relu(self.linear1(s))
x = F.relu(self.linear2(x))
x = F.relu(self.linear3(x))
x = torch.tanh(self.linear4(x))

return x


class Critic(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super().__init__()
self.linear1 = nn.Linear(input_size, hidden_size)
self.linear2 = nn.Linear(hidden_size, hidden_size)
self.linear3 = nn.Linear(hidden_size, hidden_size)
self.linear4 = nn.Linear(hidden_size, output_size)

def forward(self, s, a):
x = torch.cat([s, a], 1)
x = F.relu(self.linear1(x))
x = F.relu(self.linear2(x))
x = F.relu(self.linear3(x))
x = self.linear4(x)

return x


class Agent(object):
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)

s_dim = self.env.observation_space.shape[0]
a_dim = self.env.action_space.shape[0]
fileList = os.listdir('nets/')
if "actor.pkl" in fileList :
print("Trained nets found!")

self.actor = torch.load('nets/actor.pkl')
self.actor_target = torch.load('nets/actor_target.pkl')

self.critic1 = torch.load('nets/critic1.pkl')
self.critic_target1 = torch.load('nets/critic_target1.pkl')
self.critic2 = torch.load('nets/critic2.pkl')
self.critic_target2 = torch.load('nets/critic_target2.pkl')
else:
print("Trained nets not found!")

self.actor = Actor(s_dim, 256, a_dim).cuda()
self.actor_target = Actor(s_dim, 256, a_dim).cuda()

self.critic1 = Critic(s_dim + a_dim, 256, 1).cuda() # 此处修改了critic的输出维度恒为1
self.critic_target1 = Critic(s_dim + a_dim, 256, 1).cuda() # 此处修改了critic的输出维度恒为1
self.critic2 = Critic(s_dim + a_dim, 256, 1).cuda() # 此处修改了critic的输出维度恒为1
self.critic_target2 = Critic(s_dim + a_dim, 256, 1).cuda() # 此处修改了critic的输出维度恒为1
# 假如没找到存在的网络的话,初始化target网络
self.actor_target.load_state_dict(self.actor.state_dict())

self.critic_target1.load_state_dict(self.critic1.state_dict())
self.critic_target2.load_state_dict(self.critic2.state_dict())

self.actor_optim = optim.Adam(self.actor.parameters(), lr=self.actor_lr)
self.critic_optim1 = optim.Adam(self.critic1.parameters(), lr=self.critic_lr)
self.critic_optim2 = optim.Adam(self.critic2.parameters(), lr=self.critic_lr)
self.buffer = []
self.updateCnt = 0



def act(self, s0):
s0 = torch.tensor(s0, dtype=torch.float).cuda().unsqueeze(0).cuda()
a0 = self.actor(s0).squeeze(0).detach().cpu().numpy()
return a0

def put(self, *transition):
if len(self.buffer) == self.capacity:
self.buffer.pop(0)
self.buffer.append(transition)

def learn(self):
if len(self.buffer) < self.batch_size:
return

samples = random.sample(self.buffer, self.batch_size)


s0, a0, r1, s1 = zip(*samples)

s0 = torch.tensor(s0, dtype=torch.float).cuda()
a0 = torch.tensor(a0, dtype=torch.float).cuda()
r1 = torch.tensor(r1, dtype=torch.float).view(self.batch_size, -1).cuda()
s1 = torch.tensor(s1, dtype=torch.float).cuda()

def critic_learn():
a1 = self.actor_target(s1).detach()
y_true = r1 + self.gamma * torch.min(self.critic_target1(s1, a1), self.critic_target1(s1, a1)).detach()
# 更新网咯1
y_pred1 = self.critic1(s0, a0)
loss_fn = nn.MSELoss()
loss = loss_fn(y_pred1, y_true)
self.critic_optim1.zero_grad()
loss.backward()
self.critic_optim1.step()
# 更新网络2
y_pred2 = self.critic2(s0, a0)
loss_fn = nn.MSELoss()
loss = loss_fn(y_pred2, y_true)
self.critic_optim2.zero_grad()
loss.backward()
self.critic_optim2.step()

def actor_learn():
# 此处update actor网络同样从两个critic网络中选择一个较小的
loss = -torch.mean(torch.min(self.critic1(s0, self.actor(s0)), self.critic2(s0, self.actor(s0))))
self.actor_optim.zero_grad()
loss.backward()
self.actor_optim.step()

def soft_update(net_target, net, tau):
for target_param, param in zip(net_target.parameters(), net.parameters()):
target_param.data.copy_(target_param.data * (1.0 - tau) + param.data * tau)

critic_learn()
soft_update(self.critic_target1, self.critic1, self.tau)
soft_update(self.critic_target2, self.critic2, self.tau)
self.updateCnt += 1
# 到达更新频率的时候才更新actor
if((self.updateCnt % self.update_interval) == 0):
actor_learn()
soft_update(self.actor_target, self.actor, self.tau)


def save(self):
torch.save(self.actor, 'nets/actor.pkl')
torch.save(self.actor_target, 'nets/actor_target.pkl')
torch.save(self.critic1, 'nets/critic1.pkl')
torch.save(self.critic_target1, 'nets/critic_target1.pkl')
torch.save(self.critic2, 'nets/critic2.pkl')
torch.save(self.critic_target2, 'nets/critic_target2.pkl')

env = gym.make('Pendulum-v1')
env.reset()
env.render()

params = {
'env': env,
'gamma': 0.99,
'actor_lr': 0.001,
'critic_lr': 0.0013,
'tau': 0.02,
'capacity': 5000,
'batch_size': 32,
'update_interval': 3,
}
EPOCH_NUM = 200
agent = Agent(**params)
FLAG = False
rewardList = []
# INTCOEFF = 0.001
integral = 0
# INTCOEFF = 0.0
for episode in range(EPOCH_NUM):
s0 = env.reset()
episode_reward = 0
if(episode%20 == 0):
flag = True
else:
flag = False
integral = 0
INTCOEFF = (episode/EPOCH_NUM)**2*0.005
for step in range(500):
if(flag):
env.render()
a0 = agent.act(s0)
s1, r1, done, _ = env.step(a0)
integral += r1*INTCOEFF
agent.put(s0, a0, r1+integral, s1)

episode_reward += r1
s0 = s1

agent.learn()

print(episode, ': ', episode_reward)
rewardList.append(episode_reward)
pltX = [i for i in range(EPOCH_NUM)]
plt.plot(pltX, rewardList)
plt.show()
# agent.save()

详见DDPG_Try: DDPG尝试集 (gitee.com)