用java写一个spark消费kafka的demo
时间: 2023-07-01 08:05:10 浏览: 104
以下是使用Java编写的Spark消费Kafka的示例代码:
```
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
public class SparkKafkaConsumerDemo {
public static void main(String[] args) throws InterruptedException {
String brokers = "localhost:9092";
String groupId = "group1";
String topics = "test_topic";
SparkConf sparkConf = new SparkConf().setAppName("KafkaSparkStreamingDemo").setMaster("local[*]");
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1));
Collection<String> topicSet = Arrays.asList(topics.split(","));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topicSet, kafkaParams)
);
stream.map(new Function<ConsumerRecord<String, String>, String>() {
@Override
public String call(ConsumerRecord<String, String> record) throws Exception {
return record.value();
}
}).print();
streamingContext.start();
streamingContext.awaitTermination();
}
}
```
在此示例中,我们使用Spark Streaming来消费Kafka中的消息。我们设置了Kafka的相关参数,包括broker地址、消费者组ID、序列化和反序列化类等。然后我们创建一个直接流,订阅指定的主题,并使用预设的Kafka参数。最后,我们将每个记录的值提取出来,并将其打印到控制台。
请注意,这只是一个简单的示例代码,你需要根据你的实际情况进行修改。
阅读全文