all_df = pd.merge(all_df, tmp, on='建造年代', how='left')
时间: 2023-06-17 18:06:53 浏览: 152
这段代码使用 Pandas 库中的 merge 函数,将两个 DataFrame 按照 '建造年代' 这一列进行左连接(left join)操作,并将结果存储在 all_df 变量中。其中,all_df 是左表,tmp 是右表。左连接是指将左表中的所有行都保留,而右表中只保留与左表中相同键值的行,如果右表中没有与左表中相同键值的行,则用 NaN 值填充。具体实现过程中,on 参数指定连接的键值列,how 参数指定连接方式。
相关问题
解释下这段代码 第三方商品 __df1 = __temp_df[__temp_df['show_name'] == '第三方商品'] if not __df1.empty: __df1 = __df1.pivot_table(index=['name', 'show_name'], values=['buy_num'], aggfunc=np.sum).reset_index() else: __df1 = __df1[['name', 'show_name', 'buy_num']] # 非第三方商品 __df2 = __temp_df[__temp_df['show_name'] != '第三方商品'] if not __df2.empty: __df2 = __df2.pivot_table(index=['gt_uuid', 'show_name'] ,values=['buy_num'], aggfunc=np.sum).reset_index() __df2 = pd.merge(left=__df2, right=pd.DataFrame(s_gt_rv, columns=['gt_uuid', 'goods_name']), how='left', on='gt_uuid') __df2 = __df2[['goods_name', 'show_name', 'buy_num']] __df2.rename(columns={'goods_name': 'name'}, inplace=True) else: __df2 = __df2[['name', 'show_name', 'buy_num']] __temp_df = pd.concat([__df1, __df2]) del __df1, __df2 __temp_df.rename(columns={'name': 'goods_name', 'show_name': 'gc_name', 'buy_num': 'num'}, inplace=True) __temp_df.sort_values(by='num', ascending=False, inplace=True) return_data['goods']['goods_list'] = __temp_df.to_dict(orient='records') __temp_df = __temp_df[['gc_name', 'num']] __temp_df = __temp_df.pivot_table(index='gc_name', values='num', aggfunc=np.sum).reset_index() __temp_df.sort_values(by='num', ascending=False, inplace=True) return_data['goods']['gc_list'] = __temp_df.to_dict(orient='records') del __temp_df
这段代码是对一个名为 `__temp_df` 的数据框进行处理,并将处理结果存储在 `return_data` 字典的 `goods` 键下。
首先,根据条件 `__temp_df['show_name'] == '第三方商品'`,筛选出满足条件的行,存储在 `__df1` 中。如果 `__df1` 不为空,则对其进行数据透视操作,按照 `name` 和 `show_name` 分组,计算 `buy_num` 的总和,并重置索引。否则,保留 `__df1` 的 `name`、`show_name` 和 `buy_num` 列。
接着,根据条件 `__temp_df['show_name'] != '第三方商品'`,筛选出满足条件的行,存储在 `__df2` 中。如果 `__df2` 不为空,则对其进行数据透视操作,按照 `gt_uuid` 和 `show_name` 分组,计算 `buy_num` 的总和,并重置索引。然后,将 `__df2` 与一个包含列名为 `['gt_uuid', 'goods_name']` 的 DataFrame 进行左连接,并将结果存储在 `__df2` 中。最后,保留 `__df2` 的 `goods_name`、`show_name` 和 `buy_num` 列,并将列名 `goods_name` 改为 `name`。
接下来,通过合并 `__df1` 和 `__df2` 构成新的数据框 `__temp_df`。之后,删除 `__df1` 和 `__df2` 变量。
然后,对 `__temp_df` 进行列名的重命名,将 `name` 改为 `goods_name`,`show_name` 改为 `gc_name`,`buy_num` 改为 `num`。将 `__temp_df` 按照 `num` 降序排序。
接着,将 `__temp_df` 转换为字典格式,并将结果存储在 `return_data` 字典的 `goods_list` 键下。再次对 `__temp_df` 进行处理,只保留 `gc_name` 和 `num` 两列,并进行数据透视操作,按照 `gc_name` 分组,计算 `num` 的总和,并重置索引。最后,将结果按照 `num` 降序排序。
最后,将处理后的 `__temp_df` 转换为字典格式,并将结果存储在 `return_data` 字典的 `gc_list` 键下。最后,删除 `__temp_df` 变量。
new_df_cols = new_df.columns old_df_cols = older_df.columns total = set(new_df_cols + old_df_cols) new_df = new_df.select(*self.fill_empty_colums(new_df_cols, total)).withColumn("row_priority",F.lit(0)) older_df = older_df.select(*self.fill_empty_colums(old_df_cols, total)).withColumn("row_priority",F.lit(1)) key_column = [F.col(column_name) for column_name in key_columns] merge_spec = Window.partitionBy(key_column).orderBy("row_priority") ranked_df=new_df.unionByName(older_df).withColumn("rank", F.rank().over(merge_spec)) return self.update_audit_created_column(ranked_df,key_column).where(F.col("rank") == 1).drop("rank", "row_priority")
这段代码用 PySpark 实现了对两个 DataFrame 进行合并和去重的操作,并添加了一些元数据信息。下面是使用 SQL 实现的代码:
```sql
-- 选取需要合并的列名
WITH new_cols AS (
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'new_df'
),
old_cols AS (
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'older_df'
),
cols AS (
SELECT DISTINCT COLUMN_NAME
FROM (
SELECT COLUMN_NAME FROM new_cols
UNION ALL
SELECT COLUMN_NAME FROM old_cols
)
),
-- 对 new_df 填充空缺的列,并添加 "row_priority" 列
new_df_filled AS (
SELECT COALESCE(col1, '') AS col1, COALESCE(col2, '') AS col2, ..., COALESCE(colN, '') AS colN, 0 AS row_priority
FROM new_df
),
new_df_selected AS (
SELECT *, ROW_NUMBER() OVER (PARTITION BY key_column ORDER BY row_priority) AS rank
FROM (
-- 选取 new_df 中的列,包括填充空缺的列和 "row_priority" 列
SELECT col1, col2, ..., colN, row_priority
FROM new_df_filled
-- 生成 key_column 列,用于分组
CROSS JOIN (SELECT col1 AS key_column FROM new_df_filled) key_columns
)
),
-- 对 older_df 填充空缺的列,并添加 "row_priority" 列
old_df_filled AS (
SELECT COALESCE(col1, '') AS col1, COALESCE(col2, '') AS col2, ..., COALESCE(colN, '') AS colN, 1 AS row_priority
FROM older_df
),
old_df_selected AS (
SELECT *, ROW_NUMBER() OVER (PARTITION BY key_column ORDER BY row_priority) AS rank
FROM (
-- 选取 older_df 中的列,包括填充空缺的列和 "row_priority" 列
SELECT col1, col2, ..., colN, row_priority
FROM old_df_filled
-- 生成 key_column 列,用于分组
CROSS JOIN (SELECT col1 AS key_column FROM old_df_filled) key_columns
)
),
-- 合并两个 DataFrame,并去重
merged_df AS (
SELECT * FROM new_df_selected
UNION ALL
SELECT * FROM old_df_selected
),
-- 选取合并后的第一行
final_df AS (
SELECT *, ROW_NUMBER() OVER (PARTITION BY key_column ORDER BY rank) AS row_num
FROM merged_df
)
SELECT col1, col2, ..., colN
FROM final_df
WHERE row_num = 1
```
这段 SQL 代码的实现原理与 PySpark 代码相同,主要分为以下几个步骤:
1. 获取需要合并的列名。
2. 对 new_df 和 older_df 分别进行填充空缺列、添加 "row_priority" 列和选取列的操作,生成 new_df_selected 和 old_df_selected 两个数据集。
3. 将 two_df_selected 进行合并,并添加 rank 列,用于去重。
4. 选取合并后的第一行,得到最终的去重结果。
阅读全文