1.1 将数据文件传到HDFS中的一个目录中,使用spark on yarn 运行spark程序 1.2 统计IP 访问网站的数据(PV)(注意ip 有重复访问网站需要去重),降序排序(以下为参考结果)
时间: 2023-06-14 08:02:49 浏览: 49
1.1 首先需要将数据文件上传到HDFS中的一个目录中,可以使用以下命令:
```
hdfs dfs -put <本地文件路径> <HDFS目录路径>
```
例如,将本地的文件`/user/data/access.log`上传到HDFS的`/user/data/`目录中,可以使用以下命令:
```
hdfs dfs -put /user/data/access.log /user/data/
```
然后使用以下命令在 YARN 上运行 Spark 程序:
```
spark-submit --class <main-class> --master yarn --deploy-mode client <jar-file> <program-arguments>
```
其中,`<main-class>` 是 Spark 程序的入口类,`<jar-file>` 是打包好的 Spark 程序的 JAR 文件,`<program-arguments>` 是 Spark 程序的命令行参数。
例如,假设 Spark 程序的入口类为`com.example.MyApp`,打包好的 JAR 文件为`myapp.jar`,命令行参数为`/user/data/access.log /user/output/pv`,可以使用以下命令在 YARN 上运行 Spark 程序:
```
spark-submit --class com.example.MyApp --master yarn --deploy-mode client myapp.jar /user/data/access.log /user/output/pv
```
1.2 统计 IP 访问网站的数据(PV)(注意 IP 有重复访问网站需要去重),降序排序
可以使用以下 Spark 程序实现:
```scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object PVCounter {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("PVCounter")
val sc = new SparkContext(conf)
val inputPath = args(0)
val outputPath = args(1)
// 读取日志文件并解析出 IP 地址
val ipRDD = sc.textFile(inputPath)
.map(line => line.split(" ")(0))
// 对 IP 地址进行去重并计数
val pvRDD = ipRDD.distinct().map(ip => (ip, 1)).reduceByKey(_ + _)
// 按 PV 降序排序
val sortedRDD = pvRDD.sortBy(_._2, false)
// 将结果保存到 HDFS 中的一个文件中
sortedRDD.saveAsTextFile(outputPath)
sc.stop()
}
}
```
其中,`inputPath` 是日志文件在 HDFS 中的路径,`outputPath` 是结果文件在 HDFS 中的路径。可以使用以下命令运行 Spark 程序:
```
spark-submit --class PVCounter --master yarn --deploy-mode client <jar-file> <input-path> <output-path>
```
例如,假设打包好的 JAR 文件为`pvcounter.jar`,日志文件在`/user/data/access.log`,结果文件应保存在`/user/output/pv`,可以使用以下命令运行 Spark 程序:
```
spark-submit --class PVCounter --master yarn --deploy-mode client pvcounter.jar /user/data/access.log /user/output/pv
```
运行结果保存在`/user/output/pv`目录下。