import cx_Oracle # 连接Oracle数据库 dsn = cx_Oracle.makedsn(host='localhost', port=1521, sid='orcl') connection = cx_Oracle.connect(user='yourusername', password='yourpassword', dsn=dsn) # 插入数据并校验 cursor = connection.cursor() try: for batch in batches: cursor.execute("BEGIN") for row in batch: cursor.execute("INSERT INTO yourtable (col1, col2, col3) VALUES (:1, :2, :3)", row) cursor.execute("COMMIT") # 数据校验 cursor.execute("SELECT count(*) FROM yourtable") count = cursor.fetchone()[0] if count != len(results): raise Exception("数据校验失败") except Exception as e: print(e) connection.rollback() finally: cursor.close() connection.close(),以上代码 BEGIN 和 COMMIT 操作在循环内部,如果其他批次出现异常,会将其他已经提交的事务也回滚吗,帮我再输出一份有注释的代码,使用sqlalchemy实现
时间: 2024-03-23 08:41:49 浏览: 57
以下是使用 `sqlalchemy` 实现的代码,其中每个批次都在自己的事务中执行,如果一个批次出现异常,只会回滚当前批次的操作,而不会影响到其他批次已经提交的操作:
```
import cx_Oracle
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# 连接Oracle数据库
dsn = cx_Oracle.makedsn(host='localhost', port=1521, sid='orcl')
engine = create_engine(f"oracle+cx_oracle://yourusername:yourpassword@{dsn}")
# 创建Session
Session = sessionmaker(bind=engine)
# 插入数据并校验
try:
session = Session()
for batch in batches:
tx = session.begin() # 启动事务
try:
for row in batch:
session.execute("INSERT INTO yourtable (col1, col2, col3) VALUES (:1, :2, :3)", row)
session.commit() # 提交事务
except Exception as e:
session.rollback() # 回滚事务
raise e
finally:
tx.close()
# 数据校验
count = session.query(func.count('*')).select_from(YourTable).scalar()
if count != len(results):
raise Exception("数据校验失败")
except Exception as e:
print(e)
finally:
session.close()
engine.dispose()
```
在上面的代码中,每个批次都在自己的事务中执行。在每个批次的循环中,使用 `session.begin()` 启动一个新的事务,并使用 `session.commit()` 提交事务。如果在批次中发生异常,将会执行 `session.rollback()` 回滚当前事务,并抛出异常。在每个批次结束后,使用 `tx.close()` 关闭当前事务。
在循环结束后,使用 `session.query(func.count('*')).select_from(YourTable).scalar()` 查询表中的记录数,并与 `len(results)` 进行比较,以校验插入数据的结果。
最后,使用 `session.close()` 关闭 `Session`,并使用 `engine.dispose()` 释放数据库连接。
阅读全文