KafKa

一、Kafaka 介绍

Apache Kafka 是分布式发布-订阅消息系统。 它最初由 LinkedIn 公司开发, 之后成为 Apache 项目的一部分。 Kafka 是一种快速、 可扩展的、 设计内在就是 分布式的, 分区的和可复制的提交日志服务 Kafka 是一个消息系统, 原本开发自 LinkedIn, 用作 LinkedIn 的活动流
( activity stream) 和运营数据处理管道( pipeline) 的基础。 后贡献给 apache 基金会, 成为 apache 的一个顶级项目。 

补充:

1、Kafka是一个分布式发布-订阅消息系统。分布式,意味着Kafka
可以在集群中运行。
2、消息的发布者,称为生产者(Producers),负责发布消息。发布消息之前,
需要发布确定一个主题(topic)。
3、broker,消息的缓存代理,相当于消息的服务器。生产者发布
的消息缓存在broker中。
4、消息的订阅者,称为消费者(Consumers),负责订阅消费消息。它向broker
中获取订阅的消息进行消费。
5、Kafka集群的运行需要Zookeeper来协调服务,所以在运行Kafka
集群之前,必须保证Zookeeper集群能正常运行。
6、在企业的大数据开发中,
经常用Flume+Kafka+Spark Streaming(Storm)或者
Kafka+Spark Streaming+Redis的方式进行开发。

NoSQL:not only SQL(不仅仅是SQL),
常用NoSQL数据库:HBase、ES(Elastic Search)、Redis、MongDB

 

二、安装Kafka步骤

        1、安转zookeeper

        2、安装kafaka

        3、下载解压安装包kafka_2.10-0.9.0.1.tgz

        4、vi config/server.properties

        5、broker.id=0
            host.name=liuiwei3
           zookeeper.connect=liuwei3:2181,liuwei1:2181,liuwei4:2181

        6、将kafka复制到其他节点 scp -r  kafka_2.10-0.9.0.1 hadoop@liuwei2:$PWD                          scp -r  kafka_2.10-0.9.0.1 hadoop@liuwei4:$PWD 

        7、在其他两个salve节点分别修改 

              broker.id=1
            host.name=liuiwei2
           zookeeper.connect=liuwei3:2181,liuwei1:2181,liuwei4:2181

         8、启动kafka

             在三台机器上分别启动              ./bin/kafka-server-start.sh -daemon config/server.properties

         9、检验kafka是否安装成功

             在master节点(liuwei3)操作:创建一个名为 test 的主题

                bin/kafka-topics.sh --create --zookeeper liuwei3:2181,liuwei2:2181,liuwei4:2181 --replication-factor 1 --partitions 1 --topic test

               在一个终端上启动一个生产者  ./bin/kafka-console-producer.sh --broker-list liuwei3:9092,liuwei2:9092,liuwei4:9092 --topic test

              然后再键盘输入信息:hello   

              在另一个终端中启动消费者(liuwei2,liuwei4),进入交互客户端,命令如下: ./bin/kafka-console-consumer.sh --zookeeper liuwei2:2181 --topic test --from-beginning

             会在屏幕上显示相同的信息 hello

三、Kafka+Spark Streaming

object KafkaDemo {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("KafkaDemo")
              .setMaster("local[2]")
    val ssc=new StreamingContext(conf,Seconds(5))
    //创建主题集合
    val topicSet=Map(("tgtest2"->1))

    /**
      * 第一个参数:StreamingContext对象
      * 第二个参数:Zookeeper集群,注意端口:2181
      * 第三个参数:消费者Consumer所在小组
      * 第四个参数:运行的主题
      */
    //从Kafka中获取数据
    val lines=KafkaUtils.createStream(ssc,
    "tgmaster:2181,tgslave:2181","DefaultConsumerGroup",topicSet)
    val result=lines.flatMap(x=>{
      x._2.split(" ")
    }).map(word=>(word,1))
      .reduceByKey(_+_)

    result.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

四、kafka的基本操作   

1. 启动 Kafka:
bin/kafka-server-start.sh config/server.properties &
2. 创建 Topics
bin/kafka-topics.sh --create --zookeeper zookeeper31:2181
-replication-factor 1 --partitions 3 --topic mydemo1
3. 查看 Topcis 队列列表
bin/kafka-topics.sh --list --zookeeper zookeeper31:2181 --topic mydemo1
4. 查看队列明细
bin/kafka-topics.sh --describe --zookeeper zookeeper31:2181 --topic
mydemo1
5. 停止 Kafka: bin/kafka-server-stop.sh config/server.properties &
6. 发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
mydemo1
7. 接收消息
bin/kafka-console-consumer.sh --zookeeper zookeeper31:2181 --topic
mydemo1

   

五、spark streaming 操作kafka

   1、基于 Receiver 的方式

这种方式使用 Receiver 来获取数据。Receiver 是使用 Kafka 的高层次 Consumer API 来实现的。 receiver 从 Kafka 中获取的数据都是存储在 Spark Executor 的 内存中的, 然后 Spark Streaming 启动的 job 会去处理那些数据。

然而, 在默认的配置下, 这种方式可能会因为底层的失败而丢失数据。 如果要启 用高可靠机制, 让数据零丢失, 就必须启用 Spark Streaming 的预写日志机制 ( Write Ahead Log, WAL) 。 该机制会同步地将接收到的 Kafka 数据写入分布式 文件系统( 比如 HDFS) 上的预写日志中。 所以, 即使底层节点出现了失败, 也 可以使用预写日志中的数据进行恢复。

如何进行 Kafka 数据源连接?
(1) 在 maven 添加依赖
groupId = org.apache.spark
artifactId = spark-streaming-kafka_2.10
version = 1.5.1
(2) 使用第三方工具类创建输入 DStream
JavaPairReceiverInputDStream<String, String> kafkaStream =
KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka
partitions to consume]);

需要注意的要点:
(1) Kafka 中的 topic 的 partition, 与 Spark 中的 RDD 的 partition 是没有 关系的。 所以, 在 KafkaUtils.createStream()中, 提高 partition 的数量, 只 会增加一个 Receiver 中, 读取 partition 的线程的数量。 不会增加 Spark 处理 数据的并行度。
(2) 可以创建多个 Kafka 输入 DStream,使用不同的 consumer group 和 topic, 来通过多个 receiver 并行接收数据。
(3) 如果基于容错的文件系统, 比如 HDFS, 启用了预写日志机制, 接收到的 数据都会被复制一份到预写日志中。 因此, 在 KafkaUtils.createStream() 中, 设置的持久化级别是 StorageLevel.MEMORY_AND_DISK_SER。

Kafka 命令
bin/kafka-topics.sh --zookeeper tgmaster:2181,tgslave:2181 --topic  test4 --replication-factor 1 --partitions 1 --create

bin/kafka-console-producer.sh --broker-list tgmaster:9092,tgslave:9092  --topic test4

  2. 基于 Direct 的方式

这种新的不基于 Receiver 的直接方式, 是在 Spark 1.3 中引入的, 从而能够确 保更加健壮的机制。 替代掉使用 Receiver 来接收数据后, 这种方式会周期性地 查询 Kafka,来获得每个 topic+partition 的最新的 offset,从而定义每个 batch
的 offset 的范围。 当处理数据的 job 启动时, 就会使用 Kafka 的简单 consumer api 来获取 Kafka 指定 offset 范围的数据。
    

这种方式有如下优点:
(1)简化并行读取: 如果要读取多个 partition, 不需要创建多个输入 DStream 然后对它们进行 union 操作。 Spark 会创建跟 Kafka partition 一样多的 RDD partition, 并且会并行从 Kafka 中读取数据。 所以在 Kafka partition 和 RDD
partition 之间, 有一个一对一的映射关系。
(2) 高性能: 如果要保证零数据丢失, 在基于 receiver 的方式中, 需要开启 WAL 机制。 这种方式其实效率低下, 因为数据实际上被复制了两份, Kafka 自己 本身就有高可靠的机制, 会对数据复制一份, 而这里又会复制一份到 WAL 中。 而
基于 direct 的方式, 不依赖 Receiver, 不需要开启 WAL 机制, 只要 Kafka 中作 了数据的复制, 那么就可以通过 Kafka 的副本进行恢复。
(3) 一次且仅一次的事务机制:
基于 receiver 的方式, 是使用 Kafka 的高阶 API 来在 ZooKeeper 中保存消费过的 offset 的。 这是消费 Kafka 数据的传统方式。 这种方式配合着 WAL 机制可以保证数据零丢失的高可靠性, 但是却无法保证数据被处理一次且仅一
次, 可能会处理两次。 因为 Spark 和 ZooKeeper 之间可能是不同步的。 基于 direct 的方式, 使用 kafka 的简单 api, Spark Streaming 自己就 负责追踪消费的 offset, 并保存在 checkpoint 中。 Spark 自己一定是同步的, 因此可以保证数据是消费一次且仅消费一次。

JavaPairReceiverInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(streamingContext,
[key class], [value class], [key decoder class], [value decoder
class], [map of Kafka parameters], [set of topics to consume]);

Kafka 命令:
bin/kafka-topics.sh --zookeeper 192.168.1.107:2181,192.168.1.108:2181  --topic TestTopic --replication-factor 1 --partitions 1 --create

bin/kafka-console-producer.sh --broker-list
192.168.1.107:9092,192.168.1.108:9092 --topic TestTopic
192.168.1.191:2181,192.168.1.192:2181,192.168.1.193:2181
metadata.broker.list



补充:  kafka相关网址

http://www.cnblogs.com/yinchengzhe/p/5111635.html

http://chengjianxiaoxue.iteye.com/blog/2190488

http://blog.csdn.net/u014373825/article/details/42711191
http://www.cnblogs.com/mengyou0304/p/4836555.html

http://www.tuicool.com/articles/ieUZne     kafka常用命令说明

http://1028826685.iteye.com/blog/2326601  kafka参数详解

                   

原文地址:https://www.cnblogs.com/liuwei6/p/6648515.html