PHP 实践kafka 生产消息

首先安装kafka扩展链接附上:https://www.cnblogs.com/dalaowang/p/13469004.html

其次安装kafka扩展包,用composer安装:

composer require "nmred/kafka-php"

安装完成后会在vendor目录先生成这个文件:

把kafka封装成单例模式进行调用:

<?php

namespace apputils;

class Kafka
{
    private static $instance;
    private  $client;
    private static $result;
    private  $db;
    private static $collectionName;

    private $broker_list = '127.0.0.1';  //现在是一个,也可以多个,用逗号隔开
    private $topic = 'test';                      //定义topic
    private $partition = 0;                         //定义topic的物理分组,这里是0

    private $producer = null;                  //存储producer对象

    //构造器私有化:禁止从类外部实例化
    private function __construct()
    {
        if (empty($this->broker_list))
        {
            echo 'broker not config';
        }
        $rk = new RdKafkaProducer();  //实例化对象

        if (empty($rk)) {
            echo 'producer error1';
        }
        $rk->setLogLevel(LOG_DEBUG);  //设置错误级别
        if(!$rk->addBrokers($this->broker_list)) {//设置Broker地址
            echo 'producer error2';
        }
        $this->producer = $rk;

    }

    //克隆方法私有化:禁止从外部克隆对象
    private function __clone()
    {

    }

    //生成当前类的唯一实例
    public static function get_instance()
    {
        if (!self::$instance instanceof self) {
            self::$instance = new self();
        }
        return self::$instance;
    }

    //生产者的方法(生产者把日志向消息队列发送)
    public function send($message = [])
    {
        $topic = $this->producer->newTopic($this->topic);  //创建topic

        $topic->produce(RD_KAFKA_PARTITION_UA, $this->partition, json_encode([$message]));//生产
    }

}

上面的代码基本不用改,还是需要根据实际情况而定

控制器调用方式:

public function kafkaTest(){
        apputilsKafka::get_instance()->send(['string1']);
        echo 'OK~';
    }
原文地址:https://www.cnblogs.com/dalaowang/p/13491987.html