用java写一段Flink消费kafka的代码
时间: 2023-06-30 21:25:48 浏览: 139
好的,以下是使用Java编写的Flink消费Kafka的示例代码:
```
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) throws Exception {
// 设置执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 配置Kafka连接信息
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test-group");
// 创建Kafka消费者
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test-topic",
new SimpleStringSchema(), properties);
// 从最早的消息开始消费
consumer.setStartFromEarliest();
// 添加Kafka消费者到执行环境
DataStream<String> stream = env.addSource(consumer);
// 打印消费的消息
stream.print();
// 执行程序
env.execute("Kafka Consumer Example");
}
}
```
在上面的示例代码中,我们使用`FlinkKafkaConsumer`来创建Kafka消费者,并将其添加到Flink的执行环境中。我们还设置了消费者的属性,例如Kafka的连接信息和消费组ID。最后,我们将从Kafka中消费的消息打印出来,并使用`execute`方法来启动Flink程序的执行。
阅读全文