trident-mongo实现更新操作

    MongoDB是大数据技术中常用的NoSql型数据库,它提供的大量的查询、聚合等操作函数,对于大量查询的日志系统来说,该MongoDB是大数据日志存储的福音。Storm的高级编程技术Trident,也提供了与Mongo集成的方法,但官方只提供了新增的处理,对于常用的修改操作并未提供接口,本文提供了一种使用Trident进行mongoDB修改操作的方式,并且对持久化的数据提供了输出的拓展操作,具体代码见下方:

1.自定以:MyMongoState

package com.storm.trident.state;

import com.mongodb.client.model.Filters;
import org.apache.commons.lang.Validate;
import org.apache.storm.mongodb.common.MongoDBClient;
import org.apache.storm.mongodb.common.mapper.MongoMapper;
import org.apache.storm.mongodb.trident.state.MongoState;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.state.State;
import org.apache.storm.trident.tuple.TridentTuple;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.List;
import java.util.Map;

/**
 * 自定的mongo状态
 */
public class MyMongoState implements State {

    private static final Logger LOG = LoggerFactory.getLogger(MongoState.class);

    private MyMongoState.Options options;
    private MongoDBClient mongoClient;
    private Map map;

    public MyMongoState(Map map, MyMongoState.Options options) {
        this.options = options;
        this.map = map;
    }

    public static class Options implements Serializable {
        private String url;
        private String collectionName;
        private MongoMapper mapper;

        public MyMongoState.Options withUrl(String url) {
            this.url = url;
            return this;
        }

        public MyMongoState.Options withCollectionName(String collectionName) {
            this.collectionName = collectionName;
            return this;
        }

        public MyMongoState.Options withMapper(MongoMapper mapper) {
            this.mapper = mapper;
            return this;
        }
    }

    protected void prepare() {
        Validate.notEmpty(options.url, "url can not be blank or null");
        Validate.notEmpty(options.collectionName, "collectionName can not be blank or null");
        Validate.notNull(options.mapper, "MongoMapper can not be null");

        this.mongoClient = new MongoDBClient(options.url, options.collectionName);
    }

    public void beginCommit(Long txid) {
        LOG.debug("beginCommit is noop.");
    }

    public void commit(Long txid) {
        LOG.debug("commit is noop.");
    }

    public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
        for (TridentTuple tuple : tuples) {
            Document document = options.mapper.toDocument(tuple);
            this.mongoClient.update(Filters.eq("userId", tuple.getInteger(0))
                    , new Document("$set", document), true);
        }
    }
}

2.自定义状态工厂:MyMongoStateFactory

package com.storm.trident.state;

import org.apache.storm.task.IMetricsContext;
import org.apache.storm.trident.state.State;
import org.apache.storm.trident.state.StateFactory;

import java.util.Map;

/**
 * 自定义的mongo状态工厂
 */
public class MyMongoStateFactory implements StateFactory {

    private MyMongoState.Options options;

    public MyMongoStateFactory(MyMongoState.Options options) {
        this.options = options;
    }

    public State makeState(Map conf, IMetricsContext metrics,
                           int partitionIndex, int numPartitions) {
        MyMongoState state = new MyMongoState(conf, options);
        state.prepare();
        return state;
    }

}

3.自定义状态修改类:MyMongoStateUpdater

package com.storm.trident.state;

import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.state.BaseStateUpdater;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Values;

import java.util.List;

/**
 * 自定义的状态更新
 */
public class MyMongoStateUpdater extends BaseStateUpdater<MyMongoState> {

    public void updateState(MyMongoState state, List<TridentTuple> tuples,
                            TridentCollector collector) {
        state.updateState(tuples, collector);
        System.out.println(tuples.get(0).get(0) + "---" + tuples.get(0).get(1));
        collector.emit(new Values(tuples.get(0).get(0)));
    }

}

4.自定义创建状态工厂类:TridentMongoFactory

package com.storm.trident.factory;

import com.storm.trident.state.MyMongoState;
import com.storm.trident.state.MyMongoStateFactory;
import org.apache.storm.mongodb.common.mapper.MongoMapper;
import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper;
import org.apache.storm.mongodb.trident.state.MongoState;
import org.apache.storm.mongodb.trident.state.MongoStateFactory;
import org.apache.storm.trident.state.StateFactory;

public class TridentMongoFactory {
    public static final String url = "mongodb://127.0.0.1:27017/test";
    public static final String collectionName = "user";

    /**
     * 使用jar包中的类实现持久化
     * @return
     */
    public static StateFactory getMongoInsertState() {
        MongoMapper mapper = new SimpleMongoMapper()
                .withFields("userId", "name");

        MongoState.Options options = new MongoState.Options()
                .withUrl(url)
                .withCollectionName(collectionName)
                .withMapper(mapper);

        return new MongoStateFactory(options);
    }

    /**
     * 使用自定state实现更新mongo
     * @return
     */
    public static StateFactory getMongoModifyState() {
        MongoMapper mapper = new SimpleMongoMapper()
                .withFields("userId", "name");

        MyMongoState.Options options = new MyMongoState.Options()
                .withUrl(url)
                .withCollectionName(collectionName)
                .withMapper(mapper);

        return new MyMongoStateFactory(options);
    }
}

5.自定义输出函数:PrintFunction

package com.storm.trident.function;

import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;

/**
 * 自定义输出函数
 */
public class PrintFunction extends BaseFunction {
    public static final Long serialVersionUID = 1L;

    public void execute(TridentTuple tuple, TridentCollector collector) {
        System.out.println(tuple.get(0));
        collector.emit(tuple);
    }
}

6.创建Topology

package com.storm.trident.topology;

import com.google.common.collect.ImmutableList;
import com.storm.trident.factory.TridentMongoFactory;
import com.storm.trident.function.PrintFunction;
import com.storm.trident.state.MyMongoStateUpdater;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.mongodb.trident.state.MongoStateUpdater;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.state.StateFactory;
import org.apache.storm.trident.testing.FeederBatchSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

public class TridentDemoTopology {
    public static void main(String[] args) {
        TridentTopology topology = new TridentTopology();

        //创建spout
        FeederBatchSpout spout = new FeederBatchSpout(ImmutableList.of("userId", "name"));

        //mongoState-新增
        StateFactory factory = TridentMongoFactory.getMongoInsertState();

        //mongoState-修改
        StateFactory modifyFactory = TridentMongoFactory.getMongoModifyState();

        //创建流
        topology.newStream("spout", spout)
                .partitionPersist(modifyFactory, new Fields("userId", "name"), new MyMongoStateUpdater(), new Fields("userId"))
                .newValuesStream().each(new Fields("userId"),new PrintFunction(),new Fields("id")).parallelismHint(1);

        Config config = new Config();
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("tridentDemo", config, topology.build());

        //模拟发送数据
        spout.feed(ImmutableList.of(new Values(1, "zhangsan1")));
        spout.feed(ImmutableList.of(new Values(2, "lisi2")));
        spout.feed(ImmutableList.of(new Values(3, "wangwu3")));
        spout.feed(ImmutableList.of(new Values(4, "zhaoliu4")));
    }
}

按如上代码进行操作,就是进行MongoDB的更新啦。

原文地址:https://www.cnblogs.com/jpejie/p/9609153.html