消息队列 第4.4章 PHP对接Kafka-在Swoole中使用Kafka 消息队列 第4.4章 PHP对接Kafka-在Swoole中使用Kafka

2天前

一、安装扩展

对于 Swoole 应用,官方推荐使用 phpkafka,由于是基于 swoole,所以 phpkafka 不需要 rdkafka 扩展的支持即可直接使用。

安装:https://gitee.com/longzhiyan/phpkafka

composer require longlang/phpkafka

二、生产者

生产者配置

https://file.lulublog.cn/images/3/2025/05/O86PURaKf56asAFu6b635RuKKus4sE.png

发送单个消息

use longlang\phpkafka\Producer\Producer;
use longlang\phpkafka\Producer\ProducerConfig;

$config = new ProducerConfig();
$config->setBootstrapServer('127.0.0.1:9092');
$config->setUpdateBrokers(true);
$config->setAcks(-1);
$producer = new Producer($config);
$topic = 'test';
$value = (string) microtime(true);
$key = uniqid('', true);
$producer->send('test', $value, $key);

批量发送消息

use longlang\phpkafka\Producer\ProduceMessage;
use longlang\phpkafka\Producer\Producer;
use longlang\phpkafka\Producer\ProducerConfig;

$config = new ProducerConfig();
$config->setBootstrapServer('127.0.0.1:9092');
$config->setUpdateBrokers(true);
$config->setAcks(-1);
$producer = new Producer($config);
$topic = 'test';
$partition0 = 0;
$partition1 = 1;
$producer->sendBatch([
   new ProduceMessage($topic, 'v1', 'k1', $partition0),
   new ProduceMessage($topic, 'v2', 'k2', $partition1),
]);

三、消费者

消费者配置

https://file.lulublog.cn/images/3/2025/05/kVs64y6546m5g6Xg5hQ5q6Gsy6VOVs.png

异步消费(回调)

use longlang\phpkafka\Consumer\ConsumeMessage;
use longlang\phpkafka\Consumer\Consumer;
use longlang\phpkafka\Consumer\ConsumerConfig;

function consume(ConsumeMessage $message)
{
   var_dump($message->getKey() . ':' . $message->getValue());
   // $consumer->ack($message); // autoCommit设为false时,手动提交
}
$config = new ConsumerConfig();
$config->setBroker('127.0.0.1:9092');
$config->setTopic('test'); // 主题名称
$config->setGroupId('testGroup'); // 分组ID
$config->setClientId('test'); // 客户端ID,不同的消费者进程请使用不同的设置
$config->setGroupInstanceId('test'); // 分组实例ID,不同的消费者进程请使用不同的设置
$config->setInterval(0.1);
$consumer = new Consumer($config, 'consume');
$consumer->start();

同步消费

use longlang\phpkafka\Consumer\Consumer;
use longlang\phpkafka\Consumer\ConsumerConfig;

$config = new ConsumerConfig();
$config->setBroker('127.0.0.1:9092');
$config->setTopic('test'); // 主题名称
$config->setGroupId('testGroup'); // 分组ID
$config->setClientId('test_custom'); // 客户端ID,不同的消费者进程请使用不同的设置
$config->setGroupInstanceId('test_custom'); // 分组实例ID,不同的消费者进程请使用不同的设置
$consumer = new Consumer($config);
while(true) {
   $message = $consumer->consume();
   if($message) {
       var_dump($message->getKey() . ':' . $message->getValue());
       $consumer->ack($message); // 手动提交
   }
   sleep(1);
}
阅读 48