kafka基本用法

参考:

https://www.jianshu.com/p/fd8ce54e1156 (php操作kafka)

https://blog.csdn.net/panchang199266/article/details/82113453 (kafka安装部署)

https://www.cnblogs.com/winnerREN/p/13407396.html(kafka集群部署指南)

https://www.cnblogs.com/sodawoods-blogs/p/8969774.html (kafka消费模型)

常识:

1,kafak依赖zookeeper,zookerper基于java

1,安装java环境,linux一般默认安装了java

java -version #查看java版本

2,安装zookeeper(kafka已自带zookeeper)

参考:https://www.runoob.com/w3cnote/zookeeper-setup.html

下载:

wget https://downloads.apache.org/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz
tar -zxvf zookeeper-3.4.14.tar.gz
cd zookeeper-3.4.14
cd conf/
cp zoo_sample.cfg zoo.cfg

配置:

vim zoo.cfg

tickTime=2000
dataDir=/path/to/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2

启动:

cd ..
cd bin/
sh zkServer.sh start

3,安装kafka

参考:http://kafka.apache.org/quickstart

下载:

wget  https://downloads.apache.org/kafka/2.8.0/kafka_2.13-2.8.0.tgz
tar -xzf kafka_2.13-2.8.0.tgz
cd kafka_2.13-2.8.0
//或
wget https://mirrors.bfsu.edu.cn/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz
tar -zxvf tar zxvf kafka_2.13-2.8.0.tgz
cd kafka_2.13-2.8.0

启动:

#先启动zookeeper
#这是前台启动,启动以后,当前就无法进行其他操作(不推荐)
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
./bin/kafka-server-start.sh ./config/server.properties

#后台启动(推荐)
./bin/zookeeper-server-start.sh ./config/zookeeper.properties 1>/dev/null 2>&1 &
./bin/kafka-server-start.sh ./config/server.properties 1>/dev/null 2>&1 & 
./bin/kafka-server-start.sh -daemon config/server.properties

使用:

#创建一个主题
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --config max.message.bytes=12800000 --config flush.messages=1 --replication-factor 1 --partitions 1 --topic test
#查看主题
./bin/kafka-topics.sh --list --zookeeper localhost:2181
#描述主题
./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
#修改主题
./bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic test --partitions 3 #kafka分区数量只许增加不许减少
#删除主题
./bin/kafka-topics.sh --delete --topic test --zookeeper localhost:2181
#查看topic的消息数
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -1 #--time -1 表示最大位移 -2表示最早位移

#发送消息
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
#消费消息
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test # --from-beginning可以查看历史消息,消费者应该在生产者之前就绪,不然就丢失之前的数据

#生产者吞吐量
./bin/kafka-producer-perf-test.sh --topic test --num-records 100000 --record-size 200 --throughput -1 --producer-props bootstrap.servers=localhost:9092 acks=-1
#消费这吞吐量
./bin/kafka-consumer-perf-test.sh --topic test --broker-list localhost:9092 --messages 500000

4,安装php的kafka的扩展

git clone https://github.com/arnaud-lb/php-rdkafka.git
 
#生成configure文件
/Users/shiyibo/LNMP/php/bin/phpize 
 
#编译安装
 ./configure --with-php-config=/Users/shiyibo/LNMP/php/bin/php-config
make
make install 
 
#在php.ini 文件中配置 rdkafka扩展
vim /Users/shiyibo/LNMP/php/etc/php.ini
extension=rdkafka.so
 
#查看扩展是否生效
$php -m | grep kafka

应用举例

生产者

$objRdKafka = new RdKafkaProducer();
$objRdKafka->setLogLevel(LOG_DEBUG);
$objRdKafka->addBrokers("localhost:9092");

$oObjTopic = $objRdKafka->newTopic("test");

// 从终端接收输入 
$oInputHandler = fopen('php://stdin', 'r');

while (true) {
    echo "
Enter  messages:
";
    $sMsg = trim(fgets($oInputHandler));

   // 空消息意味着退出
    if (empty($sMsg)) {
        break;
    }

    // 发送消息
    $oObjTopic->produce(RD_KAFKA_PARTITION_UA, 0, $sMsg);
}

消费者

$objRdKafka = new RdKafkaConsumer();
$objRdKafka->setLogLevel(LOG_DEBUG);
$objRdKafka->addBrokers("localhost:9092");

$oObjTopic = $objRdKafka->newTopic("test");

/**
 * consumeStart
 *   第一个参数标识分区,生产者是往分区0发送的消息,这里也从分区0拉取消息
 *   第二个参数标识从什么位置开始拉取消息,可选值为
 *     RD_KAFKA_OFFSET_BEGINNING : 从开始拉取消息
 *     RD_KAFKA_OFFSET_END : 从当前位置开始拉取消息
 *     RD_KAFKA_OFFSET_STORED : 猜测跟RD_KAFKA_OFFSET_END一样
 */
$oObjTopic->consumeStart(0, RD_KAFKA_OFFSET_END);

while (true) {
    // 第一个参数是分区,第二个参数是超时时间
    $oMsg = $oObjTopic->consume(0, 1000);

    // 没拉取到消息时,返回NULL
    if (!$oMsg) {
        usleep(10000);
        continue;
    }

    if ($oMsg->err) {
        echo $msg->errstr(), "
";
        break;
    } else {
        echo $oMsg->payload, "
";
    }
}
原文地址:https://www.cnblogs.com/tkzc2013/p/14799971.html