解释代码for iter=1:n_iter for tour_i=1:n_param prob(:,tour_i)= (T(:,tour_i).^alpha) .* ((1./Nodes(:,tour_i)).^beta); prob(:,tour_i)=prob(:,tour_i)./sum(prob(:,tour_i)); end for A=1:NA for tour_i=1:n_param node_sel=rand; node_ind=1; prob_sum=0; for j=1:n_node prob_sum=prob_sum+prob(j,tour_i); if prob_sum>=node_sel node_ind=j; break end end ant(A,tour_i)=node_ind; tour_selected_param(tour_i) = Nodes(node_ind, tour_i); end cost(A)=cost_func(tour_selected_param,0); clc disp(['Ant number: ' num2str(A)]) disp(['Ant Cost: ' num2str(cost(A))]) disp(['Ant Paramters: ' num2str(tour_selected_param)]) if iter~=1 disp(['iteration: ' num2str(iter)]) disp('_________________') disp(['Best cost: ' num2str(cost_best)]) for i=1:n_param tour_selected_param(i) = Nodes(ant(cost_best_ind,i), i); end disp(['Best paramters: ' num2str(tour_selected_param)]) end
该段代码是用于实现蚁群算法中的蚂蚁选择路径的过程。其中n_iter表示迭代次数,n_param表示参数数量,NA表示蚂蚁数量。
首先,对于每个参数,计算出对应的概率值,并且对概率值进行归一化处理。这里使用的是蚁群算法中的公式,其中T表示信息素值,alpha和beta是调整信息素和启发式因子的系数,Nodes表示节点信息,包括每个节点的参数值。prob表示每个节点被选择的概率。
然后,对于每只蚂蚁,通过随机生成一个概率值,在所有可能的节点中选择一个节点作为路径的下一步。选择过程通过累加概率值实现。比如,给定一个随机概率值node_sel,先将概率值prob_sum初始化为0,然后依次将每个节点的概率值相加,如果prob_sum大于等于node_sel,就表示找到了下一步要去的节点。
接着,根据选择的节点信息,计算出对应的代价值cost,并进行输出结果。如果不是第一次迭代,则输出当前的最优代价值和最优参数值。该段代码的目的是在每次迭代的过程中,输出当前的蚁群状态,包括每只蚂蚁的选择路径和对应的代价值等信息。
ppo代码的pytorch实现
以下是一个简单的PPO算法的PyTorch实现,主要参考了OpenAI baselines中的实现。这里以CartPole-v1为例:
import torch
import torch.nn as nn
from torch.distributions import Categorical
import gym
class ActorCritic(nn.Module):
def __init__(self, obs_shape, action_space):
super(ActorCritic, self).__init__()
self.observation_space = obs_shape
self.action_space = action_space
self.actor_fc1 = nn.Linear(obs_shape[0], 64)
self.actor_fc2 = nn.Linear(64, action_space.n)
self.critic_fc1 = nn.Linear(obs_shape[0], 64)
self.critic_fc2 = nn.Linear(64, 1)
self.log_probs = []
self.values = []
self.rewards = []
self.masks = []
def act(self, obs):
actor_x = torch.tanh(self.actor_fc1(obs))
action_scores = self.actor_fc2(actor_x)
dist = Categorical(logits=action_scores)
action = dist.sample()
self.log_probs.append(dist.log_prob(action))
return action.item()
def evaluate(self, obs):
actor_x = torch.tanh(self.actor_fc1(obs))
action_scores = self.actor_fc2(actor_x)
dist = Categorical(logits=action_scores)
action = dist.sample()
log_prob = dist.log_prob(action)
critic_x = torch.tanh(self.critic_fc1(obs))
value = self.critic_fc2(critic_x)
self.log_probs.append(log_prob)
self.values.append(value)
return action.item(), value.item()
def clear_memory(self):
del self.log_probs[:]
del self.values[:]
del self.rewards[:]
del self.masks[:]
class PPO:
def __init__(self, env_name, batch_size=64, gamma=0.99, clip_param=0.2, ppo_epoch=10, lr=3e-4, eps=1e-5):
self.env = gym.make(env_name)
self.obs_space = self.env.observation_space
self.act_space = self.env.action_space
self.clip_param = clip_param
self.ppo_epoch = ppo_epoch
self.batch_size = batch_size
self.gamma = gamma
self.eps = eps
self.lr = lr
self.net = ActorCritic(self.obs_space.shape, self.act_space)
self.optimizer = torch.optim.Adam(self.net.parameters(), lr=self.lr, eps=self.eps)
self.net.train()
def get_batch(self):
obs = self.obs_buf[np.asarray(self.batch_ids)]
actions = self.act_buf[np.asarray(self.batch_ids)]
rewards = self.rew_buf[np.asarray(self.batch_ids)]
dones = self.done_buf[np.asarray(self.batch_ids)]
next_obs = self.obs_buf[np.asarray(self.batch_ids) + 1]
masks = 1 - dones.astype(np.float32)
return obs, actions, rewards, next_obs, masks
def learn(self, obs, actions, rewards, next_obs, masks):
obs = torch.tensor(obs, dtype=torch.float32)
actions = torch.tensor(actions, dtype=torch.float32)
rewards = torch.tensor(rewards, dtype=torch.float32).unsqueeze(1)
masks = torch.tensor(masks, dtype=torch.float32).unsqueeze(1)
next_obs = torch.tensor(next_obs, dtype=torch.float32)
with torch.no_grad():
_, next_value = self.net.evaluate(next_obs)
advantage = rewards + self.gamma * masks * next_value - self.net.values[-1]
returns = []
gae = 0
lambda_ = 0.95
for i in reversed(range(len(rewards))):
delta = rewards[i] + self.gamma * masks[i] * self.net.values[i + 1] - self.net.values[i]
gae = delta + self.gamma * masks[i] * lambda_ * gae
returns.insert(0, gae + self.net.values[i])
returns = torch.tensor(returns, dtype=torch.float32)
for _ in range(self.ppo_epoch):
for ind in BatchSampler(SubsetRandomSampler(range(self.batch_size)), self.batch_size, False):
log_prob, value = self.net.evaluate(obs[ind])
ratio = torch.exp(log_prob - self.net.log_probs[ind])
adv = advantage[ind]
surr1 = ratio * adv
surr2 = torch.clamp(ratio, 1 - self.clip_param, 1 + self.clip_param) * adv
actor_loss = -torch.min(surr1, surr2).mean()
critic_loss = (returns[ind] - value).pow(2).mean()
loss = actor_loss + 0.5 * critic_loss
# optimize
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
self.net.clear_memory()
def run(self, max_iter=10000):
obs = self.env.reset()
episode_reward = 0
for i in range(max_iter):
action = self.net.act(torch.tensor(obs, dtype=torch.float32))
next_obs, reward, done, _ = self.env.step(action)
episode_reward += reward
self.net.rewards.append(reward)
self.net.masks.append(1 - done)
obs = next_obs
if done:
obs = self.env.reset()
self.net.clear_memory()
if i % self.batch_size == 0 and i != 0:
self.learn(*self.get_batch())
if i % 100 == 0 and i != 0:
print('Episode {}, Reward: {:.2f}'.format(i, episode_reward / 100))
episode_reward = 0
上述代码中,我们首先定义了一个ActorCritic类,它包括一个Actor和一个Critic。Actor根据当前状态输出动作的概率分布,并根据分布进行采样;Critic则输出当前状态的价值。在PPO算法中,我们需要同时更新Actor和Critic。PPO算法的核心在于计算Advantage,可以参考第一篇回答中的解释。此外,我们还需要使用一个BatchSampler对数据进行采样。在run函数中,我们首先根据当前状态获取一个动作,然后执行该动作,并将相关的信息存储在ActorCritic类中。如果当前episode结束,我们则清空ActorCritic类中的信息,并重置环境。如果当前步数可以被batch_size整除,我们则进行PPO算法的更新。
结合了LDA主题模型、Word2Vec词向量模型的TextRank关键词抽取算法Python代码
以下是结合了LDA主题模型、Word2Vec词向量模型的TextRank关键词抽取算法的Python代码:
import jieba
import gensim
from gensim import corpora, models
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
def load_stopwords(path):
"""
加载停用词
:param path: 停用词文件路径
:return: 停用词列表
"""
stopwords = []
with open(path, 'r', encoding='utf-8') as f:
for line in f.readlines():
stopwords.append(line.strip())
return stopwords
def get_sentences(text):
"""
使用jieba分句
:param text: 文本内容
:return: 句子列表
"""
sentences = []
for line in text.split('\n'):
line = line.strip()
if not line:
continue
for s in line.split('。'):
s = s.strip()
if not s:
continue
sentences.append(s)
return sentences
def segment(sentence, stopwords):
"""
使用jieba进行分词并去除停用词
:param sentence: 句子
:param stopwords: 停用词列表
:return: 分词后的列表
"""
words = []
for word in jieba.cut(sentence):
word = word.strip()
if not word:
continue
if word not in stopwords:
words.append(word)
return words
def get_word2vec_model(text, size=100, window=5, min_count=5, workers=4):
"""
训练Word2Vec模型
:param text: 文本内容
:param size: 词向量维度
:param window: 窗口大小
:param min_count: 最小词频
:param workers: 线程数
:return: Word2Vec模型
"""
sentences = []
for line in text.split('\n'):
line = line.strip()
if not line:
continue
sentences.append(segment(line, stopwords))
model = gensim.models.Word2Vec(sentences, size=size, window=window, min_count=min_count, workers=workers)
return model
def get_lda_model(text, num_topics=8, passes=10):
"""
训练LDA主题模型
:param text: 文本内容
:param num_topics: 主题数
:param passes: 迭代次数
:return: LDA模型和语料库
"""
sentences = []
for line in text.split('\n'):
line = line.strip()
if not line:
continue
sentences.append(segment(line, stopwords))
dictionary = corpora.Dictionary(sentences)
corpus = [dictionary.doc2bow(sentence) for sentence in sentences]
lda_model = models.ldamodel.LdaModel(corpus=corpus, num_topics=num_topics, id2word=dictionary, passes=passes)
return lda_model, corpus
def get_topic_word_matrix(lda_model, num_topics, num_words):
"""
获取主题-词矩阵
:param lda_model: LDA模型
:param num_topics: 主题数
:param num_words: 每个主题选取的关键词数
:return: 主题-词矩阵
"""
topic_word_matrix = np.zeros((num_topics, num_words))
for i in range(num_topics):
topic_words = lda_model.get_topic_terms(i, topn=num_words)
for j in range(num_words):
topic_word_matrix[i][j] = topic_words[j][0]
return topic_word_matrix
def get_sentence_topic_vector(sentence, lda_model, dictionary, num_topics):
"""
获取句子的主题向量
:param sentence: 句子
:param lda_model: LDA模型
:param dictionary: 词典
:param num_topics: 主题数
:return: 句子的主题向量
"""
sentence_bow = dictionary.doc2bow(segment(sentence, stopwords))
topic_vector = np.zeros(num_topics)
for topic, prob in lda_model[sentence_bow]:
topic_vector[topic] = prob
return topic_vector
def get_similarity_matrix(sentences, word2vec_model):
"""
获取句子之间的相似度矩阵
:param sentences: 句子列表
:param word2vec_model: Word2Vec模型
:return: 相似度矩阵
"""
similarity_matrix = np.zeros((len(sentences), len(sentences)))
for i in range(len(sentences)):
for j in range(i+1, len(sentences)):
sim = cosine_similarity([np.mean([word2vec_model[word] for word in segment(sentences[i], stopwords) if word in word2vec_model], axis=0)],
[np.mean([word2vec_model[word] for word in segment(sentences[j], stopwords) if word in word2vec_model], axis=0)]).item()
similarity_matrix[i][j] = sim
similarity_matrix[j][i] = sim
return similarity_matrix
def get_textrank_score(sentences, num_topics, lda_model, word2vec_model):
"""
获取TextRank算法得分
:param sentences: 句子列表
:param num_topics: 主题数
:param lda_model: LDA模型
:param word2vec_model: Word2Vec模型
:return: 句子得分列表
"""
dictionary = lda_model.id2word
num_words = 20
topic_word_matrix = get_topic_word_matrix(lda_model, num_topics, num_words)
sentence_topic_vectors = np.zeros((len(sentences), num_topics))
for i in range(len(sentences)):
sentence_topic_vectors[i] = get_sentence_topic_vector(sentences[i], lda_model, dictionary, num_topics)
similarity_matrix = get_similarity_matrix(sentences, word2vec_model)
# TextRank算法迭代
max_iter = 100
d = 0.85
scores = np.ones(len(sentences))
for i in range(max_iter):
tmp_scores = np.zeros(len(sentences))
for j in range(len(sentences)):
tmp_scores[j] = (1 - d) + d * np.sum([similarity_matrix[j][k] * scores[k] for k in range(len(sentences))])
scores = tmp_scores
# 合并TextRank和主题模型得分
final_scores = np.zeros(len(sentences))
for i in range(len(sentences)):
for j in range(num_topics):
final_scores[i] += topic_word_matrix[j].tolist().count(i) * sentence_topic_vectors[i][j]
final_scores = d * final_scores + (1 - d) * scores
return final_scores
# 加载停用词
stopwords = load_stopwords('stopwords.txt')
# 加载文本
with open('text.txt', 'r', encoding='utf-8') as f:
text = f.read()
# 分句
sentences = get_sentences(text)
# 训练Word2Vec模型
word2vec_model = get_word2vec_model(text)
# 训练LDA主题模型
lda_model, corpus = get_lda_model(text)
# 获取TextRank算法得分
num_topics = 8
scores = get_textrank_score(sentences, num_topics, lda_model, word2vec_model)
# 按得分排序,获取关键词
num_keywords = 10
keywords = []
idx = np.argsort(scores)[::-1][:num_keywords]
for i in idx:
keywords.append(sentences[i])
print(keywords)
其中,text.txt为待处理的文本文件,stopwords.txt为停用词文件,需要自行准备。代码中num_topics、num_words、num_keywords等参数需要根据实际情况进行调整。
相关推荐














