storm结合kafka

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hc</groupId>
    <artifactId>mysqldetect</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>mysqldetect</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>1.1.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>1.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>1.0.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.0.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

DetectTopology.java

package com.hc.mysqldetect;

import java.util.Properties;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.kafka.bolt.KafkaBolt;
import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;

public class DetectTopology {

 public static void main(String[] args) throws Exception {

  BrokerHosts brokerHosts = new ZkHosts("10.100.66.20:2181");
  SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "mysql_log", "/mysql_log", "kafkaspout");
  spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());
  KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

  Properties props = new Properties();
  props.put("bootstrap.servers", "10.100.66.20:9092");
  props.put("acks", "1");
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  KafkaBolt kafkaBolt = new KafkaBolt().withProducerProperties(props)
    .withTopicSelector(new DefaultTopicSelector("mysql_inject"))
    .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());

  TopologyBuilder builder = new TopologyBuilder();
  builder.setSpout("kafkaSpout", kafkaSpout);
  builder.setBolt("detectBolt", new DetectBolt()).shuffleGrouping("kafkaSpout");
  builder.setBolt("kafkaBolt", kafkaBolt).shuffleGrouping("detectBolt");

  Config conf = new Config();

 conf.put(Config.WORKER_HEAP_MEMORY_MB, 1152);//默认768
 conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 180);//默认30


  conf.setDebug(true);
  if (args.length == 0) {
   String topologyName = "detectTopology";
   LocalCluster cluster = new LocalCluster();
   cluster.submitTopology(topologyName, conf, builder.createTopology());
   Utils.sleep(10000);
   cluster.killTopology(topologyName);
   cluster.shutdown();
  } else {
   conf.setNumWorkers(3);
   StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
  }

 }

}

MessageScheme.java

package com.hc.mysqldetect;

import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.List;

import org.apache.storm.spout.Scheme;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;

public class MessageScheme implements Scheme {

 private static final long serialVersionUID = 1L;
 private static final Logger logger = LoggerFactory.getLogger(MessageScheme.class);

 public Fields getOutputFields() {
  return new Fields("msg");
 }

 public List<Object> deserialize(ByteBuffer buffer) {
  String msg = getString(buffer);
  // logger.info("get one message is {}", msg);
  return new Values(msg);

 }

 private String getString(ByteBuffer buffer) {
  Charset charset = null;
  CharsetDecoder decoder = null;
  CharBuffer charBuffer = null;
  try {
   charset = Charset.forName("UTF-8");
   decoder = charset.newDecoder();
   // charBuffer = decoder.decode(buffer);//用这个的话,只能输出来一次结果,第二次显示为空
   charBuffer = decoder.decode(buffer.asReadOnlyBuffer());
   return charBuffer.toString();
  } catch (Exception ex) {
   ex.printStackTrace();
   return "";
  }
 }

}

DetectBolt.java

package com.hc.mysqldetect;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;

public class DetectBolt extends BaseBasicBolt {

 private static final long serialVersionUID = 1L;
 private static final Logger logger = LoggerFactory.getLogger(DetectBolt.class);

 public void execute(Tuple tuple, BasicOutputCollector collector) {
  String msg = (String) tuple.getValue(0);
  String out = "Message got is '" + msg + "'!";
  // logger.info("out={}", out);
  collector.emit(new Values(out));
 }

 public void declareOutputFields(OutputFieldsDeclarer declarer) {
  declarer.declare(new Fields("message"));
 }

}

原文地址:https://www.cnblogs.com/hanfeihan1992/p/8258714.html