city_info.txt文件 1 北京 华北 2 上海 华东 3 深圳 华南 4 广州 华南 5 武汉 华中 6 南京 华东 7 天津 华北 8 成都 西南 9 哈尔滨 东北 10 大连 东北 11 沈阳 东北 12 西安 西北 13 长沙 华中 14 重庆 西南 15 济南 华东 16 石家庄 华北 17 银川 西北 18 杭州 华东 19 保定 华北 20 福州 华南 21 贵阳 西南 22 青岛 华东 23 苏州 华东 24 郑州 华北 25 无锡 华东 26 厦门 华南 product_info.txt文件 1 商品_1 自营 2 商品_2 自营 3 商品_3 自营 4 商品_4 自营 5 商品_5 自营 6 商品_6 自营 7 商品_7 自营 8 商品_8 自营 9 商品_9 自营 10 商品_10 自营 11 商品_11 自营 12 商品_12 第三方 13 商品_13 第三方 user_visit_action.txt文件 2019-07-17 95 26070e87-1ad7-49a3-8fb3-cc741facaddf 37 2019-07-17 00:00:02 手机 -1 -1 \N \N \N \N 3 2019-07-17 95 26070e87-1ad7-49a3-8fb3-cc741facaddf 48 2019-07-17 00:00:10 \N 16 98 \N \N \N \N 19 2019-07-17 95 26070e87-1ad7-49a3-8fb3-cc741facaddf 6 2019-07-17 00:00:17 \N 19 85 \N \N \N \N 7 2019-07-17 38 6502cdc9-cf95-4b08-8854-f03a25baa917 29 2019-07-17 00:00:19 \N 12 36 \N \N \N \N 5 注意:txt文件都是没有表头的!!! 1.项目需求 1)统计最受欢迎的品类,先排序点击-再是订单-最后是支付 2)统计页面跳转率 3)不同区域内的热门商品Top3 4)自定义需求(哪一时间段访问人数最多,按小时统计/按日期统计/按四季统计) 给出完整scala代码使用
时间: 2024-03-13 13:45:36 浏览: 59
代码如下:
```scala
import org.apache.spark.{SparkConf, SparkContext}
object ECommerceAnalysis {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("ECommerceAnalysis").setMaster("local[*]")
val sc = new SparkContext(conf)
// 读取数据
val cityInfo = sc.textFile("city_info.txt")
val productInfo = sc.textFile("product_info.txt")
val userVisitAction = sc.textFile("user_visit_action.txt")
// 1. 统计最受欢迎的品类,先排序点击-再是订单-最后是支付
val categoryCount = userVisitAction.flatMap(line => {
val fields = line.split("\t")
if (fields(6) != "-1") {
// 点击
List((fields(6), (1, 0, 0)))
} else if (fields(8) != "null") {
// 订单
val orderIds = fields(8).split(",")
orderIds.map(orderId => (orderId, (0, 1, 0)))
} else if (fields(10) != "null") {
// 支付
val paymentIds = fields(10).split(",")
paymentIds.map(paymentId => (paymentId, (0, 0, 1)))
} else {
Nil
}
}).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2, x._3 + y._3))
.map{case (categoryId, (clickCount, orderCount, paymentCount)) =>
(clickCount, orderCount, paymentCount, categoryId)
}.sortBy(x => (x._1, x._2, x._3), false)
.take(10)
categoryCount.foreach{case (clickCount, orderCount, paymentCount, categoryId) =>
println(s"Category $categoryId: ClickCount: $clickCount, OrderCount: $orderCount, PaymentCount: $paymentCount")
}
// 2. 统计页面跳转率
val pageJumpCount = userVisitAction.map(line => {
val fields = line.split("\t")
val pageId = fields(5).toInt
val sessionId = fields(2)
val actionTime = fields(4)
val actionType = fields(6).toInt
(sessionId, (pageId, actionTime, actionType))
}).groupByKey().flatMap{case (sessionId, iter) =>
val actions = iter.toList.sortBy(_._2)
val pageJumpActions = actions.zip(actions.tail)
.filter{case (action1, action2) => action1._3 == 0 && action2._3 == 1}
pageJumpActions.map{case ((pageId1, time1, _), (pageId2, time2, _)) =>
((pageId1, pageId2), 1)
}
}.reduceByKey(_ + _)
val pageVisitCount = userVisitAction.map(line => {
val fields = line.split("\t")
val pageId = fields(5).toInt
val sessionId = fields(2)
(sessionId, pageId)
}).distinct().groupByKey().map{case (sessionId, iter) =>
val pageIds = iter.toList.sortBy(x => x)
val pageVisitPairs = pageIds.zip(pageIds.tail)
pageVisitPairs.map(pair => (pair, 1))
}.flatMap(x => x).reduceByKey(_ + _)
val pageJumpRate = pageJumpCount.join(pageVisitCount).map{case ((pageId1, pageId2), (jumpCount, visitCount)) =>
(pageId1, (pageId2, jumpCount.toDouble / visitCount))
}.groupByKey().map{case (pageId, iter) =>
val pageJumpInfo = iter.toList.sortBy(-_._2).take(10)
(pageId, pageJumpInfo)
}
pageJumpRate.foreach{case (pageId, pageJumpInfo) =>
println(s"Page $pageId: ${pageJumpInfo.mkString(", ")}")
}
// 3. 不同区域内的热门商品Top3
val cityProductCount = userVisitAction.flatMap(line => {
val fields = line.split("\t")
val cityId = fields(3)
val productId = fields(6)
if (productId != "-1") {
List(((cityId, productId), 1))
} else {
Nil
}
}).reduceByKey(_ + _)
val cityTop3Product = cityProductCount.map{case ((cityId, productId), count) =>
(cityId, (productId, count))
}.groupByKey().map{case (cityId, iter) =>
val top3Product = iter.toList.sortBy(-_._2).take(3)
(cityId, top3Product)
}
val cityMap = cityInfo.map(line => {
val fields = line.split(" ")
(fields(0), fields(1))
}).collectAsMap()
cityTop3Product.map{case (cityId, top3Product) =>
val cityName = cityMap.getOrElse(cityId, "Unknown")
val top3ProductStr = top3Product.map{case (productId, count) =>
s"$productId:${count}"
}.mkString(", ")
(cityName, top3ProductStr)
}.foreach{case (cityName, top3ProductStr) =>
println(s"$cityName: $top3ProductStr")
}
// 4. 自定义需求
// 按小时统计访问人数
val hourVisitCount = userVisitAction.map(line => {
val fields = line.split("\t")
val timestamp = fields(4)
val hour = timestamp.substring(11, 13)
(hour, 1)
}).reduceByKey(_ + _)
hourVisitCount.sortByKey().foreach{case (hour, count) =>
println(s"Hour $hour: $count")
}
sc.stop()
}
}
```
阅读全文