update_df = df.alias('l').join(landing_merge_df.alias('lm'), on=primary_keys_list, how="inner")
时间: 2024-02-14 21:22:18 浏览: 63
这段代码的意思是什么?
这段代码是将两个数据框(df和landing_merge_df)按照主键列表(primary_keys_list)进行内连接(inner join),返回的是两个数据框中主键相同的行。其中,'l'和'lm'分别是df和landing_merge_df的别名,on表示连接的条件,how表示连接方式为内连接(inner join)。最终返回的是一个新的数据框(update_df),其中包含了两个数据框中主键相同的行和所有列。
相关问题
update_df = df.alias('l').join(landing_merge_df.alias('lm'), on=primary_keys_list, how="inner")转为sql
下面是将上述代码转化为 SQL 语句的方法:
假设 df 表和 landing_merge_df 表已经在数据库中创建,并且它们的表名分别为 df_table 和 landing_merge_table。
首先,我们需要将 df 表和 landing_merge_df 表进行别名处理:
df 表别名为 l,landing_merge_df 表别名为 lm。
接下来,我们需要指定连接条件和连接方式。根据代码中的参数,连接条件为 primary_keys_list,连接方式为 inner join。
在 SQL 中,连接条件使用 ON 关键字指定,连接方式使用 INNER JOIN 关键字指定。
最终的 SQL 语句如下:
SELECT *
FROM df_table AS l
INNER JOIN landing_merge_table AS lm
ON l.primary_key_1 = lm.primary_key_1
AND l.primary_key_2 = lm.primary_key_2
AND ...
等等
if self.config.load_type == "INC": # adhoc hist job do not need to join landing merge table try: landing_merge_df = self.spark.read.format(self.config.destination_file_type). \ load(self.config.destination_data_path) # dataframe for updated records df = df.drop("audit_batch_id", "audit_job_id", "audit_src_sys_name", "audit_created_usr", "audit_updated_usr", "audit_created_tmstmp", "audit_updated_tmstmp") # dataframe for newly inserted records new_insert_df = df.join(landing_merge_df, primary_keys_list, "left_anti") self.logger.info(f"new_insert_df count: {new_insert_df.count()}") new_insert_df = DataSink_with_audit(self.spark).add_audit_columns(new_insert_df, param_dict) update_df = df.alias('l').join(landing_merge_df.alias('lm'), on=primary_keys_list, how="inner") update_df = update_df.select("l.*", "lm.audit_batch_id", "lm.audit_job_id", "lm.audit_src_sys_name", "lm.audit_created_usr", "lm.audit_updated_usr", "lm.audit_created_tmstmp", "lm.audit_updated_tmstmp") self.logger.info(f"update_df count : {update_df.count()}") update_df = DataSink_with_audit(self.spark).update_audit_columns(update_df, param_dict) # dataframe for unchanged records unchanged_df = landing_merge_df.join(df, on=primary_keys_list, how="left_anti") self.logger.info(f"unchanged_records_df count : {unchanged_df.count()}") final_df = new_insert_df.union(update_df).union(unchanged_df) print("final_df count : ", final_df.count()) except AnalysisException as e: if e.desc.startswith('Path does not exist'): self.logger.info('landing merge table not exists. will skip join landing merge') final_df = DataSink_with_audit(self.spark).add_audit_columns(df, param_dict) else: self.logger.error(f'unknown error: {e.desc}') raise e else: final_df = DataSink_with_audit(self.spark).add_audit_columns(df, param_dict) return final_df
这是一段Python代码,其中包含一个类方法的实现。该方法根据配置参数的不同,从一个特定的数据路径中将数据加载到一个Spark DataFrame中,并对该数据进行一些操作,最终返回一个具有审计列的DataFrame。如果配置参数是"INC",则会执行一些数据合并的操作,包括添加、更新和未更改的记录,并对这些记录添加审计列。如果配置参数是其他值,则只会添加审计列。
阅读全文