sparkStreaming获取kafka数据(java版本)
时间: 2024-03-25 20:40:45 浏览: 70
以下是使用Java编写的Spark Streaming获取Kafka数据的示例代码:
```java
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
public class KafkaSparkStreamingExample {
public static void main(String[] args) throws InterruptedException {
// Kafka相关配置
String brokers = "localhost:9092";
String groupId = "test-group";
String topics = "test-topic";
// Spark相关配置
SparkConf conf = new SparkConf().setAppName("KafkaSparkStreamingExample").setMaster("local[*]");
JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(5));
// Kafka参数
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", brokers);
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", groupId);
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
// 订阅Kafka主题
Collection<String> topicsSet = Arrays.asList(topics.split(","));
JavaInputDStream<Tuple2<String, String>> messages = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topicsSet, kafkaParams)
);
// 处理消息
JavaPairDStream<String, String> pairs = messages.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
pairs.foreachRDD(rdd -> rdd.foreach(record -> System.out.println(record._2)));
// 启动流处理
streamingContext.start();
streamingContext.awaitTermination();
}
}
```
在这个示例代码中,我们使用了`KafkaUtils.createDirectStream()`方法订阅了一个Kafka主题,然后使用`mapToPair()`方法将消息转换为键值对形式的RDD,最后使用`foreachRDD()`方法遍历RDD并打印出每条记录的值。
阅读全文