如何获取kafka服务器“127.0.0.1“,topic:“test“获取消息
时间: 2024-04-12 11:27:46 浏览: 133
要在 Kafka 服务器 "127.0.0.1" 上的主题 "test" 中获取消息,你可以使用 rdkafka 扩展提供的 KafkaConsumer 类。以下是一个示例代码,演示如何连接到 Kafka 服务器并消费消息:
```php
<?php
$conf = new RdKafka\Conf();
$conf->set('bootstrap.servers', '127.0.0.1');
$consumer = new RdKafka\KafkaConsumer($conf);
$consumer->subscribe(['test']);
while (true) {
$message = $consumer->consume(1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
echo $message->payload . PHP_EOL;
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
break;
default:
echo '错误:' . $message->errstr() . PHP_EOL;
break;
}
}
?>
```
在上面的示例中,我们首先创建了一个 KafkaConsumer 对象,并设置了 bootstrap.servers 为 "127.0.0.1",这是 Kafka 服务器的地址。然后使用 `subscribe` 方法订阅了主题 "test"。
在 while 循环中,我们使用 `consume` 方法来获取消息。参数 1000 表示等待的超时时间为 1 秒。根据返回的消息的错误代码,我们进行相应的处理。成功接收到消息时,你可以在 `echo` 语句中执行你的自定义逻辑。
请确保在运行此代码之前,你已经正确安装并启动了 Kafka 服务器,并且正确配置了主题 "test"。如果有任何问题,请随时向我提问。
阅读全文