请使用java编写flink消费kafka写入hive集群的demo
时间: 2024-05-14 08:11:59 浏览: 121
kafka集群搭建和使用Java写kafka生产者消费者
以下是一个简单的Java实现,使用Flink消费Kafka并将数据写入Hive集群。请根据实际情况进行修改并适当添加错误处理。
```java
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
public class FlinkKafkaHiveDemo {
private static final String KAFKA_BOOTSTRAP_SERVERS = "localhost:9092";
private static final String KAFKA_TOPIC = "test";
private static final String HIVE_METASTORE_URI = "thrift://localhost:9083";
private static final String HIVE_DATABASE = "default";
private static final String HIVE_TABLE = "test";
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.disableOperatorChaining();
Properties kafkaProps = new Properties();
kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOTSTRAP_SERVERS);
kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink_consumer");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(KAFKA_TOPIC, new SimpleStringSchema(), kafkaProps);
kafkaConsumer.setStartFromEarliest();
DataStream<String> input = env.addSource(kafkaConsumer);
DataStream<String> transformed = input.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
// 进行数据转换
return value;
}
});
Properties hiveProps = new Properties();
hiveProps.setProperty("hive.metastore.uris", HIVE_METASTORE_URI);
HiveConf hiveConf = new HiveConf();
hiveConf.addResource(hiveProps);
Hive hive = Hive.get(hiveConf);
try {
Table table = new Table();
table.setDbName(HIVE_DATABASE);
table.setTableName(HIVE_TABLE);
table.setTableType("EXTERNAL_TABLE");
List<FieldSchema> columns = List.of(new FieldSchema("col1", TypeInfoFactory.stringTypeInfo.getTypeName(), ""));
table.setFields(columns);
table.getParameters().put("EXTERNAL", "TRUE");
table.getParameters().put("LOCATION", "/user/hive/warehouse/" + HIVE_DATABASE + ".db/" + HIVE_TABLE);
hive.createTable(table);
} catch (HiveException e) {
e.printStackTrace();
}
Configuration hadoopConf = new Configuration();
hadoopConf.set("fs.defaultFS", "hdfs://localhost:9000");
Path outputPath = new Path("/user/hive/warehouse/" + HIVE_DATABASE + ".db/" + HIVE_TABLE);
FileSystem fs = FileSystem.get(hadoopConf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
Properties kafkaProducerProps = new Properties();
kafkaProducerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOTSTRAP_SERVERS);
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(KAFKA_TOPIC, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), kafkaProducerProps, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
transformed.writeAsText("/tmp/flink-hive-output", org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE).setParallelism(1);
transformed.writeUsingOutputFormat(new HiveOutputFormat(hiveConf, HIVE_DATABASE, HIVE_TABLE)).setParallelism(1);
env.execute("Flink Kafka Hive Demo");
}
private static class HiveOutputFormat extends org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat<String> {
private final HiveConf hiveConf;
private final String database;
private final String table;
public HiveOutputFormat(HiveConf hiveConf, String database, String table) {
super();
this.hiveConf = hiveConf;
this.database = database;
this.table = table;
}
@Override
public org.apache.hadoop.mapred.RecordWriter getRecordWriter(FileSystem ignored, org.apache.hadoop.mapred.JobConf jobConf, String name, org.apache.hadoop.util.Progressable progressable) throws IOException {
try {
return new HiveRecordWriter(hiveConf, database, table);
} catch (HiveException e) {
throw new IOException(e);
}
}
}
private static class HiveRecordWriter implements org.apache.hadoop.mapred.RecordWriter<LongWritable, Text> {
private final HiveConf hiveConf;
private final String database;
private final String table;
private final org.apache.hadoop.hive.ql.metadata.Table hiveTable;
private final TextInputFormat inputFormat;
private final NullOutputFormat<Text, Text> outputFormat;
public HiveRecordWriter(HiveConf hiveConf, String database, String table) throws HiveException {
this.hiveConf = hiveConf;
this.database = database;
this.table = table;
this.hiveTable = Hive.get(hiveConf).getTable(database, table);
this.inputFormat = new TextInputFormat();
this.outputFormat = new NullOutputFormat<>();
}
@Override
public void write(LongWritable key, Text value) throws IOException {
try {
inputFormat.addInputPath(new org.apache.hadoop.mapred.FileSplit(new Path(value.toString()), 0, Long.MAX_VALUE, new String[0]));
org.apache.hadoop.mapred.RecordReader<LongWritable, Text> reader = inputFormat.getRecordReader(new org.apache.hadoop.mapred.FileSplit(new Path(value.toString()), 0, Long.MAX_VALUE, new String[0]), new org.apache.hadoop.mapred.JobConf(hiveConf), null);
org.apache.hadoop.mapred.OutputCollector<Text, Text> collector = outputFormat.getRecordWriter(new org.apache.hadoop.mapred.JobConf(hiveConf), null, null, null);
LongWritable keyWritable = reader.createKey();
Text valueWritable = reader.createValue();
while (reader.next(keyWritable, valueWritable)) {
collector.collect(null, valueWritable);
}
reader.close();
} catch (Exception e) {
throw new IOException(e);
}
}
@Override
public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException {
}
}
}
```
阅读全文