MongoDB线程安全批量处理

Mongo批处理工具类:

package com.saike.solr.server.util;

import java.net.UnknownHostException;
import java.util.ArrayList;

import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.Mongo;
import com.mongodb.MongoException;
import com.mongodb.MongoOptions;

/**
 * 批处理工具类
 * @author xieyong
 *
 */
public class UtileMongDB {
    
    UtilThreadLocal<ArrayList<DBObject>> localBatch;
    /**mongo单例对象  根据官方文档mongojava是线程安全的*/
    private static Mongo mongo;
    private static DBCollection coll;
    //private static Log log = LogFactory.getLog(UtileMongDB.class);
    private static DB db;
    
    static{
           /** 实例化db*/
           MongoOptions options = new MongoOptions();
                      options.autoConnectRetry = true;
                      options.connectionsPerHost = 1000;
                      options.maxWaitTime = 5000;
                      options.socketTimeout = 0;
                      options.connectTimeout = 15000;
                      options.threadsAllowedToBlockForConnectionMultiplier = 5000;
            try {
                mongo = new Mongo(MongoDBConstant.MONGO_HOST,MongoDBConstant.MONGO_PORT);
            } catch (UnknownHostException | MongoException e) {
                e.printStackTrace();
            }
            // boolean auth = db.authenticate(myUserName, myPassword);
    }
    
    public UtileMongDB(){
        try {
            localBatch = new UtilThreadLocal<ArrayList<DBObject>>(ArrayList.class);
        } catch (Exception e) {
            e.printStackTrace();
        } 
    }
    /**
     * 返回db对象
     * @return db
     */
    public static DB getDB(){
        if(db==null){
            db = mongo.getDB(MongoDBConstant.MONGO_DB);
        }
        return db;
    }
    
    /**
     * 返回mongo
     * @return mongo连接池
     */
    public static Mongo getMong(){
        return mongo;
    }
    
    /**
     * 读取集合
     * @return mongo集合
     * */
    public static DBCollection getColl(String collname){
        return getDB().getCollection(collname);
    }
    
    public static DBCollection getColl(){
        return getDB().getCollection(MongoDBConstant.MONGO_COLLECTION);
    }
    
    /**  crud操作 */
    public void addBatch(String key,String value){
        BasicDBObject basicDB = new BasicDBObject();
        basicDB.put(key, value);
        /** 这里用线程本地变量,不用会存在竞技条件*/
        localBatch.newGet().add(basicDB);
    }
    
    /**
     * 执行批处理
     * */
    public void executeInsertBatch(){
        getColl().insert(localBatch.get());
        localBatch.get().clear();
    }
    /**
     * 执行批量删除
     */
    public void executeDeleteBatch(){
        ArrayList<DBObject> array = localBatch.get();    
        for(DBObject obj:array){
            getColl().remove(obj);
        }
        localBatch.get().clear();
    }
    
    
    
    
    
    public DBCursor query(String key,String value){
        BasicDBObject basicDBObject = new BasicDBObject();
        basicDBObject.put(key,value);
        return getColl().find(basicDBObject);
    }
        
}

 

ThreadLocal的封装:

package com.saike.solr.server.util;

import java.lang.reflect.Constructor;

/**
 * 
 * @author xieyong
 *
 * @param <T> 本地线程变量对象了类型
 */
public class UtilThreadLocal<T> extends ThreadLocal<T> {
    /**参数集合*/
    Object[] obj;
    /**实例化构造函数*/
    Constructor<T> construct;
    
    /**
     * 
     * @param clazz        本地变量的class
     * @param args        构造函数的参数
     * @throws NoSuchMethodException
     * @throws SecurityException
     */
    public UtilThreadLocal(Class clazz,Object... args) throws NoSuchMethodException, SecurityException{
        this.obj = obj;
        Class[] clazzs = null;
        /** new 获取参数class供获取构造函数用*/
        if(args != null)
            if(args.length !=0){
                clazzs = new Class[args.length];
                for(int i = 0;i<args.length;i++){
                    clazzs[i] = args[i].getClass();
                }
            }
        this.construct = clazz.getConstructor(clazzs);
    }
    
    /**
     * 如果当前线程没有对象创建一个新对象
     * @return
     */
    public T newGet(){
        T tar = super.get() ;
        if(tar == null){
            try {
                tar = construct.newInstance(obj);
                super.set(tar);
            }catch(Exception e){
                e.printStackTrace();
            }
        }
        return tar;
    }
}
原文地址:https://www.cnblogs.com/JimmyXie/p/3858528.html