kafka+SparkStreaming以及Kafka+Kerberos+Sentry+SparkStreaming代码调试问题与分析

【调试背景】
目前测试kafka集群有两套,版本为 0.10.x。有一套是添加了Kerberos+Sentry认证,另一套没有添加。
现在需要通过sparkStreaming接入kafka做实时分析。
【总体结论】
实验1:1.6.x版本spark的jar包,0.8.x.x的spark-streaming-kafka,无Kerberos+Sentry认证,用createStream,可以从zk中获取broker,接入成功;
实验2:1.6.x版本spark的jar包,0.8.x.x的spark-streaming-kafka,有Kerberos+Sentry认证,用createStream,无法zk中获取broker,接入失败,报空指针;
实验3:1.6.x版本spark的jar包,0.8.x.x的spark-streaming-kafka,有Kerberos+Sentry认证,用createDirectStream,直接设置broker,接入失败,报EOFException;
实验4:2.1.x版本spark的jar包,010版本的spark-streaming-kafka,有Kerberos+Sentry认证,用createDirectStream,直接设置broker,需要修改“KafkaUtils”源码,接入成功;
PS     :2.1.x版本spark的jar包,010版本的spark-streaming-kafka,无Kerberos+Sentry认证,用createDirectStream,直接设置broker,接入成功;
【实验1】可以正常运行
(1)kafka环境:无Kerberos+Sentry认证
(2)使用jar包:

 
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.6.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>1.6.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.10</artifactId>
            <version>1.6.1</version>
        </dependency>
(3)核心代码:
package com.xx.kafka;

import java.util.HashMap;
import java.util.Map;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import scala.Tuple2;

public class PrintRecMsg {
    
    public static void main(String[] args) {
 
        Map<String, Integer> topicmap = new HashMap<>();
        topicmap.put(args[0], 2);
        SparkConf sparkConf = new SparkConf().setAppName("PrintRecMsg").setMaster("local[2]");

        final JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));
        String zkQuorum ="host1:2181,host2:2181,host3:2181";
        String group = "mygroup";
        
        JavaPairReceiverInputDStream<String, String> lines = KafkaUtils.createStream(jssc, zkQuorum, group, topicmap);

        JavaDStream<String> msg = lines.map(new Function<Tuple2<String,String>, String>() {

            @Override
            public String call(Tuple2<String, String> tuple2) throws Exception {
                return tuple2._1() + "," + tuple2._2();
            }
        });
        msg.print(20);
        jssc.start();
        jssc.awaitTermination();
    }
}

 

 
(4)分析:
创建的流只需要三个和Kafka有关的参数:zk集群地址,消费者组,topicMap。
createStream是走的Zookeeper去获取对应集群broker信息,然后进行消费。
 
【实验2】无法运行
(1)kafka环境:kafka+Kerberos+Sentry认证
(2)使用jar包:(同上)
(3)核心代码:由于需要添加额外的kafka参数,因此采用了另一个“createStream”的重载方法。
public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf().setAppName("PrintDirectMsg").setMaster("local[2]"); 
        final JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000)); 

        Map<String, Integer> topicmap = new HashMap<>();
        topicmap.put("gaoweiurl", 2);
        
        HashMap<String, String> kafkaParams = new HashMap<String, String>();
        kafkaParams.put("zookeeper.connect", "host1:2181,host2:2181,host3:2181";);
        kafkaParams.put("group.id", "mygroup");
        kafkaParams.put("auto.offset.reset", "largest");
        /** 以下是和Kerberos+sentry认证相关  **/
        kafkaParams.put("security.protocol", "SASL_PLAINTEXT");
        kafkaParams.put("sasl.mechanism", "GSSAPI");
        kafkaParams.put("sasl.kerberos.service.name", "kafka");
        System.setProperty("java.security.auth.login.config", "/xx/xx/kafka-jaas.conf");
        System.setProperty("java.security.krb5.conf", "/xx/xx/krb5.conf");
        
        JavaPairInputDStream<String, String> lines = KafkaUtils.createStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicmap, StorageLevel.MEMORY_AND_DISK_SER_2());
        
        JavaDStream<String> msg = lines.map(new Function<Tuple2<String, String>, String>() {

            @Override
            public String call(Tuple2<String, String> tuple2) throws Exception {
                return tuple2._1() + "," + tuple2._2();
            }
        });
        msg.print(20);
        jssc.start();
        jssc.awaitTermination();
    } 
 (4)结果及分析:
(4)问题及分析
 
 
 
Zookeeper可以正常连接,而且日志显示,已经成功通过Kerberos+sentry认证。
但是在开始消费消息的时候,一直报一个错误:
通过异常堆栈一行行的查找源码,首先:
==>“at org.apache.kafka.common.utils.Utils.formatAddress(Utils.java:312)”
抛出了空指针异常。进入代码发现:
说明传入host为空。
==>“at kafka.cluster.Broker.connectionString(Broker.scala:62)”
说明是创建Broker对象的时候,调用“connectionString(host,port)”出现的。
查看这个方法,是Broker的一个成员方法
创建Broker的时候,入参host为空,那么是谁创建Broker对象呢?
==>“at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)”
 
这行代码说明,是有人先创建了“brokers”,然后在执行“fetchTopicMetadata”方法的时候,
执行broker.map(_.connectionString),而brokers的“host”为空,所以空指针异常。
这里的结果已经明确了,但是,还没有找到brokers对象创建的地方。
异常堆栈在这个地方的时候就断了。剩下的就是找到brokers的创建位置。
接下来从下往上看异常:
==>“at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)”
线程“LeaderFinderThread”启动之后,开始执行doWork()方法。
然后创建了brokers,然后通过“ClientUtlis.fetchTopicMetadata”触发了前面的空指针错误。
那么“getAllBrokersInCluster(zkClient)”方法是怎么生成brokers的呢?
首先,这个例子是通过Zookeeper获取broker的,这个地方基本上可以确定是通过zk获取kafka的broker信息。
通过
进入“getAllBrokersInCluster”方法:
其中,BrokerIdspath为:
接下来,
总之,就是会读取“brokers/ids/id”中的信息,返回brokerInfo,然后创建broker。
回到Broker,进入“createBroker”方法查看:
最终,看看配置了Kerberos+Sentry的kafka的broker信息吧,
呵呵,比较下不加Kerberos+sentry的kafka的broker信息:
呵呵。
 
【实验3】无法运行
(1)kafka环境:kafka+Kerberos+Sentry认证
(2)使用jar包:(同上)
(3)核心代码:使用createDirectStream,不用Zookeeper,直接连接broker。
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import kafka.serializer.StringDecoder;
import scala.Tuple2;

public class PrintDirectMsgDirect {
    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf().setAppName("PrintDirectMsg").setMaster("local[2]");
        final JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));

        int numThreads = 2;
        Map<String, Integer> topicmap = new HashMap<>();
        topicmap.put("gaoweiurl", numThreads);
        Set<String> topicSet = new HashSet<>();
        topicSet.add("gaoweiurl");
        
        HashMap<String, String> kafkaParams = new HashMap<String, String>();
        kafkaParams.put("bootstrap.servers", "node86:9092,node99:9092,node101:9092");
        kafkaParams.put("metadata.broker.list", "node86:9092,node99:9092,node101:9092");
        kafkaParams.put("group.id", "spark-executor-kafka_shjs_wlpt");
        kafkaParams.put("auto.offset.reset", "smallest");
        kafkaParams.put("enable.auto.commit", "true");
        kafkaParams.put("security.protocol", "SASL_PLAINTEXT");
        kafkaParams.put("sasl.mechanism", "GSSAPI");
        kafkaParams.put("sasl.kerberos.service.name", "kafka");
        System.setProperty("java.security.auth.login.config", "D:\Kerberos\kafka-jaas.conf");
        System.setProperty("java.security.krb5.conf", "D:\Kerberos\krb5.conf");
        JavaPairInputDStream<String, String> lines= KafkaUtils.createDirectStream(jssc,  String.class, String.class,
                StringDecoder.class, StringDecoder.class, kafkaParams, topicSet);
    
        JavaDStream<String> msg = lines.map(new Function<Tuple2<String, String>, String>() {
            @Override
            public String call(Tuple2<String, String> tuple2) throws Exception {
                return tuple2._1() + "," + tuple2._2();
            }
        });
        msg.print(20);
        jssc.start();
        jssc.awaitTermination();
    }
}
(4)结果及分析
然后,百度了下,呵呵。
Spark的1.x版本不支持“Secure Kafka”。
【实验4】失败后成功
换了高版本的spark和sparkStreaming和sparkStreamingKafka的jar包
(1)kafka环境:kafka+Kerberos+Sentry认证
(2)使用jar包:全部是高版本
 <spark.version>2.1.0</spark.version>
        <spark-streaming-kafka.version>2.1.0</spark-streaming-kafka.version> 
<!-- spark core 核心依赖包 -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.10</artifactId>
                <version>${spark.version}</version>
            </dependency>

            <!-- spark streaming 依赖包 -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.10</artifactId>
                <version>${spark.version}</version>
            </dependency>

            <!-- spark streaming kafka 依赖包 -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-10_2.10</artifactId>
                <version>${spark-streaming-kafka.version}</version>
            </dependency>
(3)核心代码:
package com.ustcinfo.ishare.bdp.spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

/**
 * @Description: 所有spark任务的总入口
 * @author: Beethoven.S
 * @date: 2017/9/14 13:50 
 * @e-mail: sheng.gang@ustcinfo.com
 */

object sparkJobExecutor {

    /** krb5.conf配置文件 **/
    val KRB5_CONF: String = "D:\Kerberos\krb5.conf"
    /** JAAS配置文件 **/
    val KAFKA_JAAS_CONF: String = "D:\Kerberos\kafka-jaas.conf"
    /** kafka broker地址,多个broker用逗号分开 **/
    val KAFKA_BROKERS: String = "node86:9092,node99:9092,node101:9092"

    def main(args: Array[String]) {
        /** 添加Kerberos认证所需的JAAS配置文件到运行时环境 **/
        System.setProperty("java.security.auth.login.config", KAFKA_JAAS_CONF)
        /** 添加krb5配置文件到运行时环境 **/
        System.setProperty("java.security.krb5.conf", KRB5_CONF)

        val sparkConf = new SparkConf().setMaster("local[4]").setAppName("sparkStreaming")
        val streamingContext = new StreamingContext(sparkConf, Seconds(10))
        val stream = KafkaUtils.createDirectStream[String, String](
            streamingContext,
            PreferConsistent,
            ConsumerStrategies.Subscribe[String, String](Array("gaoweiurl"), Map(
                "bootstrap.servers" -> "node86:9092,node99:9092,node101:9092",
                "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
                "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
                "group.id" -> "kafka_shjs_wlpt",
                "enable.auto.commit" -> "true",
                "auto.offset.reset" -> "earliest",
                "sasl.kerberos.service.name" -> "kafka",
                "security.protocol" -> "SASL_PLAINTEXT"
            ))
        )
        stream.foreachRDD(kv => {
            println("============> ")
            kv.foreach(x => println("RDD==> " + x))
        })


        streamingContext.start()
        streamingContext.awaitTermination()
    }
}
 
然后,在没有数据写入的时候,很正常,但是一旦开始接收数据之后,就会出现如下错误:
没错,sparkStreaming开始监听并且连接的时候,用的消费者组ID确实是我代码中配置的:
但是,一旦开始接收消息,通过RDD读取数据的时候,groupId居然被自动添加了“spark-executor-”的前缀。
(我这个开始设置的“spark-executor-kafka_shjs_wlpt”这个就是看看是不是有了前缀就不再加的,结果,
还是自动的添加了前缀,成了“spark-executor-spark-executor-kafka_shjs_wlpt”)。
然后,在spark运行的时候发现了这句话:
 
找到“KafkaUtils”代码中,追踪到万恶之源:
而且是,只要收到消息,创建RDD的时候会这么干:
 
解决方式就是覆盖这个源码:
 
注意覆盖的时候,包的名称路径必须要和源码路径一模一样,否则会出现scala的私有依赖引用问题。
然后再次,执行:
kafka的数据可以正常读取。
 
 
原文地址:https://www.cnblogs.com/shenggang/p/7997792.html