import gym import math import random import numpy as np import matplotlib.pyplot as plt import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim import os '''迁移到了GPU上进行训练'''
class Actor(nn.Module): def __init__(self, input_size, hidden_size, output_size): super(Actor, self).__init__() self.linear1 = nn.Linear(input_size, hidden_size) self.linear2 = nn.Linear(hidden_size, hidden_size) self.linear3 = nn.Linear(hidden_size, hidden_size) self.linear4 = nn.Linear(hidden_size, output_size) def forward(self, s): x = F.relu(self.linear1(s)) x = F.relu(self.linear2(x)) x = F.relu(self.linear3(x)) x = torch.tanh(self.linear4(x))
return x
class Critic(nn.Module): def __init__(self, input_size, hidden_size, output_size): super().__init__() self.linear1 = nn.Linear(input_size, hidden_size) self.linear2 = nn.Linear(hidden_size, hidden_size) self.linear3 = nn.Linear(hidden_size, hidden_size) self.linear4 = nn.Linear(hidden_size, output_size)
def forward(self, s, a): x = torch.cat([s, a], 1) x = F.relu(self.linear1(x)) x = F.relu(self.linear2(x)) x = F.relu(self.linear3(x)) x = self.linear4(x)
return x
class Agent(object): def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value)
s_dim = self.env.observation_space.shape[0] a_dim = self.env.action_space.shape[0] fileList = os.listdir('nets/') if "actor.pkl" in fileList : print("Trained nets found!")
self.actor = torch.load('nets/actor.pkl') self.actor_target = torch.load('nets/actor_target.pkl')
self.critic1 = torch.load('nets/critic1.pkl') self.critic_target1 = torch.load('nets/critic_target1.pkl') self.critic2 = torch.load('nets/critic2.pkl') self.critic_target2 = torch.load('nets/critic_target2.pkl') else: print("Trained nets not found!")
self.actor = Actor(s_dim, 256, a_dim).cuda() self.actor_target = Actor(s_dim, 256, a_dim).cuda()
self.critic1 = Critic(s_dim + a_dim, 256, 1).cuda() self.critic_target1 = Critic(s_dim + a_dim, 256, 1).cuda() self.critic2 = Critic(s_dim + a_dim, 256, 1).cuda() self.critic_target2 = Critic(s_dim + a_dim, 256, 1).cuda() self.actor_target.load_state_dict(self.actor.state_dict())
self.critic_target1.load_state_dict(self.critic1.state_dict()) self.critic_target2.load_state_dict(self.critic2.state_dict())
self.actor_optim = optim.Adam(self.actor.parameters(), lr=self.actor_lr) self.critic_optim1 = optim.Adam(self.critic1.parameters(), lr=self.critic_lr) self.critic_optim2 = optim.Adam(self.critic2.parameters(), lr=self.critic_lr) self.buffer = [] self.updateCnt = 0
def act(self, s0): s0 = torch.tensor(s0, dtype=torch.float).cuda().unsqueeze(0).cuda() a0 = self.actor(s0).squeeze(0).detach().cpu().numpy() return a0
def put(self, *transition): if len(self.buffer) == self.capacity: self.buffer.pop(0) self.buffer.append(transition)
def learn(self): if len(self.buffer) < self.batch_size: return
samples = random.sample(self.buffer, self.batch_size)
s0, a0, r1, s1 = zip(*samples)
s0 = torch.tensor(s0, dtype=torch.float).cuda() a0 = torch.tensor(a0, dtype=torch.float).cuda() r1 = torch.tensor(r1, dtype=torch.float).view(self.batch_size, -1).cuda() s1 = torch.tensor(s1, dtype=torch.float).cuda()
def critic_learn(): a1 = self.actor_target(s1).detach() y_true = r1 + self.gamma * torch.min(self.critic_target1(s1, a1), self.critic_target1(s1, a1)).detach() y_pred1 = self.critic1(s0, a0) loss_fn = nn.MSELoss() loss = loss_fn(y_pred1, y_true) self.critic_optim1.zero_grad() loss.backward() self.critic_optim1.step() y_pred2 = self.critic2(s0, a0) loss_fn = nn.MSELoss() loss = loss_fn(y_pred2, y_true) self.critic_optim2.zero_grad() loss.backward() self.critic_optim2.step()
def actor_learn(): loss = -torch.mean(torch.min(self.critic1(s0, self.actor(s0)), self.critic2(s0, self.actor(s0)))) self.actor_optim.zero_grad() loss.backward() self.actor_optim.step()
def soft_update(net_target, net, tau): for target_param, param in zip(net_target.parameters(), net.parameters()): target_param.data.copy_(target_param.data * (1.0 - tau) + param.data * tau)
critic_learn() soft_update(self.critic_target1, self.critic1, self.tau) soft_update(self.critic_target2, self.critic2, self.tau) self.updateCnt += 1 if((self.updateCnt % self.update_interval) == 0): actor_learn() soft_update(self.actor_target, self.actor, self.tau)
def save(self): torch.save(self.actor, 'nets/actor.pkl') torch.save(self.actor_target, 'nets/actor_target.pkl') torch.save(self.critic1, 'nets/critic1.pkl') torch.save(self.critic_target1, 'nets/critic_target1.pkl') torch.save(self.critic2, 'nets/critic2.pkl') torch.save(self.critic_target2, 'nets/critic_target2.pkl')
env = gym.make('Pendulum-v1') env.reset() env.render()
params = { 'env': env, 'gamma': 0.99, 'actor_lr': 0.001, 'critic_lr': 0.0013, 'tau': 0.02, 'capacity': 5000, 'batch_size': 32, 'update_interval': 3, } EPOCH_NUM = 200 agent = Agent(**params) FLAG = False rewardList = []
integral = 0
for episode in range(EPOCH_NUM): s0 = env.reset() episode_reward = 0 if(episode%20 == 0): flag = True else: flag = False integral = 0 INTCOEFF = (episode/EPOCH_NUM)**2*0.005 for step in range(500): if(flag): env.render() a0 = agent.act(s0) s1, r1, done, _ = env.step(a0) integral += r1*INTCOEFF agent.put(s0, a0, r1+integral, s1)
episode_reward += r1 s0 = s1
agent.learn()
print(episode, ': ', episode_reward) rewardList.append(episode_reward) pltX = [i for i in range(EPOCH_NUM)] plt.plot(pltX, rewardList) plt.show()
|