es异步批量增添数据java示例
时间: 2023-07-04 22:16:46 浏览: 302
以下是一个使用 Elasticsearch Java API 进行异步批量增添数据的示例:
```java
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
public class ElasticSearchAsyncBatchInsert {
private final RestHighLevelClient client;
private final String indexName;
public ElasticSearchAsyncBatchInsert(RestHighLevelClient client, String indexName) {
this.client = client;
this.indexName = indexName;
}
public CompletableFuture<BulkResponse> insert(List<String> jsonDocuments) {
BulkRequest bulkRequest = new BulkRequest();
for (String json : jsonDocuments) {
IndexRequest request = new IndexRequest(indexName);
request.source(json, XContentType.JSON);
bulkRequest.add(request);
}
CompletableFuture<BulkResponse> future = new CompletableFuture<>();
client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkResponse) {
RestStatus status = bulkResponse.status();
if (status != RestStatus.OK) {
future.completeExceptionally(new RuntimeException("Failed to insert data: " + status));
} else {
future.complete(bulkResponse);
}
}
@Override
public void onFailure(Exception e) {
future.completeExceptionally(e);
}
});
return future;
}
public static void main(String[] args) throws IOException, InterruptedException {
RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(new HttpHost("localhost", 9200)));
ElasticSearchAsyncBatchInsert batchInsert = new ElasticSearchAsyncBatchInsert(client, "my_index");
List<String> jsonDocuments = new ArrayList<>();
jsonDocuments.add("{\"name\": \"John\", \"age\": 30}");
jsonDocuments.add("{\"name\": \"Mary\", \"age\": 25}");
jsonDocuments.add("{\"name\": \"Mike\", \"age\": 35}");
CompletableFuture<BulkResponse> future = batchInsert.insert(jsonDocuments);
future.whenComplete(new BiConsumer<BulkResponse, Throwable>() {
@Override
public void accept(BulkResponse bulkResponse, Throwable throwable) {
if (throwable != null) {
throwable.printStackTrace();
} else {
System.out.println("Number of inserted documents: " + bulkResponse.getItems().length);
}
}
});
client.close();
}
}
```
以上代码中,`ElasticSearchAsyncBatchInsert` 类封装了异步插入数据的操作。在 `insert` 方法中,我们首先创建一个 `BulkRequest` 对象,将所有需要插入的数据添加到该对象中。然后使用 `client.bulkAsync` 方法进行异步的批量插入操作。
在 `main` 方法中,我们创建了一个 `ElasticSearchAsyncBatchInsert` 对象,并向其传递了需要插入的数据。然后我们调用 `insert` 方法进行异步插入操作,同时使用 `whenComplete` 方法添加一个回调函数,在插入操作完成后输出插入的结果。
需要注意的是,在异步插入操作中,我们使用了 Java 8 的 `CompletableFuture` 类来处理异步结果。使用 `CompletableFuture` 可以让我们更加方便地进行异步编程。
阅读全文