springboot+kafka(centos7集群部署kafka)

 

 
1、kafka简介
  1.1:Kafka是由Apache软件基金会开发的一个开源流处理平台,由ScalaJava编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息

2:Kafka  是一种高吞吐量 的分布式发布订阅消息系统,主要特性:

  2.1.:通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
  2.2:高吞吐量 :即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。
  2.3:支持通过Kafka服务器和消费机集群来分区消息。
  2.4:支持Hadoop并行数据加载

3:基本概念
  3.1、Producer:消息生产者,向kafka broker发消息的客户端。

  3.2、Consumer:消息消费者,向kafka broker取消息的客户端。

  3.3、Topic:特定类型的消息流,可以理解为一个队列。
  3.4、Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个CG只会把消息发给该CG       中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic。
  3.5、Broker:已发布的消息保存在一组服务器中,称之为Kafka集群。一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
  3.6、Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给                          consumer,不保证一个topic的整体(多个partition间)的顺序。
  3.7、Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka。

4:配置准备
  4.1:准备三台Linux服务器

机器名IP
hadoop207 192.168.168.207
hadoop208 192.168.168.208
hadoop209 192.168.168.209


    

 

 

 

  4.2:zookeeper环境准备
    由于Kafka需要通过Zookeeper进行分布式系统的协调和促进,通过Zookeeper协调Broker、生产者和消费者。所以安装前需在每台机器上安装好Zookeeper(Kafka自带有Zookeeper,但一般使用外置安装)

 4.2.1:zookeeper官网下载地址:https://zookeeper.apache.org/releases.html

 4.2.2:自行安装jdk
    

 4.2.3:解压zookeeper(tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/)    

 4.2.4:修改配置文件zoo.cfg

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/opt/module/zookeeper-3.5.7/zooData
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
#######################cluster##########################
#server.2=hadoop202:2888:3888
#server.3=hadoop203:2888:3888
#server.4=hadoop204:2888:3888
#server.5=hadoop205:2888:3888
#server.6=hadoop206:2888:3888
server.7=hadoop207:2888:3888
server.8=hadoop208:2888:3888
server.9=hadoop209:2888:3888

4.2.5:一台机器配置好后,可以使用rsync同步文件,也可以自定义xsyn集群分发脚本,同步到另外两台机器(chmod 777 xsync -> xsyn /opt/module)

#!/bin/bash

#1. 判断参数个数
if [ $# -lt 1 ]
then
    echo Not Enough Arguement!
    exit;
fi

#2. 遍历集群所有机器
for host in hadoop207 hadoop208 hadoop209
do
    echo ====================  $host  ====================
    #3. 遍历所有目录,挨个发送

    for file in $@
    do
        #4. 判断文件是否存在
        if [ -e $file ]
            then
                #5. 获取父目录
                pdir=$(cd -P $(dirname $file); pwd)

                #6. 获取当前文件的名称
                fname=$(basename $file)
                ssh $host "sudo mkdir -p $pdir"
                rsync -av $pdir/$fname $host:$pdir
            else
                echo $file does not exists!
        fi
    done
done

4.2.6:zookeeper集群启动、停止脚本(vim zk.sh -> zk.sh start)

#!/bin/bash

case $1 in
"start"){
    for i in hadoop207 hadoop208 hadoop209
    do
        echo -------------$i starting-----------------------------
        ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start"
    done
}
;;
"stop"){
    for i in hadoop207 hadoop208 hadoop209
    do
       echo --------------$i stoping------------------------------ 
       ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop"
    done

}
;;
"status"){
    for i in hadoop207 hadoop208 hadoop209
    do
       echo --------------$i status------------------------------- 
       ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status"
    done

}
;;
esac

 

 5:官网下载kafka地址:https://kafka.apache.org/downloads,解压到/opt/module

 

 5.1:修改配置文件(cd /opt/module/kafka_2.13-3.0.0/config ->vim server.properties)
  1、 修改broker.id,确保每台机器的broker.id不一致,本文3台服务器的broker.id分别设置为1、2、3;

  2、 port默认为9092,可以根据需要进行修改,一般情况下保持3台服务器一致;

  3、 修改host.name为本机真实IP;
  4、 num.partitions默认为1,可根据集群要求进行修改,本文修改为4;
  5、 修改zookeeper.connect,其值为所有服务器上zookeeper的IP端口串,如下所示:
    zookeeper.connect=192.168.168.207:2181,192.168.168.208:2181,192.168.168.209:2181
  6、 log.dirs= 配置kafka日志目录

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2

#删除 topic 功能
delete.topic.enable=true

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/opt/module/kafka_2.13-3.0.0/logs/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=hadoop207:2181,hadoop208:2181,hadoop209:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

6:集群启动脚本编写(vim kafka.sh)

#!/bin/bash

case $1 in
"start"){
    for i in hadoop207 hadoop208 hadoop209
    do
        echo -------------$i starting-----------------------------
        ssh $i '/opt/module/kafka_2.13-3.0.0/bin/kafka-server-start.sh /opt/module/kafka_2.13-3.0.0/config/server.properties'

    done
}
;;
"stop"){
    for i in hadoop207 hadoop208 hadoop209
    do
       echo --------------$i stoping------------------------------ 
       ssh $i "/opt/module/kafka_2.13-3.0.0/bin/kafka-server-stop.sh stop"
    done

}
;;
esac

 6.1:jps查看启动进程

 7:kafka可视化界面下载地址:http://download.kafka-eagle.org/
  7.1:下载后解压:tar -zxvf kafka-eagle-bin-2.0.8.tar.gz -C /opt/module

  7.2:修改配置文件(vim /opt/module/kafka-eafle-bin-2.0.8/conf/system-config.properties)
  

######################################
# multi zookeeper & kafka cluster list
# Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead
######################################
efak.zk.cluster.alias=cluster1
#efak.zk.cluster.alias=cluster1,cluster2
cluster1.zk.list=hadoop207:2181,hadoop208:2181,hadoop209:2181
#cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181

######################################
# zookeeper enable acl
######################################
cluster1.zk.acl.enable=false
cluster1.zk.acl.schema=digest
cluster1.zk.acl.username=test
cluster1.zk.acl.password=test123

######################################
# broker size online list
######################################
cluster1.efak.broker.size=20

######################################
# zk client thread limit
######################################
kafka.zk.limit.size=32

######################################
# EFAK webui port
######################################
efak.webui.port=8048

######################################
# kafka jmx acl and ssl authenticate
######################################
cluster1.efak.jmx.acl=false
cluster1.efak.jmx.user=keadmin
cluster1.efak.jmx.password=keadmin123
cluster1.efak.jmx.ssl=false
cluster1.efak.jmx.truststore.location=/data/ssl/certificates/kafka.truststore
cluster1.efak.jmx.truststore.password=ke123456

######################################
# kafka offset storage
######################################
cluster1.efak.offset.storage=kafka
cluster2.efak.offset.storage=zk

######################################
# kafka jmx uri
######################################
cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi

######################################
# kafka metrics, 15 days by default
######################################
efak.metrics.charts=true
efak.metrics.retain=15

######################################
# kafka sql topic records max
######################################
efak.sql.topic.records.max=5000
efak.sql.topic.preview.records.max=10

######################################
# delete kafka topic token
######################################
efak.topic.token=keadmin

######################################
# kafka sasl authenticate
######################################
cluster1.efak.sasl.enable=false
cluster1.efak.sasl.protocol=SASL_PLAINTEXT
cluster1.efak.sasl.mechanism=SCRAM-SHA-256
cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle";
cluster1.efak.sasl.client.id=
cluster1.efak.blacklist.topics=
cluster1.efak.sasl.cgroup.enable=false
cluster1.efak.sasl.cgroup.topics=
cluster2.efak.sasl.enable=false
cluster2.efak.sasl.protocol=SASL_PLAINTEXT
cluster2.efak.sasl.mechanism=PLAIN
cluster2.efak.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle";
cluster2.efak.sasl.client.id=
cluster2.efak.blacklist.topics=
cluster2.efak.sasl.cgroup.enable=false
cluster2.efak.sasl.cgroup.topics=

######################################
# kafka ssl authenticate
######################################
cluster3.efak.ssl.enable=false
cluster3.efak.ssl.protocol=SSL
cluster3.efak.ssl.truststore.location=
cluster3.efak.ssl.truststore.password=
cluster3.efak.ssl.keystore.location=
cluster3.efak.ssl.keystore.password=
cluster3.efak.ssl.key.password=
cluster3.efak.ssl.endpoint.identification.algorithm=https
cluster3.efak.blacklist.topics=
cluster3.efak.ssl.cgroup.enable=false
cluster3.efak.ssl.cgroup.topics=

######################################
# kafka sqlite jdbc driver address
######################################
#efak.driver=org.sqlite.JDBC
#efak.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
#efak.username=root
#efak.password=www.kafka-eagle.org

######################################
# kafka mysql jdbc driver address
######################################
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://120.79.35.166:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=123456

7.2:启动(./ke.sh start

  

 

 7.3: 访问8048端口 用户名admin  默认密码 123456

  

8:springboot整合kafka简单demo
 8.1:pom.xml

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

8.2:application.yml

server:
  servlet:
    context-path: /
  port: 8088
spring:
  kafka:
    bootstrap-servers: 192.168.168.207:9092,192.168.168.208:9092,192.168.168.209:9092
    #生产者的配置,大部分我们可以使用默认的,这里列出几个比较重要的属性
    producer:
      #每批次发送消息的数量
      batch-size: 16
      #设置大于0的值将使客户端重新发送任何数据,一旦这些数据发送失败。注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。
      retries: 0
      #producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常,以“block.on.buffer.full”来表明。这项设置将和producer能够使用的总内存相关,但并不是一个硬性的限制,因为不是producer使用的所有内存都是用于缓存。一些额外的内存会用于压缩(如果引入压缩机制),同样还有一些用于维护请求。
      buffer-memory: 33554432
      #key序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    #消费者的配置
    consumer:
      #Kafka中没有初始偏移或如果当前偏移在服务器上不再存在时,默认区最新 ,有三个选项 【latest, earliest, none】
      # latest 从最新的数据消费,也就是新产生的数据,
      auto-offset-reset: earliest
      #是否开启自动提交
      enable-auto-commit: true
      #自动提交的时间间隔
      auto-commit-interval: 100
      #key的解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #value的解码方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #在/usr/local/etc/kafka/consumer.properties中有配置
      group-id: test-consumer-group

8.3:Consumer

/**
 * 定义一个消费者监听topic中的消息
 * @author ywb
 * @createdDate 2021/12/27 11:15
 * @updatedDate
 */
@Slf4j
@Component
public class Consumer {

    private static Gson gson = new GsonBuilder().create();

    @KafkaListener(topics = {"ywb"})
    public void listen(ConsumerRecord<?, ?> record){

        Optional<?> kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {

            Object message = kafkaMessage.get();
            log.info("consumer  get  message : {}",gson.toJson(message));
        }

    }
}

8.4:Producer

/**
 * @author ywb
 * @createdDate 2021/12/27 11:16
 * @updatedDate
 */
@Component
public class Producer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    private static Gson gson = new GsonBuilder().create();

    //发送消息方法
//    @Transactional(rollbackFor = RuntimeException.class)
    public void send() {
        Message message = new Message();
        message.setId("KFK_"+System.currentTimeMillis());
        message.setMsg(UUID.randomUUID().toString());
        message.setSendTime(new Date());
        kafkaTemplate.send("ywb", gson.toJson(message));
//        int i = 1/0;
//        new RuntimeException("error");
    }

}

8.5:Message

/**
 * @author ywb
 * @createdDate 2021/12/27 11:16
 * @updatedDate
 */
public class Message {

    private String id;

    private String msg;

    private Date sendTime;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }

    public Date getSendTime() {
        return sendTime;
    }

    public void setSendTime(Date sendTime) {
        this.sendTime = sendTime;
    }
}

8.6:SendController

/**
 * @author ywb
 * @createdDate 2021/12/27 11:16
 * @updatedDate
 */
@RestController
@RequestMapping("/kafka")
public class SendController {

    @Autowired
    private Producer producer;

    @RequestMapping(value = "/send")
    public String send() {
        producer.send();
        return "{\"code\":0}";
    }
}

8.7:生产者发送请求http://localhost:8088/kafka/send,消费者开始消费

2022-01-07 16:24:40.789  INFO 38592 --- [ntainer#0-0-C-1] com.ratel.kafka.consumer.Consumer        : consumer  get  message : "{\"id\":\"KFK_1641543880749\",\"msg\":\"94a89968-40e2-49c3-ac55-4b3b97041e70\",\"sendTime\":\"Jan 7, 2022 4:24:40 PM\"}"
2022-01-07 16:24:44.380  INFO 38592 --- [ntainer#0-0-C-1] com.ratel.kafka.consumer.Consumer        : consumer  get  message : "{\"id\":\"KFK_1641543884372\",\"msg\":\"87f27450-709d-4559-91ad-72d52ee4619f\",\"sendTime\":\"Jan 7, 2022 4:24:44 PM\"}"

 

 

原文地址:https://www.cnblogs.com/ywbmaster/p/15775533.html