flink kafka 统计 代码
时间: 2023-09-09 12:10:23 浏览: 205
以下是使用Flink和Kafka进行统计的示例代码。
首先,需要导入相关的依赖:
```xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
```
接下来,创建一个Flink的StreamExecutionEnvironment:
```java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
```
然后,创建一个Kafka的消费者流:
```java
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), props);
DataStream<String> stream = env.addSource(consumer);
```
接着,对数据流进行处理和转换,例如统计单词出现的次数:
```java
DataStream<Tuple2<String, Integer>> counts = stream
.flatMap((String line, Collector<Tuple2<String, Integer>> out) -> {
for (String word : line.split(" ")) {
out.collect(new Tuple2<>(word, 1));
}
})
.keyBy(0)
.sum(1);
```
最后,将结果写入到Kafka的一个新的Topic中:
```java
Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", "localhost:9092");
FlinkKafkaProducer<Tuple2<String, Integer>> producer = new FlinkKafkaProducer<>("output-topic", new Tuple2SerializationSchema(), producerProps, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
counts.addSink(producer);
```
完整的代码示例:
```java
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;
import java.util.Properties;
public class WordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("input-topic", new SimpleStringSchema(), props);
DataStream<String> stream = env.addSource(consumer);
DataStream<Tuple2<String, Integer>> counts = stream
.flatMap((String line, Collector<Tuple2<String, Integer>> out) -> {
for (String word : line.split(" ")) {
out.collect(new Tuple2<>(word, 1));
}
})
.keyBy(0)
.sum(1);
Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", "localhost:9092");
FlinkKafkaProducer<Tuple2<String, Integer>> producer = new FlinkKafkaProducer<>("output-topic", new Tuple2SerializationSchema(), producerProps, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
counts.addSink(producer);
env.execute("Word Count");
}
private static class Tuple2SerializationSchema implements KafkaSerializationSchema<Tuple2<String, Integer>> {
@Override
public ProducerRecord<byte[], byte[]> serialize(Tuple2<String, Integer> element, @Nullable Long timestamp) {
return new ProducerRecord<>("output-topic", element.f0.getBytes(), element.f1.toString().getBytes());
}
}
}
```
阅读全文