df_selected = df[cols]是干什么的
时间: 2023-12-31 09:04:11 浏览: 26
这段代码的作用是从一个DataFrame中选择指定的列(columns),并创建一个新的DataFrame对象。其中,`df`是原始的DataFrame对象,`cols`是一个包含所需列名的列表(或者是一个单独的列名,此时cols可以是一个字符串)。
例如,如果`df`有五列,分别是A、B、C、D和E,而`cols`中包含了A、C和E三个列名,那么执行`df_selected = df[cols]`后,`df_selected`将只包含A、C和E这三列并构成一个新的DataFrame对象。
相关问题
new_df_cols = new_df.columns old_df_cols = older_df.columns total = set(new_df_cols + old_df_cols) new_df = new_df.select(*self.fill_empty_colums(new_df_cols, total)).withColumn("row_priority",F.lit(0)) older_df = older_df.select(*self.fill_empty_colums(old_df_cols, total)).withColumn("row_priority",F.lit(1)) key_column = [F.col(column_name) for column_name in key_columns] merge_spec = Window.partitionBy(key_column).orderBy("row_priority") ranked_df=new_df.unionByName(older_df).withColumn("rank", F.rank().over(merge_spec)) return self.update_audit_created_column(ranked_df,key_column).where(F.col("rank") == 1).drop("rank", "row_priority")
这段代码用 PySpark 实现了对两个 DataFrame 进行合并和去重的操作,并添加了一些元数据信息。下面是使用 SQL 实现的代码:
```sql
-- 选取需要合并的列名
WITH new_cols AS (
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'new_df'
),
old_cols AS (
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'older_df'
),
cols AS (
SELECT DISTINCT COLUMN_NAME
FROM (
SELECT COLUMN_NAME FROM new_cols
UNION ALL
SELECT COLUMN_NAME FROM old_cols
)
),
-- 对 new_df 填充空缺的列,并添加 "row_priority" 列
new_df_filled AS (
SELECT COALESCE(col1, '') AS col1, COALESCE(col2, '') AS col2, ..., COALESCE(colN, '') AS colN, 0 AS row_priority
FROM new_df
),
new_df_selected AS (
SELECT *, ROW_NUMBER() OVER (PARTITION BY key_column ORDER BY row_priority) AS rank
FROM (
-- 选取 new_df 中的列,包括填充空缺的列和 "row_priority" 列
SELECT col1, col2, ..., colN, row_priority
FROM new_df_filled
-- 生成 key_column 列,用于分组
CROSS JOIN (SELECT col1 AS key_column FROM new_df_filled) key_columns
)
),
-- 对 older_df 填充空缺的列,并添加 "row_priority" 列
old_df_filled AS (
SELECT COALESCE(col1, '') AS col1, COALESCE(col2, '') AS col2, ..., COALESCE(colN, '') AS colN, 1 AS row_priority
FROM older_df
),
old_df_selected AS (
SELECT *, ROW_NUMBER() OVER (PARTITION BY key_column ORDER BY row_priority) AS rank
FROM (
-- 选取 older_df 中的列,包括填充空缺的列和 "row_priority" 列
SELECT col1, col2, ..., colN, row_priority
FROM old_df_filled
-- 生成 key_column 列,用于分组
CROSS JOIN (SELECT col1 AS key_column FROM old_df_filled) key_columns
)
),
-- 合并两个 DataFrame,并去重
merged_df AS (
SELECT * FROM new_df_selected
UNION ALL
SELECT * FROM old_df_selected
),
-- 选取合并后的第一行
final_df AS (
SELECT *, ROW_NUMBER() OVER (PARTITION BY key_column ORDER BY rank) AS row_num
FROM merged_df
)
SELECT col1, col2, ..., colN
FROM final_df
WHERE row_num = 1
```
这段 SQL 代码的实现原理与 PySpark 代码相同,主要分为以下几个步骤:
1. 获取需要合并的列名。
2. 对 new_df 和 older_df 分别进行填充空缺列、添加 "row_priority" 列和选取列的操作,生成 new_df_selected 和 old_df_selected 两个数据集。
3. 将 two_df_selected 进行合并,并添加 rank 列,用于去重。
4. 选取合并后的第一行,得到最终的去重结果。
我的是多个特征的时间序列,其中每一行数据均属于同一时刻。我要以气温、水位、水温三个特征为预测依据,取当前和上二个时刻共三个时刻的已知数据对下一时刻的水温进行预测,那以下代码模版怎么改成符合我的要求的代码def series_to_supervised(data, n_in=1, n_out=1, dropnan=True): n_vars = 1 if type(data) is list else data.shape[1] df = pd.DataFrame(data) cols, names = list(), list() # input sequence (t-n, ... t-1) for i in range(n_in, 0, -1): cols.append(df.shift(i)) # forecast sequence (t, t+1, ... t+n) for i in range(0, n_out): cols.append(df.shift(-i)) # put it all together agg = pd.concat(cols, axis=1) # drop rows with NaN values if dropnan: agg.dropna(inplace=True) return agg.values
你可以修改`series_to_supervised`函数,实现你的需求,修改后的代码如下:
```python
import pandas as pd
from numpy import array
def series_to_supervised(data, n_in=1, n_out=1, dropnan=True):
# data: [[气温, 水位, 水温], [气温, 水位, 水温], [气温, 水位, 水温], ...]
n_vars = 1 if isinstance(data, list) else data.shape[1]
df = pd.DataFrame(data)
cols, names = list(), list()
# input sequence (t-n, ... t-1)
for i in range(n_in, 0, -1):
cols.append(df.shift(i))
names += [('var%d(t-%d)' % (j+1, i)) for j in range(n_vars)]
# forecast sequence (t, t+1, ... t+n)
for i in range(0, n_out):
cols.append(df.shift(-i))
if i == 0:
names += [('var%d(t)' % (j+1)) for j in range(n_vars)]
else:
names += [('var%d(t+%d)' % (j+1, i)) for j in range(n_vars)]
# put it all together
agg = pd.concat(cols, axis=1)
agg.columns = names
# drop rows with NaN values
if dropnan:
agg.dropna(inplace=True)
# keep only the columns we need: t-2, t-1, t, and the target variable
selected_cols = ['var%d(t-%d)' % (j+1, i) for i in range(n_in, 0, -1) for j in range(n_vars)]
selected_cols += ['var%d(t)' % (j+1) for j in range(n_vars)]
agg = agg[selected_cols]
return agg.values
```
在上述代码中,`data`是一个二维数组,每行数据表示一个时间点的气温、水位、水温三个特征值。函数将返回一个新的二维数组,其中每行数据包括当前时刻和前两个时刻的气温、水位、水温三个特征值,以及下一时刻的水温。这里假设你要预测的是下一时刻的水温,因此`n_out=1`。如果你要预测下下时刻的水温,可以将`n_out`改为2。