solr入门之多线程操作solr中索引字段的解决

涉及的问题:
建索引时有一个字段是该词语出现的次数,这个字段是放在solr里的 
而我用的是多线程来进行全量导入的,这里就涉及到了多线程问题
多个线程操作同一个变量时怎样处理?

我是这样子做的 :
首先将变量本地话--分布式就放到大容器中,我这里只使用了一个map来存
词和次数的关系映射

变量本地化后就是多线程的攻克了--锁的设置-我不过在操作时加了一个锁来解决问题

这样做后整体上应该能解决变量的问题了
最后另一个线程顺序问题要解决下
当 最后一个提交索引时 获取的索引不一定是正确的索引
当两个线程中都有此字段时  都获取到了正确的结果 可是 最后提交那个
不一定是正确的最大的那个  所以还是存在一点点的误差的


import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.common.SolrInputDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import cn.com.mx.gome.search.core.util.PreIndexThreadPool;
import cn.com.mx.gome.search.core.util.prop.PropUtils;
import cn.com.mx.gome.suggest.cache.WordCountCache;
import cn.com.mx.gome.suggest.component.FullIndexProcessContainer;
import cn.com.mx.gome.suggest.component.FullIndexSuggestProcess;
import cn.com.mx.gome.suggest.constant.Const;
import cn.com.mx.gome.suggest.domian.BaseSuggestIndex;
import cn.com.mx.gome.suggest.service.FullIndexSuggestService;
import cn.com.mx.gome.suggest.solr.SolrServiceWrapper;
/**
 * 搜索建议  索引库导入业务层
 * @author songqinghu
 *
 */
@Service("fullIndexSuggestServiceImpl")
public class FullIndexSuggestServiceImpl extends FullIndexServiceImpl implements FullIndexSuggestService {

    private Logger logger = LoggerFactory.getLogger(FullIndexSuggestServiceImpl.class);

    @Resource //容器类
    private FullIndexProcessContainer fullIndexProcessContainer;

    @Value("${maxImumPoolSize}")//最大线程数目---通用能够提取到父类中
    private int maxImumPoolSize;

    private int skip=0;//開始的角标

    private int limit=1000;//默认的步距

    private int rows=0;//建立索引的总数目

    private long maxNum =0;

    //可回收线程池
    private  ExecutorService threadPool  =  Executors.newCachedThreadPool();

    //solr连接
    @Resource
    private SolrServiceWrapper solrServiceWrapperImpl;

    private  SolrClient suggestClient;

    @PostConstruct
    private void getClient(){
        suggestClient = solrServiceWrapperImpl.getCollection(getCollectionName());;
    }


    @Override
    public String getCollectionName() {

        return "meixin_suggest";
    }
    /**
     * 搜索建议全量索引建立 详细实现方法
     * 这种方法我想这样设计:
     * 能够动态的加入和删除导入的过程类进入 多线程导入
     * 做一个 集合类容器来完毕类的组装操作 方法通过接口的方式来进行统一的 处理过程控制关键点 
     * 
     * 1.先做一个圈子的索引建议词导入--仅仅导入圈子的名称
     * ---从数据源中获取到所有的圈子名称--对圈子名称进行处理拼音加入---组装成bean 提交到solr中--> ok 最粗糙版本号 兴许能够优化下
     * 2.商品搜索推荐词 导入---分析讨论下 (临时不在--无非三个来源  数据库  已有索引库  还有人工导入干预也是进数据库  这里要不要再设置一个插拔式的数据源接口呢?
     *   先做个正规途径的数据获取流程
     */
    @Override
    public int index(SolrClient client ,boolean morethread) {
        logger.warn("{} 開始全量导入",new Date());
        try {
            //每次进入全量导入方法  对计数器清零处理
            skip = 0;
            rows = 0;
            limit = PropUtils.getInstance().getInt(Const.INDEX_FULL_LIMIT_GROUP, 1000);
            logger.warn("时间{},初始化变量: skip:{},rows:{},limit{} ",new Date(),skip,rows,limit);
            WordCountCache.clear();//清理本地词语次数缓存
            //容器中获取种类
            List<FullIndexSuggestProcess> processContainer = fullIndexProcessContainer.getProcessContainer();
            for (FullIndexSuggestProcess fullprocess : processContainer) {
                logger.warn("时间{},导入{}索引 ",new Date(),fullprocess.getClass().getName());
                //多线程索引建立过程
                //1.获取此时最大索引数目--通用方法
                maxNum = fullprocess.getMaxNum();
                //2.组建任务线程队列 
                ArrayList<Future<Integer>> futureTasks = new ArrayList<Future<Integer>>();
                //3.向任务队列中加入任务到线程或者索引数终止
              for (int i = 0; i < maxImumPoolSize; i++) {

                    try {
                        if(!fullprocess.isEnd(skip,limit,maxNum)){//考虑兼容各种数据库问题---mongo 当相等时 尽管开启了线程可是取不到数据--能够考虑容错
                            break;
                        }else{
                            futureTasks.add(getFuture(client,fullprocess));
                        }
                    } catch (Exception e) {
                        logger.error("FullIndexSuggestServiceImpl addfuture : ",e);
                    }
                }

                //4.循环任务队列  持续加入任务到索引导入结束
                while(futureTasks.size()>0){
                    ArrayList<Future<Integer>> tmpFutureTasks = new ArrayList<Future<Integer>>();
                    for (Future<Integer> future : futureTasks) {
                        if(!future.isDone()){//该线程未运行结束 --加入到任务队列中继续运行
                            tmpFutureTasks.add(future);
                        }else{
                            //rows +=future.get();//统计索引数量--不是必需统计 数目是错的 中间可能会覆盖
                            if(fullprocess.isEnd(skip,limit,maxNum)){
                                tmpFutureTasks.add(getFuture(client,fullprocess));
                            }
                        }
                    }
                    futureTasks = tmpFutureTasks;

                    Thread.sleep(500);
                }
                //一个全量导入结束

            }
            //所有的全量导入结束--这里开启异步线程池对索引词语出现的次数进行重写
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    List<List<SolrInputDocument>> list = WordCountCache.getDocs();
                    logger.warn("时间{},開始改动词频 ,循环次数{}",new Date(),list.size());
                    try {
                        for (List<SolrInputDocument> docs : list) {
                             suggestClient.add(docs);
                        }
                    } catch (SolrServerException |IOException e) {
                        logger.error(" index threadPool.execute : " + e);
                    } 
                }
            });

        } catch (Exception e) {
            logger.error("FullIndexSuggestServiceImpl ERROR:", e);
        }
        return WordCountCache.getRows();
    }
    /**
     * 
     * @描写叙述:通用任务组装类
     * @param client  solr客户端
     * @param fullprocess 数据获取
     * @return
     * @return Future<Integer>
     * @exception
     * @createTime:2016年3月24日
     * @author: songqinghu
     */
    private Future<Integer> getFuture(SolrClient client, FullIndexSuggestProcess fullprocess) {
        //任务类
        IndexTask indexTask = new IndexTask();
        //设置參数
        indexTask.setParameters(skip, limit, client, fullprocess);
        //提交开启任务---线程池的书写
        Future<Integer> future = PreIndexThreadPool.getPool().submit(indexTask);
        //变更控制变量--这里也要做成通用型的--都设置到各子的方法中??

skip = skip + limit; //返回任务封装 return future; } private static class IndexTask implements Callable<Integer>{ private int skip; //開始 坐标 private int limit; //步距 private SolrClient client; //客户端 private FullIndexSuggestProcess fullprocess; //未知数据源 /** * * @描写叙述:设置类中用到的參数 * @return void * @exception * @createTime:2016年3月24日 * @author: songqinghu */ public void setParameters(int skip,int limit,SolrClient client,FullIndexSuggestProcess fullprocess){ this.skip = skip; this.limit = limit; this.client = client; this.fullprocess = fullprocess; } @Override public Integer call() throws Exception { //1 获取原始资源数据 对资源数据进行处理----->拼音处理--工具类-->组装返回的beans类---->组装类 List<BaseSuggestIndex> beans = fullprocess.getBeans(skip, limit); //2 推断返回的创建成功的索引数目 if(beans !=null && beans.size()>0){ //3 //提交 client.addBeans(beans); return beans.size(); }else{ return 0; } } } }


索引导入类


import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import javax.annotation.Resource;
import javax.management.RuntimeErrorException;

import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import cn.com.mx.gome.search.core.util.MD5Utils;
import cn.com.mx.gome.search.core.util.Pinyin4jUtil;
import cn.com.mx.gome.search.core.util.SetToStringArrUtils;
import cn.com.mx.gome.search.quote.digger.solrbean.ProductBean;
import cn.com.mx.gome.suggest.cache.WordCountCache;
import cn.com.mx.gome.suggest.domian.BaseSuggestIndex;
import cn.com.mx.gome.suggest.solr.SolrServiceWrapper;

/**
 * 商品类 搜索建议推荐词导入
 * @author songqinghu
 *
 */
@Component("fullIndexItemSuggestProcess")
public class FullIndexItemSuggestProcess implements FullIndexSuggestProcess {

    private Logger logger = LoggerFactory.getLogger(FullIndexItemSuggestProcess.class);

    //这里注入一个工厂来生产指定的数据源获取类来获取不同的数据源类--先放着

    //solr连接
    @Resource
    private SolrServiceWrapper solrServiceWrapperImpl;

    @Value("${SORL_PRODUCT_NAME}") //从 properties 文件里注入 solr连接的名称
    private String solrProductName;

    @Value("${SORL_SUGGEST_NAME}")
    private String solrSuggestName;

    private  SolrClient itemClient;

    private SolrClient suggestClient;

    @Override
    public List<BaseSuggestIndex> getBeans(int skip, int limit) {//这里在传一个 參数用来确定数据导入方式
        Map<String, Integer> words ;
        if(false){
            //words = getDataBySQL(skip, limit); //从原始数据库中获取
        }else {
            words= getDataBySolr(skip, limit);//从solr中获取
        }

        List<BaseSuggestIndex> assembleBeans = AssembleBeans(words);

        return assembleBeans;
    }
    /**
     * 
     * @描写叙述:从solr索引库中获取数据
     * @param skip 開始
     * @param limit 步距
     * @return
     * @return List<Product>
     * @exception
     * @createTime:2016年3月25日
     * @author: songqinghu
     */
    private Map<String,Integer> getDataBySolr(int skip, int limit) {
        //查询指定范围内的数据
        SolrQuery query = new SolrQuery();
        query.set("q", "*:*");
        query.setStart(skip);
        query.setRows(limit);
        QueryResponse response;
        Map<String,Integer> words = null;
        try {
            response = itemClient.query(query);

            logger.info("skip="+skip+",limit="+limit+",获取到索引数  : "+response.getResults().getNumFound());

            List<ProductBean> productBeans = response.getBeans(ProductBean.class);
            //数据处理部分---我要将获取到的数据中的 须要的三个字段的汉字 然后进行记录出现次数和去反复
             words = suggestCollect(productBeans);
         }catch (SolrServerException | IOException  e) {
             logger.error("FullIndexItemSuggestProcess  getDataBySolr :" + e);
         }
        return words;
    }
    //将获取到的数据中的 须要的三个字段的汉字 然后进行记录出现次数和去反复
    private Map<String, Integer> suggestCollect(List<ProductBean> productBeans) {

        Map<String, Integer> words = new HashMap<String,Integer>();

        for (ProductBean productBean : productBeans) { //这里须要考虑词语切分问题---不太会处理

            String name = productBean.getName(); //商品名称 
            mapOperation(words, name);
            List<String> cateNames = productBean.getCateName();//类目名称
            for (String cateName : cateNames) {
                mapOperation(words, cateName);
            }
            String spuBrand = productBean.getSpuBrand();//品牌名称
            mapOperation(words, spuBrand);

        }
        return words;
    }
    /**
     * 对map中数据进行操作
     */
    private void mapOperation(Map<String, Integer> words,String name){
        if(words.containsKey(name)){ //存在就加一
            Integer count = words.get(name);
            words.put(name, count+1);
        }else{//不存在就存入
            words.put(name, 1);
        }
    }

    //对原始查询后的集合数据进行拆分
    public List<BaseSuggestIndex> AssembleBeans(Map<String,Integer> words){
        ArrayList<BaseSuggestIndex> baseSuggestIndexs = new ArrayList<BaseSuggestIndex>();
        Set<Entry<String, Integer>> entrySet = words.entrySet();
        for (Entry<String, Integer> entry : entrySet) {
            BaseSuggestIndex assembleBean = AssembleBean(entry);
            baseSuggestIndexs.add(assembleBean);
        }

        return baseSuggestIndexs;
    }
    //组装单个 solr文档对象
    public BaseSuggestIndex AssembleBean(Entry<String, Integer> entry){

        BaseSuggestIndex baseIndex = new BaseSuggestIndex();

        String word = entry.getKey();

        Set<String> shortpy = Pinyin4jUtil.converterToFirstSpellToSet(word);

        Set<String> allpy = Pinyin4jUtil.converterToSpellToSet(word);

        baseIndex.setWord(word);

        baseIndex.setShort_py(SetToStringArrUtils.convertToStringArr(shortpy));

        baseIndex.setAll_py(SetToStringArrUtils.convertToStringArr(allpy));
        //这里须要设置一下 使用md5加密算法 来保证 每一个字符串相应的id唯一  涉及到分类问题 可能会出现反复加入分类
        String id = getId(word);
        baseIndex.setSuggestId(id);

        baseIndex.setType("product");

        baseIndex.setCreateTime(new Date().getTime());
        //这里须要特别注意了--涉及到多线程问题了 ---当查询已经存在的词语的时候   查到次数加上当前的次数---
        //存在问题--怎样处理线程顺序保证多个线程是有序的操作呢?---存入映射关系  结束后再次进行次数更新?
        //这里必须要处理下 和实际的数据差距太大了了!!!!!!!!!!!!!
        logger.info("词名: "+word + " 本轮次数 :"+ entry.getValue());
        baseIndex.setCount(getCount4Word(id, entry.getValue()));

        return baseIndex;
    }
    /**
     * 
     * @描写叙述:获取该词语的数量加上眼下的词语数量---首次从索引库中获取  后缓存到本地中 以后从本地中获取
     * 这里还是不太准确  尽管保证了词语出现次数 通过加锁 可是线程提交时是无法进行控制的 还是存在误差的
     * @param id
     * @param count 本轮词语出现的次数
     * @return
     * @return Integer
     * @exception
     * @createTime:2016年3月25日
     * @author: songqinghu
     */
    private Integer getCount4Word(String id,Integer count ){

        return WordCountCache.putCount(id, count);
    }



    /**
     * 
     * @描写叙述:通过数据库来获取数据源
     * @param skip
     * @param limit
     * @return
     * @return List<Product>
     * @exception
     * @createTime:2016年3月25日
     * @author: songqinghu
     */
    private Map<String, Integer> getDataBySQL(int skip, int limit) {

        return null;
    }


    @Override
    public long getMaxNum() {//这里也须要依据数据源来配置下 
        if(itemClient == null){
            itemClient = solrServiceWrapperImpl.getCollection(solrProductName);
        }
        SolrQuery query = new SolrQuery();
        query.set("q", "*:*");
        long maxNum = 0;
        try {
            maxNum = itemClient.query(query).getResults().getNumFound();
        } catch (SolrServerException | IOException  e) {
            logger.error("FullIndexItemSuggestProcess  getMaxNum :" + e);
        }
        logger.info(new Date()+"  最大值: "+maxNum);
        return maxNum;
    }

    @Override
    public boolean isEnd(int skip, int limit, long maxNum) {

        //開始坐标要是小于 最大数量就继续---要不要事实更新呢?时时更新吧

        return skip < getMaxNum() ? true : false;
    }
    /**
     * 获取词语相应的索引id 
     */
    @Override
    public String getId(String word) {
        if(word ==null){
            throw new RuntimeException("id不能为空");
        }
        return "product_"+MD5Utils.MD5(word);
    }

}

本地容器类

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
/**
 * 容器类 --对solr中涉及的词语出现的词语进行本地缓存 和 同步处理
 * 分布式时能够考虑 放入 共享容器中
 * @author songqinghu
 *
 */
import java.util.concurrent.ConcurrentHashMap;

import org.apache.solr.common.SolrInputDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cn.com.mx.gome.suggest.domian.BaseSuggestIndex;



public class WordCountCache {

    private  static  Logger logger = LoggerFactory.getLogger(WordCountCache.class);
    /**
     * 容器  放入word的md5加密后的 和次数
     */
    private static ConcurrentHashMap<String, Integer> wordCountMap = new ConcurrentHashMap<String,Integer>();

    private static Object lock  = new  Object();

    /**
     * 
     * @描写叙述:设置词语的次数 ---设置锁处理
     * @param word 词语(加密后的)id
     * @param count 次数
     * @return void
     * @exception
     * @createTime:2016年3月28日
     * @author: songqinghu
     */
    public static Integer putCount(String id,Integer count){
        logger.warn("WordCountCache 线程: "+ Thread.currentThread().getName() +" id: "+id+" count: "+count);
        synchronized (lock) {
            Integer result = wordCountMap.get(id);
            if(result !=null){ //存在此key 加上已经缓存的---不存在 直接存这次的
                count = result + count;
            }
            wordCountMap.put(id, count);
            return wordCountMap.get(id);
        }
    }
    /**
     * 
     * @描写叙述:是否缓存到了本地中
     * @param id
     * @return
     * @return boolean
     * @exception
     * @createTime:2016年3月28日
     * @author: songqinghu
     */
    public static boolean containsKey(String id){
        if(wordCountMap.containsKey(id)){
            return true;
        }
        return false;
    }
    /**
     * 
     * @描写叙述:清空本地的 缓存---全量结束 和  清理全量索引的时候
     * @return
     * @return boolean
     * @exception
     * @createTime:2016年3月28日
     * @author: songqinghu
     */
    public static boolean clear(){
        wordCountMap.clear();
        return true;
    }

    /**
     * 
     * @描写叙述:当全量索引导入结束后 获取缓存的词频 进行词频重写--后清空
     *  这里还要优化下 当索引量非常多 不能一次所有都提交了--设置为加入到定量的list中
     * @return
     * @return List<SolrInputDocument>
     * @exception
     * @createTime:2016年3月28日
     * @author: songqinghu
     */
    public static List<List<SolrInputDocument>> getDocs(){
        //文档容器
        List<List<SolrInputDocument>> list = new  ArrayList<List<SolrInputDocument>>();

        ArrayList<SolrInputDocument> docs = new  ArrayList<SolrInputDocument>();//測试一下

        Set<Entry<String, Integer>> entrySet = wordCountMap.entrySet();
        int i =0;
        for (Entry<String, Integer> entry : entrySet) {
            String id = entry.getKey();
            Integer count = entry.getValue();
            SolrInputDocument doc = new SolrInputDocument();
            doc.setField(BaseSuggestIndex.Fd.suggestId.name(),id);//这里的硬编码能够设置成类的形式
            Map<String, Integer> counts = new HashMap<String, Integer>(1);
            counts.put("set", count);
            doc.setField(BaseSuggestIndex.Fd.count.name(), counts);
            docs.add(doc);
            i++;
            if(i>=1000){//为1000  单次改动文档数目设置为1000--总量最后一次进不来 的处理
                ArrayList<SolrInputDocument> tmp = new  ArrayList<SolrInputDocument>();
                tmp.addAll(docs);
                list.add(tmp);
                i=0;
                docs.clear();
            }
        }
        if(i!=0){//最后一次没有进入推断里
            list.add(docs);
        }
        return list;
    }
    /**
     * 
     * @描写叙述:获取总数量--id就为总的索引数
     * @return
     * @return Integer
     * @exception
     * @createTime:2016年3月28日
     * @author: songqinghu
     */
    public static Integer getRows(){
        return wordCountMap.size();
    }


}










原文地址:https://www.cnblogs.com/jzssuanfa/p/7228334.html