storm 整合 kafka之保存MySQL数据库

整合Kafka+Storm,消息通过各种方式进入到Kafka消息中间件,比如通过使用Flume来收集的日志数据,然后暂由Kafka中的路由暂存,然后在由实时计算程序Storm做实时分析,这时候我们需要讲Storm中的Spout中读取Kafka中的消息,然后交由具体的Bolt组件分析处理。

1、配置Maven依赖包
 <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    
    <dependency>
       <groupId>org.apache.storm</groupId>
       <artifactId>storm-core</artifactId>
       <version>1.1.0</version>
       <scope>provided</scope>
    </dependency>
    
    <dependency>
            <groupId>org.twitter4j</groupId>
            <artifactId>twitter4j-stream</artifactId>
            <version>3.0.3</version>
   </dependency>
   
   <dependency>
            <groupId>commons-collections</groupId>
            <artifactId>commons-collections</artifactId>
            <version>3.2.1</version>
    </dependency>
    
    <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>13.0</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>0.10.0.1</version>
        <exclusions>  
          <exclusion>  
               <groupId>org.apache.zookeeper</groupId>  
               <artifactId>zookeeper</artifactId>  
          </exclusion>  
          <exclusion>  
               <groupId>log4j</groupId>  
               <artifactId>log4j</artifactId>  
          </exclusion>  
          <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
     </exclusions>
    </dependency>
    
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-kafka</artifactId>
        <version>1.1.0</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.0.1</version>
    </dependency>
    
    
    <!-- mysql maven相关依赖 -->  
    <dependency>  
         <groupId>commons-dbutils</groupId>  
         <artifactId>commons-dbutils</artifactId>  
         <version>1.6</version>  
    </dependency>  

    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-jdbc</artifactId>
        <version>1.0.5</version>
    </dependency>
        
    <!-- druid数据源 -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>druid</artifactId>
        <version>1.0.27</version>
    </dependency>
    <dependency>
         <groupId>redis.clients</groupId>
         <artifactId>jedis</artifactId>
         <version>2.7.3</version>
    </dependency>
    <dependency>  
         <groupId>mysql</groupId>  
         <artifactId>mysql-connector-java</artifactId>  
         <version>5.1.29</version>  
    </dependency>    
    <dependency>
            <groupId>c3p0</groupId>
            <artifactId>c3p0</artifactId>
            <version>0.9.1.2</version>
    </dependency> 
 
 
2、编写Storm程序
 import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class WordSplitBolt implements IRichBolt {

    /**
     *
     */
    private static final long serialVersionUID = 1L;
    private OutputCollector collector;
    
    public void prepare(Map mapConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    public void execute(Tuple tuple) {
        String line = tuple.getString(0);
        String[] arr = line.split(" ");
        for (String s : arr) {
            collector.emit(new Values(s,1));
        }
        
        collector.ack(tuple);
    }


    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word","count"));
    }

    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

    public void cleanup() {
        // TODO Auto-generated method stub
        
    }

}
 
 
 

import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class WordCountBolt implements IRichBolt {
    private static final long serialVersionUID = 1L;
    private long lastEmitTime = 0;//设置清分时间
    
    private long duration = 5000;//时间片,5秒钟清分一次

    private OutputCollector collector;
    Map<String, Integer> map;
    public void prepare(Map mapConf, TopologyContext context, OutputCollector collector) {
        this.map = new HashMap<String, Integer>();
        this.collector = collector;
    }

    public void execute(Tuple tuple) {
        String word = tuple.getString(0);
        Integer count = tuple.getInteger(1);
        if(!map.containsKey(word)){
            map.put(word, 1);
        }else{
            Integer c = map.get(word)+1;//有的话取出来原来的记录,并在原有的记录上+1;
            map.put(word, c);  //在放进去
        }
        collector.ack(tuple);
    }


    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }

    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

    public void cleanup() {
         for(Map.Entry<String, Integer> entry:map.entrySet()){
             System.out.println(entry.getKey()+" : " +entry.getValue());
             try {
                MyDbUtils.update(MyDbUtils.INSERT_LOG,entry.getKey(),entry.getValue());
            } catch (SQLException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
          }
    }

}

 

 

 

 
3 编写MyDbUtils工具类
 import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.dbutils.BasicRowProcessor;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.handlers.ArrayListHandler;

public class MyDbUtils {
    private static String className = "com.mysql.jdbc.Driver";

    private static QueryRunner queryRunner = new QueryRunner();
    public static final String INSERT_LOG = "INSERT INTO "+JdbcUtils.getTable_name()+"(word,count) VALUES (?,?)";
    
    static{
        try {
            Class.forName(className);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    static void update(String sql,Object... params) throws SQLException {
         Connection connection = getConnection();  
         //更新数据  
         queryRunner.update(connection,sql, params);  
         connection.close();  
    }

    
    public static List<String> executeQuerySql(String sql){
        List<String> result = new ArrayList<String>();  
        try {  
             List<Object[]> requstList = queryRunner.query(getConnection(), sql,  
                       new ArrayListHandler(new BasicRowProcessor() {  
                            @Override  
                            public <Object> List<Object> toBeanList(ResultSet rs,  
                                      Class<Object> type) throws SQLException {  
                                 return super.toBeanList(rs, type);  
                            }  
                       }));  
             for (Object[] objects : requstList) {  
                  result.add(objects[0].toString());  
             }  
        } catch (SQLException e) {  
             e.printStackTrace();  
        }  
        return result;  
    }
    private static Connection getConnection() throws SQLException {
        //获取mysql连接  
        return DriverManager.getConnection(JdbcUtils.getUrl(),JdbcUtils.getUser(),JdbcUtils.getPassword());
    }
}
 
4.编写JdbcUtils程序
public class JdbcUtils {

    private static String url;
    private static String user;
    private static String password;    
    private static String table_name;
    
    

    public static String getTable_name() {
        return table_name;
    }
    public static void setTable_name(String table_name) {
        JdbcUtils.table_name = table_name;
    }
    public static String getUrl() {
        return url;
    }
    public static void setUrl(String url) {
        JdbcUtils.url = url;
    }
    public static String getUser() {
        return user;
    }
    public static void setUser(String user) {
        JdbcUtils.user = user;
    }
    public static String getPassword() {
        return password;
    }
    public static void setPassword(String password) {
        JdbcUtils.password = password;
    }
    public JdbcUtils(String url, String user, String password,String table_name) {
        this.url = url;
        this.user = user;
        this.password = password;
        this.table_name = table_name;
    }
    
}
 
5.编写Mytopology程序
import java.util.UUID;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public class Mytopology {
    public void Mytopologys(String zkConnString,String topic,String url,String user,String password,String table_name) throws InterruptedException{
          JdbcUtils ju = new JdbcUtils(url,user,password,table_name);
          Config config = new Config();
          config.setDebug(true);
          config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
          BrokerHosts hosts = new ZkHosts(zkConnString);
          SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, topic, "/" + topic,UUID.randomUUID().toString());
          kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
          
          TopologyBuilder builder = new TopologyBuilder();
          builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutConfig));
          builder.setBolt("word-spitter", new WordSplitBolt()).shuffleGrouping("kafka-spout");
          builder.setBolt("word-counter", new WordCountBolt()).fieldsGrouping("word-spitter", new Fields("word"));
             
          LocalCluster cluster = new LocalCluster();
          cluster.submitTopology("KafkaStormSample", config, builder.createTopology());

          Thread.sleep(15000);
          
          cluster.shutdown();
    }
}

6.最后测试程序是否能实现
注意:运行此程序时先启动flume,配置好kafka
public class Test1 {
    public static void main(String[] args) throws Exception {
        Mytopology mp = new Mytopology();
        mp.Mytopologys("zkConnString", "topic","数据库的url","数据库user","数据库password","tableName");
    }
}
 
 
原文地址:https://www.cnblogs.com/zzmmyy/p/7987029.html