一、安装扩展
对于 Swoole 应用,官方推荐使用 phpkafka,由于是基于 swoole,所以 phpkafka 不需要 rdkafka 扩展的支持即可直接使用。
安装:https://gitee.com/longzhiyan/phpkafka
composer require longlang/phpkafka
二、生产者
生产者配置
发送单个消息
use longlang\phpkafka\Producer\Producer;
use longlang\phpkafka\Producer\ProducerConfig;
$config = new ProducerConfig();
$config->setBootstrapServer('127.0.0.1:9092');
$config->setUpdateBrokers(true);
$config->setAcks(-1);
$producer = new Producer($config);
$topic = 'test';
$value = (string) microtime(true);
$key = uniqid('', true);
$producer->send('test', $value, $key);
批量发送消息
use longlang\phpkafka\Producer\ProduceMessage;
use longlang\phpkafka\Producer\Producer;
use longlang\phpkafka\Producer\ProducerConfig;
$config = new ProducerConfig();
$config->setBootstrapServer('127.0.0.1:9092');
$config->setUpdateBrokers(true);
$config->setAcks(-1);
$producer = new Producer($config);
$topic = 'test';
$partition0 = 0;
$partition1 = 1;
$producer->sendBatch([
new ProduceMessage($topic, 'v1', 'k1', $partition0),
new ProduceMessage($topic, 'v2', 'k2', $partition1),
]);
三、消费者
消费者配置
异步消费(回调)
use longlang\phpkafka\Consumer\ConsumeMessage;
use longlang\phpkafka\Consumer\Consumer;
use longlang\phpkafka\Consumer\ConsumerConfig;
function consume(ConsumeMessage $message)
{
var_dump($message->getKey() . ':' . $message->getValue());
// $consumer->ack($message); // autoCommit设为false时,手动提交
}
$config = new ConsumerConfig();
$config->setBroker('127.0.0.1:9092');
$config->setTopic('test'); // 主题名称
$config->setGroupId('testGroup'); // 分组ID
$config->setClientId('test'); // 客户端ID,不同的消费者进程请使用不同的设置
$config->setGroupInstanceId('test'); // 分组实例ID,不同的消费者进程请使用不同的设置
$config->setInterval(0.1);
$consumer = new Consumer($config, 'consume');
$consumer->start();
同步消费
use longlang\phpkafka\Consumer\Consumer;
use longlang\phpkafka\Consumer\ConsumerConfig;
$config = new ConsumerConfig();
$config->setBroker('127.0.0.1:9092');
$config->setTopic('test'); // 主题名称
$config->setGroupId('testGroup'); // 分组ID
$config->setClientId('test_custom'); // 客户端ID,不同的消费者进程请使用不同的设置
$config->setGroupInstanceId('test_custom'); // 分组实例ID,不同的消费者进程请使用不同的设置
$consumer = new Consumer($config);
while(true) {
$message = $consumer->consume();
if($message) {
var_dump($message->getKey() . ':' . $message->getValue());
$consumer->ack($message); // 手动提交
}
sleep(1);
}