storm kafka整合

public class KafkaTopo {
	
	public static void main(String[] args) {
		String zkRoot = "/kafka-storm";
		String spoutId = "KafkaSpout";
		BrokerHosts brokerHosts = new ZkHosts("m2:2181,m7:2181,m8:2181"); 
		SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "test2", zkRoot, spoutId);
		// spoutConfig.forceFromStart = true;
		spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());
		TopologyBuilder builder = new TopologyBuilder();
		//设置一个spout用来从kaflka消息队列中读取数据并发送给下一级的bolt组件,此处用的spout组件并非自定义的,而是storm中已经开发好的KafkaSpout
		builder.setSpout("KafkaSpout", new KafkaSpout(spoutConfig));
		builder.setBolt("UpperBolt", new UpperBolt()).shuffleGrouping("KafkaSpout");
		builder.setBolt("ExtBolt", new ExtBolt(), 4).fieldsGrouping("UpperBolt", new Fields("name"));
		Config conf = new Config();
		conf.setNumWorkers(4);
		conf.setNumAckers(0);
		conf.setDebug(false);
		
		//LocalCluster用来将topology提交到本地模拟器运行,方便开发调试
		LocalCluster cluster = new LocalCluster();
		cluster.submitTopology("WordCount", conf, builder.createTopology());
		
		//提交topology到storm集群中运行
//		StormSubmitter.submitTopology("sufei-topo", conf, builder.createTopology());
	}
	
}

  

原文地址:https://www.cnblogs.com/heml/p/6074960.html