kafka+storm连接

本项目为maven项目,需要添加必要的storm库,以及kafka依赖,使用storm自带的storm-kafka进行连接,根据自己集群环境

		<dependency>
			<groupId>org.apache.storm</groupId>
			<artifactId>storm-core</artifactId>
			<version>0.9.3</version>
		</dependency>

		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka_2.10</artifactId>
			<version>0.8.2.1</version>
			<exclusions>
				<exclusion>
					<groupId>org.apache.zookeeper</groupId>
					<artifactId>zookeeper</artifactId>
				</exclusion>
				<exclusion>
					<groupId>log4j</groupId>
					<artifactId>log4j</artifactId>
				</exclusion>
			</exclusions>
		</dependency>

		<dependency>
			<groupId>org.apache.storm</groupId>
			<artifactId>storm-kafka</artifactId>
			<version>0.9.3</version>
		</dependency>

  实例topology:

package com.xh.kafka.test;

import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;

public class KafkaSpoutTest {

	public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
		
		BrokerHosts brokerHosters = new ZkHosts("zookeeperip1:2181,zookeeperip2:2181/kafka/65_250-252");
		
		String topic = "log_test";
		
		//offsetZkRoot 和 offsetZkId 自定义即可
		String offsetZkRoot = "/storm_test";
		String offsetZkId = "kafka-storm";
		
		SpoutConfig spoutConfig = new SpoutConfig(brokerHosters, topic, offsetZkRoot, offsetZkId);
		
		spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());		
		
				
		Config conf = new Config();
			
		TopologyBuilder builder = new TopologyBuilder();

		builder.setSpout("spout", new KafkaSpout(spoutConfig));
		builder.setBolt("bolt", new SequenceBolt()).shuffleGrouping("spout");
	
		if(args != null && args.length > 0){
			conf.setNumWorkers(3);
			StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
		}else{
			LocalCluster cluster = new LocalCluster();
			cluster.submitTopology("my-topology", conf, builder.createTopology());
		}
	}

}

  此外,不管是本地运行还是集群运行,都需要修改host文件,添加,kafka集群的机器名,例如:

192.168.*.* kafka-01
192.168.**.** kafka-02
192.168.***.*** kafka-03

  否则会报错如下:

23810 [Thread-10-spout] INFO  kafka.consumer.SimpleConsumer - Reconnect due to socket error: java.nio.channels.ClosedChannelException

23815 [Thread-10-spout] ERROR backtype.storm.util - Async loop died!
java.lang.RuntimeException: java.nio.channels.ClosedChannelException
at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) ~[storm-kafka-0.9.3.jar:0.9.3]
at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) ~[storm-kafka-0.9.3.jar:0.9.3]
at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[storm-kafka-0.9.3.jar:0.9.3]
at backtype.storm.daemon.executor$fn__3373$fn__3388$fn__3417.invoke(executor.clj:565) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) ~[storm-core-0.9.3.jar:0.9.3]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Unknown Source) [na:1.7.0_65]
Caused by: java.nio.channels.ClosedChannelException: null
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) ~[kafka_2.10-0.8.2.1.jar:na]
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) ~[kafka_2.10-0.8.2.1.jar:na]
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) ~[kafka_2.10-0.8.2.1.jar:na]
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127) ~[kafka_2.10-0.8.2.1.jar:na]
at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[kafka_2.10-0.8.2.1.jar:na]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:77) ~[storm-kafka-0.9.3.jar:0.9.3]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:67) ~[storm-kafka-0.9.3.jar:0.9.3]
at storm.kafka.PartitionManager.<init>(PartitionManager.java:83) ~[storm-kafka-0.9.3.jar:0.9.3]
at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[storm-kafka-0.9.3.jar:0.9.3]
... 6 common frames omitted

  

后来项目搬迁到了jstorm平台,这里补充下jstorm和kafka的连接方法:

原文地址:https://www.cnblogs.com/lemonqin/p/4444339.html