使用 Flink 消费 Kafka 中 topic 为 topic1 的数据,根据数据 中不同的表将数据分别分发至kafka的 DWD层的 Topic中。使用Kafka 自带的消费者消费 Topic 的前 1 条数据,
时间: 2023-06-16 15:03:39 浏览: 204
Flink消费Kafka数据1
首先,你需要编写 Flink 程序来消费 Kafka 中的数据,并根据不同的表将数据分发至不同的 DWD Topic 中。可以使用 Flink 的 Kafka Consumer API 来消费 Kafka 中的数据,然后使用 Flink 的 SplitStream 和 SideOutput 将数据分发至不同的 DWD Topic 中。
以下是一个简单的示例代码:
```java
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.api.functions.source.SourceFunction.TimestampedSourceContext;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaToFlinkToKafka {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置 Kafka Consumer 的配置
Properties consumerProps = new Properties();
consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group");
consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 创建 Kafka Consumer
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("topic1", new SimpleStringSchema(), consumerProps);
// 将 Consumer 添加到 Flink 中
DataStream<String> stream = env
.addSource(consumer)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(10)) {
@Override
public long extractTimestamp(String element) {
// 从数据中提取时间戳
return Long.parseLong(element.split(",")[0]);
}
});
// 使用 SplitStream 和 SideOutput 将数据分发至不同的 DWD Topic 中
DataStream<String> table1Stream = stream
.process(new ProcessFunction<String, String>() {
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
// 根据数据中的表名将数据分发至不同的 DWD Topic 中
if (value.contains("table1")) {
ctx.output("dwd-table1", value);
} else if (value.contains("table2")) {
ctx.output("dwd-table2", value);
} else if (value.contains("table3")) {
ctx.output("dwd-table3", value);
}
}
}).getSideOutput(new OutputTag<String>("dwd-table1") {});
DataStream<String> table2Stream = stream.getSideOutput(new OutputTag<String>("dwd-table2") {});
DataStream<String> table3Stream = stream.getSideOutput(new OutputTag<String>("dwd-table3") {});
// 设置 Kafka Producer 的配置
Properties producerProps = new Properties();
producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 创建 Kafka Producer
FlinkKafkaProducer<String> producer1 = new FlinkKafkaProducer<>("dwd-table1", new SimpleStringSchema(), producerProps);
FlinkKafkaProducer<String> producer2 = new FlinkKafkaProducer<>("dwd-table2", new SimpleStringSchema(), producerProps);
FlinkKafkaProducer<String> producer3 = new FlinkKafkaProducer<>("dwd-table3", new SimpleStringSchema(), producerProps);
// 将分发后的数据写入 Kafka 中
table1Stream.addSink(producer1);
table2Stream.addSink(producer2);
table3Stream.addSink(producer3);
// 执行程序
env.execute("KafkaToFlinkToKafka");
}
}
```
接下来,你可以使用 Kafka 自带的消费者来消费 DWD Topic 的前 1 条数据,示例代码如下:
```java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
// 设置 Kafka Consumer 的配置
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kafka-consumer-group");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 创建 Kafka Consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅 DWD Topic
consumer.subscribe(Collections.singletonList("dwd-table1"));
// 消费前 1 条数据
int count = 0;
while (count < 1) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
count++;
if (count >= 1) {
break;
}
}
}
// 关闭 Kafka Consumer
consumer.close();
}
}
```
注意,这只是一个简单的示例代码,实际使用时需要根据具体情况进行修改。
阅读全文