new SourceRecord 使用案例
时间: 2023-07-11 07:29:08 浏览: 55
在 Apache Kafka 中,SourceRecord 是一个用于表示单个消息的类。它包含了来自 Kafka 源头的消息的所有元数据,包括 topic、partition、offset、key 和 value 等信息。下面是一个使用 SourceRecord 的示例:
```java
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Field;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.connect.source.SourceTaskContext;
public class MySourceTask extends SourceTask {
private SourceTaskContext context;
public void start(Map<String, String> props) {
context = this.context;
}
public void stop() {
// Perform any necessary cleanup operations here
}
public void poll() throws InterruptedException {
// Poll for new records from the source system
// Construct a new SourceRecord for each record
String myTopic = "my-topic";
int myPartition = 0;
long myOffset = 12345L;
String myKey = "my-key";
String myValue = "my-value";
Schema valueSchema = Schema.STRING_SCHEMA;
Struct value = new Struct(valueSchema);
value.put("field1", "value1");
value.put("field2", "value2");
Map<String, String> sourcePartition = new HashMap<>();
sourcePartition.put("partition", String.valueOf(myPartition));
Map<String, Object> sourceOffset = new HashMap<>();
sourceOffset.put("offset", myOffset);
SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, myTopic, myPartition, Schema.STRING_SCHEMA, myKey, valueSchema, value);
context.sourceTask().commit();
}
public String version() {
return "1.0";
}
}
```
在这个示例中,我们创建了一个名为 `MySourceTask` 的子类,它实现了 `SourceTask` 接口。在 `poll()` 方法中,我们模拟从源系统中拉取了一条新的记录,并为其构建一个新的 `SourceRecord` 对象。然后,我们使用 `context.sourceTask().commit()` 方法将偏移量提交给 Kafka Connect 进行跟踪和管理。