scala2.12不能使用.reduce((a, b) => (a._1, a._2 + b._2))
时间: 2024-05-02 18:18:58 浏览: 106
这是因为.reduce((a, b) => (a._1, a._2 + b._2))返回的是一个元组类型,而Scala 2.12不支持元组类型的相加操作。可以使用以下两种方式解决这个问题:
1. 使用case class代替元组类型
```
case class MyTuple(a: Int, b: Int)
val result = list.map(x => MyTuple(x._1, x._2)).reduce((a, b) => MyTuple(a.a, a.b + b.b))
```
2. 使用.tupled将元组类型转换为函数形式
```
val result = list.map(x => (x._1, x._2)).reduce(Function.tupled((a, b) => (a._1, a._2 + b._2)))
```
相关问题
scala2.12版本编写Flink1.14消费kafka中的数据,统计商城案例每分钟的GMV,将结果存入redis中
好的,这个问题可以回答。首先,您需要在代码中引入Flink和Kafka的依赖,然后创建一个Kafka数据源,使用Flink的窗口函数来统计每分钟的GMV,并将结果存入Redis中。具体实现可以参考以下代码:
```scala
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.api.common.functions.MapFunction
import redis.clients.jedis.Jedis
object Main {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Kafka Consumer Config
val kafkaProps = new Properties()
kafkaProps.setProperty("bootstrap.servers", "localhost:9092")
kafkaProps.setProperty("group.id", "test")
// Kafka Consumer
val kafkaConsumer = new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), kafkaProps)
// Create Data Stream
val dataStream = env.addSource(kafkaConsumer)
// Map to Case Class
val caseClassDataStream = dataStream.map(new MapFunction[String, CaseClass] {
override def map(value: String): CaseClass = {
// Parse JSON to Case Class
}
})
// Create Windowed Stream
val windowedStream = caseClassDataStream
.keyBy(_.timestamp)
.timeWindow(Time.minutes(1))
// Calculate GMV
val gmvStream = windowedStream
.reduce((a, b) => CaseClass(a.timestamp, a.gmv + b.gmv))
// Store GMV in Redis
gmvStream.map(new MapFunction[CaseClass, Unit] {
override def map(value: CaseClass): Unit = {
val jedis = new Jedis("localhost", 6379)
jedis.set("gmv", value.gmv.toString)
}
})
// Execute Job
env.execute("GMV Calculation")
}
case class CaseClass(timestamp: Long, gmv: Double)
}
```
这段代码中,我们使用了Flink和Kafka的依赖,并创建了一个Kafka数据源。然后,我们将从Kafka中消费的数据映射到一个Case Class中,并创建了一个每分钟的窗口。在窗口中,我们使用reduce函数来计算每分钟的GMV,并将结果存入Redis中。最后,我们执行了这个Flink Job。
编写 Scala 独立应用程序: Pad 1 999 Workstation 2 25300 Workstation 2 25300 Workstation 38 13130 Workstation 38 13130 SSD 1 1060 TPC 40 2865 Workstation 38 13130 Computer 2 9900 HD 1 535 PC 3 3600 PC 2 2800 Computer 2 9900 HD 1 535 Workstation 38 13130 输出文件 1 格式: Computer 2 9900 … 输出文件 2 格式: TPC,114600 … 1) 将上面两张表的内容分别写入两个本地文件,三列分别为设备名、设备数量和单价 2) 从本地文件读入数据,实现去重合并,输出结果的行数,并将结果写入本地文件, 目录命名为“q3_out1_学号” 3) 计算合并后的表数据各项总金额(Num × Price),将结果写入本地文件,目录 “q3_out2_学号”; 4) 使用 sbt 打包上述程序; 5) 通过 spark-submit 执行生成的 jar,查看运行结果。
好的,我明白了您的要求。下面是针对您的问题的解答:
1. 将数据写入本地文件
您可以使用以下代码将数据写入两个本地文件:
```scala
import java.io._
val data1 = List("Pad 1 999", "Workstation 2 25300", "Workstation 2 25300", "Workstation 38 13130", "Workstation 38 13130", "SSD 1 1060", "TPC 40 2865", "Workstation 38 13130", "Computer 2 9900", "HD 1 535", "PC 3 3600", "PC 2 2800", "Computer 2 9900", "HD 1 535", "Workstation 38 13130")
val data2 = List("TPC 114600", "HD 4825", "Computer 19800", "Pad 999", "Workstation 50600")
val file1 = new File("path/to/file1.txt")
val file2 = new File("path/to/file2.txt")
val bw1 = new BufferedWriter(new FileWriter(file1))
val bw2 = new BufferedWriter(new FileWriter(file2))
for (line <- data1) {
bw1.write(line)
bw1.newLine()
}
for (line <- data2) {
bw2.write(line)
bw2.newLine()
}
bw1.close()
bw2.close()
```
请将 `path/to/file1.txt` 和 `path/to/file2.txt` 替换为您想要保存文件的路径。
2. 去重合并并写入本地文件
您可以使用以下代码实现去重合并并将结果写入本地文件:
```scala
import scala.io.Source
val file1 = Source.fromFile("path/to/file1.txt")
val file2 = Source.fromFile("path/to/file2.txt")
val data1 = file1.getLines().toList
val data2 = file2.getLines().toList
file1.close()
file2.close()
val mergedData = (data1 ++ data2)
.map(line => {
val fields = line.split(' ')
(fields(0), fields(1).toInt, fields(2).toInt)
})
.groupBy(_._1)
.mapValues(_.map(row => (row._2, row._3)).reduce((a, b) => (a._1 + b._1, a._2)))
.toList
.sortBy(_._1)
val out1File = new File("q3_out1_学号")
val bw = new BufferedWriter(new FileWriter(out1File))
for (line <- mergedData) {
bw.write(s"${line._1} ${line._2._1} ${line._2._2}")
bw.newLine()
}
bw.close()
val result1 = mergedData.length
```
请将 `q3_out1_学号` 替换为您想要保存结果的文件名,并将 `学号` 替换为您的学号。
3. 计算各项总金额并写入本地文件
您可以使用以下代码计算各项总金额并将结果写入本地文件:
```scala
val result2 = mergedData.map(line => line._2._1 * line._2._2).sum
val out2File = new File("q3_out2_学号")
val bw = new BufferedWriter(new FileWriter(out2File))
bw.write(result2.toString)
bw.close()
```
请将 `q3_out2_学号` 替换为您想要保存结果的文件名,并将 `学号` 替换为您的学号。
4. 使用 sbt 打包
您可以使用以下命令使用 sbt 打包您的程序:
```
sbt package
```
生成的 jar 文件位于 `target/scala-2.12/` 目录下。
5. 使用 spark-submit 执行程序
您可以使用以下命令使用 spark-submit 执行您的程序:
```
spark-submit --class Main --master local[2] path/to/jar/file path/to/file1.txt path/to/file2.txt
```
请将 `Main` 替换为您打包后程序的主类名,将 `path/to/jar/file` 替换为您的 jar 文件路径,将 `path/to/file1.txt` 和 `path/to/file2.txt` 替换为您之前保存数据的文件路径。
阅读全文