kafka命令

group=orchestrate-factory && export KAFKA_HEAP_OPTS="" && /kafka/bin/kafka-consumer-groups.sh --command-config config/consumer.properties --describe --bootstrap-server kafka-1-master.basis.svc.cluster.local:9092 --group $group | awk '{print $5}'|awk '{sum+=$1} END {print sum}'

  

kafka鉴权监控

 

#!/bin/bash
if [ -f /tmp/kafkachecklock ];then
    Ntime=`date +%s`
    Ftime=`date -d "$(stat /tmp/kafkachecklock|grep Modify|awk '{print $2,$3}'|awk -F '.' '{print $1}')" +%s`
    if [ $((Ntime-Ftime)) -gt 153 ];then
                rm -f /tmp/kafkachecklock
    fi
    kill 0
else
    dockerid=`docker ps|grep "kafka"|grep -vE "pause|side"|awk '{print $1}'`
    localIP=`ip addr|grep -A5 "^2:"|grep inet|awk 'NR==1{print $2}'|cut -d "/" -f1`
        #echo "dockerid: $dockerid"
        #echo "localIP: $localIP"
    rm -f /tmp/kafka_total
    if [ x"$dockerid" != x"" ];then
       touch /tmp/kafkachecklock
       Dockerpid=`docker inspect -f {{.State.Pid}} $dockerid`
       if [ "`nsenter -n -m -p -u -t $Dockerpid ls /lib/libjli.so`" == "" ];then
          nsenter -n -m -p -u -t $Dockerpid cp /usr/lib/jvm/java-1.8-openjdk/lib/amd64/jli/libjli.so /lib/libjli.so
       fi
       nsenter -n -m -p -u -t $Dockerpid /kafka/bin/kafka-consumer-groups.sh --bootstrap-server kafka-1.default.svc.cluster.local:9092 --command-config /kafka/config/consumer.properties --list|grep -v KMOffsetCache > /tmp/kafkaItem
       for i in `cat /tmp/kafkaItem`;do
           nsenter -n -m -p -u -t $Dockerpid /kafka/bin/kafka-consumer-groups.sh --describe --bootstrap-server kafka-1.default.svc.cluster.local:9092 --command-config /kafka/config/consumer.properties --group $i|grep -vE "^$|TOPIC"  > /tmp/kafkaN
       #    sleep 2
        j=1
        while [ $j -le `cat /tmp/kafkaN|sort|uniq|wc -l` ]
        do
        TOPIC=`cat /tmp/kafkaN|awk NR==$j'{print $1}'`
        PARTITION=`cat /tmp/kafkaN|awk NR==$j'{print $2}'`
        CURRENT_OFFSET=`cat /tmp/kafkaN|awk NR==$j'{print $3}'`
        LOG_END_OFFSET=`cat /tmp/kafkaN|awk NR==$j'{print $4}'`
        LAG=`cat /tmp/kafkaN|awk NR==$j'{print $5}'`
        CONSUMER_ID=`cat /tmp/kafkaN|awk NR==$j'{print $6}'`
        HOST=`cat /tmp/kafkaN|awk NR==$j'{print $7}'`
        CLIENT_ID=`cat /tmp/kafkaN|awk NR==$j'{print $8}'`
        if [ $LAG == "-" ];then    LAG=0;fi
        if [ $CURRENT_OFFSET == "-" ];then CURRENT_OFFSET=0;fi
        if [ $LOG_END_OFFSET == "-" ];then LOG_END_OFFSET=0;fi
        
        echo "KAFKA_CURRENT_OFFSET{app="$i",topic="$TOPIC",partition="$PARTITION",CONSUMER_ID="$CONSUMER_ID",host="$HOST",CLIENT_ID="$CLIENT_ID"} $CURRENT_OFFSET"   >>/tmp/kafka_total
        echo "KAFKA_LOG_END_OFFSET{app="$i",topic="$TOPIC",partition="$PARTITION",CONSUMER_ID="$CONSUMER_ID",host="$HOST",CLIENT_ID="$CLIENT_ID"} $LOG_END_OFFSET"   >>/tmp/kafka_total
        echo "KAFKA_LAG{app="$i",topic="$TOPIC",partition="$PARTITION",CONSUMER_ID="$CONSUMER_ID",host="$HOST",CLIENT_ID="$CLIENT_ID"} $LAG"   >>/tmp/kafka_total
        j=$((j+1))
        done

       done
    cat /tmp/kafka_total|curl --data-binary @- http://$localIP:9091/metrics/job/kafka"`echo $localIP|awk -F '.' '{print $NF}'`"
    fi
    rm -f /tmp/kafkachecklock
fi
#add to /etc/crontab
if [ `cat /etc/crontab|grep $0|wc -l` -eq 0 ];then
        sudo cp -f $0 /etc/
        sudo chmod +x /etc/$0
        sudo bash -c "echo '*/1 * * * * root /etc/$0'>>/etc/crontab"
fi
View Code

kafka监控

 
#!/bin/bash
if [ -f /tmp/kafkachecklock ];then
    Ntime=`date +%s`
    Ftime=`date -d "$(stat /tmp/kafkachecklock|grep Modify|awk '{print $2,$3}'|awk -F '.' '{print $1}')" +%s`
    if [ $((Ntime-Ftime)) -gt 153 ];then
                rm -f /tmp/kafkachecklock
    fi
    kill 0
else
    dockerid=`docker ps|grep "kafka"|grep -v "pause"|awk '{print $1}'`
    kafkaNum=`docker ps|grep "kafka"|grep -v "pause"|awk '{print $NF}'|awk -F "_" '{print $2}'|cut -d "." -f1`
    localIP=`ip addr|grep -A5 "^2:"|grep inet|awk 'NR==1{print $2}'|cut -d "/" -f1`
    rm -f /tmp/kafka_total${kafkaNum}
    if [ x"$dockerid" != x"" ];then
       touch /tmp/kafkachecklock
       Dockerpid=`docker inspect -f {{.State.Pid}} $dockerid`
       if [ $(nsenter -n -m -p -u -t $Dockerpid ls /lib/libjli.so|wc -l) -eq 0 ];then
          nsenter -n -m -p -u -t $Dockerpid cp -f /usr/lib/jvm/java-1.8-openjdk/jre/lib/amd64/jli/libjli.so /lib/libjli.so
       fi
       nsenter -n -m -p -u -t $Dockerpid /kafka/bin/kafka-consumer-groups.sh --bootstrap-server $kafkaNum.default.svc.cluster.local:9092 --list|grep -v KMOffsetCache > /tmp/kafkaItem
       for i in `cat /tmp/kafkaItem`;do
           nsenter -n -m -p -u -t $Dockerpid /kafka/bin/kafka-consumer-groups.sh --describe --bootstrap-server $kafkaNum.default.svc.cluster.local:9092 --group $i|grep -vE "^$|TOPIC"  > /tmp/kafkaN
       #    sleep 2
        j=1
        while [ $j -le `cat /tmp/kafkaN|wc -l` ]
        do
        TOPIC=`cat /tmp/kafkaN|awk NR==$j'{print $1}'`
        PARTITION=`cat /tmp/kafkaN|awk NR==$j'{print $2}'`
        CURRENT_OFFSET=`cat /tmp/kafkaN|awk NR==$j'{print $3}'`
        LOG_END_OFFSET=`cat /tmp/kafkaN|awk NR==$j'{print $4}'`
        LAG=`cat /tmp/kafkaN|awk NR==$j'{print $5}'`
        CONSUMER_ID=`cat /tmp/kafkaN|awk NR==$j'{print $6}'`
        HOST=`cat /tmp/kafkaN|awk NR==$j'{print $7}'`
        CLIENT_ID=`cat /tmp/kafkaN|awk NR==$j'{print $8}'`
        if [ $LAG == "-" ];then    LAG=0;fi
        if [ $CURRENT_OFFSET == "-" ];then CURRENT_OFFSET=0;fi
        if [ $LOG_END_OFFSET == "-" ];then LOG_END_OFFSET=0;fi
        
        echo "KAFKA_CURRENT_OFFSET{app="$i",topic="$TOPIC",partition="$PARTITION",CONSUMER_ID="$CONSUMER_ID",host="$HOST",CLIENT_ID="$CLIENT_ID"} $CURRENT_OFFSET"   >>/tmp/kafka_total${kafkaNum}
        echo "KAFKA_LOG_END_OFFSET{app="$i",topic="$TOPIC",partition="$PARTITION",CONSUMER_ID="$CONSUMER_ID",host="$HOST",CLIENT_ID="$CLIENT_ID"} $LOG_END_OFFSET"   >>/tmp/kafka_total${kafkaNum}
        echo "KAFKA_LAG{app="$i",topic="$TOPIC",partition="$PARTITION",CONSUMER_ID="$CONSUMER_ID",host="$HOST",CLIENT_ID="$CLIENT_ID"} $LAG"   >>/tmp/kafka_total${kafkaNum}
        j=$((j+1))
        done

       done
    cat /tmp/kafka_total${kafkaNum}|curl --data-binary @- http://$localIP:9091/metrics/job/kafka"`echo $localIP|awk -F '.' '{print $NF}'`"
    fi
    rm -f /tmp/kafkachecklock
fi
#add to /etc/crontab
if [ `cat /etc/crontab|grep $0|wc -l` -eq 0 ];then
        sudo cp -f $0 /etc/
        sudo chmod +x /etc/$0
        sudo bash -c "echo '*/1 * * * * root /etc/$0'>>/etc/crontab"
fi
View Code

清空普罗米修斯pushgateway

#!/bin/bash
Ntime=`date +%s`
LOCALIP=`ip addr |grep $(ip addr|grep ^2:|awk '{print $2}'|cut -d ":" -f1)|awk 'NR==2{print $2}'|cut -d "/" -f1`
curl http://$LOCALIP:9091/metrics|grep push_time_seconds|grep -v "#" > /tmp/metricsjob
for i in $(cat /tmp/metricsjob|awk '{print $NF}');do
    JOBTIME=$(echo $i|awk '{print sprintf("%d", $0);}')
    if [ $((Ntime-JOBTIME)) -gt 600 ];then
        JOB=$(cat /tmp/metricsjob|grep $i|awk -F "job=" '{print $2}'|awk -F '"' '{print $2}')
        curl -X DELETE http://$LOCALIP:9091/metrics/job/$JOB         
        echo $JOB >>/tmp/deletejobs
    fi
done
#add to /etc/crontab
if [ `cat /etc/crontab|grep $0|wc -l` -eq 0 ];then
        sudo cp -f $0 /etc/
        sudo chmod +x /etc/$0
        sudo bash -c "echo '*/3 * * * * root /etc/$0'>>/etc/crontab"
fi
View Code

查看list

export KAFKA_HEAP_OPTS=""
/kafka/bin/kafka-consumer-groups.sh --bootstrap-server kafka-1-master.basis.svc.cluster.local:9092 --command-config /kafka/config/consumer.properties --list

查看详细信息

/kafka/bin/kafka-consumer-groups.sh --command-config config/consumer.properties --describe --bootstrap-server kafka-1-master.basis.svc.cluster.local:9092 --group baseBusiness

 kafka集群部署

https://www.cnblogs.com/saneri/p/8762168.html
#!/bin/bash
# check zk status
for i in {1..3};do
kubectl exec -n basis -it  `kubectl get po -n basis|grep zookeeper-$i| awk '{print $1}'` -- bash  2>&1 << EOF
export JVMFLAGS=""
/opt/zookeeper/bin/zkServer.sh  status
EOF
done

sleep 5
# check kafka product send
kubectl exec -n basis -it  `kubectl get po -n basis |grep kafka-deployment-1| awk '{print$1}'` -- bash  2>&1 << EOF
export KAFKA_HEAP_OPTS=""
/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-1.basis.svc.cluster.local:2181,zookeeper-2.basis.svc.cluster.local:2181,zookeeper-3.basis.svc.cluster.local:2181 --replication-factor 1 --partitions 3 --topic test
/kafka/bin/kafka-producer-perf-test.sh --producer.config config/producer.properties --num-records 10000 --record-size 4096 --topic test --throughput 10000 --producer-props bootstrap.servers=kafka-1.basis.svc.cluster.local:9092,kafka-2.basis.svc.cluster.local:9092,kafka-3.basis.svc.cluster.local:9092 acks=all
EOF


sleep 5
# check kafka consumer
kubectl -n basis exec -it  `kubectl get po -n basis|grep kafka-deployment-1| awk '{print$1}'` -- bash  2>&1 << EOF
export KAFKA_HEAP_OPTS=""
/kafka/bin/kafka-consumer-perf-test.sh --consumer.config config/consumer.properties --topic test --broker-list kafka-1.basis.svc.cluster.local:9092,kafka-2.basis.svc.cluster.local:9092,kafka-3.basis.svc.cluster.local:9092 --group app-test-consumer --messages 10000  --threads 1
EOF
check_zk_kafka_acl.sh
原文地址:https://www.cnblogs.com/hanwei666/p/11743390.html