php kafka生产者,消费者操作

php7.2

kafka7.8.1

生产者代码

<?php
$conf = new RdKafkaConf();
$conf->setDrMsgCb(function ($kafka, $message) {
    file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
});
$conf->setErrorCb(function ($kafka, $err, $reason) {
    file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
});

$rk = new RdKafkaProducer($conf);
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1");

$cf = new RdKafkaTopicConf();
// -1必须等所有brokers同步完成的确认 1当前服务器确认 0不确认,这里如果是0回调里的offset无返回,如果是1和-1会返回offset
// 我们可以利用该机制做消息生产的确认,不过还不是100%,因为有可能会中途kafka服务器挂掉
$cf->set('request.required.acks', 0);
$topic = $rk->newTopic("test", $cf);


//RD_KAFKA_PARTITION_UA自动选择分区
//$option可选
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "kafka123456789", 'kafka');
//kafka队列中的长度 

$len = $rk->getOutQLen(); while ($len > 0) { $len = $rk->getOutQLen(); var_dump($len); $rk->poll(50); }

 这里我并没有开启集群,只是单机操作。发送 成功

消费者代码

<?php

/**
 * 消费者消费消息
 *
 * 实现的例子来源于:
 *
 * https://github.com/arnaud-lb/php-rdkafka#examples
 */

$objRdKafka = new RdKafkaConsumer();
$objRdKafka->setLogLevel(LOG_DEBUG);
$objRdKafka->addBrokers("127.0.0.1:9092");

$oObjTopic = $objRdKafka->newTopic('test');

/**
 * consumeStart
 *   第一个参数标识分区,生产者是往分区0发送的消息,这里也从分区0拉取消息
 *   第二个参数标识从什么位置开始拉取消息,可选值为
 *     RD_KAFKA_OFFSET_BEGINNING : 从开始拉取消息
 *     RD_KAFKA_OFFSET_END : 从当前位置开始拉取消息
 *     RD_KAFKA_OFFSET_STORED : 猜测跟RD_KAFKA_OFFSET_END一样
 */
$oObjTopic->consumeStart(0, RD_KAFKA_OFFSET_END);

while (true) {
    // 第一个参数是分区,第二个参数是超时时间
    $oMsg = $oObjTopic->consume(0, 1000);

    // 没拉取到消息时,返回NULL
    if (!$oMsg) {
        usleep(10000);
        continue;
    }

    if ($oMsg->err) {
        echo $msg->errstr(), "
";
        break;
    } else {
        echo $oMsg->payload, "
";
    }
}

消费者获取到消息

原文地址:https://www.cnblogs.com/heijinli/p/13826874.html