update_df = update_df.select("l.*", "lm.audit_batch_id", "lm.audit_job_id", "lm.audit_src_sys_name", "lm.audit_created_usr", "lm.audit_updated_usr", "lm.audit_created_tmstmp", "lm.audit_updated_tmstmp")
时间: 2024-02-14 08:22:19 浏览: 91
这段代码的意思是什么?
这段代码是将update_df数据框中的所有列(l.*)和另一个数据框(lm)中的审计列拼接在一起。具体来说,将lm数据框中的audit_batch_id、audit_job_id、audit_src_sys_name、audit_created_usr、audit_updated_usr、audit_created_tmstmp、audit_updated_tmstmp这些审计列与update_df的所有列拼接在一起。最终返回的是一个新的数据框,其中包含了原数据框的所有列和审计列。
相关问题
if self.config.load_type == "INC": # adhoc hist job do not need to join landing merge table try: landing_merge_df = self.spark.read.format(self.config.destination_file_type). \ load(self.config.destination_data_path) # dataframe for updated records df = df.drop("audit_batch_id", "audit_job_id", "audit_src_sys_name", "audit_created_usr", "audit_updated_usr", "audit_created_tmstmp", "audit_updated_tmstmp") # dataframe for newly inserted records new_insert_df = df.join(landing_merge_df, primary_keys_list, "left_anti") self.logger.info(f"new_insert_df count: {new_insert_df.count()}") new_insert_df = DataSink_with_audit(self.spark).add_audit_columns(new_insert_df, param_dict) update_df = df.alias('l').join(landing_merge_df.alias('lm'), on=primary_keys_list, how="inner") update_df = update_df.select("l.*", "lm.audit_batch_id", "lm.audit_job_id", "lm.audit_src_sys_name", "lm.audit_created_usr", "lm.audit_updated_usr", "lm.audit_created_tmstmp", "lm.audit_updated_tmstmp") self.logger.info(f"update_df count : {update_df.count()}") update_df = DataSink_with_audit(self.spark).update_audit_columns(update_df, param_dict) # dataframe for unchanged records unchanged_df = landing_merge_df.join(df, on=primary_keys_list, how="left_anti") self.logger.info(f"unchanged_records_df count : {unchanged_df.count()}") final_df = new_insert_df.union(update_df).union(unchanged_df) print("final_df count : ", final_df.count()) except AnalysisException as e: if e.desc.startswith('Path does not exist'): self.logger.info('landing merge table not exists. will skip join landing merge') final_df = DataSink_with_audit(self.spark).add_audit_columns(df, param_dict) else: self.logger.error(f'unknown error: {e.desc}') raise e else: final_df = DataSink_with_audit(self.spark).add_audit_columns(df, param_dict) return final_df
这是一段Python代码,其中包含一个类方法的实现。该方法根据配置参数的不同,从一个特定的数据路径中将数据加载到一个Spark DataFrame中,并对该数据进行一些操作,最终返回一个具有审计列的DataFrame。如果配置参数是"INC",则会执行一些数据合并的操作,包括添加、更新和未更改的记录,并对这些记录添加审计列。如果配置参数是其他值,则只会添加审计列。
new_df_cols = new_df.columns old_df_cols = older_df.columns total = set(new_df_cols + old_df_cols) new_df = new_df.select(*self.fill_empty_colums(new_df_cols, total)).withColumn("row_priority",F.lit(0)) older_df = older_df.select(*self.fill_empty_colums(old_df_cols, total)).withColumn("row_priority",F.lit(1)) key_column = [F.col(column_name) for column_name in key_columns] merge_spec = Window.partitionBy(key_column).orderBy("row_priority") ranked_df=new_df.unionByName(older_df).withColumn("rank", F.rank().over(merge_spec)) return self.update_audit_created_column(ranked_df,key_column).where(F.col("rank") == 1).drop("rank", "row_priority")
这段代码用 PySpark 实现了对两个 DataFrame 进行合并和去重的操作,并添加了一些元数据信息。下面是使用 SQL 实现的代码:
```sql
-- 选取需要合并的列名
WITH new_cols AS (
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'new_df'
),
old_cols AS (
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'older_df'
),
cols AS (
SELECT DISTINCT COLUMN_NAME
FROM (
SELECT COLUMN_NAME FROM new_cols
UNION ALL
SELECT COLUMN_NAME FROM old_cols
)
),
-- 对 new_df 填充空缺的列,并添加 "row_priority" 列
new_df_filled AS (
SELECT COALESCE(col1, '') AS col1, COALESCE(col2, '') AS col2, ..., COALESCE(colN, '') AS colN, 0 AS row_priority
FROM new_df
),
new_df_selected AS (
SELECT *, ROW_NUMBER() OVER (PARTITION BY key_column ORDER BY row_priority) AS rank
FROM (
-- 选取 new_df 中的列,包括填充空缺的列和 "row_priority" 列
SELECT col1, col2, ..., colN, row_priority
FROM new_df_filled
-- 生成 key_column 列,用于分组
CROSS JOIN (SELECT col1 AS key_column FROM new_df_filled) key_columns
)
),
-- 对 older_df 填充空缺的列,并添加 "row_priority" 列
old_df_filled AS (
SELECT COALESCE(col1, '') AS col1, COALESCE(col2, '') AS col2, ..., COALESCE(colN, '') AS colN, 1 AS row_priority
FROM older_df
),
old_df_selected AS (
SELECT *, ROW_NUMBER() OVER (PARTITION BY key_column ORDER BY row_priority) AS rank
FROM (
-- 选取 older_df 中的列,包括填充空缺的列和 "row_priority" 列
SELECT col1, col2, ..., colN, row_priority
FROM old_df_filled
-- 生成 key_column 列,用于分组
CROSS JOIN (SELECT col1 AS key_column FROM old_df_filled) key_columns
)
),
-- 合并两个 DataFrame,并去重
merged_df AS (
SELECT * FROM new_df_selected
UNION ALL
SELECT * FROM old_df_selected
),
-- 选取合并后的第一行
final_df AS (
SELECT *, ROW_NUMBER() OVER (PARTITION BY key_column ORDER BY rank) AS row_num
FROM merged_df
)
SELECT col1, col2, ..., colN
FROM final_df
WHERE row_num = 1
```
这段 SQL 代码的实现原理与 PySpark 代码相同,主要分为以下几个步骤:
1. 获取需要合并的列名。
2. 对 new_df 和 older_df 分别进行填充空缺列、添加 "row_priority" 列和选取列的操作,生成 new_df_selected 和 old_df_selected 两个数据集。
3. 将 two_df_selected 进行合并,并添加 rank 列,用于去重。
4. 选取合并后的第一行,得到最终的去重结果。
阅读全文