flink 1.14版本 mongodb sink
时间: 2023-07-18 08:37:43 浏览: 134
在 Flink 1.14 版本中,MongoDB Sink 的使用方式与之前的版本有些不同。具体来说,需要使用新的 MongoDB Connector for Flink,该 Connector 与 Flink 的版本保持一致,可以在 Flink 官方网站上下载。下载完成后,需要将 Connector 的 JAR 包添加到 Flink 的 classpath 中。
在代码中,可以使用以下方式创建一个 MongoDB Sink:
```java
MongoDBOutputFormatConfig.Builder builder = MongoDBOutputFormatConfig.builder()
.setHosts("localhost:27017")
.setDatabase("mydb")
.setCollection("mycollection")
.setDocumentClass(MyType.class);
MongoDBOutputFormat<MyType> outputFormat = new MongoDBOutputFormat<>(builder.build());
DataStream<MyType> stream = ...
stream.writeUsingOutputFormat(outputFormat);
```
其中,`MyType` 是要写入到 MongoDB 的数据类型。需要注意的是,在 Flink 1.14 中,MongoDB Sink 不再是一个 DataStream Sink,而是一个 OutputFormat。因此,在将 Sink 添加到 DataStream 中时,需要使用 `writeUsingOutputFormat` 方法。
除了上述方式,Flink 1.14 还提供了一种更加简便的创建 MongoDB Sink 的方式,即通过 `MongoDB.sink()` 方法:
```java
DataStream<MyType> stream = ...
stream.sink(new MongoDBSink<>(MongoDB.sink(
"mongodb://localhost:27017/mydb.mycollection",
new MyTypeMapper())
));
```
这种方式可以更加方便地创建 MongoDB Sink,不需要手动创建 OutputFormat。
阅读全文