基于Flink获取kafka数据并打印
时间: 2024-01-13 13:05:13 浏览: 33
可以通过Flink提供的Kafka Consumer来获取Kafka数据,并通过Flink提供的Print Sink将数据打印出来。以下是一个示例代码:
```
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import java.util.Properties;
public class KafkaToFlink {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置Kafka相关属性
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
// 创建Kafka Consumer并添加到Flink环境中
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("my-topic", new SimpleStringSchema(), props);
consumer.setStartFromEarliest();
env.addSource(consumer)
.print(); // 将数据打印出来
env.execute("Kafka To Flink");
}
}
```
其中,`bootstrap.servers`指定Kafka的地址和端口,`group.id`指定消费者组的名称,`my-topic`指定要消费的Kafka主题。`setStartFromEarliest()`表示从最早的数据开始消费,如果要从最新的数据开始消费可以使用`setStartFromLatest()`方法。最后通过`print()`将数据打印出来。