A2C tensorflow实现
时间: 2023-07-24 19:35:01 浏览: 165
以下是 A2C 的 TensorFlow 实现:
```python
import gym
import tensorflow as tf
import numpy as np
# 定义 Actor 网络
class Actor(tf.keras.Model):
def __init__(self, state_dim, action_dim, hidden_dim):
super(Actor, self).__init__()
self.fc1 = tf.keras.layers.Dense(hidden_dim, activation='relu')
self.action_head = tf.keras.layers.Dense(action_dim, activation='softmax')
def call(self, x):
x = self.fc1(x)
action_probs = self.action_head(x)
return action_probs
# 定义 Critic 网络
class Critic(tf.keras.Model):
def __init__(self, state_dim, hidden_dim):
super(Critic, self).__init__()
self.fc1 = tf.keras.layers.Dense(hidden_dim, activation='relu')
self.value_head = tf.keras.layers.Dense(1)
def call(self, x):
x = self.fc1(x)
state_value = self.value_head(x)
return state_value
# 定义 A2C 算法
class A2C:
def __init__(self, state_dim, action_dim, hidden_dim, lr, gamma):
self.actor = Actor(state_dim, action_dim, hidden_dim)
self.critic = Critic(state_dim, hidden_dim)
self.optimizer_actor = tf.keras.optimizers.Adam(lr)
self.optimizer_critic = tf.keras.optimizers.Adam(lr)
self.gamma = gamma
def train(self, env, episodes):
for i in range(episodes):
state = env.reset()
rewards = []
log_probs = []
values = []
with tf.GradientTape(persistent=True) as tape:
while True:
# 选择动作
state = tf.constant(state.reshape(1, -1), dtype=tf.float32)
action_probs = self.actor(state)
dist = tfp.distributions.Categorical(probs=action_probs)
action = dist.sample()
log_prob = dist.log_prob(action)
# 执行动作
next_state, reward, done, _ = env.step(action.numpy()[0])
rewards.append(reward)
log_probs.append(log_prob)
values.append(self.critic(state))
state = next_state
if done:
# 计算返回值
returns = [0]
for r in reversed(rewards):
returns[0] = r + self.gamma * returns[0]
returns.insert(0, returns[0])
returns = returns[:-1]
# 计算 Advantage
advantages = [ret - value.numpy()[0] for ret, value in zip(returns, values)]
# 计算 actor 和 critic 的损失函数
actor_loss = 0
critic_loss = 0
for log_prob, advantage, value in zip(log_probs, advantages, values):
actor_loss += -log_prob * advantage
critic_loss += tf.keras.losses.Huber()(value, tf.constant([returns]))
# 更新网络参数
actor_grads = tape.gradient(actor_loss, self.actor.trainable_variables)
self.optimizer_actor.apply_gradients(zip(actor_grads, self.actor.trainable_variables))
critic_grads = tape.gradient(critic_loss, self.critic.trainable_variables)
self.optimizer_critic.apply_gradients(zip(critic_grads, self.critic.trainable_variables))
break
# 测试 A2C 算法
env = gym.make('CartPole-v0')
a2c = A2C(state_dim=4, action_dim=2, hidden_dim=128, lr=0.001, gamma=0.99)
a2c.train(env, episodes=1000)
```
这是一个简单的 A2C 实现,可以在 Gym 的 CartPole-v0 环境上进行训练和测试。在训练过程中,会输出每个 episode 的平均奖励和最后一个 episode 的奖励。
阅读全文