用flink读取kafka代码
时间: 2023-05-28 11:03:32 浏览: 48
下面是使用Flink读取Kafka的Java代码示例:
```
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class KafkaConsumer {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("my-topic", new SimpleStringSchema(), properties);
DataStream<String> stream = env.addSource(consumer);
stream.print();
env.execute("Kafka Consumer");
}
}
```
在此示例中,我们使用FlinkKafkaConsumer从Kafka读取“my-topic”中的字符串消息,并将其打印到控制台。我们可以通过设置属性来配置Kafka连接,例如设置Kafka服务器的地址和端口,以及消费者组的ID。最后,我们使用env.execute()方法来启动Flink作业。