使用 Flink scala 消费 Kafka 中 topic 为 topic1 的数据,根据数据 中不同的表将数据分别分发至kafka的 DWD层的 Topic中。使用Kafka 自带的消费者消费 Topic 的前 1 条数据,
时间: 2023-06-16 20:03:32 浏览: 688
以下是使用 Flink scala 消费 Kafka 中 topic 为 topic1 的数据,根据数据中不同的表将数据分别分发至 kafka 的 DWD 层的 Topic 中,并使用 Kafka 自带的消费者消费 Topic 的前 1 条数据的代码:
```scala
import java.util.Properties
import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer, KafkaDeserializationSchema, KafkaSerializationSchema}
import org.apache.flink.streaming.util.serialization.{JSONKeyValueDeserializationSchema, SimpleStringSchema}
object KafkaDataPipeline {
def main(args: Array[String]): Unit = {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// set up the Kafka properties
val kafkaProps = new Properties()
kafkaProps.setProperty("bootstrap.servers", "localhost:9092")
kafkaProps.setProperty("group.id", "test-group")
// set up the Kafka consumer to consume from topic1
val consumer = new FlinkKafkaConsumer("topic1", new JSONKeyValueDeserializationSchema(false), kafkaProps)
// set up the Kafka producers for each DWD layer topic
val topic1DWD1Producer = new FlinkKafkaProducer("topic1-dwd1", new MyKafkaSerializationSchema, kafkaProps)
val topic1DWD2Producer = new FlinkKafkaProducer("topic1-dwd2", new MyKafkaSerializationSchema, kafkaProps)
// create the Kafka stream
val kafkaStream = env.addSource(consumer)
// split the stream based on the table name in the data
val splitStream = kafkaStream.split(data => data.get("table_name") match {
case "table1" => List("dwd1")
case "table2" => List("dwd2")
case _ => List.empty[String]
})
// send the data to the appropriate DWD layer topic
splitStream.select("dwd1").addSink(topic1DWD1Producer)
splitStream.select("dwd2").addSink(topic1DWD2Producer)
// set up the Kafka consumer to consume from topic1-dwd1
val consumer2 = new FlinkKafkaConsumer("topic1-dwd1", new SimpleStringSchema, kafkaProps)
consumer2.setStartFromEarliest()
// create the Kafka stream and print the first message
val kafkaStream2 = env.addSource(consumer2)
kafkaStream2.print()
// execute the Flink program
env.execute("Kafka Data Pipeline")
}
}
class MyKafkaSerializationSchema extends SerializationSchema[String] with KafkaSerializationSchema[String] {
override def serialize(element: String): Array[Byte] = {
element.getBytes("UTF-8")
}
override def getTargetTopic(element: String): String = {
"dummy-topic"
}
override def serializeKey(element: String, timestamp: java.lang.Long): Array[Byte] = {
null
}
override def serializeValue(element: String, timestamp: java.lang.Long): Array[Byte] = {
serialize(element)
}
}
```
上述代码中,我们首先设置了 Kafka 的一些属性,然后创建了一个 Kafka 消费者,消费主题为 topic1。接着,我们创建了两个 Kafka 生产者,分别用于将数据发送到 DWD 层的两个主题。我们将流根据数据中的表名分成了两个流,然后把每个流发送到相应的 DWD 层主题。最后,我们又创建了一个 Kafka 消费者,消费主题为 topic1-dwd1,然后打印了第一条消息。
阅读全文