写一个flink写入es的sink
时间: 2023-03-31 17:01:59 浏览: 148
flink采集文本数据写入ES,flink消费kafka数据写入ES,以及一些flink相关的demo
可以使用 Flink 提供的 Elasticsearch Sink 将数据写入 Elasticsearch 中。具体步骤如下:
1. 引入 Elasticsearch Sink 的依赖:
```
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.12</artifactId>
<version>1.12.</version>
</dependency>
```
2. 创建 Elasticsearch Sink:
```
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSinkBuilder;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class MyElasticsearchSink {
public static ElasticsearchSink<Map<String, Object>> createSink(String indexName, String typeName, String clusterName, String[] hosts) {
List<HttpHost> httpHosts = new ArrayList<>();
for (String host : hosts) {
httpHosts.add(new HttpHost(host, 920, "http"));
}
ElasticsearchSink.Builder<Map<String, Object>> builder = new ElasticsearchSinkBuilder<>(httpHosts, new ElasticsearchSinkFunction<Map<String, Object>>() {
@Override
public void process(Map<String, Object> element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element, indexName, typeName));
}
private IndexRequest createIndexRequest(Map<String, Object> element, String indexName, String typeName) {
return Requests.indexRequest()
.index(indexName)
.type(typeName)
.source(JsonXContent.contentBuilder().map(element), XContentType.JSON);
}
});
builder.setBulkFlushMaxActions(100);
builder.setBulkFlushInterval(100);
builder.setRestClientFactory(restClientBuilder -> {
restClientBuilder.setMaxRetryTimeoutMillis(60000);
restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
httpClientBuilder.setMaxConnTotal(200);
httpClientBuilder.setMaxConnPerRoute(100);
return httpClientBuilder;
});
return new RestHighLevelClient(restClientBuilder);
});
return builder.build();
}
}
```
3. 使用 Elasticsearch Sink 将数据写入 Elasticsearch:
```
DataStream<Map<String, Object>> dataStream = ...;
String indexName = "my_index";
String typeName = "my_type";
String clusterName = "my_cluster";
String[] hosts = {"localhost"};
dataStream.addSink(MyElasticsearchSink.createSink(indexName, typeName, clusterName, hosts));
```
阅读全文