、使用Flink消费Kafka的dwd层数据,统计商城实时订单数量,将key设置成totalcount存入Redis中。使用redis cli以get key方式获取totalcount值
时间: 2024-05-15 19:15:37 浏览: 71
Flink读取Kafka数据保存到Redis的解决方案教程
首先需要创建一个Flink项目,使用Kafka作为数据源,实时消费dwd层数据,并对订单数量进行统计。需要用到以下依赖:
```xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.12.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.12.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.12.2</version>
</dependency>
```
然后需要编写一个 Flink 程序,从 Kafka 中读取订单数据,统计订单数量,将结果存入 Redis。
```java
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import java.util.Properties;
public class OrderCount {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置Kafka连接参数
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
// 创建Kafka消费者
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("dwd_order", new SimpleStringSchema(), props);
// 设置Kafka消费者从最新位置开始消费
consumer.setStartFromLatest();
// 创建订单流
DataStream<String> orderStream = env.addSource(consumer);
// 统计订单数量,并将结果存入Redis
FlinkJedisPoolConfig jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
.setHost("localhost")
.setPort(6379)
.build();
orderStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return new Tuple2<>("totalcount", 1);
}
})
.keyBy(0)
.sum(1)
.addSink(new RedisSink<>(jedisPoolConfig, new RedisMapper<Tuple2<String, Integer>>() {
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.SET);
}
@Override
public String getKeyFromData(Tuple2<String, Integer> data) {
return data.f0;
}
@Override
public String getValueFromData(Tuple2<String, Integer> data) {
return String.valueOf(data.f1);
}
}));
env.execute("OrderCount");
}
}
```
运行该程序后,即可实时从 Kafka 中读取订单数据,统计订单数量,并将结果存入 Redis 中。可以使用 redis-cli 命令行工具来查看 Redis 中的结果:
```
redis-cli> get totalcount
```
阅读全文