没有合适的资源?快使用搜索试试~ 我知道了~
首页RabbitMQ消息模式之Confirm确认消息
理解Confirm消息确认机制 消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答。生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心保障 消费端代码: package com.ue.confirm; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Queuein
资源详情
资源评论
资源推荐

RabbitMQ消息模式之消息模式之Confirm确认消息确认消息
理解理解Confirm消息确认机制消息确认机制
消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答。生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,这种方式也是消息的
可靠性投递的核心保障
消费端代码:
package com.ue.confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Consumer {
public static void main(String[] args) throws Exception {
//1、创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.14.758");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2、获取Connection
Connection connection = connectionFactory.newConnection();
//3、通过Connection创建一个新的Channel
Channel channel = connection.createChannel();
String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.#";
String queueName = "test_confirm_queue";
//4、声明交换机和队列 然后进行绑定设置, 最后制定路由Key
channel.exchangeDeclare(exchangeName, "topic", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//5、创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);
while(true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("消费端: " + msg);
}
}
}
生产端代码:
package com.ue.confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
public class Producer {
public static void main(String[] args) throws Exception {
//1、创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.14.758");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");


















安全验证
文档复制为VIP权益,开通VIP直接复制

评论0