python操作数据库时,若cursor.execute(sql)用sql语句清空数据后使用 df_upload.to_sql(upload_name, engine, if_exists="append", index=False)追加数据,实现中途出错即rollback
时间: 2024-02-27 17:58:20 浏览: 144
如果在执行 `cursor.execute(sql)` 后,出现了错误,需要进行 rollback 操作,可以使用以下代码实现:
```python
import sqlalchemy
engine = sqlalchemy.create_engine('数据库连接字符串') # 连接数据库
conn = engine.connect()
trans = conn.begin()
try:
# 执行清空数据的 sql 语句
cursor.execute(sql)
# 执行追加数据的操作
df_upload.to_sql(upload_name, engine, if_exists="append", index=False)
# 提交事务
trans.commit()
except:
# 回滚事务
trans.rollback()
finally:
# 关闭数据库连接
conn.close()
```
其中,我们使用了 SQLAlchemy 库连接数据库,并使用 `conn.begin()` 开启事务,如果在事务执行过程中出现错误,就会执行 `trans.rollback()` 进行回滚,否则执行 `trans.commit()` 提交事务。这样可以保证数据的一致性和完整性。
相关问题
sqlalchemy实现python操作数据库时,若cursor.execute(sql)用sql语句清空数据后使用 df_upload.to_sql(upload_name, engine, if_exists="append", index=False)追加数据,实现中途出错即rollback
在使用`sqlalchemy`操作数据库时,可以使用上下文管理器`with`来实现自动提交或回滚事务。在这种情况下,如果在清空数据之后执行`df_upload.to_sql()`方法时出现错误,则会自动回滚并撤销对数据库的更改。
下面是一个示例代码:
```
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import IntegrityError
engine = create_engine('mysql+pymysql://username:password@host:port/dbname')
Session = sessionmaker(bind=engine)
# 创建数据库会话
with Session() as session:
# 开始事务
with session.begin():
try:
# 清空数据
session.execute('TRUNCATE TABLE table_name')
# 追加数据
df_upload.to_sql(upload_name, engine, if_exists="append", index=False)
except IntegrityError as e:
# 出现错误时回滚事务
session.rollback()
print(f"Error: {str(e)}")
except Exception as e:
# 其他异常时也回滚事务
session.rollback()
print(f"Error: {str(e)}")
else:
# 没有出现错误时提交事务
session.commit()
print("Data uploaded successfully.")
```
在上面的代码中,我们首先创建了一个`Session`对象,并将其绑定到数据库引擎上。然后使用`with`语句创建了一个数据库会话。在这个会话中,我们使用`with session.begin()`语句开始了一个事务。在事务中,我们首先清空了数据表,然后使用`df_upload.to_sql()`方法追加数据。如果出现了`IntegrityError`异常(比如插入了重复的数据),则会回滚事务并输出错误信息。如果没有出现异常,则会提交事务并输出成功信息。
请问以下代码有何问题? try: db = pymysql.Connect(host='10.32.158.162', port=3306, user='dscuser', passwd='dscuser_123', db='COST') if y == "岗位费用": df_upload = pd.melt(df_upload, id_vars=['project', 'role'], value_vars=["2023", "2024", "2025", "2026", "2027", "2028", "2029", "上一版"], var_name='year', value_name='value') elif y == "年度项目月份工时": df_upload = pd.melt(df_upload, id_vars=['Project', 'Function'], value_vars=temp, var_name='year', value_name='value') df_upload = df_upload.dropna() df_upload = df_upload.drop_duplicates() df_upload = df_upload[(df_upload['value'] != 0) & (df_upload['value'].notna())] else: sql = "delete from COST.{} where project = '{}';".format(upload_name, project_name) cursor = db.cursor() cursor.execute(sql) # db.commit() # db.close() col_names = list(df_upload) for col in col_names: df_upload[col] = df_upload[col].astype('string', copy=False) df_upload.to_sql(upload_name, engine, if_exists="append", index=False) else: # db.commit() # db.close() error = error msg = "" except Exception as e: db.rollback() msg = str(e)[0:100] else: db.commit() finally: db.close()
这段代码存在一些问题:
1. 引入的库`pandas`和`pymysql`没有看到在哪里导入,需要在代码开头进行导入。
2. 在执行SQL语句时,没有使用参数化查询,容易出现SQL注入漏洞。应该使用参数化查询,例如使用`cursor.execute(sql, params)`方法。其中`params`是一个元组或字典类型,用于替换SQL语句中的参数。
3. 在`try`语句块中,如果`y`不等于"岗位费用"和"年度项目月份工时",则会执行`else`语句块,但是该语句块中没有对`df_upload`进行处理,会导致后面的代码出现错误。
4. 在`else`语句块中,如果使用`cursor`对象进行数据库操作,则应该在操作完成后调用`cursor.close()`方法释放资源。
5. 在`except`语句块中,`error = error`这行代码没有实际作用,应该删除。
下面是修改后的代码:
```
import pandas as pd
import pymysql
try:
db = pymysql.Connect(host='10.32.158.162', port=3306, user='dscuser', passwd='dscuser_123', db='COST')
if y == "岗位费用":
df_upload = pd.melt(df_upload, id_vars=['project', 'role'], value_vars=["2023", "2024", "2025", "2026", "2027", "2028", "2029", "上一版"], var_name='year', value_name='value')
elif y == "年度项目月份工时":
df_upload = pd.melt(df_upload, id_vars=['Project', 'Function'], value_vars=temp, var_name='year', value_name='value')
df_upload = df_upload.dropna()
df_upload = df_upload.drop_duplicates()
df_upload = df_upload[(df_upload['value'] != 0) & (df_upload['value'].notna())]
else:
sql = "delete from COST.{} where project = %s;".format(upload_name)
cursor = db.cursor()
cursor.execute(sql, (project_name,))
cursor.close()
col_names = list(df_upload)
for col in col_names:
df_upload[col] = df_upload[col].astype('string', copy=False)
df_upload.to_sql(upload_name, engine, if_exists="append", index=False)
except Exception as e:
db.rollback()
msg = str(e)[0:100]
else:
db.commit()
finally:
db.close()
```
在上面的代码中,我们首先导入了`pandas`和`pymysql`库。在`try`语句块中,我们首先创建了一个数据库连接`db`,然后根据不同的条件对`df_upload`进行处理。如果`y`等于"岗位费用",则使用`pd.melt()`方法进行数据重构。如果`y`等于"年度项目月份工时",则使用`pd.melt()`方法和`temp`变量进行数据重构。然后对数据进行清洗,去除空值和重复值,以及`value`列中的0值和空值。
如果`y`不等于"岗位费用"和"年度项目月份工时",则执行`else`语句块。在该语句块中,我们使用参数化查询的方式执行SQL语句,避免了SQL注入漏洞。然后将`df_upload`中的数据类型转换为字符串类型,并使用`df_upload.to_sql()`方法将数据写入数据库。
在`except`语句块中,如果出现异常,则会回滚事务并输出错误信息。在`else`语句块中,如果没有出现异常,则会提交事务。最后在`finally`语句块中,关闭数据库连接。
阅读全文