def deduplicate(self, df: DataFrame) -> DataFrame: key_columns = [F.col(column_name) for column_name in self.config.deduplication_key_columns] order_by_columns = [F.col(column_name).desc() for column_name in self.config.deduplication_order_columns] if self.config.deduplicate_order_by_type == "asc": order_by_columns = [F.col(column_name) for column_name in self.config.deduplication_order_columns] ranking_column = "duplicates_rank" is_deleted_column = "IsDeleted" if (self.config.filter_deleted_flag) & ( is_deleted_column in df.columns): # if True removes records that marked deleted filter = (F.col(ranking_column) == 1) & (F.col(is_deleted_column) == "False") else: self.logger.warning(f"Records marked as deleted will be loaded to {self.config.table_full_name} table!") filter = F.col(ranking_column) == 1 return df.withColumn(ranking_column, F.row_number().over(Window.partitionBy(key_columns).orderBy(order_by_columns))).filter( filter).drop(ranking_column)怎么用临时表和cte的形式写成sql
时间: 2024-04-05 19:29:55 浏览: 87
可以使用以下SQL语句将Python代码转换为临时表和CTE的形式:
WITH cte AS (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY <key_columns>
ORDER BY <order_by_columns> DESC/ASC
) AS duplicates_rank
FROM tmp_table
),
filtered_cte AS (
SELECT *
FROM cte
WHERE duplicates_rank = 1
AND (<is_deleted_column> = 'False' OR <is_deleted_column> IS NULL)
)
SELECT *
FROM filtered_cte;
其中,`tmp_table`是源数据的临时表,`<key_columns>`是`self.config.deduplication_key_columns`中的列名,`<order_by_columns>`是`self.config.deduplication_order_columns`中的列名,`<is_deleted_column>`是`self.config.filter_deleted_flag`和`is_deleted_column in df.columns`的结果中的`is_deleted_column`的列名。
相关问题
def deduplicate(self, df: DataFrame) -> DataFrame: key_columns = [F.col(column_name) for column_name in self.config.deduplication_key_columns] order_by_columns = [F.col(column_name).desc() for column_name in self.config.deduplication_order_columns] if self.config.deduplicate_order_by_type == "asc": order_by_columns = [F.col(column_name) for column_name in self.config.deduplication_order_columns] ranking_column = "duplicates_rank" is_deleted_column = "IsDeleted" if (self.config.filter_deleted_flag) & ( is_deleted_column in df.columns): # if True removes records that marked deleted filter = (F.col(ranking_column) == 1) & (F.col(is_deleted_column) == "False") else: self.logger.warning(f"Records marked as deleted will be loaded to {self.config.table_full_name} table!") filter = F.col(ranking_column) == 1 return df.withColumn(ranking_column, F.row_number().over(Window.partitionBy(key_columns).orderBy(order_by_columns))).filter( filter).drop(ranking_column)
这段代码是用来去重的,输入是一个DataFrame,输出也是一个DataFrame。该函数首先根据配置文件中的列名获取需要去重的列和排序列,然后按照排序列进行降序或升序排列。接着,该函数给每个分组内的记录打上重复排名,最后根据重复排名过滤出排名为1的记录并删除重复排名列。如果配置文件中开启了筛选标记为删除的记录的功能,并且DataFrame中有IsDeleted列,则会将标记为删除的记录过滤掉。如果没有开启该功能,则会将标记为删除的记录加载到目标表中。
if (self.config.filter_deleted_flag) & ( is_deleted_column in df.columns): # if True removes records that marked deleted filter = (F.col(ranking_column) == 1) & (F.col(is_deleted_column) == "False") else: self.logger.warning(f"Records marked as deleted will be loaded to {self.config.table_full_name} table!") filter = F.col(ranking_column) == 1 return df.withColumn(ranking_column, F.row_number().over(Window.partitionBy(key_columns).orderBy(order_by_columns))).filter( filter).drop(ranking_column)
这段代码是一个Python函数,主要功能是对DataFrame进行过滤、排序、计算排名,并返回结果。具体来说,函数根据一些参数(如是否过滤已删除的记录、排序方式等)对DataFrame进行过滤和排序,然后计算每个分组中每行的排名,最后返回过滤后的结果。具体注释如下:
```
def filter_sort_rank(df, key_columns, order_by_columns, ranking_column, is_deleted_column='is_deleted'):
# 如果需要过滤已删除的记录,则将is_deleted_column列为"False"的记录过滤掉
if (self.config.filter_deleted_flag) & (is_deleted_column in df.columns):
filter = (F.col(ranking_column) == 1) & (F.col(is_deleted_column) == "False")
else:
# 如果不需要过滤已删除的记录,则保留所有记录,并发出警告
self.logger.warning(f"Records marked as deleted will be loaded to {self.config.table_full_name} table!")
filter = F.col(ranking_column) == 1
# 对DataFrame进行排序、计算排名,并过滤掉不符合条件的记录
return df.withColumn(ranking_column, F.row_number().over(Window.partitionBy(key_columns).orderBy(order_by_columns))).filter(filter).drop(ranking_column)
```
其中,`df`是输入的DataFrame,`key_columns`指定了分组的列,`order_by_columns`指定了排序的列,`ranking_column`指定了计算排名时添加的新列名,`is_deleted_column`指定了标记是否删除的列名。
阅读全文