sofa mq生产者和消费者代码
时间: 2024-12-16 17:31:08 浏览: 4
Sofa MQ(阿里巴巴开源的消息中间件)是一个分布式消息系统,用于在分布式应用之间解耦通信。在Sofa MQ中,生产者(Producer)负责发布消息到指定的主题(Topic),而消费者(Consumer)则订阅并消费这些消息。
下面是一些基本的生产者和消费者的示例代码(使用Java语言):
### 生产者代码
```java
import com.alipay.sofa.mq.api.Message;
import com.alipay.sofa.mq.api.SendResult;
public class SofaMQProducerExample {
private final String instanceId; // 消息队列实例ID
private final String topic; // 主题名
public void sendMessage(String message) {
try (Message msg = Message.createTextMessage(message)) {
SendResult result = SofaMQProducer.send(instanceId, topic, msg);
if (!result.isSuccess()) {
System.out.println("发送消息失败:" + result.getErrorMessage());
} else {
System.out.println("消息已成功发送到主题 " + topic);
}
} catch (Exception e) {
System.err.println("发送消息异常:" + e.getMessage());
}
}
}
```
### 消费者代码
```java
import com.alipay.sofa.mq.api.Message;
import com.alipay.sofa.mq.client.consumer.ConsumerConfig;
import com.alipay.sofa.mq.client.consumer.MessageListener;
public class SofaMQConsumerExample {
private final String instanceId; // 消息队列实例ID
private final String topic; // 主题名
public void consumeMessages() {
ConsumerConfig config = new ConsumerConfig();
config.setInstanceId(instanceId);
config.setTopic(topic);
try (SofaMQConsumer consumer = new SofaMQConsumer(config)) {
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
System.out.println("接收到消息:" + message.getBody());
}
@Override
public void onError(Throwable error) {
System.err.println("消费消息出错:" + error.getMessage());
}
});
// 开始接收消息
consumer.start();
} catch (Exception e) {
System.err.println("创建消费者异常:" + e.getMessage());
}
}
}
```
请注意,这只是一个简化的示例,实际使用时需要设置连接配置、错误处理等更多细节,并确保在完成后关闭资源。
阅读全文