python 保存df to.sql时候,df的trade_date列格式改为时间格式
时间: 2024-02-09 14:08:55 浏览: 104
你可以使用 pandas.to_datetime() 方法将 trade_date 列的格式转换为时间格式,然后再保存到 SQL 数据库中。示例代码如下:
```python
import pandas as pd
from sqlalchemy import create_engine
# 读取数据到 DataFrame
df = pd.read_csv('your_data.csv')
# 将 trade_date 列转换为时间格式
df['trade_date'] = pd.to_datetime(df['trade_date'])
# 保存到 SQL 数据库中
engine = create_engine('your_sql_connection_string')
df.to_sql('your_table_name', con=engine, if_exists='replace', index=False)
```
其中,your_data.csv 是你要保存的数据文件,your_sql_connection_string 是你的 SQL 数据库连接字符串,your_table_name 是你要保存到的表名。如果你的表已经存在,可以使用 if_exists 参数指定覆盖原表。如果你的数据中没有索引列,可以将 index 参数设置为 False。
相关问题
for ts_code in ths_index_list: #行业分类到个股行业表格 industry = pd.read_sql_table(table_name= ts_code, con=con2) industry_member_list = industry['code'].tolist() for code in industry_member_list:#各个行业到行业内部所有股票 try:#解决member未更新退市股数据,在moneyflow查找无表格报错----------------------OK stock_daily = pd.read_sql('SELECT ts_code, trade_date, net_mf_amount FROM `{}`'.format(code), con=con6) #############需要解决 不同股票交易时间数不一致,筛选的时间早但表格无这个时间怎么解决 以及 交易时间存在,但停牌无数据null处理问题 # df_sum = stock_daily.groupby('trade_date')['net_mf_amount'].sum().reset_index() except : pass 最内层的for循环,对每个股票的net列 按照 trade_date列的时间顺序 累加 输出结果为 trade_date列 和 总和,总和指的是相同时间不同股票net值相加
根据你的描述,你想在最内层的循环中,对每个股票的"net_mf_amount"列按照"trade_date"列的时间顺序进行累加,并输出结果包含"trade_date"列和对应时间点的总和。以下是一个示例代码来实现这个逻辑:
```python
import pandas as pd
result = pd.DataFrame() # 创建一个空的DataFrame用于保存结果
for ts_code in ths_index_list:
industry = pd.read_sql_table(table_name=ts_code, con=con2)
industry_member_list = industry['code'].tolist()
for code in industry_member_list:
try:
stock_daily = pd.read_sql('SELECT ts_code, trade_date, net_mf_amount FROM `{}`'.format(code), con=con6)
# 按照时间顺序对net_mf_amount进行累加
df_sum = stock_daily.groupby('trade_date')['net_mf_amount'].sum().reset_index()
# 合并结果到总的DataFrame中
result = pd.concat([result, df_sum], ignore_index=True)
except:
pass
# 打印结果
print(result)
```
这段代码将在最内层的循环中,使用`groupby`方法按照"trade_date"列对"net_mf_amount"进行分组,并计算每个日期对应的总和。然后,将每个股票的结果合并到总的DataFrame中。最后,打印出结果。
请注意,这只是一个示例代码,具体实现可能需要根据你的数据结构和需求进行调整。如果你有具体的问题或需要更多的帮助,请告诉我。
import pandas as pd from sqlalchemy import create_engine # 连接到数据库 engine = create_engine('mysql+pymysql://user:password@localhost/database') # 获取所有表格的名称 with engine.connect() as conn, conn.begin(): tables = conn.execute("SHOW TABLES").fetchall() # 遍历所有表格 for table in tables: table_name = table[0] table_name_quoted = '' + table_name + '' # 检查是否存在名为'a'的列,如果不存在则添加'a'和'b'列 with engine.connect() as conn, conn.begin(): a_column = conn.execute("SHOW COLUMNS FROM " + table_name_quoted + " LIKE 'a'").fetchone() if a_column is None: conn.execute("ALTER TABLE " + table_name_quoted + " ADD COLUMN a DECIMAL(10,2)") conn.execute("ALTER TABLE " + table_name_quoted + " ADD COLUMN b DECIMAL(10,2)") # 查询net_mf_amount列的数据 query = "SELECT trade_date, net_mf_amount FROM " + table_name_quoted + " ORDER BY trade_date DESC" df = pd.read_sql_query(query, engine) # 计算a和b列 a_column = [] b_column = [] for i in range(len(df)): if i == 0: a_column.append(None) b_column.append(None) else: if pd.notnull(df.iloc[i]['net_mf_amount']) and pd.notnull(df.iloc[i-1]['net_mf_amount']): if i-2 >= 0: if pd.notnull(df.iloc[i-2]['net_mf_amount']): a = df.iloc[i]['net_mf_amount'] - df.iloc[i-1]['net_mf_amount'] b = df.iloc[i]['net_mf_amount'] - df.iloc[i-2]['net_mf_amount'] a_column.append(a) b_column.append(b) else: j = i-3 while j >= 0: if pd.notnull(df.iloc[j]['net_mf_amount']): a = df.iloc[i]['net_mf_amount'] - df.iloc[i-1]['net_mf_amount'] b = df.iloc[i]['net_mf_amount'] - df.iloc[j]['net_mf_amount'] a_column.append(a) b_column.append(b) break j -= 1 else: a = df.iloc[i]['net_mf_amount'] - df.iloc[i-1]['net_mf_amount'] b = None a_column.append(a) b_column.append(b) else: a_column.append(None) b_column.append(None) # 将结果保存到数据库 with engine.connect() as conn, conn.begin(): for i in range(len(df)): conn.execute("UPDATE " + table_name_quoted + " SET a=%s, b=%s WHERE trade_date=%s", (a_column[i], b_column[i], df.iloc[i]['trade_date'])) # 关闭连接 engine.dispose() 有5000个表格,使用多线程,线程池
可以使用Python内置的`concurrent.futures`模块实现多线程处理。具体实现可以参考以下代码:
```
import concurrent.futures
def process_table(table_name):
table_name_quoted = '' + table_name + ''
with engine.connect() as conn, conn.begin():
a_column = conn.execute("SHOW COLUMNS FROM " + table_name_quoted + " LIKE 'a'").fetchone()
if a_column is None:
conn.execute("ALTER TABLE " + table_name_quoted + " ADD COLUMN a DECIMAL(10,2)")
conn.execute("ALTER TABLE " + table_name_quoted + " ADD COLUMN b DECIMAL(10,2)")
query = "SELECT trade_date, net_mf_amount FROM " + table_name_quoted + " ORDER BY trade_date DESC"
df = pd.read_sql_query(query, engine)
a_column = []
b_column = []
for i in range(len(df)):
if i == 0:
a_column.append(None)
b_column.append(None)
else:
if pd.notnull(df.iloc[i]['net_mf_amount']) and pd.notnull(df.iloc[i-1]['net_mf_amount']):
if i-2 >= 0:
if pd.notnull(df.iloc[i-2]['net_mf_amount']):
a = df.iloc[i]['net_mf_amount'] - df.iloc[i-1]['net_mf_amount']
b = df.iloc[i]['net_mf_amount'] - df.iloc[i-2]['net_mf_amount']
a_column.append(a)
b_column.append(b)
else:
j = i-3
while j >= 0:
if pd.notnull(df.iloc[j]['net_mf_amount']):
a = df.iloc[i]['net_mf_amount'] - df.iloc[i-1]['net_mf_amount']
b = df.iloc[i]['net_mf_amount'] - df.iloc[j]['net_mf_amount']
a_column.append(a)
b_column.append(b)
break
j -= 1
else:
a = df.iloc[i]['net_mf_amount'] - df.iloc[i-1]['net_mf_amount']
b = None
a_column.append(a)
b_column.append(b)
else:
a_column.append(None)
b_column.append(None)
with engine.connect() as conn, conn.begin():
for i in range(len(df)):
conn.execute("UPDATE " + table_name_quoted + " SET a=%s, b=%s WHERE trade_date=%s", (a_column[i], b_column[i], df.iloc[i]['trade_date']))
print("Processed table:", table_name)
if __name__ == '__main__':
engine = create_engine('mysql+pymysql://user:password@localhost/database')
with engine.connect() as conn, conn.begin():
tables = conn.execute("SHOW TABLES").fetchall()
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
executor.map(process_table, [table[0] for table in tables])
engine.dispose()
```
在`main`函数中,首先获取所有表格的名称,然后使用`ThreadPoolExecutor`创建一个最大线程数为10的线程池。使用`map`方法将`process_table`函数和表格名称列表传入,线程池会自动分配任务并执行。每个线程都会执行`process_table`函数,对一张表格进行处理。
在`process_table`函数中,首先检查是否存在名为'a'的列,如果不存在则添加'a'和'b'列。然后从数据库中查询数据到`DataFrame`中,并对每一行数据计算'a'和'b'列的值。最后将结果更新到数据库中。每个线程在处理完一张表格后,会输出一条信息,表示该表格处理完成。
阅读全文