Trident-MySQL

使用事物TridentTopology 持久化数据到MySQL

1、构建拓扑JDBCTopology类
package storm.trident.mysql;

import java.util.Arrays;
import java.util.Map;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.trident.TridentState;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.CombinerAggregator;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.IBatchSpout;
import org.apache.storm.trident.state.StateType;
import org.apache.storm.trident.testing.FixedBatchSpout;
import org.apache.storm.trident.testing.MemoryMapState;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

/**
 * 事物Trident-MySQL Topology
 * @author mengyao
 *
 */
@SuppressWarnings("all")
public class JDBCTopology {

    public static void main(String[] args) {
        TridentTopology topology = new TridentTopology();
        
        //Spout数据源
        FixedBatchSpout spout = new FixedBatchSpout(new Fields("tels"), 7, 
                new Values("189111    3"),
                new Values("135111    7"),
                new Values("189111    2"),
                new Values("158111    5"),
                new Values("159111    6"),
                new Values("159111    3"),
                new Values("158111    5")
                );
        spout.setCycle(false);
        
        //State持久化配置属性
        JDBCStateConfig config = new JDBCStateConfig();
        config.setDriver("com.mysql.jdbc.Driver");
        config.setUrl("jdbc:mysql://localhost:3306/test");
        config.setUsername("root");
        config.setPassword("123456");
        config.setBatchSize(10);
        config.setCacheSize(10);
        config.setType(StateType.TRANSACTIONAL);
        config.setCols("tel");
        config.setColVals("sum");
        config.setTable("tbl_tel");
        
        topology.newStream("spout", spout)
            .each(new Fields("tels"), new KeyValueFun(), new Fields("tel", "money"))
            .groupBy(new Fields("tel"))
            .persistentAggregate(JDBCState.getFactory(config), new Fields("money"), new SumCombinerAgg(), new Fields("sum"));
        
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("test1", new Config(), topology.build());
    }

}

@SuppressWarnings("all")
class KeyValueFun extends BaseFunction {
    @Override
    public void execute(TridentTuple tuple, TridentCollector collector) {
        String record = tuple.getString(0);
        collector.emit(new Values(record.split("	")[0], record.split("	")[1]));
    }
}

@SuppressWarnings("all")
class SumCombinerAgg implements CombinerAggregator<Long> {
    @Override
    public Long init(TridentTuple tuple) {
        return Long.parseLong(tuple.getString(0));
    }
    @Override
    public Long combine(Long val1, Long val2) {
        Long val = val1+val2;
        System.out.println(val);
        return val;
    }
    @Override
    public Long zero() {
        return 0L;
    }
}

2、构建基于IBackingMap的JDBCState类
package storm.trident.mysql;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.storm.task.IMetricsContext;
import org.apache.storm.trident.state.OpaqueValue;
import org.apache.storm.trident.state.State;
import org.apache.storm.trident.state.StateFactory;
import org.apache.storm.trident.state.StateType;
import org.apache.storm.trident.state.TransactionalValue;
import org.apache.storm.trident.state.map.CachedMap;
import org.apache.storm.trident.state.map.IBackingMap;
import org.apache.storm.trident.state.map.NonTransactionalMap;
import org.apache.storm.trident.state.map.OpaqueMap;
import org.apache.storm.trident.state.map.TransactionalMap;

@SuppressWarnings("all")
public class JDBCState<T> implements IBackingMap<T> {

    private static JDBCStateConfig config;
    
    JDBCState(JDBCStateConfig config){
        this.config = config;
    }
    
    @Override
    public List<T> multiGet(List<List<Object>> keys) {
        StringBuilder sqlBuilder = new StringBuilder("SELECT ").append(config.getCols())
                .append(","+config.getColVals())
                .append(",txid")
                .append(" FROM "+config.getTable())
                .append(" WHERE ")
                .append(config.getCols())
                .append("='");
        
        JDBCUtil jdbcUtil = new JDBCUtil(config.getDriver(),config.getUrl(),config.getUsername(),config.getPassword());
        
        List<Object> result = new ArrayList<Object>();
        Map<String, Object> map = null;
        for (List<Object> list : keys) {
            Object key = list.get(0);
            map = jdbcUtil.queryForMap(sqlBuilder.toString()+key+"'");
            System.out.println(sqlBuilder.toString()+key+"'"+" 【"+map);
            Bean itemBean = (Bean)map.get(key);
            long txid=0L;
            long val=0L;
            if (itemBean!=null) {
                val=itemBean.getSum();
                txid=itemBean.getTxid();
            }
            if (config.getType()==StateType.OPAQUE) {
                result.add(new OpaqueValue(txid, val));
            } else if (config.getType()==StateType.TRANSACTIONAL) {
                result.add(new TransactionalValue(txid, val));
            } else {
                result.add(val);
            }
        }
        return (List<T>) result;
    }

    @Override
    public void multiPut(List<List<Object>> keys, List<T> vals) {
        //构建新增SQL
        StringBuilder sqlBuilder = new StringBuilder("INSERT INTO ").append(config.getTable())
                .append("("+config.getCols())
                .append(","+config.getColVals())
                .append(",txid")
                .append(",time")
                .append(") VALUES ");
        for (int i = 0; i < keys.size(); i++) {
            List<Object> key = keys.get(i);
            if (config.getType()==StateType.TRANSACTIONAL) {
                TransactionalValue val = (TransactionalValue)vals.get(i);
                sqlBuilder.append("(");
                sqlBuilder.append(key.get(0));
                sqlBuilder.append(",");
                sqlBuilder.append(val.getVal());
                sqlBuilder.append(",");
                sqlBuilder.append(val.getTxid());
                sqlBuilder.append(",NOW()");
                sqlBuilder.append("),");
            }
        }
        sqlBuilder.setLength(sqlBuilder.length()-1);
        System.out.println(sqlBuilder.toString());
        //新增数据
        JDBCUtil jdbcUtil = new JDBCUtil(config.getDriver(),config.getUrl(),config.getUsername(),config.getPassword());
        jdbcUtil.insert(sqlBuilder.toString());
    }

    public static Factory getFactory(JDBCStateConfig config) {
        return new Factory(config);
    } 
    
    static class Factory implements StateFactory {
        private static JDBCStateConfig config;
        public Factory(JDBCStateConfig config) {
            this.config = config;
        }
        @Override
        public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
            final CachedMap map = new CachedMap(new JDBCState(config), config.getCacheSize());
            System.out.println(config);
            if(config.getType()==StateType.OPAQUE) {
                return OpaqueMap.build(map);
            } else if(config.getType()==StateType.TRANSACTIONAL){
                return TransactionalMap.build(map);
            }else {
                return NonTransactionalMap.build(map);
            }
        }
    }
    
}

3、构建基于IBackingMap的JDBCStateConfig配置类
package storm.trident.mysql;

import java.util.List;

import org.apache.storm.trident.state.StateType;

@SuppressWarnings("all")
public class JDBCStateConfig {

    private String url;
    private String driver;
    private String username;
    private String password;
    private String table;
    private int batchSize;
    private String cols;
    private String colVals;
    private int cacheSize = 100;
    private StateType type = StateType.OPAQUE;

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public String getDriver() {
        return driver;
    }

    public void setDriver(String driver) {
        this.driver = driver;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public String getTable() {
        return table;
    }

    public void setTable(String table) {
        this.table = table;
    }

    public int getBatchSize() {
        return batchSize;
    }

    public void setBatchSize(int batchSize) {
        this.batchSize = batchSize;
    }

    public String getCols() {
        return cols;
    }

    public void setCols(String cols) {
        this.cols = cols;
    }

    public String getColVals() {
        return colVals;
    }

    public void setColVals(String colVals) {
        this.colVals = colVals;
    }

    public int getCacheSize() {
        return cacheSize;
    }

    public void setCacheSize(int cacheSize) {
        this.cacheSize = cacheSize;
    }

    public StateType getType() {
        return type;
    }

    public void setType(StateType type) {
        this.type = type;
    }

    @Override
    public String toString() {
        return "Test2StateConfig [url=" + url + ", driver=" + driver + ", username=" + username + ", password="
                + password + ", table=" + table + ", batchSize=" + batchSize + ", cols=" + cols
                + ", colVals=" + colVals + ", cacheSize=" + cacheSize + ", type=" + type + "]";
    }


}

4、构建JDBC工具类和实体Bean
package storm.trident.mysql;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;

public class JDBCUtil {

    private String driver;
    private String url;
    private String username;
    private String password;
    private Connection connection;
    private PreparedStatement ps;
    private ResultSet rs;
    

    public JDBCUtil(String driver, String url, String username, String password) {
        this.driver = driver;
        this.url = url;
        this.username = username;
        this.password = password;
        init();
    }
    
    void init(){
        try {
            Class.forName(driver);
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }

    public boolean insert(String sql) {
        int state = 0;
        try {
            connection = DriverManager.getConnection(url, username, password);
            ps = connection.prepareStatement(sql);
            state = ps.executeUpdate();
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            try {
                ps.close();
                connection.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        if (state>0) {
            return true;
        }
        return false;
    }
    
    public Map<String, Object> queryForMap(String sql) {
        Map<String, Object> result = new HashMap<String, Object>();
        try {
            connection = DriverManager.getConnection(url, username, password);
            ps = connection.prepareStatement(sql);
            rs = ps.executeQuery();
            if(rs.next()){
                Bean iteBean=new Bean(rs.getString("tel"), rs.getLong("sum"), rs.getLong("txid"), null);                    
                result.put(rs.getString("tel"), iteBean);
            }
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            try {
                ps.close();
                connection.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        return result;
    }

    public String getDriver() {
        return driver;
    }

    public void setDriver(String driver) {
        this.driver = driver;
    }

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

}

package storm.trident.mysql;

public class Bean {

    private String tel;
    private long sum;
    private long txid;
    private String time;

    public Bean(){
    }
    
    public Bean(String tel, long sum, long txid, String time) {
        super();
        this.tel = tel;
        this.sum = sum;
        this.txid = txid;
        this.time = time;
    }

    public String getTel() {
        return tel;
    }

    public void setTel(String tel) {
        this.tel = tel;
    }

    public long getSum() {
        return sum;
    }

    public void setSum(long sum) {
        this.sum = sum;
    }

    public long getTxid() {
        return txid;
    }

    public void setTxid(long txid) {
        this.txid = txid;
    }

    public String getTime() {
        return time;
    }

    public void setTime(String time) {
        this.time = time;
    }

    @Override
    public String toString() {
        return "Bean [tel=" + tel + ", sum=" + sum + ", txid=" + txid + ", time=" + time + "]";
    }

}
原文地址:https://www.cnblogs.com/mengyao/p/6094253.html