Kafka监控-Kafka Eagle

一、Kafka Eagle

1.1、Eagle安装

1)修改 kafka 启动命令

修改 kafka-server-start.sh 命令:

[hadoop@hadoop102 kafka]$ vim bin/kafka-server-start.sh
...
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

#修改为
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
    export JMX_PORT="9999"
    #export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

注意:修改之后在启动 Kafka 之前要分发之其他节点

[hadoop@hadoop102 kafka]$ cat /usr/local/bin/xsync 
#!/bin/bash
#1 获取输入参数个数,如果没有参数,直接退出
pcount=$#
if((pcount==0)); then
echo no args;
exit;
fi

#2 获取文件名称
p1=$1
fname=`basename $p1`
echo fname=$fname

#3 获取上级目录到绝对路径
pdir=`cd -P $(dirname $p1); pwd`
echo pdir=$pdir

#4 获取当前用户名称
user=`whoami`

#5 循环
for((host=103; host<105; host++)); do
        echo ------------------- hadoop$host --------------
        rsync -rvl $pdir/$fname $user@hadoop$host:$pdir
done
[hadoop@hadoop102 kafka]$ xsync bin/kafka-server-start.sh

2)上传压缩包 kafka-eagle-bin-1.3.7.tar.gz 到集群并解压

[hadoop@hadoop102 software]$ ll kafka-eagle-bin-1.3.7.tar.gz 
-rw-r--r-- 1 hadoop hadoop 84934270 Aug 24  2019 kafka-eagle-bin-1.3.7.tar.gz
[hadoop@hadoop102 software]$ tar xf kafka-eagle-bin-1.3.7.tar.gz 
[hadoop@hadoop102 software]$ cd kafka-eagle-bin-1.3.7/
[hadoop@hadoop102 kafka-eagle-bin-1.3.7]$ ls
kafka-eagle-web-1.3.7-bin.tar.gz
[hadoop@hadoop102 kafka-eagle-bin-1.3.7]$ tar xf kafka-eagle-web-1.3.7-bin.tar.gz -C /opt/module/
[hadoop@hadoop102 kafka-eagle-bin-1.3.7]$ mv /opt/module/kafka-eagle-web-1.3.7/ /opt/module/eagle

3)给启动文件执行权限

[hadoop@hadoop102 eagle]$ ls
bin  conf  db  font  kms  logs
[hadoop@hadoop102 eagle]$ chmod 777 bin/ke.sh 
[hadoop@hadoop102 eagle]$ ll bin/
total 12
-rw-r--r-- 1 hadoop hadoop 1848 Aug 22  2017 ke.bat
-rwxrwxrwx 1 hadoop hadoop 7190 Jul 30  2019 ke.sh

4)修改环境变量

[hadoop@hadoop102 eagle]$ sudo vim /etc/profile
#eagle
export KE_HOME=/opt/module/eagle
export PATH=$PATH:$KE_HOME/bin

[hadoop@hadoop102 eagle]$ source /etc/profile

#分发/etc/profile,三台需一样

5)修改配置文件system-config.properties

image

image

######################################
# multi zookeeper&kafka cluster list
######################################
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=hadoop102:2181,hadoop103:2181,hadoop104:2181
######################################
# kafka offset storage
######################################
cluster1.kafka.eagle.offset.storage=kafka
######################################
# enable kafka metrics
######################################
kafka.eagle.metrics.charts=true
kafka.eagle.sql.fix.error=false
######################################
# kafka jdbc driver address
######################################
kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://hadoop102:3306/ke?useUnicode=true&ch
aracterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=root

6)使用docker安装mysql

安装docker:

curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo
wget -O /etc/yum.repos.d/docker-ce.repo https://mirrors.ustc.edu.cn/docker-ce/linux/centos/docker-ce.repo
sed -i 's#download.docker.com#mirrors.tuna.tsinghua.edu.cn/docker-ce#g' /etc/yum.repos.d/docker-ce.repo
yum install docker-ce lrzsz -y
docker version
mkdir -p /etc/docker/ && touch /etc/docker/daemon.json
cat >/etc/docker/daemon.json <<EOF
{
  "registry-mirrors": ["https://t09ww3n9.mirror.aliyuncs.com"]
}
EOF
systemctl start docker
systemctl enable docker

安装mysql:

sudo docker run -p 3306:3306 --name mysql 
-v /mydata/mysql/log:/var/log/mysql 
-v /mydata/mysql/data:/var/lib/mysql 
-v /mydata/mysql/conf:/etc/mysql 
-e MYSQL_ROOT_PASSWORD=root 
-d mysql:5.7

创建数据库:

mysql> create database ke;

7)启动

注意:启动之前需要先启动 ZK 以及 KAFKA

[hadoop@hadoop102 eagle]$ bin/ke.sh start

image

image

image

1.2、Eagle简单使用

1)启动消费者

package com.dianchou.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;

/**
 * @author lawrence
 * @create 2021-02-05
 */
public class MyConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        //连接的集群
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        //开启自动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        //自动提交间隔
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        //key value反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-test");

        //创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        //订阅主题
        consumer.subscribe(Collections.singletonList("group-test"));

        //获取数据
        while (true) {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(100);
            //解析并打印consumerRecords
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.printf("offset = %d, key = %s, value = %s%n", consumerRecord.offset(), consumerRecord.key(), consumerRecord.value());
            }
        }
    }
}

2)eagle中查看topic及消费者

image

image

image

image

3)启动生产者

package com.dianchou.kafka.producer;


import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/**
 * @author lawrence
 * @create 2021-02-04
 */
public class MyProducer {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        //创建Kafka生产者的配置信息
        Properties properties = new Properties();
        //指定连接的集群
        //properties.put("bootstrap.servers","hadoop102:9092");
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");

        //ack应答级别
        //properties.put("acks", "all");
        properties.put(ProducerConfig.ACKS_CONFIG, "all");

        //重试次数
        properties.put("retries", 3);
        //批次大小16K
        properties.put("batch.size", 16384);
        //等待时间1ms
        properties.put("linger.ms", 1);
        //RecordAccumulator 缓冲区大小32M
        properties.put("buffer.memory", 33554432);
        //key序列化
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //value序列化
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //创建kafka生产者,传入生产者的配置信息
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        for (int i = 0; i < 10000; i++) {
            //不带回调函数
            producer.send(new ProducerRecord<String, String>("group-test", Integer.toString(i), Integer.toString(i))).get();
        }
        producer.close();
    }
}

4)查看

image

image

5)Brokers监控

image

6)Kafka监控

image

7)zookeeper

image

8)topic

image

作者:Lawrence

-------------------------------------------

个性签名:独学而无友,则孤陋而寡闻。做一个灵魂有趣的人!

扫描上面二维码关注我
如果你真心觉得文章写得不错,而且对你有所帮助,那就不妨帮忙“推荐"一下,您的“推荐”和”打赏“将是我最大的写作动力!
本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接.
原文地址:https://www.cnblogs.com/hujinzhong/p/14378525.html