一、编写生产者端
我们首先需要创建一个生产者,并向其添加代理
$conf = new RdKafka\Conf();
$conf->set('log_level', (string) LOG_DEBUG);
$conf->set('debug', 'all');
$rk = new RdKafka\Producer($conf);
$rk->addBrokers("127.0.0.1:9092");
创建主题并生成消息
$topic = $rk->newTopic("test");
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "我是生产者1");
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "我是生产者2");
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "我是生产者3");
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "我是生产者4");
这里我只是用简单文本消息而已,实际更多的是用 json
二、编写消费者端逻辑
我们首先需要创建一个的消费者,因 Low-level consumer 即将弃用,官方建议直接 High-level consumer,代码如下
$conf = new RdKafka\Conf();
// Configure the group.id. All consumer with the same group.id will consume
// 设置分组
$conf->set('group.id', 'ldGroup');
// 设置代理
$conf->set('metadata.broker.list', '127.0.0.1');
// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'earliest': start from the beginning
$conf->set('auto.offset.reset', 'earliest');
// Emit EOF event when reaching the end of a partition
$conf->set('enable.partition.eof', 'true');
$consumer = new RdKafka\KafkaConsumer($conf);
// 订阅主题
$consumer->subscribe(['test']);
echo "等待消息中。。。\n";
//业务逻辑
while (true) {
$message = $consumer->consume(120*1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
echo '收到消息:'.$message->payload."\n";
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}
测试一下,我们运行消费者代码 php consumer.php,然后打开新的 Terminal,再运行 php producer.php