使用 Flink scala 消费 Kafka 中 topic 为 topic2 的数据,将数据分 别分发至 kafka 的 DWD 层的 Topic 中。使用 Kafka 自带的消费者消费 Topic 的前 1 条数据
时间: 2023-06-16 12:03:29 浏览: 136
以下是使用 Flink scala 消费 Kafka 中 topic 为 topic2 的数据,将数据分别分发至 kafka 的 DWD 层的 Topic 中,并使用 Kafka 自带的消费者消费 Topic 的前 1 条数据的示例代码:
```scala
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringSerializer
object KafkaToFlinkToKafkaExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Kafka consumer properties
val kafkaConsumerProps = new Properties()
kafkaConsumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
kafkaConsumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-kafka-consumer")
kafkaConsumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
kafkaConsumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
// Kafka producer properties
val kafkaProducerProps = new Properties()
kafkaProducerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
kafkaProducerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
kafkaProducerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
// Kafka topics
val sourceTopic = "topic2"
val targetTopic1 = "dwd_topic1"
val targetTopic2 = "dwd_topic2"
// Kafka consumer
val kafkaConsumer = new FlinkKafkaConsumer[String](sourceTopic, new SimpleStringSchema(), kafkaConsumerProps)
val kafkaStream = env.addSource(kafkaConsumer)
// Kafka producers
val kafkaProducer1 = new FlinkKafkaProducer[String](targetTopic1, new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), kafkaProducerProps)
val kafkaProducer2 = new FlinkKafkaProducer[String](targetTopic2, new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), kafkaProducerProps)
// Process stream and write to Kafka
kafkaStream.map(record => {
// Write to targetTopic1
kafkaProducer1.send(record)
// Write to targetTopic2
kafkaProducer2.send(record)
record
})
// Kafka consumer for targetTopic1
val kafkaConsumerTarget1 = new FlinkKafkaConsumer[String](targetTopic1, new SimpleStringSchema(), kafkaConsumerProps)
val kafkaStreamTarget1 = env.addSource(kafkaConsumerTarget1)
// Kafka consumer for targetTopic2
val kafkaConsumerTarget2 = new FlinkKafkaConsumer[String](targetTopic2, new SimpleStringSchema(), kafkaConsumerProps)
val kafkaStreamTarget2 = env.addSource(kafkaConsumerTarget2)
// Print first record from targetTopic1
kafkaStreamTarget1.print().setParallelism(1).first(1)
env.execute("KafkaToFlinkToKafkaExample")
}
}
```
在上述代码中,我们首先定义了 Kafka 的消费者和生产者的相关属性,以及源 topic 和目标 topic。然后,我们使用 Flink 的 `FlinkKafkaConsumer` 和 `FlinkKafkaProducer` 分别创建了 Kafka 的消费者和生产者,并将 Kafka 中的数据流读取到 Flink 中,然后对数据流进行处理,将数据分别写入到两个目标 topic 中。最后,我们通过创建两个 Kafka 消费者来消费目标 topic 中的数据,并使用 `print().setParallelism(1).first(1)` 打印出目标 topic1 的前 1 条数据。
注意:在实际生产环境中,需要根据需求对代码进行修改和优化,例如增加容错机制、设置 Flink 的 Checkpoint 等。
阅读全文