使用Flink消费Kafka中ProduceRecord主题的数据,统计在已经检验的产品中,各设备每5分钟生产产品总数,将结果存入Redis中,key值为“totalproduce”,value值为“设备id,最近五分钟生产总数”。使用redis cli以HGETALL key方式获取totalproduce值的Scala代码为
时间: 2024-02-09 07:09:43 浏览: 43
以下是你要求的Scala代码:
```
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema
import java.util.Properties
import org.apache.flink.streaming.connectors.redis.RedisSinkConfiguration
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase
import org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig
import org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig
import redis.clients.jedis.JedisCommands
case class Record(schema: String, payload: String)
object RedisExample extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
val stream = env.addSource(new FlinkKafkaConsumer("ProduceRecord", new JSONKeyValueDeserializationSchema(false), properties))
val recordStream = stream.map(kv => {
val schema = kv.getKey
val payload = kv.getValue
Record(schema, payload)
})
val processedStream = recordStream
.keyBy(record => record.payload.get("deviceid").asText())
.timeWindow(Time.minutes(5))
.apply((key, window, records, out: Collector[String]) => {
val count = records.size.toString
val deviceId = key
val outputValue = s"$deviceId,$count"
out.collect(outputValue)
})
val redisSinkConf = new FlinkJedisConfigBase.JedisPoolConfigBuilder()
.setHost("localhost")
.setPort(6379)
.setPassword("password")
.build()
val redisSink = new RedisSink[Record](redisSinkConf, new RedisMapper[Record] {
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET, "totalproduce")
}
override def getKeyFromData(data: Record): String = {
// Useless for this code
""
}
override def getValueFromData(data: Record): String = {
data.payload.get("productid").asText
}
})
processedStream.addSink(redisSink)
env.execute("Flink Redis Example")
}
```
注:该代码仅为演示使用。在实际生产环境中,应该根据实际需求修改调整。