python 使用连接池、分批处理mysql遍历每个表格,表格要加反引号根据trade_date降序排列,对net_mf_amount列的前一天的数据减去今天的数据放在a列,如果之前计算过则跳过,并且保存
时间: 2024-02-09 07:10:53 浏览: 79
好的,关于你的问题,我可以给你一些思路和代码实现。
首先需要连接池,可以使用第三方库`pymysql`和`DBUtils`来实现:
```python
import pymysql
from dbutils.pooled_db import PooledDB
pool = PooledDB(
creator=pymysql, # 使用pymysql作为连接库
maxconnections=5, # 连接池最大连接数
mincached=2, # 初始化时,连接池中至少创建的空闲连接,0表示不创建
maxcached=5, # 连接池中最多闲置的连接,0表示不限制
maxshared=0, # 连接池中最多共享的连接数量,0表示全共享
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待
maxusage=None, # 一个连接最多被重复使用多少次,None表示不限制
setsession=[], # 连接建立后需要执行的命令列表
ping=0, # ping MySQL服务器检查是否服务可用,0表示不检查,1表示检查
host='127.0.0.1',
port=3306,
user='root',
password='root',
database='test'
)
conn = pool.connection() # 从连接池中获取一个连接
cursor = conn.cursor()
```
然后需要分批处理每个表格,可以使用`show tables`命令获取所有表格,再使用`desc tablename`命令获取表格结构,最后使用`select`命令遍历每个表格的数据:
```python
cursor.execute("show tables;")
tables = cursor.fetchall()
for table in tables:
table = table[0]
cursor.execute(f"desc {table};")
columns = [column[0] for column in cursor.fetchall()]
columns_str = ",".join([f"`{column}`" for column in columns])
cursor.execute(f"select {columns_str} from `{table}` order by `trade_date` desc;")
results = cursor.fetchall()
for i in range(0, len(results), batch_size):
batch_results = results[i:i+batch_size]
# 处理每个批次的数据
```
最后需要对`net_mf_amount`列的前一天的数据减去今天的数据放在`a`列,如果之前计算过则跳过,并且保存,可以使用`pandas`库来处理数据:
```python
import pandas as pd
df = pd.DataFrame(batch_results, columns=columns)
df['trade_date'] = pd.to_datetime(df['trade_date'])
df = df.sort_values('trade_date', ascending=False).reset_index(drop=True)
if 'a' not in df.columns:
df['a'] = 0
else:
df = df[df['a'] == 0].reset_index(drop=True)
for i in range(1, len(df)):
if df.loc[i, 'a'] != 0:
continue
df.loc[i, 'a'] = df.loc[i-1, 'net_mf_amount'] - df.loc[i, 'net_mf_amount']
df = df[['trade_date', 'net_mf_amount', 'a']]
df.to_sql(name=table, con=engine, if_exists='replace')
```
完整的代码如下:
```python
import pymysql
from dbutils.pooled_db import PooledDB
import pandas as pd
pool = PooledDB(
creator=pymysql, # 使用pymysql作为连接库
maxconnections=5, # 连接池最大连接数
mincached=2, # 初始化时,连接池中至少创建的空闲连接,0表示不创建
maxcached=5, # 连接池中最多闲置的连接,0表示不限制
maxshared=0, # 连接池中最多共享的连接数量,0表示全共享
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待
maxusage=None, # 一个连接最多被重复使用多少次,None表示不限制
setsession=[], # 连接建立后需要执行的命令列表
ping=0, # ping MySQL服务器检查是否服务可用,0表示不检查,1表示检查
host='127.0.0.1',
port=3306,
user='root',
password='root',
database='test'
)
batch_size = 1000
conn = pool.connection() # 从连接池中获取一个连接
cursor = conn.cursor()
cursor.execute("show tables;")
tables = cursor.fetchall()
for table in tables:
table = table[0]
cursor.execute(f"desc {table};")
columns = [column[0] for column in cursor.fetchall()]
columns_str = ",".join([f"`{column}`" for column in columns])
cursor.execute(f"select {columns_str} from `{table}` order by `trade_date` desc;")
results = cursor.fetchall()
for i in range(0, len(results), batch_size):
batch_results = results[i:i+batch_size]
df = pd.DataFrame(batch_results, columns=columns)
df['trade_date'] = pd.to_datetime(df['trade_date'])
df = df.sort_values('trade_date', ascending=False).reset_index(drop=True)
if 'a' not in df.columns:
df['a'] = 0
else:
df = df[df['a'] == 0].reset_index(drop=True)
for i in range(1, len(df)):
if df.loc[i, 'a'] != 0:
continue
df.loc[i, 'a'] = df.loc[i-1, 'net_mf_amount'] - df.loc[i, 'net_mf_amount']
df = df[['trade_date', 'net_mf_amount', 'a']]
df.to_sql(name=table, con=engine, if_exists='replace')
cursor.close()
conn.close()
```
希望能对你有所帮助!
阅读全文