new_insert_df = DataSink_with_audit(self.spark).add_audit_columns(new_insert_df, param_dict)
时间: 2023-10-05 19:08:21 浏览: 75
param_dict是一个字典,包含了一些参数,用于指定审计列的名称、值等信息。具体来说,param_dict中可能包含以下键值对:
- "audit_user": 表示执行操作的用户;
- "audit_date": 表示执行操作的时间;
- "audit_application": 表示执行操作的应用程序;
- "audit_comment": 表示执行操作的注释或说明。
这些参数可以根据实际需要进行设置,用于记录操作的元数据信息。在add_audit_columns()方法中,会根据这些参数添加对应的审计列。
相关问题
if self.config.load_type == "INC": # adhoc hist job do not need to join landing merge table try: landing_merge_df = self.spark.read.format(self.config.destination_file_type). \ load(self.config.destination_data_path) # dataframe for updated records df = df.drop("audit_batch_id", "audit_job_id", "audit_src_sys_name", "audit_created_usr", "audit_updated_usr", "audit_created_tmstmp", "audit_updated_tmstmp") # dataframe for newly inserted records new_insert_df = df.join(landing_merge_df, primary_keys_list, "left_anti") self.logger.info(f"new_insert_df count: {new_insert_df.count()}") new_insert_df = DataSink_with_audit(self.spark).add_audit_columns(new_insert_df, param_dict) update_df = df.alias('l').join(landing_merge_df.alias('lm'), on=primary_keys_list, how="inner") update_df = update_df.select("l.*", "lm.audit_batch_id", "lm.audit_job_id", "lm.audit_src_sys_name", "lm.audit_created_usr", "lm.audit_updated_usr", "lm.audit_created_tmstmp", "lm.audit_updated_tmstmp") self.logger.info(f"update_df count : {update_df.count()}") update_df = DataSink_with_audit(self.spark).update_audit_columns(update_df, param_dict) # dataframe for unchanged records unchanged_df = landing_merge_df.join(df, on=primary_keys_list, how="left_anti") self.logger.info(f"unchanged_records_df count : {unchanged_df.count()}") final_df = new_insert_df.union(update_df).union(unchanged_df) print("final_df count : ", final_df.count()) except AnalysisException as e: if e.desc.startswith('Path does not exist'): self.logger.info('landing merge table not exists. will skip join landing merge') final_df = DataSink_with_audit(self.spark).add_audit_columns(df, param_dict) else: self.logger.error(f'unknown error: {e.desc}') raise e else: final_df = DataSink_with_audit(self.spark).add_audit_columns(df, param_dict) return final_df
这是一段Python代码,其中包含一个类方法的实现。该方法根据配置参数的不同,从一个特定的数据路径中将数据加载到一个Spark DataFrame中,并对该数据进行一些操作,最终返回一个具有审计列的DataFrame。如果配置参数是"INC",则会执行一些数据合并的操作,包括添加、更新和未更改的记录,并对这些记录添加审计列。如果配置参数是其他值,则只会添加审计列。
def transform(self, df: DataFrame) -> DataFrame: """ add audit col to dataframe :param df: :return: """ param_dict = { "job_id": self.config.job_id, "batch_id": self.config.batch_id, "data_source_name": self.config.data_source_name, "table_full_name": self.config.table_full_name, "audit_created_usr": "airflow_user", "audit_updated_usr": "airflow_user", "audit_src_sys_name": "SAP_X79_EDWS" } if "audit_created_usr" in df.columns: df = DataSink_with_audit(self.spark).update_audit_columns(df, param_dict) else: df = DataSink_with_audit(self.spark).add_audit_columns(df, param_dict) return df
这段代码实现了一个 DataFrame 数据的转换方法,主要是在 DataFrame 中添加了一列用于审计的信息。
具体来说,代码首先通过 `self.config` 中的信息生成了一个参数字典 `param_dict`,用于存储审计相关的信息,比如 job_id、batch_id、data_source_name 等。然后,代码判断 DataFrame 中是否已经存在了审计相关的列,如果存在,则调用 `DataSink_with_audit` 类中的 `update_audit_columns` 方法,将参数字典 `param_dict` 中的信息更新到 DataFrame 中的审计列中。如果不存在,则调用 `DataSink_with_audit` 类中的 `add_audit_columns` 方法,为 DataFrame 添加一个审计列,并将参数字典 `param_dict` 中的信息填充到该列中。
最后,代码返回经过转换后的 DataFrame。
阅读全文