使用Flink1.14.6实现索引数据到Elasticsearch7.17 同时实现自定义scheam 设置分片数和复制数 实现这样完整的程序代码
时间: 2023-06-24 22:03:40 浏览: 132
从文件读取数据,保存到ElasticSearch,使用flink框架
首先,需要引入Flink Elasticsearch Connector的依赖:
```xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.12</artifactId>
<version>1.14.6</version>
</dependency>
```
然后,可以使用如下代码实现将数据索引到Elasticsearch,并设置自定义的schema和分片数/复制数:
```java
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactoryImpl;
import org.apache.flink.streaming.connectors.elasticsearch7.index.DynamicIndex;
import org.apache.flink.streaming.connectors.elasticsearch7.index.IndexGenerator;
import org.apache.flink.streaming.connectors.elasticsearch7.index.IndexGeneratorFactory;
import org.apache.flink.streaming.connectors.elasticsearch7.index.RandomIndexGenerator;
import org.apache.flink.streaming.connectors.elasticsearch7.index.RoundRobinIndexGenerator;
import org.apache.flink.streaming.connectors.elasticsearch7.index.SimpleIndexGenerator;
import org.apache.flink.streaming.connectors.elasticsearch7.index.StaticIndex;
import org.apache.flink.streaming.connectors.elasticsearch7.index.TypedIndex;
import org.apache.flink.streaming.connectors.elasticsearch7.index.bulk.FlushBackoffType;
import org.apache.flink.streaming.connectors.elasticsearch7.index.bulk.IndexBulkProcessor;
import org.apache.flink.streaming.connectors.elasticsearch7.index.bulk.IndexRequest;
import org.apache.flink.streaming.connectors.elasticsearch7.index.bulk.RetryRejectedBulkProcessor;
import org.apache.flink.streaming.connectors.elasticsearch7.index.bulk.options.BulkFlushBackoffOptions;
import org.apache.flink.streaming.connectors.elasticsearch7.index.bulk.options.BulkOptions;
import org.apache.flink.streaming.connectors.elasticsearch7.index.bulk.options.RetryRejectedOptions;
import org.apache.flink.streaming.connectors.elasticsearch7.index.bulk.options.RetryOptions;
import org.apache.flink.streaming.connectors.elasticsearch7.index.bulk.retry.ActionRequestRetryBehavior;
import org.apache.flink.streaming.connectors.elasticsearch7.index.bulk.retry.BulkFlushBackoffPolicy;
import org.apache.flink.streaming.connectors.elasticsearch7.index.bulk.retry.DelayRetryBackoffPolicy;
import org.apache.flink.streaming.connectors.elasticsearch7.index.bulk.retry.ExponentialBackoffDelayPolicy;
import org.apache.flink.streaming.connectors.elasticsearch7.index.bulk.retry.NoRetryActionRequestRetryBehavior;
import org.apache.flink.streaming.connectors.elasticsearch7.index.bulk.retry.RetryActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch7.index.selector.IndexSelector;
import org.apache.flink.streaming.connectors.elasticsearch7.index.selector.TimestampedIndexSelector;
import org.apache.flink.streaming.connectors.elasticsearch7.index.selector.UnboundedCountIndexSelector;
import org.apache.flink.streaming.connectors.elasticsearch7.index.selector.UnboundedCountWithTimestampedIndexSelector;
import org.apache.flink.streaming.connectors.elasticsearch7.index.selector.VersionedIndexSelector;
import org.apache.flink.streaming.connectors.elasticsearch7.index.shadowflush.ShadowFlushContext;
import org.apache.flink.streaming.connectors.elasticsearch7.index.shadowflush.ShadowFlushIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.index.shadowflush.ShadowFlushIndexerFactory;
import org.apache.flink.streaming.connectors.elasticsearch7.index.shadowflush.ShadowFlushStrategy;
import org.apache.flink.streaming.connectors.elasticsearch7.index.shadowflush.ShadowFlushStrategyFactory;
import org.apache.flink.streaming.connectors.elasticsearch7.index.shadowflush.ShadowFlushType;
import org.apache.flink.streaming.connectors.elasticsearch7.index.shadowflush.retry.NoRetryShadowFlushRetryBehavior;
import org.apache.flink.streaming.connectors.elasticsearch7.index.shadowflush.retry.ShadowFlushRetryBehavior;
import org.apache.flink.streaming.connectors.elasticsearch7.index.shadowflush.retry.ShadowFlushRetryFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch7.index.shadowflush.retry.shadowflushing.ShadowFlushingBackoffPolicy;
import org.apache.flink.streaming.connectors.elasticsearch7.index.shadowflush.retry.shadowflushing.ShadowFlushingDelayPolicy;
import org.apache.flink.streaming.connectors.elasticsearch7.index.shadowflush.retry.shadowflushing.ShadowFlushingRetryBackoffPolicy;
import org.apache.flink.streaming.connectors.elasticsearch7.index.shadowflush.retry.shadowflushing.ShadowFlushingRetryOptions;
import org.apache.flink.streaming.connectors.elasticsearch7.index.shadowflush.retry.shadowflushing.ShadowFlushingRetryRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch7.index.shadowflush.retry.shadowflushing.ShadowFlushingRetryer;
import org.apache.flink.streaming.connectors.elasticsearch7.index.shadowflush.retry.shadowflushing.ShadowFlushingStrategyFactory;
import org.apache.flink.streaming.connectors.elasticsearch7.index.shadowflush.retry.shadowflushing.ShadowFlushingType;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class IndexToElasticsearchWithSchema {
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final String indexName = params.get("indexName", "my_index");
final String clusterName = params.get("clusterName", "elasticsearch");
final String esHost = params.get("esHost", "localhost");
final String esPort = params.get("esPort", "9200");
final int parallelism = params.getInt("parallelism", 1);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
DataStream<Map<String, Object>> dataStream = env.fromElements(
Collections.singletonMap("name", "Alice"),
Collections.singletonMap("name", "Bob"),
Collections.singletonMap("name", "Charlie")
);
ElasticsearchSink.Builder<Map<String, Object>> esSinkBuilder = new ElasticsearchSink.Builder<>(
Collections.singletonList(new IngestDocument()), new ElasticsearchSinkFunction<Map<String, Object>>() {
public void process(Map<String, Object> element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(indexName, element));
}
});
esSinkBuilder.setRestClientFactory(new RestClientFactoryImpl(clusterName, esHost, esPort));
esSinkBuilder.setBulkFlushMaxActions(100);
IndexGenerator<Map<String, Object>> indexGenerator = new SimpleIndexGenerator<>(indexName);
IndexSelector<Map<String, Object>> indexSelector = new VersionedIndexSelector<>(indexGenerator);
BulkOptions bulkOptions = new BulkOptions();
bulkOptions.setFlushBackoffType(FlushBackoffType.EXPONENTIAL);
bulkOptions.setFlushBackoffNumRetries(3);
bulkOptions.setFlushBackoffDelay(1000L);
bulkOptions.setBulkFlushInterval(1000L);
bulkOptions.setBulkFlushMaxActions(100);
bulkOptions.setBulkFlushMaxSizeMb(10L);
bulkOptions.setBulkFlushBackoffOptions(
new BulkFlushBackoffOptions(
new DelayRetryBackoffPolicy(
new ExponentialBackoffDelayPolicy(1000L, 1.5f),
RetryOptions.TRY_ONCE_ONLY
),
RetryOptions.TRY_ONCE_ONLY
)
);
ActionRequestRetryBehavior<Map<String, Object>> retryBehavior = new NoRetryActionRequestRetryBehavior<>();
ActionRequestFailureHandler<Map<String, Object>> failureHandler = new RetryActionRequestFailureHandler<>(retryBehavior);
IndexBulkProcessor<Map<String, Object>> bulkProcessor = new IndexBulkProcessor<>(bulkOptions, failureHandler);
ShadowFlushStrategy<Map<String, Object>> shadowFlushStrategy = ShadowFlushStrategyFactory.create(
ShadowFlushType.ASYNCHRONOUS,
indexSelector,
bulkProcessor,
new ShadowFlushIndexerFactory<Map<String, Object>>() {
public ShadowFlushIndexer<Map<String, Object>> create(ShadowFlushContext context) {
return new IngestDocument();
}
},
new ShadowFlushRetryer<Map<String, Object>>() {
public boolean retryOnThrowable(Throwable throwable) {
return false;
}
public void retry(Runnable runnable) {
runnable.run();
}
},
new ShadowFlushingRetryBackoffPolicy(
new ShadowFlushingDelayPolicy(
new ExponentialBackoffDelayPolicy(1000L, 1.5f),
RetryOptions.TRY_ONCE_ONLY
),
RetryOptions.TRY_ONCE_ONLY
)
);
ShadowFlushRetryBehavior<Map<String, Object>> shadowFlushRetryBehavior = new NoRetryShadowFlushRetryBehavior<>();
ShadowFlushRetryFailureHandler<Map<String, Object>> shadowFlushRetryFailureHandler = new ShadowFlushingRetryRequestFailureHandler<>(shadowFlushRetryBehavior);
esSinkBuilder.setBulkProcessorFactory(
new IndexBulkProcessor.Factory<Map<String, Object>>() {
public IndexBulkProcessor<Map<String, Object>> createBulkProcessor() {
return new RetryRejectedBulkProcessor<>(bulkProcessor, 3, shadowFlushStrategy, shadowFlushRetryFailureHandler);
}
}
);
esSinkBuilder.setIndexSelector(indexSelector);
esSinkBuilder.setFailureHandler(new RetryActionRequestFailureHandler<>(retryBehavior));
ElasticsearchSink<Map<String, Object>> esSink = esSinkBuilder.build();
dataStream.map(new MapFunction<Map<String, Object>, Map<String, Object>>() {
@Override
public Map<String, Object> map(Map<String, Object> value) throws Exception {
value.put("timestamp", System.currentTimeMillis());
return value;
}
}).addSink(esSink);
env.execute("Index to Elasticsearch with Schema");
}
private static IndexRequest createIndexRequest(String indexName, Map<String, Object> document) {
return new IndexRequest(indexName, "_doc", null, null, document);
}
private static class IngestDocument implements ElasticsearchSinkFunction<Map<String, Object>>, ShadowFlushIndexer<Map<String, Object>> {
@Override
public void process(Map<String, Object> element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest("my_index", element));
}
@Override
public void process(Map<String, Object> element, ShadowFlushContext context, RequestIndexer indexer) {
indexer.add(createIndexRequest("my_index", element));
}
}
}
```
在这个代码中,我们使用了自定义的索引名称和文档类型,这些可以根据实际需求进行修改。同时,我们也设置了自定义的分片数和复制数,可以根据实际需求进行修改。在这个例子中,我们使用了三个线程并行处理数据,并将数据索引到名为“my_index”的索引中。
阅读全文