户行为:用户ID、商品ID、商品类目ID、行为类型和时间戳组成,并以逗号分隔。 行为类型:点击pv、购买buy、加购cart、fav收藏中用sql或者RDD统计一天内的每个时段的用户量、点击量、购买量
时间: 2023-07-10 19:39:49 浏览: 93
使用 Spark RDD 进行统计的代码如下:
```python
from pyspark import SparkContext, SparkConf
import datetime
# 创建 SparkContext
conf = SparkConf().setAppName("UserBehaviorCount")
sc = SparkContext(conf=conf)
# 读取用户行为数据
rdd = sc.textFile("user_behavior.txt")
# 定义时间转换函数
def timestamp2datetime(timestamp):
return datetime.datetime.fromtimestamp(int(timestamp)).strftime('%Y-%m-%d %H')
# 统计每个时段的用户量、点击量、购买量
result = rdd \
.map(lambda line: line.split(",")) \
.filter(lambda fields: len(fields) == 5) \
.filter(lambda fields: fields[3] in ['pv', 'buy']) \
.map(lambda fields: (timestamp2datetime(fields[4]), (fields[0], fields[3]))) \
.groupByKey() \
.mapValues(lambda values: (len(set([v[0] for v in values])), len([v for v in values if v[1] == 'pv']), len([v for v in values if v[1] == 'buy']))) \
.sortByKey()
# 输出结果
for hour, (user_count, click_count, buy_count) in result.collect():
print("{}\t{}\t{}\t{}".format(hour, user_count, click_count, buy_count))
# 关闭 SparkContext
sc.stop()
```
上述代码中,首先读取用户行为数据文件 `user_behavior.txt`,然后过滤掉行为类型不为 `pv` 或 `buy` 的记录,接着使用 `map` 函数将时间戳转换为具体的小时数,并以此作为键,用户ID和行为类型作为值。然后使用 `groupByKey` 函数对相同键的记录进行分组,并使用 `mapValues` 函数分别统计每个时段的用户量、点击量、购买量。最后使用 `sortByKey` 函数按时间排序,并输出结果。
注意,需要将时间戳转换为具体的小时数才能进行统计,因此需要定义一个时间转换函数 `timestamp2datetime` 进行转换。
阅读全文