python调用rocketmq
时间: 2023-09-08 21:11:59 浏览: 157
Python-rocketmqpython是一个基于rocketmqclientcpp封装的RocketMQPython客户端
Python调用RocketMQ的步骤如下:
1. 导入Producer或PushConsumer或PullConsumer类,这取决于你想要的消费方式。
2. 创建一个Producer或Consumer实例,并设置PID(生产者或消费者ID)。
3. 设置NameServer的地址,NameServer是RocketMQ的核心组件,用于管理和路由消息。
4. 如果是Producer,调用start()方法启动Producer;如果是Consumer,调用start()方法启动Consumer。
5. 如果是Producer,创建一个Message实例,并设置topic、keys和body等属性,然后调用send_sync()方法发送消息。
6. 如果是PushConsumer,定义一个回调函数来处理接收到的消息,然后调用subscribe()方法订阅指定的topic。
7. 如果是PullConsumer,调用pull()方法拉取消息,并遍历处理每条消息。
8. 在Producer或Consumer使用完后,调用shutdown()方法关闭Producer或Consumer。
示例代码如下:
```
# 生产者示例
from rocketmq.client import Producer, Message
producer = Producer('PID-001')
producer.set_namesrv_addr('ip:port')
producer.start()
msg = Message('rocket_mq_test_broadcast_topic')
msg.set_keys('2020-12-15')
msg.set_tags('explain')
msg.set_body('{"key":"value"}')
ret = producer.send_sync(msg)
print(ret.status, ret.msg_id, ret.offset)
producer.shutdown()
# PushConsumer示例
import time
from rocketmq.client import PushConsumer
def callback(msg):
print(msg)
consumer = PushConsumer('PID-001')
consumer.set_namesrv_addr('ip:port')
consumer.subscribe("rocket_mq_test_broadcast_topic", callback)
consumer.start()
while True:
time.sleep(30)
consumer.shutdown()
# PullConsumer示例
from rocketmq.client import PullConsumer, json
consumer = PullConsumer('PID-001')
consumer.set_namesrv_addr('ip:port')
consumer.start()
for msg in consumer.pull('rocket_mq_test_broadcast_topic'):
print(msg.tags)
print(msg.keys)
print(msg.id, msg.body)
print(msg.topic)
print(msg)
data = json.loads(str(msg)) # dict
consumer.shutdown()
```
请注意,上述示例代码中的'ip:port'需要替换为实际的RocketMQ NameServer的IP地址和端口号。<span class="em">1</span><span class="em">2</span><span class="em">3</span>
#### 引用[.reference_title]
- *1* *2* *3* [python操作rocket-mq](https://blog.csdn.net/hqh131360239/article/details/108703074)[target="_blank" data-report-click={"spm":"1018.2226.3001.9630","extra":{"utm_source":"vip_chatgpt_common_search_pc_result","utm_medium":"distribute.pc_search_result.none-task-cask-2~all~insert_cask~default-1-null.142^v93^chatsearchT3_1"}}] [.reference_item style="max-width: 100%"]
[ .reference_list ]
阅读全文