springboot集成pulsar集群
时间: 2023-05-31 20:03:55 浏览: 546
1. 环境准备
- Pulsar集群
- Java 8
- Spring Boot 2.x
- Pulsar Java Client
2. 添加依赖
在`pom.xml`中添加以下依赖:
```xml
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-spring-boot-starter</artifactId>
<version>${pulsar.version}</version>
</dependency>
```
其中`${pulsar.version}`为Pulsar的版本号。
3. 配置Pulsar连接信息
在`application.yml`中添加以下配置:
```yaml
pulsar:
serviceUrl: pulsar://localhost:6650
authPluginClassName: org.apache.pulsar.client.impl.auth.AuthenticationToken
authParams: token:eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
```
其中`serviceUrl`为Pulsar集群连接地址,`authPluginClassName`和`authParams`为认证信息,可以根据实际情况进行修改。
4. 发送消息
在Spring Boot中可以使用`PulsarTemplate`来发送消息,示例代码如下:
```java
import org.apache.pulsar.client.api.MessageId;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.util.concurrent.ListenableFutureTask;
import org.springframework.util.concurrent.ListenableFutureTaskAdapter;
@Service
public class PulsarProducerService {
@Autowired
private PulsarTemplate<String> pulsarTemplate;
public void sendMessage(String topic, String message) {
ListenableFutureTask<MessageId> future = new ListenableFutureTaskAdapter<>(() -> pulsarTemplate.send(topic, message));
future.addCallback(new ListenableFutureCallback<MessageId>() {
@Override
public void onFailure(Throwable ex) {
System.out.println("Send message failed: " + ex.getMessage());
}
@Override
public void onSuccess(MessageId result) {
System.out.println("Send message success: " + result);
}
});
future.run();
}
}
```
在上述代码中,`PulsarTemplate`使用泛型`<String>`,表示发送的消息为字符串类型。`sendMessage`方法接收两个参数,分别为消息的主题和内容。发送消息的过程中,使用`ListenableFuture`来处理异步回调。在回调函数中,可以根据发送结果进行相应的处理。
5. 接收消息
在Spring Boot中可以使用`PulsarConsumerFactory`来创建消费者,示例代码如下:
```java
import org.apache.pulsar.client.api.Consumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class PulsarConsumerService {
@Autowired
private PulsarConsumerFactory pulsarConsumerFactory;
public void receiveMessage(String topic, ConsumerMessageHandler<String> handler) throws Exception {
Consumer<String> consumer = pulsarConsumerFactory.createConsumer(topic, "my-subscription", String.class);
consumer.subscribe();
while (true) {
String message = consumer.receive().getValue();
handler.handle(message);
consumer.acknowledgeAsync(consumer.getLastMessageId());
}
}
}
```
在上述代码中,`PulsarConsumerFactory`使用泛型`<String>`,表示接收消息的类型为字符串。`receiveMessage`方法接收两个参数,分别为消息的主题和消息处理器。在接收消息的过程中,使用`while`循环不断接收消息,并交给消息处理器进行处理。处理完成后,使用`acknowledgeAsync`方法对消息进行确认。
6. 运行测试
在完成上述步骤后,可以编写测试代码来测试消息的发送和接收。示例代码如下:
```java
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class PulsarApplicationTests {
@Autowired
private PulsarProducerService pulsarProducerService;
@Autowired
private PulsarConsumerService pulsarConsumerService;
@Test
void sendMessage() {
pulsarProducerService.sendMessage("my-topic", "Hello, Pulsar!");
}
@Test
void receiveMessage() throws Exception {
pulsarConsumerService.receiveMessage("my-topic", message -> {
System.out.println("Received message: " + message);
});
}
}
```
在运行测试代码后,可以在控制台中看到消息发送和接收的相关信息。
阅读全文