java 访问 kerberos 认证的 kafka

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0"
 3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 5     <modelVersion>4.0.0</modelVersion>
 6 
 7     <groupId>com.ht</groupId>
 8     <artifactId>kafkatest</artifactId>
 9     <version>1.0-SNAPSHOT</version>
10     <build>
11         <plugins>
12             <plugin>
13                 <groupId>org.apache.maven.plugins</groupId>
14                 <artifactId>maven-compiler-plugin</artifactId>
15                 <configuration>
16                     <source>1.7</source>
17                     <target>1.7</target>
18                 </configuration>
19             </plugin>
20         </plugins>
21     </build>
22 
23 
24     <dependencies>
25         <dependency>
26             <groupId>org.apache.kafka</groupId>
27             <artifactId>kafka-clients</artifactId>
28             <version>0.10.0.0</version>
29         </dependency>
30     </dependencies>
31 </project>

java 代码

 1 import org.apache.kafka.clients.CommonClientConfigs;
 2 import org.apache.kafka.clients.consumer.ConsumerRecord;
 3 import org.apache.kafka.clients.consumer.ConsumerRecords;
 4 import org.apache.kafka.clients.consumer.KafkaConsumer;
 5 
 6 import java.util.Collections;
 7 import java.util.Properties;
 8 
 9 import static org.apache.kafka.clients.consumer.ConsumerConfig.*;
10 
11 /**
12  * @author sunzq
13  * @since 2017/8/29
14  */
15 public class Application {
16     public static void main(String[] args) {
17 
18         Properties props = new Properties();
19         props.put(BOOTSTRAP_SERVERS_CONFIG, "node1:6667,node2:6667,node3:6667,node4:6667");
20         props.put(ENABLE_AUTO_COMMIT_CONFIG, "true");
21         props.put(GROUP_ID_CONFIG, "test08291103");
22 //      props.put(ConsumerConfig.CLIENT_ID_CONFIG, "test0829");
23         props.put(AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
24         props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
25         props.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
26         props.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
27         props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
28 
29         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
30         // topic name: test9
31         consumer.subscribe(Collections.singleton("test9"));
32         while (true) {
33             ConsumerRecords<String, String> records = consumer.poll(100);
34             for (ConsumerRecord<String, String> record : records)
35                 System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
36         }
37     }
38 }

启动参数

 -Djava.security.krb5.conf=c:\app\conf\krb5.conf -Djava.security.auth.login.config=c:\app\conf\kafka_jaas.conf  

windows 下记得用 \ 

原文地址:https://www.cnblogs.com/kischn/p/7447306.html