多智能体强化学习的原理简介及基于Sarsa的多智能体强化学习python代码
时间: 2023-05-27 17:04:37 浏览: 161
多智能体强化学习 (Multi-agent Reinforcement Learning, MARL) 是指一个由多个个体组成的环境中,每个个体都有自己的决策空间,目标是通过与环境的交互,获取最大的累积奖励。MARL 的特点是不同个体之间相互影响,一个个体的决策将会影响其他个体的决策,因此 MARL 的复杂度比单智能体强化学习要高。其主要应用于博弈论、自动驾驶、机器人、智能交通等领域。
基于Sarsa的多智能体强化学习算法可以通过如下步骤实现:
1. 初始化每个智能体的策略,价值函数以及环境模型。
2. 每个智能体与环境交互进行学习,按照如下步骤进行:
a. 根据当前状态,每个智能体选择一个行为。这里使用 $\epsilon$-贪心策略,即以一定概率随机选择行为,以一定概率选择当前最优行为。
b. 执行行为,更新环境状态。
c. 获取奖励,用于更新价值函数。
d. 根据新状态和价值函数更新智能体的策略。这里使用Sarsa(state-action-reward-state-action)算法,即使用当前策略选择一个行为,然后观察下一个状态及奖励,利用下一个状态和奖励更新当前价值函数,再根据新的价值函数更新策略。
e. 将状态更新为新状态,继续执行下一个动作。
3. 迭代多次执行以上步骤,直到收敛。
下面是基于Sarsa的多智能体强化学习的Python代码:
```python
import numpy as np
import random
#定义环境
class Gridworld:
def __init__(self, size):
self.size = size
self.state = np.zeros(2, dtype=np.int32)
self.actions = np.array([[0,1],[0,-1],[1,0],[-1,0]])
self.rewards = np.array([[0,-10],[-10,0],[0,-10],[0,-10]])
#判断当前状态是否终止状态
def is_terminal(self, state):
if ((state == [0,0]).all() or (state == [self.size-1,self.size-1]).all()):
return True
else:
return False
#获取当前状态的所有可选行为
def get_actions(self):
return self.actions
#更新状态
def update_state(self, action):
new_state = self.state + action
if new_state[0] < 0 or new_state[0] >= self.size or new_state[1] < 0 or new_state[1] >= self.size:
return False
else:
self.state = new_state
return True
#获取当前状态的奖励
def get_reward(self):
return self.rewards[np.where(np.all(self.actions == self.action, axis=1))[0][0]]
#定义智能体
class Agent:
def __init__(self, id, grid):
self.id = id
self.grid = grid
self.q_table = np.zeros((grid.size, grid.size, 4)) #价值函数
self.epsilion = 0.1 #探索概率
self.alpha = 0.5 #学习率
self.gamma = 0.9 #衰减系数
#根据当前状态选择一个行为
def choose_action(self, state):
if random.uniform(0,1) < self.epsilion:
action = random.choice(self.grid.get_actions())
else:
action = self.greedy_policy(state)
return action
#根据epsilon-greedy策略选择一个行为
def greedy_policy(self, state):
values = self.q_table[state[0], state[1], :]
max_value = np.max(values)
actions = self.grid.get_actions()
candidate_actions = [a for a in actions if values[np.where(np.all(self.grid.actions == a, axis=1))[0][0]] == max_value]
return random.choice(candidate_actions)
#执行一个周期,包括选择行为、执行行为、更新价值函数和策略
def run_cycle(self, state):
self.action = self.choose_action(state)
self.grid.update_state(self.action)
reward = self.grid.get_reward()
next_state = self.grid.state
next_action = self.choose_action(next_state)
value = self.q_table[state[0], state[1], np.where(np.all(self.grid.actions == self.action, axis=1))[0][0]]
next_value = self.q_table[next_state[0], next_state[1], np.where(np.all(self.grid.actions == next_action, axis=1))[0][0]]
td_error = reward + self.gamma * next_value - value
self.q_table[state[0], state[1], np.where(np.all(self.grid.actions == self.action, axis=1))[0][0]] += self.alpha * td_error
self.epsilion *= 0.99 #探索概率指数衰减
#执行多个周期
def run_cycles(self, num_cycles):
for i in range(num_cycles):
if self.grid.is_terminal(self.grid.state):
self.grid.state = np.zeros(2, dtype=np.int32)
state = self.grid.state
self.run_cycle(state)
#定义多智能体
class MultiAgent:
def __init__(self, num_agents, grid):
self.grid = grid
self.agents = [Agent(i, grid) for i in range(num_agents)]
#执行一个周期,让每个智能体分别执行一个周期
def run_cycle(self):
for agent in self.agents:
if self.grid.is_terminal(self.grid.state):
self.grid.state = np.zeros(2, dtype=np.int32)
state = self.grid.state
agent.run_cycle(state)
#执行多个周期
def run_cycles(self, num_cycles):
for i in range(num_cycles):
self.run_cycle()
#设定环境大小和智能体数量
size = 4
num_agents = 2
#初始化环境和多智能体
grid = Gridworld(size)
multi_agent = MultiAgent(num_agents, grid)
#执行多个周期
multi_agent.run_cycles(1000)
#输出每个智能体的价值函数
for agent in multi_agent.agents:
print('agent', agent.id)
print(agent.q_table)
```