网站转化率统计spark
时间: 2024-12-30 22:21:58 浏览: 6
### 使用 Spark 实现网站转化率统计
#### 数据准备
为了实现网站转化率统计,数据通常来自用户的浏览记录。这些记录可能存储在一个日志文件或其他形式的数据集中。每条记录应至少包含以下字段:
- `userId`:用户唯一标识符。
- `sessionId`:会话唯一标识符。
- `pageId`:页面编号或名称。
- `timestamp`:访问时间戳。
假设有一个名为 `user_visits` 的 DataFrame 表示上述结构化后的用户访问数据集[^1]。
```python
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 初始化Spark Session
spark = SparkSession.builder.appName("ConversionRate").getOrCreate()
# 假设已经加载了一个DataFrame user_visits
# user_visits.show()
```
#### 单跳转换率计算
对于单跳转换率而言,关注的是两个连续页面之间的转移概率。这可以通过识别同一 session 中相邻的 page 访问来完成,并据此构建一对多的关系表用于后续分析[^2]。
```python
# 添加一个辅助列 'next_page' 来表示下一页
df_with_next_pages = (
user_visits.withColumnRenamed('pageId', 'current_page')
.withColumn('rank', F.row_number().over(Window.partitionBy('sessionId').orderBy('timestamp')))
)
single_hop_pairs = df_with_next_pages.alias('a')\
.join(
df_with_next_pages.alias('b'),
(F.col('a.sessionId') == F.col('b.sessionId')) & \
((F.col('a.rank')+1) == F.col('b.rank')),
how='inner'
).select(F.col('a.current_page'), F.col('b.pageId').alias('next_page'))
# 统计各对组合的数量以及总的起始页次数
pair_counts = single_hop_pairs.groupBy(['current_page', 'next_page']).count()\
.withColumnRenamed('count', 'transition_count')
total_starts_per_page = pair_counts.groupBy('current_page').agg(F.sum('transition_count')).withColumnRenamed('sum(transition_count)', 'total_starts')
conversion_rates_single_hop = pair_counts.join(total_starts_per_page,
on=['current_page'],
how="left")\
.withColumn('conversion_rate',
F.round((F.col('transition_count') / F.col('total_starts')), 4))
```
#### 多级(A_B)转化率计算
当涉及到多个页面组成的流程时,则需按照指定顺序跟踪整个过程中的变化情况。这里定义了一种方法来处理这种类型的序列模式匹配问题[^3]。
```python
def calculate_multi_level_conversion(target_flow, spark_df):
"""
:param target_flow: list of strings representing the expected flow e.g., ['home','product_list','checkout']
:param spark_df: input dataframe containing columns like userId, sessionId, timestamp and pageId.
"""
from functools import reduce
# 创建窗口函数以便按session分组并排序
window_spec = Window.partitionBy(spark_df['sessionId']).orderBy(spark_df['timestamp'])
# 对于目标流中的每一个页面,在原始DF基础上创建新的指示器列
indicator_cols = []
for i, p in enumerate(target_flow[:-1]):
col_name = f"is_{p}_to_{target_flow[i+1]}"
next_page_col = F.lead('pageId', offset=1).over(window_spec)
condition = (F.col('pageId')==p)&(next_page_col==target_flow[i+1])
new_column = when(condition, True)\
.otherwise(False)\
.alias(col_name)
indicator_cols.append(new_column)
extended_df = spark_df.select('*', *indicator_cols)
# 进一步过滤只保留符合条件的情况
filtered_flows = extended_df.filter(reduce(lambda a,b:a|b, [extended_df[c]==True for c in map(lambda x:x.name(), indicator_cols)]))
# 转换成宽格式以方便聚合操作
pivoted = filtered_flows.groupby('sessionId').pivot([c.name() for c in indicator_cols]).count()
# 计算最终的结果
result = {}
prev_step_total = None
for step in range(len(target_flow)-1):
current_step_key = f"{target_flow[step]}_to_{target_flow[step+1]}"
if not prev_step_total:
prev_step_total = pivoted[current_step_key].sum()
conversion_ratio = round(pivoted[current_step_key].sum()/prev_step_total, 4)*100
result[f"From {target_flow[step]} To {target_flow[step+1]}"] = {
"Total Sessions": int(prev_step_total),
"Converted Sessions": int(pivoted[current_step_key].sum()),
"% Conversion Rate": str(conversion_ratio)+"%"
}
prev_step_total = pivoted[current_step_key].sum()
return pd.DataFrame(result).T.reset_index(names=["Transition"])
```
通过以上两种方式可以分别得到不同层次上的转化率指标,帮助更好地理解用户行为特征及其背后潜在的影响因素。
阅读全文