用if 和 for 代替 while while ~done g=f>T; Tnext=0.5*(mean(f(g))+mean(f(~g))); done=abs(T-Tnext)<0.5; T=Tnext; end

时间: 2024-01-24 12:05:07 浏览: 14
可以用if和for代替while循环,代码实现如下: ``` done = false; while ~done g=f>T; Tnext=0.5*(mean(f(g))+mean(f(~g))); done=abs(T-Tnext)<0.5; T=Tnext; end % 替换为 if 和 for done = false; for i = 1:1000 % 设置一个最大迭代次数,避免死循环 g=f>T; Tnext=0.5*(mean(f(g))+mean(f(~g))); if abs(T-Tnext)<0.5 done = true; break; % 跳出循环 else T=Tnext; end end ``` 这样做可以避免可能出现的无限循环问题。
相关问题

用akshare写出股票强化学习代码,画图显示买卖点和收益

好的,以下是我使用akshare库编写的股票强化学习代码和画图显示买卖点和收益的程序: ```python import akshare as ak import numpy as np import pandas as pd import matplotlib.pyplot as plt # 获取股票数据 stock_code = 'sh600519' stock_data = ak.stock_zh_a_hist(symbol=stock_code, adjust="qfq", start_date='20180101') stock_data.index = pd.to_datetime(stock_data['日期']) stock_data = stock_data.drop(['日期'], axis=1) # 计算技术指标RSI def calc_RSI(data, time_period=14): delta = data.diff() gain = delta.where(delta > 0, 0) loss = - delta.where(delta < 0, 0) avg_gain = gain.rolling(time_period).mean() avg_loss = loss.rolling(time_period).mean() RS = avg_gain / avg_loss RSI = 100 - (100 / (1 + RS)) return RSI stock_data['RSI'] = calc_RSI(stock_data['收盘']) # 定义状态和动作 def get_state(stock_data, t): state = [] for i in range(t - 4, t + 1): state.append(stock_data.iloc[i]['收盘']) state.append(stock_data.iloc[i]['RSI']) return np.array(state) def get_action(q_values, state, eps=0.1): if np.random.uniform() < eps: return np.random.randint(0, 3) else: return np.argmax(q_values[state]) # 定义强化学习过程 alpha = 0.1 gamma = 0.99 epsilon = 0.1 q_values = np.zeros((200, 3)) rewards = [] for episode in range(500): state = get_state(stock_data, 5) done = False total_reward = 0 while not done: action = get_action(q_values, tuple(state), epsilon) if action == 0: reward = -0.1 elif action == 1: reward = 0.1 else: reward = 0 next_state = get_state(stock_data, 6) q_values[tuple(state)][action] += alpha * (reward + gamma * np.max(q_values[tuple(next_state)]) - q_values[tuple(state)][action]) state = next_state total_reward += reward if len(stock_data) <= 6: done = True else: stock_data = stock_data.iloc[1:] if len(stock_data) <= 6: done = True rewards.append(total_reward) # 计算收益和买卖点 def calc_profit(stock_data, q_values): positions = [] profits = [] for i in range(5, len(stock_data) - 1): state = get_state(stock_data, i) action = np.argmax(q_values[tuple(state)]) if action == 0: positions.append(0) profits.append(0) elif action == 1: positions.append(1) profits.append(stock_data.iloc[i + 1]['收盘'] - stock_data.iloc[i]['收盘']) else: positions.append(-1) profits.append(stock_data.iloc[i]['收盘'] - stock_data.iloc[i + 1]['收盘']) return positions, profits positions, profits = calc_profit(stock_data, q_values) # 画图显示买卖点和收益 fig, ax1 = plt.subplots() ax1.plot(stock_data['收盘'], color='black') ax1.set_ylabel('Price') ax2 = ax1.twinx() ax2.plot(positions, color='blue') ax2.set_ylabel('Position') ax3 = ax1.twinx() ax3.spines["right"].set_position(("axes", 1.2)) ax3.plot(np.cumsum(profits), color='red') ax3.set_ylabel('Profit') plt.show() ``` 这段程序使用了akshare库获取股票数据,并计算了RSI指标用于状态的定义。然后定义了强化学习过程,使用Q-learning算法更新Q值,并记录总收益。最后计算买卖点和收益,并用图表展示。

maddpg结合优先经验回放和mpe环境 tensorflow代码

以下是使用TensorFlow实现MADDPG算法并结合优先经验回放和MPE环境的代码。代码中包含注释以帮助理解。 ```python import numpy as np import tensorflow as tf import random from mlagents.envs import UnityEnvironment from mlagents.envs import BrainInfo # 超参数 BUFFER_SIZE = int(1e6) # 经验回放缓冲区大小 BATCH_SIZE = 128 # 批量大小 GAMMA = 0.99 # 折扣因子 TAU = 1e-3 # 目标网络更新速率 LR_ACTOR = 1e-3 # Actor网络学习率 LR_CRITIC = 1e-3 # Critic网络学习率 UPDATE_EVERY = 2 # 更新网络的时间步数 NUM_UPDATES = 10 # 每次更新网络的次数 # 神经网络模型 class Actor(tf.keras.Model): def __init__(self, state_size, action_size): super(Actor, self).__init__() self.fc1 = tf.keras.layers.Dense(256, activation='relu') self.fc2 = tf.keras.layers.Dense(128, activation='relu') self.fc3 = tf.keras.layers.Dense(action_size, activation='tanh') def call(self, state): x = self.fc1(state) x = self.fc2(x) x = self.fc3(x) return x class Critic(tf.keras.Model): def __init__(self, state_size, action_size): super(Critic, self).__init__() self.fc1 = tf.keras.layers.Dense(256, activation='relu') self.fc2 = tf.keras.layers.Dense(128, activation='relu') self.fc3 = tf.keras.layers.Dense(1, activation=None) self.fc4 = tf.keras.layers.Dense(256, activation='relu') self.fc5 = tf.keras.layers.Dense(128, activation='relu') self.fc6 = tf.keras.layers.Dense(1, activation=None) def call(self, state, action): xs = tf.concat([state, action], axis=1) x1 = self.fc1(xs) x1 = self.fc2(x1) x1 = self.fc3(x1) x2 = self.fc4(xs) x2 = self.fc5(x2) x2 = self.fc6(x2) return x1, x2 # 优先经验回放类 class PrioritizedReplay: def __init__(self, buffer_size, batch_size): self.buffer_size = buffer_size self.batch_size = batch_size self.buffer = [] self.priorities = np.zeros((buffer_size,), dtype=np.float32) self.pos = 0 self.alpha = 0.5 self.beta = 0.5 self.beta_increment_per_sampling = 0.001 def add(self, state, action, reward, next_state, done): max_priority = np.max(self.priorities) if self.buffer else 1.0 experience = (state, action, reward, next_state, done) if len(self.buffer) < self.buffer_size: self.buffer.append(experience) else: self.buffer[self.pos] = experience self.priorities[self.pos] = max_priority self.pos = (self.pos + 1) % self.buffer_size def sample(self): if len(self.buffer) == self.buffer_size: priorities = self.priorities else: priorities = self.priorities[:self.pos] probs = priorities ** self.alpha probs /= probs.sum() indices = np.random.choice(len(self.buffer), self.batch_size, p=probs) samples = [self.buffer[idx] for idx in indices] total = len(self.buffer) weights = (total * probs[indices]) ** (-self.beta) weights /= weights.max() self.beta = np.min([1., self.beta + self.beta_increment_per_sampling]) return indices, samples, weights def update_priorities(self, batch_indices, batch_priorities): for idx, priority in zip(batch_indices, batch_priorities): self.priorities[idx] = priority # MADDPG算法类 class MADDPG: def __init__(self, state_size, action_size, num_agents): self.state_size = state_size self.action_size = action_size self.num_agents = num_agents self.actors = [Actor(state_size, action_size) for _ in range(num_agents)] self.critics = [Critic((state_size+action_size)*num_agents, 1) for _ in range(num_agents)] self.target_actors = [Actor(state_size, action_size) for _ in range(num_agents)] self.target_critics = [Critic((state_size+action_size)*num_agents, 1) for _ in range(num_agents)] for i in range(num_agents): self.target_actors[i].set_weights(self.actors[i].get_weights()) self.target_critics[i].set_weights(self.critics[i].get_weights()) self.buffer = PrioritizedReplay(BUFFER_SIZE, BATCH_SIZE) self.actor_optimizer = [tf.keras.optimizers.Adam(LR_ACTOR) for _ in range(num_agents)] self.critic_optimizer = [tf.keras.optimizers.Adam(LR_CRITIC) for _ in range(num_agents)] self.t_step = 0 def act(self, obs): obs = np.array(obs) actions = [] for i in range(self.num_agents): action = self.actors[i](obs[i][np.newaxis,:], training=False) actions.append(action.numpy()) actions = np.concatenate(actions, axis=0) return actions def step(self, state, action, reward, next_state, done): self.buffer.add(state, action, reward, next_state, done) self.t_step = (self.t_step + 1) % UPDATE_EVERY if self.t_step == 0 and len(self.buffer.buffer) > BATCH_SIZE: for _ in range(NUM_UPDATES): indices, samples, weights = self.buffer.sample() self.learn(samples, weights) self.update_targets() self.buffer.update_priorities(indices, weights) def learn(self, samples, weights): states = np.array([sample[0] for sample in samples]) actions = np.array([sample[1] for sample in samples]) rewards = np.array([sample[2] for sample in samples]) next_states = np.array([sample[3] for sample in samples]) dones = np.array([sample[4] for sample in samples]) for i in range(self.num_agents): # 计算Q值 with tf.GradientTape(persistent=True) as tape: target_actions = [self.target_actors[j](next_states[j][np.newaxis,:], training=False) for j in range(self.num_agents)] target_actions = np.concatenate(target_actions, axis=0) target_qs = self.target_critics[i]((next_states.reshape(-1, self.state_size*self.num_agents), target_actions)) target_qs = target_qs.numpy().reshape(-1, self.num_agents) q_targets = rewards[:,i][:,np.newaxis] + (GAMMA * target_qs * (1 - dones[:,i][:,np.newaxis])) critic_qs = self.critics[i]((states.reshape(-1, self.state_size*self.num_agents), actions.reshape(-1, self.action_size*self.num_agents))) critic_loss = tf.reduce_mean(weights * (q_targets - critic_qs)**2) critic_grads = tape.gradient(critic_loss, self.critics[i].trainable_variables) self.critic_optimizer[i].apply_gradients(zip(critic_grads, self.critics[i].trainable_variables)) # 计算Actor梯度 with tf.GradientTape() as tape: actor_actions = [self.actors[j](states[:,j,:], training=False) if j == i else self.actors[j](states[:,j,:], training=True) for j in range(self.num_agents)] actor_actions = np.concatenate(actor_actions, axis=0) actor_loss = -tf.reduce_mean(self.critics[i]((states.reshape(-1, self.state_size*self.num_agents), actor_actions))) actor_grads = tape.gradient(actor_loss, self.actors[i].trainable_variables) self.actor_optimizer[i].apply_gradients(zip(actor_grads, self.actors[i].trainable_variables)) def update_targets(self): for i in range(self.num_agents): self.target_actors[i].set_weights(TAU*np.array(self.actors[i].get_weights())+(1-TAU)*np.array(self.target_actors[i].get_weights())) self.target_critics[i].set_weights(TAU*np.array(self.critics[i].get_weights())+(1-TAU)*np.array(self.target_critics[i].get_weights())) # 环境 env_name = "MPE/3DBall" env = UnityEnvironment(file_name=env_name) brain_name = env.brain_names[0] brain = env.brains[brain_name] env_info = env.reset()[brain_name] state_size = env_info.vector_observations.shape[1] action_size = brain.vector_action_space_size num_agents = len(env_info.agents) maddpg = MADDPG(state_size, action_size, num_agents) scores = [] scores_window = deque(maxlen=100) for i_episode in range(10000): env_info = env.reset()[brain_name] obs = env_info.vector_observations score = np.zeros(num_agents) while True: actions = maddpg.act(obs) env_info = env.step(actions)[brain_name] next_obs = env_info.vector_observations rewards = env_info.rewards dones = env_info.local_done maddpg.step(obs, actions, rewards, next_obs, dones) obs = next_obs score += rewards if np.any(dones): break scores_window.append(np.max(score)) scores.append(np.max(score)) print('\rEpisode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_window)), end="") if i_episode % 100 == 0: print('\rEpisode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_window))) if np.mean(scores_window)>=0.5: print('\nEnvironment solved in {:d} episodes!\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_window))) break env.close() ```

相关推荐

最新推荐

recommend-type

微软内部资料-SQL性能优化5

Each index row in node pages contains an index key (or set of keys for a composite index) and a pointer to a page at the next level for which the first key value is the same as the key value in the ...
recommend-type

微软内部资料-SQL性能优化2

System space can only be accessed while in kernel mode, while user space is accessible in user mode. This protects system space from being tampered with by user mode code. Shared System Space ...
recommend-type

微软内部资料-SQL性能优化3

Intent Lock is the term used to mean placing a marker in a higher-level lock queue. The type of intent lock can also be called the multigranular lock mode. An intent lock indicates that SQL Server ...
recommend-type

鸡国大冒险运行程序,点开即用

recommend-type

基于Python+Flask的安全多方计算的隐私保护系统设计与实现+全部资料齐全+部署文档.zip

【资源说明】 基于Python+Flask的安全多方计算的隐私保护系统设计与实现+全部资料齐全+部署文档.zip基于Python+Flask的安全多方计算的隐私保护系统设计与实现+全部资料齐全+部署文档.zip 【备注】 1、该项目是个人高分项目源码,已获导师指导认可通过,答辩评审分达到95分 2、该资源内项目代码都经过测试运行成功,功能ok的情况下才上传的,请放心下载使用! 3、本项目适合计算机相关专业(人工智能、通信工程、自动化、电子信息、物联网等)的在校学生、老师或者企业员工下载使用,也可作为毕业设计、课程设计、作业、项目初期立项演示等,当然也适合小白学习进阶。 4、如果基础还行,可以在此代码基础上进行修改,以实现其他功能,也可直接用于毕设、课设、作业等。 欢迎下载,沟通交流,互相学习,共同进步!
recommend-type

zigbee-cluster-library-specification

最新的zigbee-cluster-library-specification说明文档。
recommend-type

管理建模和仿真的文件

管理Boualem Benatallah引用此版本:布阿利姆·贝纳塔拉。管理建模和仿真。约瑟夫-傅立叶大学-格勒诺布尔第一大学,1996年。法语。NNT:电话:00345357HAL ID:电话:00345357https://theses.hal.science/tel-003453572008年12月9日提交HAL是一个多学科的开放存取档案馆,用于存放和传播科学研究论文,无论它们是否被公开。论文可以来自法国或国外的教学和研究机构,也可以来自公共或私人研究中心。L’archive ouverte pluridisciplinaire
recommend-type

实现实时数据湖架构:Kafka与Hive集成

![实现实时数据湖架构:Kafka与Hive集成](https://img-blog.csdnimg.cn/img_convert/10eb2e6972b3b6086286fc64c0b3ee41.jpeg) # 1. 实时数据湖架构概述** 实时数据湖是一种现代数据管理架构,它允许企业以低延迟的方式收集、存储和处理大量数据。与传统数据仓库不同,实时数据湖不依赖于预先定义的模式,而是采用灵活的架构,可以处理各种数据类型和格式。这种架构为企业提供了以下优势: - **实时洞察:**实时数据湖允许企业访问最新的数据,从而做出更明智的决策。 - **数据民主化:**实时数据湖使各种利益相关者都可
recommend-type

解释minorization-maximization (MM) algorithm,并给出matlab代码编写的例子

Minorization-maximization (MM) algorithm是一种常用的优化算法,用于求解非凸问题或含有约束的优化问题。该算法的基本思想是通过构造一个凸下界函数来逼近原问题,然后通过求解凸下界函数的最优解来逼近原问题的最优解。具体步骤如下: 1. 初始化参数 $\theta_0$,设 $k=0$; 2. 构造一个凸下界函数 $Q(\theta|\theta_k)$,使其满足 $Q(\theta_k|\theta_k)=f(\theta_k)$; 3. 求解 $Q(\theta|\theta_k)$ 的最优值 $\theta_{k+1}=\arg\min_\theta Q(
recommend-type

JSBSim Reference Manual

JSBSim参考手册,其中包含JSBSim简介,JSBSim配置文件xml的编写语法,编程手册以及一些应用实例等。其中有部分内容还没有写完,估计有生之年很难看到完整版了,但是内容还是很有参考价值的。