使用akshare获取股票数据强化学习完整代码
时间: 2024-05-12 11:15:48 浏览: 129
以下是使用akshare获取股票数据并进行强化学习的完整代码示例:
```python
import akshare as ak
import pandas as pd
import numpy as np
import random
from collections import deque
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, LSTM
from tensorflow.keras.optimizers import Adam
# 获取股票数据
stock_zh_a_daily = ak.stock_zh_a_daily(symbol='sh600000', adjust="hfq")
df = pd.DataFrame(stock_zh_a_daily)
df = df[['open', 'high', 'low', 'close', 'volume']]
df['H-L'] = df['high'] - df['low']
df['O-C'] = df['open'] - df['close']
df = df[['close', 'H-L', 'O-C', 'volume']]
df = df[-1000:] # 取最近1000天的数据
print(df.head())
# 定义强化学习环境
class Environment:
def __init__(self, data, initial_investment=20000):
self.actions = ["buy", "sell", "hold"]
self.data = data
self.n_step = len(self.data) - 1
self.initial_investment = initial_investment
self.current_step = None
self.stock_owned = None
self.cash_in_hand = None
self.action_history = []
self.state_history = []
self.reward_history = []
self.value_history = []
def reset(self):
self.current_step = 0
self.stock_owned = 0
self.cash_in_hand = self.initial_investment
self.action_history = []
self.state_history = []
self.reward_history = []
self.value_history = []
return self._get_observation()
def step(self, action):
assert action in self.actions
# 更新当前持仓
prev_val = self._get_val()
self.action_history.append(action)
self.current_step += 1
self.state_history.append(self._get_observation())
if action == "sell":
self.stock_owned -= 1
self.cash_in_hand += self.data.iloc[self.current_step]['close']
self.reward_history.append(self.data.iloc[self.current_step]['close'] - prev_val)
elif action == "buy":
self.stock_owned += 1
self.cash_in_hand -= self.data.iloc[self.current_step]['close']
self.reward_history.append(prev_val - self.data.iloc[self.current_step]['close'])
else:
self.reward_history.append(0)
self.value_history.append(self._get_val())
done = (self.current_step == self.n_step)
info = {"stock_owned": self.stock_owned, "cash_in_hand": self.cash_in_hand}
return self._get_observation(), sum(self.reward_history), done, info
def _get_observation(self):
obs = np.array([self.data.iloc[self.current_step]['close'], self.data.iloc[self.current_step]['H-L'],
self.data.iloc[self.current_step]['O-C'], self.data.iloc[self.current_step]['volume']])
return obs
def _get_val(self):
return self.stock_owned * self.data.iloc[self.current_step]['close'] + self.cash_in_hand
# 构建神经网络模型
class ReplayBuffer:
def __init__(self, obs_dim, act_dim, size=int(1e6)):
self.obs_buf = np.zeros([size, obs_dim], dtype=np.float32)
self.act_buf = np.zeros([size, act_dim], dtype=np.float32)
self.rew_buf = np.zeros(size, dtype=np.float32)
self.next_obs_buf = np.zeros([size, obs_dim], dtype=np.float32)
self.done_buf = np.zeros(size, dtype=np.float32)
self.ptr, self.size, self.max_size = 0, 0, size
def store(self, obs, act, rew, next_obs, done):
self.obs_buf[self.ptr] = obs
self.act_buf[self.ptr] = act
self.rew_buf[self.ptr] = rew
self.next_obs_buf[self.ptr] = next_obs
self.done_buf[self.ptr] = done
self.ptr = (self.ptr+1) % self.max_size
self.size = min(self.size+1, self.max_size)
def sample_batch(self, batch_size=32):
idxs = np.random.randint(0, self.size, size=batch_size)
return dict(obs=self.obs_buf[idxs],
act=self.act_buf[idxs],
rew=self.rew_buf[idxs],
next_obs=self.next_obs_buf[idxs],
done=self.done_buf[idxs])
def get_model(obs_dim, act_dim):
model = Sequential([
Dense(256, input_shape=(obs_dim,), activation='relu'),
Dense(256, activation='relu'),
Dense(256, activation='relu'),
Dense(act_dim)
])
return model
# 训练模型
def train():
env = Environment(df)
obs_dim = env.reset().shape[0]
act_dim = len(env.actions)
model = get_model(obs_dim, act_dim)
target_model = get_model(obs_dim, act_dim)
target_model.set_weights(model.get_weights())
replay_buffer = ReplayBuffer(obs_dim=obs_dim, act_dim=act_dim)
def get_action(state, epsilon):
if random.random() < epsilon:
return random.choice(env.actions)
else:
state = np.expand_dims(state, axis=0)
q_values = model.predict(state)
return env.actions[np.argmax(q_values)]
def compute_loss(batch):
obs, act, rew, next_obs, done = batch['obs'], batch['act'], batch['rew'], batch['next_obs'], batch['done']
target_q = target_model.predict(next_obs).max(axis=1)
target_q = rew + (1-done) * gamma * target_q
q = model.predict(obs)
q = tf.reduce_sum(q * tf.one_hot(act, act_dim), axis=1)
loss = tf.reduce_mean((q - target_q)**2)
return loss
optimizer = Adam(learning_rate=lr)
epsilon = 1.0
gamma = 0.99
batch_size = 32
updates_per_step = 10
update_target_every = 2000
replay_start_size = 10000
total_timesteps = 200000
start_steps = 10000
step_count = 0
episode_reward = 0
episode_timesteps = 0
state = env.reset()
for i in range(total_timesteps):
if i < start_steps:
action = random.choice(env.actions)
else:
action = get_action(state, epsilon)
next_state, reward, done, info = env.step(action)
replay_buffer.store(state, env.actions.index(action), reward, next_state, done)
state = next_state
episode_reward += reward
episode_timesteps += 1
step_count += 1
if len(replay_buffer.obs_buf) < replay_start_size:
continue
if step_count % updates_per_step == 0:
for j in range(updates_per_step):
batch = replay_buffer.sample_batch(batch_size=batch_size)
loss = compute_loss(batch)
grads = tf.gradients(loss, model.trainable_variables)
optimizer.apply_gradients(zip(grads, model.trainable_variables))
if step_count % update_target_every == 0:
target_model.set_weights(model.get_weights())
if done or (episode_timesteps == env.n_step):
print('Episode: {}, episode reward: {}, episode timesteps: {}'.format(i, episode_reward, episode_timesteps))
state = env.reset()
episode_reward, episode_timesteps = 0, 0
train()
```
这段代码中,我们首先使用akshare获取上证指数的股票数据,并对数据进行处理。然后我们定义了一个强化学习环境类`Environment`,可以根据当前的股票数据状态,选择买入、卖出或持有不动。接下来我们用神经网络模型来拟合这个环境,并使用经验回放缓存`ReplayBuffer`来加速训练。最后,我们使用`train()`函数来训练模型,并输出训练过程中的奖励和步数信息。
阅读全文