用python实现 pg库有一张表字段为pad_sys_ver, app_version, net_work,vehicle, upload_time, trigger_events, create_time, uuid, brand, platform, delivery_status 使用连接池分批读取数据 拆分其中trigger_events字段,使用copy_from入库,增加异常情况处理 该字段数据如下 "[{\"eventId\":\"insertCarOffOrOnInf\",\"eventTime\":1679499600554,\"eventValue\":\"{\\\"mileage\\\":\\\"\\\",\\\"networkStatus\\\":\\\"0\\\",\\\"offOrOn\\\":\\\"0\\\"}\",\"id\":2770},{\"eventId\":\"insertCarOffOrOnInf\",\"eventTime\":1679500508874,\"eventValue\":\"{\\\"mileage\\\":\\\"\\\",\\\"networkStatus\\\":\\\"0\\\",\\\"offOrOn\\\":\\\"1\\\"}\",\"id\":2771}]"
时间: 2023-05-22 20:06:20 浏览: 82
可以使用Python中的psycopg2库连接到pg数据库,然后使用连接池进行分批读取数据。接着,针对trigger_events字段,可以使用json.loads()将其转换为python字典,然后再使用copy_from将其入库。如果出现异常情况,可以使用try-except语句进行处理。
以下是一份可能的Python代码:
import psycopg2.pool
import json
pool = psycopg2.pool.SimpleConnectionPool(minconn=1, maxconn=10,
dbname='dbname', user='user',
password='password',
host='host', port='port')
with pool.getconn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT pad_sys_ver, app_version, net_work, vehicle, upload_time, trigger_events, create_time, uuid, brand, platform, delivery_status FROM mytable")
while True:
rows = cur.fetchmany(100)
if not rows:
break
for row in rows:
trigger_events = json.loads(row[5])
for event in trigger_events:
try:
cur.copy_from(event, 'mytable_trigger_events', sep=',')
except Exception as e:
print(f"Error occurred: {e}")
pool.closeall()
阅读全文