阅读量:3
要确认消息已被正确处理,您可以使用 PHP RdKafka 扩展的 ack 方法
// 创建消费者
$conf = new \RdKafka\Conf();
$conf->set('group.id', 'myGroup');
$conf->set('bootstrap.servers', 'localhost:9092');
$consumer = new \RdKafka\KafkaConsumer($conf);
// 订阅主题
$consumer->subscribe(['myTopic']);
while (true) {
// 拉取消息
$message = $consumer->consume(120*1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "Reached end of partition event\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
case RD_KAFKA_RESP_ERR__PARTITION_NOT_FOUND:
echo "Partition not found\n";
break;
case RD_KAFKA_RESP_ERR__UNKNOWN:
throw new \Exception($message->errstr(), $message->err);
default:
if ($message->err == RD_KAFKA_RESP_ERR_NO_ERROR) {
// 消息已处理,确认
$consumer->ack($message);
echo "Message consumed and acknowledged\n";
} else {
throw new \Exception($message->errstr(), $message->err);
}
break;
}
}
在这个示例中,我们创建了一个 Kafka 消费者,订阅了名为 “myTopic” 的主题。然后,我们进入一个无限循环,不断从 Kafka 拉取消息。当成功拉取到消息时($message->err == RD_KAFKA_RESP_ERR_NO_ERROR),我们调用 ack 方法来确认消息已被正确处理。如果发生错误,我们将抛出异常。
以上就是关于“PHP rdkafka怎样确认消息”的相关介绍,筋斗云是国内较早的云主机应用的服务商,拥有10余年行业经验,提供丰富的云服务器、租用服务器等相关产品服务。云服务器资源弹性伸缩,主机vCPU、内存性能强悍、超高I/O速度、故障秒级恢复;电子化备案,提交快速,专业团队7×24小时服务支持!
简单好用、高性价比云服务器租用链接:https://www.jindouyun.cn/product/cvm