kafka环境搭建

kafka环境搭建
    //关于使用 bin/kafka-server-start.sh -daemon config/server.properties启动kafka服务后
    //执行 bin/kafka-server-stop.sh无法关闭kafka服务
    修改 vim bin/kafka-server-stop.sh
    PIDS=$(ps ax | grep -i 'kafka.Kafka' | grep java | grep -v grep | awk '{print $1}')
    将如上语句中的 kafka.Kafka修改为 kafkaServer-gc

    不能停止服务的原因:
      运行脚本时:如果ps输出超过4096个字符的Linux限制,bin / zookeeper-server-stop.sh将
      无法停止zookeeper服务器进程;通过kafka-server-start.sh脚本启动了Kafka,则该进程找
      不到括kafka.Kafka,grep无法找到进程,并且它返回失败消息,没有Kafka服务器停止
    1.kafka下载
      下载地址:http://kafka.apache.org/downloads (选择相应版本及镜像地址下载)
      使用 tar -zxf kafka_2.11-0.10.2.1.tgz
      然后将文件重命名并移动到 /usr/local/kafkacloud目录下
      mv kafka_2.11-0.10.2.1 /usr/local/kafkacloud/kafka1
    2.修改配置文件
      【在/usr/local/kafkacloud/目录下执行操作命令】
      cp kafka1/config/server.properties kafka1/config/server-1.properties
      vim kafka1/config/server-1.properties
      参数修改:
        broker.id=1
        listeners=PLAINTEXT://:9091
        port=9091
        host.name=192.168.10.13
        advertised.port=9091
        advertised.host.name=192.168.10.13
        dvertised.listeners=PLAINTEXT://192.168.10.13:9091
        listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL (被注释掉打开注释)
        log.dirs=/tmp/kafka1-logs1 【该参数文本最后详解】
        zookeeper.connect=192.168.10.13:2181,192.168.10.13:2182,192.168.10.13:2183
        cp kafka/config/server.properties kafka/config/server-2.properties
        vim kafka1/config/server-1.properties
      参数修改:
        broker.id=2
        listeners=PLAINTEXT://:9092
        port=9092
        host.name=192.168.10.13
        advertised.port=9092
        advertised.host.name=192.168.10.13
        dvertised.listeners=PLAINTEXT://192.168.10.13:9092
        listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL (被注释掉打开注释)
        log.dirs=/tmp/kafka1-logs2
        zookeeper.connect=192.168.10.13:2181,192.168.10.13:2182,192.168.10.13:2183
        cp kafka/config/server.properties kafka/config/server-3.properties
        vim kafka1/config/server-1.properties
      参数修改:
        broker.id=3
        listeners=PLAINTEXT://:9093
        port=9093
        host.name=192.168.10.13
        advertised.port=9093
        advertised.host.name=192.168.10.13
        dvertised.listeners=PLAINTEXT://192.168.10.13:9093
        listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL (被注释掉打开注释)
        log.dirs=/tmp/kafka1-logs3
        zookeeper.connect=192.168.10.13:2181,192.168.10.13:2182,192.168.10.13:2183
   3.防火墙端口开放
      执行 vim /etc/sysconfig/iptables (将 9091 9092 9093端口开放)
      文件示例:
      # Manual customization of this file is not recommended.
      *filter
      :INPUT ACCEPT [0:0]
      :FORWARD ACCEPT [0:0]
      :OUTPUT ACCEPT [0:0]
      -A INPUT -m state --state ESTABLISHED,RELATED -j ACCEPT
      -A INPUT -p icmp -j ACCEPT
      -A INPUT -i lo -j ACCEPT
      -A INPUT -m state --state NEW -m tcp -p tcp --dport 22 -j ACCEPT
      -A INPUT -m state --state NEW -m tcp -p tcp --dport 80 -j ACCEPT
      -A INPUT -m state --state NEW -m tcp -p tcp --dport 8080 -j ACCEPT
      -A INPUT -m state --state NEW -m tcp -p tcp --dport 9091 -j ACCEPT (需要添加)
      -A INPUT -m state --state NEW -m tcp -p tcp --dport 9092 -j ACCEPT (需要添加)
      -A INPUT -m state --state NEW -m tcp -p tcp --dport 9093 -j ACCEPT (需要添加)

      :wq (保存退出)
      重启防火墙: service iptables restart
   4.写一个启动kafka小脚本
      4.1启动kafka服务
         cd /usr/local/kafkacloud/
         vim start-kafkas1.sh (启动kafka脚本文件名可随意选取)
          #!/bin/bash
          /usr/local/kafkacloud/kafka1/bin/kafka-server-start.sh -daemon /usr/local/kafkacloud/kafka1/config/server-1.properties
          /usr/local/kafkacloud/kafka1/bin/kafka-server-start.sh -daemon /usr/local/kafkacloud/kafka1/config/server-2.properties
          /usr/local/kafkacloud/kafka1/bin/kafka-server-start.sh -daemon /usr/local/kafkacloud/kafka1/config/server-3.properties
          :wq (保存退出)
          chmod +x start-kafkas1.sh :赋予可执行权限
        执行 ./start-kafkas1.sh 即可启动kafka(由于语句中加入了 -daemon 表示后置启动,启动过程中不会打印启动信息)
        ps aux | grep kafka :可以查看只有有kafka启动成功后的进程
        查看启动打印信息:
          vim /usr/local/kafkacloud/kafka1/logs/server.log 或者 vim /usr/local/kafkacloud/kafka1/logs/kafkaServer.out 进行查看
          注意:例如 kafkaServer.out普通用户不具有写入的权限时,普通用户启动kafka服务会报错(亲历)
     4.2创建一个topic及相关命令介绍
      cd /usr/local/kafkacloud/kafka1
        1. 创建一个叫 test 的 topic (--replication-factor 3:表示3个副本 --partitions 1 :表示一个分区 )
          bin/kafka-topics.sh --create --zookeeper 192.168.10.13:2181,192.168.10.13:2182,192.168.10.13:2183 --replication-factor 3 --partitions 1 --topic test
        2. 查看topic列表
          bin/kafka-topics.sh --list --zookeeper 192.168.10.13:2181,192.168.10.13:2182,192.168.10.13:2183
        3. 查看创建的topic的描述信息
          bin/kafka-topics.sh --describe --zookeeper 192.168.10.13:2181,192.168.10.13:2182,192.168.10.13:2183 --topic test
        4. 运行生产者,然后在控制台输入一些消息发送到服务器
          bin/kafka-console-producer.sh --broker-list 192.168.10.13:9094,192.168.10.13:9095,192.168.10.13:9096 --topic test
          运行后,默认一行为一条消息,回车后消息发送(在另一个 终端界面 运行消费者 即可消费信息)
        5. 运行消费者,消费消息
          bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.13:9094,192.168.10.13:9095,192.168.10.13:9096 --from-beginning --topic test
    5.配置文件中log.dirs参数解释
        以=/tmp/kafka1-logs1为例:
        启动后/tmp/kafka1-logs1该文件夹下:
        5.1 meta.properties文件
          文件内容:
            #Mon Dec 25 10:56:36 CST 2017
            version=0
            broker.id=1
            broker.id是kafka服务的身份标识
            此处的broker.id表示了kafka1-logs1 是与 config/server-1.properties对应
        5.2 test-0 (创建的test这个topic对应的文件夹 test-0)
        (也有可能有test-1 test-2 test-3....)
          根据配置文件config/server-1.properties中参数 num.partitions=1 决定分区数量】
          文件夹下有三个文件:
          00000000000000000000.index
          00000000000000000000.log (test这个topic 下发布的消息内容存放在该文件中)
          00000000000000000000.timeindex
    6.配置文件参数详解
      1、   server.properties
      broker.id :表示服务的代理id 如果未设置,则会生成唯一的代理标识。为避免zookeeper生成的代理

           标识与用户配置的代理标识之间的冲突,生成的代理标识从reserved.broker.max.id + 1开始。
      zookeeper.connect :连接zookeeper
      advertised.host.name :主机名发布到ZooKeeper供客户使用
      advertised.listeners :监听器发布到ZooKeeper供客户使用
      advertised.port :发布到ZooKeeper供客户端使用的端口
      auto.create.topics.enable :在服务器上启用自动创建主题
      auto.leader.rebalance.enable :启用自动领导者平衡。后台线程会定期检查并触发领导平衡
      compression.type :指定给定主题的最终压缩类型(可以不设置)
      delete.topic.enable :启用删除主题。如果此配置已关闭,则通过管理工具删除主题将不起作用
      host.name :仅在未设置“侦听器”时使用。使用`listeners`来代替。broker。如果这个设置,它只会绑定到这个地址。如果没有设置,它将绑定到所有接口
      listeners :监听器列表 - 逗号分隔的我们将监听的URI列表和监听器名称。如果侦听器名称不是安全协议,则还必须

          设置listener.security.protocol.map。指定主机名为0.0.0.0以绑定到所有接口。保留主机名为空以绑定到默认界面。

          合法侦听器列表的示例:PLAINTEXT:// myhost:9092,SSL://:9091 CLIENT://0.0.0.0:9092,REPLICATION:// localhost:9093
      log.dir :保存日志数据的目录(对log.dirs属性的补充)
      log.dirs :日志数据保存的目录。如果未设置,则使用log.dir中的值
      log.flush.interval.ms :任何主题中的消息在刷新到磁盘之前都保留在内存中的最长时间(以毫秒为单位)。如果未设置,则使用log.flush.scheduler.interval.ms中的值
      log.retention.bytes :删除之前日志的最大大小
      log.retention.hours :保留日志文件的小时数(以小时为单位),大写为log.retention.ms属性
      log.retention.minutes :在删除日志文件之前保留日志的分钟数(以分钟为单位),次要log.retention.ms属性。如果未设置,则使用log.retention.hours中的值
      log.retention.ms :保留日志文件的毫秒数(以毫秒为单位),如果未设置,则使用log.retention.minutes中的值
      log.segment.bytes :单个日志文件的最大大小(默认为1073741824:表示1G)

人生没有彩排,每天都是现场直播!
原文地址:https://www.cnblogs.com/northern-light/p/8111099.html