zookeeper

package com.bnls.test.common

import kafka.common.TopicAndPartition
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
import org.slf4j.LoggerFactory

import scala.collection.JavaConversions._
import scala.collection.mutable


object ZookeeperHelper {

val LOG = LoggerFactory.getLogger(ZookeeperHelper.getClass)

//链接zookeeper的host及端口
val zk_connectstring = "10.60.81.168:2181,10.60.81.167:2181,10.60.81.166:2181"
//zookeeper表空间
val zk_namespace = "mykafka"
// offset 路径起始位置
val Globe_kafkaOffsetPath = "/kafka/offsets"

// ZK client
val zkClient = {
val client = CuratorFrameworkFactory.builder.connectString(zk_connectstring)
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).namespace(zk_namespace).build()
client.start()
client
}

// 路径确认函数 确认ZK中路径存在,不存在则创建该路径
def ensureZKPathExists(path: String) = {
if (zkClient.checkExists().forPath(path) == null) {
zkClient.create().creatingParentsIfNeeded().forPath(path)
}
}

// 保存 新的 offset
def storeOffsets(offsetRange: Array[OffsetRange], groupName: String) = {

for (o <- offsetRange) {
val zkPath = s"${Globe_kafkaOffsetPath}/${groupName}/${o.topic}/${o.partition}"

// 向对应分区第一次写入或者更新Offset 信息
println("---Offset写入ZK------ Topic:" + o.topic + ", Partition:" + o.partition + ", Offset:" + o.untilOffset)
ensureZKPathExists(zkPath)
zkClient.setData().forPath(zkPath, o.untilOffset.toString.getBytes())
}
println(s"保存新 offset 成功!")
}

def getZKOffset(kafkaParam: Map[String, String], topic: Set[String], groupName: String): Map[TopicAndPartition, Long] = {

// Kafka 0.8和0.10的版本差别,0.10 为 TopicPartition 0.8 TopicAndPartition
var offsets: Map[TopicAndPartition, Long] = Map()

val topic1 = topic.head

// 读取ZK中保存的Offset,作为Dstrem的起始位置。如果没有则创建该路径,并从 0 开始Dstream
val zkTopicPath = s"${Globe_kafkaOffsetPath}/${groupName}/${topic1}"
// 检查路径是否存在
ensureZKPathExists(zkTopicPath)

// 获取topic的子节点,即 分区
val childrens = zkClient.getChildren().forPath(zkTopicPath)

// 遍历分区
for { p <- childrens }
yield {

// 遍历读取子节点中的数据:即 offset
val offsetData = zkClient.getData().forPath(s"$zkTopicPath/$p")
// 将offset转为Long
val offSet = java.lang.Long.valueOf(new String(offsetData)).toLong
offsets += TopicAndPartition(topic1, Integer.parseInt(p)) -> offSet
}
offsets
}

def getResetOffsets(kafkaParam: Map[String, String], topics: Set[String]): Map[TopicAndPartition, Long] = {
//复制KafkaCluster
val cluster = new KafkaClusterHelper(kafkaParam)

var offsets: Map[TopicAndPartition, Long] = Map()
System.out.println("dddddd22222222222dddd")
// 最新或者最小offset reset为smallest或largest
val reset = kafkaParam.get("auto.offset.reset").map(x => x.toString.toLowerCase())
System.out.println(kafkaParam)
System.out.println(cluster.toString)
System.out.println(reset)
val topicAndPartitions: Set[TopicAndPartition] = cluster.getPartitions(topics).right.get
System.out.println(topicAndPartitions)

if (reset == Some("smallest")) {
System.out.println("start**********")
val leaderOffsets = cluster.getEarliestLeaderOffsets(topicAndPartitions).right.get
System.out.println(leaderOffsets)
topicAndPartitions.foreach(tp => {
offsets += tp -> leaderOffsets(tp).offset
})
} else if (reset == Some("largest")) {
val leaderOffsets = cluster.getLatestLeaderOffsets(topicAndPartitions).right.get
topicAndPartitions.foreach(tp => {
offsets += tp -> leaderOffsets(tp).offset
})
}
offsets
}

def getConSumerOffsets(kafkaParam: Map[String, String], topicSet1:Set[String], groupName:String) : (Map[TopicPartition, Long],Int) = {

val brokers = kafkaParam("bootstrap.servers")
System.out.println(brokers)
// println(brokers)
var topicSet = topicSet1.toArray
System.out.println("rrrrrrrrrrrrrrrr")
System.out.println(topicSet.toString)
val kafkaSmallestParams = Map[String, String]("metadata.broker.list" -> brokers, "auto.offset.reset" -> "smallest")
val kafkaLargestParams = Map[String, String]("metadata.broker.list" -> brokers, "auto.offset.reset" -> "largest")

var offSets: mutable.Buffer[(TopicPartition, Long)] = mutable.Buffer()

val smallOffsets = getResetOffsets(kafkaSmallestParams, topicSet1)
val largestOffsets = getResetOffsets(kafkaLargestParams, topicSet1)
val consumerOffsets = getZKOffset( kafkaParam,topicSet1, groupName) // cOffset-从外部存储中读取的offset

smallOffsets.foreach({
case(tp, sOffset) => {
val cOffset = if (!consumerOffsets.containsKey(tp)) 0 else consumerOffsets(tp)
val lOffset = largestOffsets(tp)
if(sOffset > cOffset) {
offSets.append((new TopicPartition(tp.topic.toString,tp.partition.toInt), sOffset))
} else if(cOffset > lOffset){
offSets.append((new TopicPartition(tp.topic.toString,tp.partition.toInt), lOffset))
} else{
offSets.append((new TopicPartition(tp.topic.toString,tp.partition.toInt), cOffset))
}
}
})
if(offSets.isEmpty){
(offSets.toMap,0)
} else {
(offSets.toMap, 1)
}
}


}
原文地址:https://www.cnblogs.com/heguoxiu/p/10064292.html