消息队列 第4.2章-PHP对接Kafka-简单实例 消息队列 第4.2章-PHP对接Kafka-简单实例

2天前

一、编写生产者端

我们首先需要创建一个生产者,并向其添加代理

$conf = new RdKafka\Conf();
$conf->set('log_level', (string) LOG_DEBUG);
$conf->set('debug', 'all');
$rk = new RdKafka\Producer($conf);
$rk->addBrokers("127.0.0.1:9092");

创建主题并生成消息

$topic = $rk->newTopic("test");
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "我是生产者1");
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "我是生产者2");
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "我是生产者3");
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "我是生产者4");

这里我只是用简单文本消息而已,实际更多的是用 json

二、编写消费者端逻辑

我们首先需要创建一个的消费者,因 Low-level consumer 即将弃用,官方建议直接 High-level consumer,代码如下

$conf = new RdKafka\Conf();

// Configure the group.id. All consumer with the same group.id will consume
// 设置分组
$conf->set('group.id', 'ldGroup');

// 设置代理
$conf->set('metadata.broker.list', '127.0.0.1');

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'earliest': start from the beginning
$conf->set('auto.offset.reset', 'earliest');

// Emit EOF event when reaching the end of a partition
$conf->set('enable.partition.eof', 'true');

$consumer = new RdKafka\KafkaConsumer($conf);

// 订阅主题
$consumer->subscribe(['test']);

echo "等待消息中。。。\n";
//业务逻辑
while (true) {
   $message = $consumer->consume(120*1000);
   switch ($message->err) {
       case RD_KAFKA_RESP_ERR_NO_ERROR:
           echo '收到消息:'.$message->payload."\n";
           break;
       case RD_KAFKA_RESP_ERR__PARTITION_EOF:
           echo "No more messages; will wait for more\n";
           break;
       case RD_KAFKA_RESP_ERR__TIMED_OUT:
           echo "Timed out\n";
           break;
       default:
           throw new \Exception($message->errstr(), $message->err);
           break;
   }
}

测试一下,我们运行消费者代码 php consumer.php,然后打开新的 Terminal,再运行 php producer.php

https://file.lulublog.cn/images/3/2025/05/UEosSeZuPOO3U8EZznREjevrSRVze3.jpg

阅读 39