php kafka 生产和消费


<?php #生产 $rk = new RdKafkaProducer(); $rk->setLogLevel(LOG_DEBUG); $rk->addBrokers("127.0.0.1"); $topic = $rk->newTopic("test3"); for ($i = 0; $i < 10; $i++) { $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i"); }

<?php
#消费

<?php

$rk = new RdKafkaConsumer();
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1");

$topic = $rk->newTopic("test");

$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);

while (true) {
    $msg = $topic->consume(0, 1000);
    if (null === $msg) {
        continue;
    } elseif ($msg->err) {
        echo $msg->errstr(), "
";
        break;
    } else {
        echo $msg->payload, "
";
    }
}
 
原文地址:https://www.cnblogs.com/chengfengchi/p/12836185.html