kafka 配置kerberos校验以及开启acl实践

转载请注明原创地址:http://www.cnblogs.com/dongxiao-yang/p/7131626.html      

   kafka从0.9版本以后引入了集群安全机制,由于最近需要新搭建一套kafka集群,决定采用SASL/GSSAPI (Kerberos) 作为新集群的权限系统基础,本次新集群版本为0.10.2.0。

   团队内部已有同学搭建了专门的kerberos服务器,所以省掉了自建kerberos的步骤。

 (1)首先是为broker每台服务器在kerber服务器生成相应的principal和keytab,将下列命令里生成的kafka.keytab文件分发到对应broker机器的统一位置,比如/etc/kafka.keytab

addprinc -randkey kafka/kafkahost1@EXAMPLE.COM
addprinc -randkey kafka/kafkahost2@EXAMPLE.COM
addprinc -randkey kafka/kafkahost3@EXAMPLE.COM
.........


xst -norandkey -k /opt/kafkahost1/kafka.keytab kafka/kafkahost1@EXAMPLE.COM
xst -norandkey -k /opt/kafkahost2/kafka.keytab kafka/kafkahost2@EXAMPLE.COM
xst -norandkey -k /opt/kafkahost3/kafka.keytab kafka/kafkahost3@EXAMPLE.COM
.....

(2)配置kafka server文件

listeners=SASL_PLAINTEXT://:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.enabled.mechanisms=GSSAPI
sasl.kerberos.service.name=kafka //这里的service.name要跟上面建立的principal相对应,kafka/kafkahost3@EXAMPLE.COM的principal服务名就是kafka
super.users=User:kafka  //acl相关,broker服务本身是采用kafka这个服务身份进行交互,只有配置成superuser才能获取集群内的metadata信息
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer //acl相关,配置后才能启用acl

 (3)建立kafka_server_jaas.conf文件,由于集群使用的zookeeper并没有启用kerberos,所以没有client模块,KafkaClient模块是为了bin目录下kafka-console-consumer.sh之类的的脚本使用的

KafkaServer {
            com.sun.security.auth.module.Krb5LoginModule required
            useKeyTab=true
            storeKey=true
            keyTab="/etc/kafka.keytab"
            principal="kafka/kafkahost1@EXAMPLE.COM";
        };

KafkaClient {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        storeKey=true
        keyTab="/etc/kafka.keytab"
        principal="kafka/kafkahost1@EXAMPLE.COM"
        useTicketCache=true;
};

 (4)修改bin目录下kafka-run-class.sh,在  exec $JAVA 后面增加kerberos启动参数,然后就可以用正常的脚本启动服务了:

-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf  

 

(5)客户端脚本使用

启用kerberos后,部分kafka管理脚本需要增加额外的参数才能使用

首先建立配置文件client.properties

security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
sasl.mechanism=GSSAPI

所以新命令的使用方式为

bin/kafka-consumer-groups.sh --bootstrap-server kafkahost1:9092 --list --command-config client.properties

bin/kafka-console-producer.sh --broker-list kafkahost1:9092 --topic dxTT --producer.config client.properties

bin/kafka-console-consumer.sh --bootstrap-server kafkahost1:9092 --topic dxTT --consumer.config client.properties

 

问题记录:

kafka服务端配置好kerberos后,controller持续报无法连接到broker的错误(包括连接自身实例),大概错误如下

[2018-01-25 17:48:41,864] WARN [Controller-60-to-broker-60-send-thread], Controller 60's connection to broker kafka60:9092 (id: 60 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
java.io.IOException: Connection to kafka60:9092 (id: 60 rack: null) failed
at kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:84)
at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94)
at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:232)
at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:185)
at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:184)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2018-01-25 17:48:41,970] WARN [Controller-60-to-broker-60-send-thread], Controller 60's connection to broker kafka60:9092 (id: 60 rack: null) was unsuccessful (kafka.controller.RequestSendThread)

原因: 打开kafka-authorizer.log的DEBUG日志会看到具体错误,这个是由于线上jre的环境缺少kerberos认证的算法库导致的,更新jre相关类库即可

[2018-01-25 17:55:31,155] DEBUG Connection with /host disconnected (org.apache.kafka.common.network.Selector)
java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: Failure unspecified at GSS-API level (Mechanism level: Encryption type AES256 CTS mode with HMAC SHA1-96 is not supported/enabled)]
at org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:250)
at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:71)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:350)
at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
at kafka.network.Processor.poll(SocketServer.scala:494)
at kafka.network.Processor.run(SocketServer.scala:432)
at java.lang.Thread.run(Thread.java:748)
Caused by: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: Failure unspecified at GSS-API level (Mechanism level: Encryption type AES256 CTS mode with HMAC SHA1-96 is not supported/enabled)]
at com.sun.security.sasl.gsskerb.GssKrb5Server.evaluateResponse(GssKrb5Server.java:199)
at org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:235)
... 6 more
Caused by: GSSException: Failure unspecified at GSS-API level (Mechanism level: Encryption type AES256 CTS mode with HMAC SHA1-96 is not supported/enabled)
at sun.security.jgss.krb5.Krb5Context.acceptSecContext(Krb5Context.java:856)
at sun.security.jgss.GSSContextImpl.acceptSecContext(GSSContextImpl.java:342)
at sun.security.jgss.GSSContextImpl.acceptSecContext(GSSContextImpl.java:285)
at com.sun.security.sasl.gsskerb.GssKrb5Server.evaluateResponse(GssKrb5Server.java:167)
... 7 more
Caused by: KrbException: Encryption type AES256 CTS mode with HMAC SHA1-96 is not supported/enabled
at sun.security.krb5.EncryptionKey.findKey(EncryptionKey.java:522)
at sun.security.krb5.KrbApReq.authenticate(KrbApReq.java:273)
at sun.security.krb5.KrbApReq.<init>(KrbApReq.java:149)
at sun.security.jgss.krb5.InitSecContextToken.<init>(InitSecContextToken.java:108)
at sun.security.jgss.krb5.Krb5Context.acceptSecContext(Krb5Context.java:829)
... 10 more

原文地址:https://www.cnblogs.com/dongxiao-yang/p/7131626.html