def add_audit_columns(self, dataframe, write_params): try: print(write_params) audit_source_sys_name = write_params.get("audit_src_sys_name", "") or write_params.get( "table_full_name") or write_params.get("data_source_name", "") print("audit_source_sys_name - " + audit_source_sys_name) if ("audit_src_sys_name".upper() in (name.upper() for name in dataframe.columns)): print("audit_src_sys_name column is present. Adding present layer details....") audit_dataframe = DataSink_with_audit(self.spark).add_audit_col_append(dataframe, audit_source_sys_name, write_params) print("Tables is updated.") else: print("Adding column - audit_src_sys_name.") audit_dataframe = DataSink_with_audit(self.spark).add_audit_col_fill(dataframe, audit_source_sys_name, write_params) print("audit_src_sys_name added.") print("Added Audit Cols") return audit_dataframe except Exception as e: raise Exception("job failed with error {}".format(e))
时间: 2024-02-14 07:16:52 浏览: 26
根据您提供的Python函数,我为您编写了一个MySQL存储过程的示例,但是需要根据您的具体需求进行修改和调整。这个存储过程的功能是向数据表中添加一个名为audit_src_sys_name的列,同时向这个列中填充相应的值。
```
CREATE PROCEDURE add_audit_columns(
IN df_column_names VARCHAR(255),
IN df_values VARCHAR(255),
IN write_params TEXT,
OUT audit_dataframe TEXT
)
BEGIN
DECLARE audit_source_sys_name VARCHAR(255);
DECLARE column_present BOOLEAN;
DECLARE sql_stmt TEXT;
SET audit_source_sys_name = JSON_EXTRACT(write_params, '$.audit_src_sys_name');
IF audit_source_sys_name IS NULL OR audit_source_sys_name = '' THEN
SET audit_source_sys_name = JSON_EXTRACT(write_params, '$.table_full_name');
END IF;
SET column_present = FALSE;
SET sql_stmt = CONCAT('SELECT COUNT(*) INTO @exists FROM information_schema.columns WHERE table_name = \'', df_column_names, '\' AND column_name = \'audit_src_sys_name\'');
PREPARE stmt FROM sql_stmt;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
IF @exists > 0 THEN
SET column_present = TRUE;
SET sql_stmt = CONCAT('UPDATE ', df_column_names, ' SET audit_src_sys_name = \'', audit_source_sys_name, '\'');
ELSE
SET sql_stmt = CONCAT('ALTER TABLE ', df_column_names, ' ADD COLUMN audit_src_sys_name VARCHAR(255) DEFAULT NULL AFTER ', df_values);
PREPARE stmt FROM sql_stmt;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
END IF;
IF column_present THEN
SET audit_dataframe = CONCAT('Audit source system name column is present. Adding present layer details....\n');
SET sql_stmt = CONCAT('UPDATE ', df_column_names, ' SET audit_src_sys_name = CONCAT(audit_src_sys_name, \', \', \'', audit_source_sys_name, '\')');
ELSE
SET audit_dataframe = CONCAT('Adding column - audit_src_sys_name.\n');
SET sql_stmt = CONCAT('UPDATE ', df_column_names, ' SET audit_src_sys_name = \'', audit_source_sys_name, '\'');
END IF;
PREPARE stmt FROM sql_stmt;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET audit_dataframe = CONCAT(audit_dataframe, 'Tables is updated.\n');
END
```
这个存储过程的输入参数包括数据表的列名、值、以及一个JSON格式的字符串write_params,其中包含各种写入参数信息。输出参数是一个JSON格式的字符串audit_dataframe,包含存储过程的执行结果信息。请注意,这只是一个示例,您需要根据实际情况进行修改和调整。