我用的是新版gym,请修改这份代码,使之能运行
时间: 2024-10-24 15:19:10 浏览: 27
为了使您的代码能够兼容新版Gym(例如 Gymnasium),您需要对一些API调用进行调整。以下是主要的改动点:
1. **环境创建**:`gym.make` 返回的对象不再直接提供 `reset()` 方法,而是返回一个 `EnvSpec` 对象。您需要使用 `reset()` 方法时传递 `seed` 参数。
2. **步进方法**:`step()` 方法现在返回五个元素,而不是四个。具体来说,它返回 `(observation, reward, terminated, truncated, info)` 而不是 `(observation, reward, done, info)`。
3. **动作空间和观测空间**:`action_space` 和 `observation_space` 的属性和方法可能有所变化,但通常情况下不需要修改。
下面是修改后的代码:
```python
import numpy as np
from scipy.stats import truncnorm
import gymnasium as gym
import itertools
import torch
import torch.nn as nn
import torch.nn.functional as F
import collections
import matplotlib.pyplot as plt
class CEM:
def __init__(self, n_sequence, elite_ratio, fake_env, upper_bound, lower_bound):
self.n_sequence = n_sequence
self.elite_ratio = elite_ratio
self.upper_bound = upper_bound
self.lower_bound = lower_bound
self.fake_env = fake_env
def optimize(self, state, init_mean, init_var):
mean, var = init_mean, init_var
X = truncnorm(-2, 2, loc=np.zeros_like(mean), scale=np.ones_like(var))
state = np.tile(state, (self.n_sequence, 1))
for _ in range(5):
lb_dist, ub_dist = mean - self.lower_bound, self.upper_bound - mean
constrained_var = np.minimum(
np.minimum(np.square(lb_dist / 2), np.square(ub_dist / 2)),
var
)
action_sequences = [X.rvs() for _ in range(self.n_sequence)] * np.sqrt(constrained_var) + mean
returns = self.fake_env.propagate(state, action_sequences)[:, 0]
elites = action_sequences[np.argsort(returns)][-int(self.elite_ratio * self.n_sequence):]
new_mean = np.mean(elites, axis=0)
new_var = np.var(elites, axis=0)
mean = 0.1 * mean + 0.9 * new_mean
var = 0.1 * var + 0.9 * new_var
return mean
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class Swish(nn.Module):
def __init__(self):
super(Swish, self).__init__()
def forward(self, x):
return x * torch.sigmoid(x)
def init_weights(m):
def truncated_normal_init(t, mean=0.0, std=0.01):
torch.nn.init.normal_(t, mean=mean, std=std)
while True:
cond = (t < mean - 2 * std) | (t > mean + 2 * std)
if not torch.sum(cond):
break
t = torch.where(
cond,
torch.nn.init.normal_(torch.ones(t.shape, device=device), mean=mean, std=std),
t
)
return t
if type(m) == nn.Linear or isinstance(m, FCLayer):
truncated_normal_init(m.weight, std=1 / (2 * np.sqrt(m._input_dim)))
m.bias.data.fill_(0.0)
class FCLayer(nn.Module):
def __init__(self, input_dim, output_dim, ensemble_size, activation):
super(FCLayer, self).__init__()
self._input_dim, self._output_dim = input_dim, output_dim
self.weight = nn.Parameter(torch.Tensor(ensemble_size, input_dim, output_dim).to(device))
self._activation = activation
self.bias = nn.Parameter(torch.Tensor(ensemble_size, output_dim).to(device))
def forward(self, x):
return self._activation(torch.add(torch.bmm(x, self.weight), self.bias[:, None, :]))
class EnsembleModel(nn.Module):
def __init__(self, state_dim, action_dim, ensemble_size=5, learning_rate=1e-3):
super(EnsembleModel, self).__init__()
self._output_dim = (state_dim + 1) * 2
self._max_logvar = nn.Parameter((torch.ones((1, self._output_dim // 2)).float() / 2).to(device), requires_grad=False)
self._min_logvar = nn.Parameter((-torch.ones((1, self._output_dim // 2)).float() * 10).to(device), requires_grad=False)
self.layer1 = FCLayer(state_dim + action_dim, 200, ensemble_size, Swish())
self.layer2 = FCLayer(200, 200, ensemble_size, Swish())
self.layer3 = FCLayer(200, 200, ensemble_size, Swish())
self.layer4 = FCLayer(200, 200, ensemble_size, Swish())
self.layer5 = FCLayer(200, self._output_dim, ensemble_size, nn.Identity())
self.apply(init_weights)
self.optimizer = torch.optim.Adam(self.parameters(), lr=learning_rate)
def forward(self, x, return_log_var=False):
ret = self.layer5(self.layer4(self.layer3(self.layer2(self.layer1(x)))))
mean = ret[:, :, :self._output_dim // 2]
logvar = self._max_logvar - F.softplus(self._max_logvar - ret[:, :, self._output_dim // 2:])
logvar = self._min_logvar + F.softplus(logvar - self._min_logvar)
return mean, logvar if return_log_var else torch.exp(logvar)
def loss(self, mean, logvar, labels, use_var_loss=True):
inverse_var = torch.exp(-logvar)
if use_var_loss:
mse_loss = torch.mean(torch.mean(torch.pow(mean - labels, 2) * inverse_var, dim=-1), dim=-1)
var_loss = torch.mean(torch.mean(logvar, dim=-1), dim=-1)
total_loss = torch.sum(mse_loss) + torch.sum(var_loss)
else:
mse_loss = torch.mean(torch.pow(mean - labels, 2), dim=(1, 2))
total_loss = torch.sum(mse_loss)
return total_loss, mse_loss
def train(self, loss):
self.optimizer.zero_grad()
loss += 0.01 * torch.sum(self._max_logvar) - 0.01 * torch.sum(self._min_logvar)
loss.backward()
self.optimizer.step()
class EnsembleDynamicsModel:
def __init__(self, state_dim, action_dim, num_network=5):
self._num_network = num_network
self._state_dim, self._action_dim = state_dim, action_dim
self.model = EnsembleModel(state_dim, action_dim, ensemble_size=num_network)
self._epoch_since_last_update = 0
def train(self, inputs, labels, batch_size=64, holdout_ratio=0.1, max_iter=20):
permutation = np.random.permutation(inputs.shape[0])
inputs, labels = inputs[permutation], labels[permutation]
num_holdout = int(inputs.shape[0] * holdout_ratio)
train_inputs, train_labels = inputs[num_holdout:], labels[num_holdout:]
holdout_inputs, holdout_labels = inputs[:num_holdout], labels[:num_holdout]
阅读全文