elasticsearch-全量

注:重复造轮子了 以下可以改为spring batch 来导入。

说明

maven依赖

官方客户端 https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.4/index.html

<dependency>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch</artifactId>
                <version>6.5.0</version>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>elasticsearch-rest-high-level-client</artifactId>
                <version>6.5.0</version>
                <exclusions>
                    <exclusion>
                        <artifactId>commons-codec</artifactId>
                        <groupId>commons-codec</groupId>
                    </exclusion>
                    <!--此处要排除掉自带的,这个自带的版本低,会报错-->
                    <exclusion>
                        <artifactId>elasticsearch</artifactId>
                        <groupId>org.elasticsearch</groupId>
                    </exclusion>
                </exclusions>
            </dependency>

Util类

配置类

package com.crb.ocms.product.domain.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

/**
 * @Project crb-product-domain
 * @PackageName com.crb.ocms.product.admin.config
 * @ClassName ESConfiguration
 * @Author liqiang
 * @Date 2019/3/6 5:52 PM
 * @Description 用于加载es的相关配置
 */
@ConfigurationProperties(prefix = "esconfig")
@Data
public class ESConfiguration {
    /**
     * index别名
     */
    private String aliasName;
    
    /**
     * 索引名字
     */
    private String indexName;
    
    
    /**
     * 全量索引的最大处理线程大小
     */
    private int threadSize;
    
    /**
     * 全量索引并行执行每个线程每次执行数据大小
     */
    private int treadDataSize;
    
    /**
     * typename
     */
    private String typeName;
    
    /**
     * ESURL
     */
    private String esUrl;

    /**
     * es ip
      */
    private String host;

    /**
     * es端口
     */
    private Integer port;

    /***
     *   用于标识是否正在处理的rediskey
     */
    private String redissKey;

    /**
     *mappingJson
     */
    private String mappingJson;
}

util工具类

package com.crb.ocms.product.service.impl;

import com.crb.ocms.product.domain.config.ESConfiguration;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NStringEntity;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.client.*;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.SuggestBuilders;
import org.elasticsearch.search.suggest.SuggestionBuilder;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.*;
import java.util.regex.Pattern;


/**
 * @Project crb-product-service
 * @PackageName com.crb.ocms.demo.service.test.impl
 * @ClassName ESHLRestUtil
 * @Author liqiang
 * @Date 2019/2/25 13:45
 * @Description es工具类
 */
@Slf4j
@Component
@EnableConfigurationProperties(ESConfiguration.class)
public class ESHLRestUtil {
    private RestHighLevelClient client = null;

    public RestHighLevelClient getClient() {
        return client;
    }

    public void setClient(RestHighLevelClient client) {
        this.client = client;
    }
    ESConfiguration esConfiguration;

    @Autowired
    public ESHLRestUtil(ESConfiguration configuration) {
        client = new SimpleRestHighLevelClient(RestClient.builder(new HttpHost(configuration.getHost(), configuration.getPort(), "http")));
        this.esConfiguration=configuration;
    }
    public SimpleRestHighLevelClient getSimpleClient(){
        return (SimpleRestHighLevelClient)client;
    }


    /**
     *根据indexname获得index名字 支持通配符匹配
     * @param indexName
     * @return
     */
    public   String [] getIndexNames(String indexName)  {

        GetIndexResponse getIndexResponse= null;
        try {
            getIndexResponse = getIndexInfo(indexName);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return getIndexResponse!=null?getIndexResponse.getIndices():null;
    }

    /**
     * 根据index名字获得index信息
     * @param indexName
     * @return
     */
    public GetIndexResponse  getIndexInfo(String indexName) throws IOException {
      //GetIndexRequest request = new GetIndexRequest().indices(indexName);
//        try {
//            GetIndexResponse getIndexResponse =client.indices().get(request, RequestOptions.DEFAULT);
//            return  getIndexResponse;
//        } catch (IOException e) {
//            e.printStackTrace();
//        }
        //master_timeout
        /**
         * 因为现在是api使用6.5 线上是6.24 master_timeout   使用以下方式 替换掉
         */
        GetIndexRequest request = new GetIndexRequest().indices(indexName);
        String[] indices = request.indices() == null ? Strings.EMPTY_ARRAY : request.indices();
        String endpoint =indexName;
        Request httpequest = new Request("GET", endpoint);
        httpequest.addParameter("ignore_unavailable", "false");
        httpequest.addParameter("expand_wildcards", "open");
        httpequest.addParameter("allow_no_indices", "true");
        org.apache.http.HttpEntity entity = new NStringEntity("", ContentType.APPLICATION_JSON);
        httpequest.setEntity(entity);
        Response httpResponse= getClient().getLowLevelClient().performRequest(httpequest);
        GetIndexResponse getIndexResponse=getSimpleClient().parseGetIndexRespons(httpResponse);
        return  getIndexResponse;
    }

    /**
     * 获得最大index名字
     * @param names indexName_number 格式
     * @return
     */
    public  Integer getMaxIndexNumber(String []names) {
        if(names==null||names.length<=0){
            return null;
        }
        List<Integer> indexNumber=new ArrayList<Integer>();
        for (String name:
                names) {
            String[] arrs=name.split("_");
            if(arrs.length<=1){
                continue;
            }
            indexNumber.add(Integer.valueOf(arrs[1]));
        }
        indexNumber.sort(new Comparator<Integer>() {
            @Override
            public int compare(Integer o1, Integer o2) {
                 return o2 - o1;
            }
        });
        return CollectionUtils.isEmpty(indexNumber)?null:indexNumber.get(0);
    }
    /**
     * 验证索引是否存在
     *
     * @param index 索引名称
     * @return
     * @throws Exception
     */
    public boolean indexExists(String index) throws Exception {
        GetIndexRequest request = new GetIndexRequest();
        request.indices(index);
        boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
        return exists;
    }

    /**
     * 创建index
     *
     * @param index
     * @param indexType
     * @param properties 结构: {name:{type:text}} {age:{type:integer}}
     * @return
     * @throws Exception
     */
    public boolean indexCreate(String index, String indexType,
                               Map properties) throws Exception {

        if (indexExists(index)) {
            return true;
        }
        CreateIndexRequest request = new CreateIndexRequest(index);
        request.settings(Settings.builder().put("index.number_of_shards", 3)
                .put("index.number_of_replicas", 2));
        Map jsonMap = new HashMap<>();
        Map mapping = new HashMap<>();
        mapping.put("properties", properties);
        jsonMap.put(indexType, mapping);
        request.mapping(indexType, jsonMap);

        CreateIndexResponse createIndexResponse = client.indices().create(
                request,RequestOptions.DEFAULT);
        boolean acknowledged = createIndexResponse.isAcknowledged();
        return acknowledged;
    }

    /**
     * 删除指定索引
     * @param indexName
     * @return
     * @throws IOException
     */
    public boolean deleteIndex(String indexName) throws IOException {
        DeleteIndexRequest request = new DeleteIndexRequest(indexName);
        AcknowledgedResponse deleteIndexResponse = client.indices().delete(request, RequestOptions.DEFAULT);
        return deleteIndexResponse.isAcknowledged();
    }
    /**
     * 创建索引
     * @param index       索引名字
     * @param settiongs settiongs
     * @return
     * @throws Exception
     * @author lqiang
     */
    public boolean indexCreate(String index, String settiongs
    ) throws Exception {

        if (indexExists(index)) {
            return true;
        }
        CreateIndexRequest request = new CreateIndexRequest(index);
//        request.settings(Settings.builder().put("index.number_of_shards", 3)
//                .put("index.number_of_replicas", 2));
        request.source(settiongs, XContentType.JSON);

        CreateIndexResponse createIndexResponse = client.indices().create(
                request,RequestOptions.DEFAULT);
        boolean acknowledged = createIndexResponse.isAcknowledged();
        return acknowledged;
    }

    /**
     * 创建更新文档
     *
     * @param index
     * @param indexType
     * @param documentId
     * @param josonStr
     * @return
     * @throws Exception
     */
    public boolean documentCreate(String index, String indexType,
                                  String documentId, String josonStr) throws Exception {
        IndexRequest request = new IndexRequest(index, indexType, documentId);

        request.source(josonStr, XContentType.JSON);
        IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);

        if (indexResponse.getResult() == DocWriteResponse.Result.CREATED
                || indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
            return true;
        }
        ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
        if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
            return true;
        }
        if (shardInfo.getFailed() > 0) {
            for (ReplicationResponse.ShardInfo.Failure failure : shardInfo
                    .getFailures()) {
                throw new Exception(failure.reason());
            }
        }
        return false;
    }

    /**
     * 创建更新文档
     *
     * @param index
     * @param indexType
     * @param documentId
     * @param map
     * @return
     * @throws Exception
     */
    public boolean documentCreate(String index, String indexType,
                                  String documentId, Map map) throws Exception {
        IndexRequest request = new IndexRequest(index, indexType, documentId);

        request.source(map);
        IndexResponse indexResponse = client.index(request,RequestOptions.DEFAULT);

        if (indexResponse.getResult() == DocWriteResponse.Result.CREATED
                || indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
            return true;
        }
        ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
        if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
            return true;
        }
        if (shardInfo.getFailed() > 0) {
            for (ReplicationResponse.ShardInfo.Failure failure : shardInfo
                    .getFailures()) {
                throw new Exception(failure.reason());
            }
        }
        return false;
    }


    /**
     * 批量创建更新文档
     *
     * @param index
     * @param indexType
     * @param list      建索引时传入_id作为docuemntId
     * @return
     * @throws Exception
     */
    public boolean documentCreateBulk(String index, String indexType,
                                      List<Map<String, Object>> list) throws Exception {

        if (list.size() > 0) {

            BulkRequest bulkRequest = new BulkRequest();

            for (Map map : list) {
                IndexRequest indexRequest;

                Object idObj = map.get("_id");
                if (idObj != null && StringUtils.isNotBlank(idObj.toString())) {
                    map.remove("_id");
                    String documentId = idObj.toString();
                    indexRequest = new IndexRequest(index, indexType, documentId);
                } else {
                    indexRequest = new IndexRequest(index, indexType);
                }
                indexRequest.source(map);
                bulkRequest.add(indexRequest);

            }

            BulkResponse bulkResponse = client.bulk(bulkRequest,RequestOptions.DEFAULT);

            if (bulkResponse.hasFailures()) {
                System.out.println("索引异常信息:" + bulkResponse.buildFailureMessage());
                return false;
            }
        }

        return true;

    }

    /**
     * 创建更新文档
     *
     * @param index
     * @param indexType
     * @param documentId
     * @param routing
     * @param map
     * @return
     * @throws Exception
     */
    public boolean documentCreate(String index, String indexType,
                                  String documentId, String routing, Map map) throws Exception {
        IndexRequest request = new IndexRequest(index, indexType, documentId);

        request.routing(routing);
        request.source(map);
        IndexResponse indexResponse = client.index(request,RequestOptions.DEFAULT);

        if (indexResponse.getResult() == DocWriteResponse.Result.CREATED
                || indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
            return true;
        }
        ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
        if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
            return true;
        }
        if (shardInfo.getFailed() > 0) {
            for (ReplicationResponse.ShardInfo.Failure failure : shardInfo
                    .getFailures()) {
                throw new Exception(failure.reason());
            }
        }
        return false;
    }

    /**
     * 创建索引
     *
     * @param index
     * @param indexType
     * @param josonStr
     * @return
     * @throws Exception
     */
    public String documentCreate(String index, String indexType, String josonStr)
            throws Exception {
        IndexRequest request = new IndexRequest(index, indexType);

        request.source(josonStr, XContentType.JSON);
        IndexResponse indexResponse = client.index(request,RequestOptions.DEFAULT);

        String id = indexResponse.getId();
        if (indexResponse.getResult() == DocWriteResponse.Result.CREATED
                || indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
            return id;
        }
        ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
        if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
            return id;
        }
        if (shardInfo.getFailed() > 0) {
            for (ReplicationResponse.ShardInfo.Failure failure : shardInfo
                    .getFailures()) {
                throw new Exception(failure.reason());
            }
        }
        return null;
    }

    /**
     * 创建索引
     *
     * @param index
     * @param indexType
     * @param map
     * @return
     * @throws Exception
     */
    public String documentCreate(String index, String indexType,
                                 Map map) throws Exception {
        IndexRequest request = new IndexRequest(index, indexType);

        request.source(map);
        IndexResponse indexResponse = client.index(request,RequestOptions.DEFAULT);

        String id = indexResponse.getId();
        if (indexResponse.getResult() == DocWriteResponse.Result.CREATED
                || indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
            return id;
        }
        ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
        if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
            return id;
        }
        if (shardInfo.getFailed() > 0) {
            for (ReplicationResponse.ShardInfo.Failure failure : shardInfo
                    .getFailures()) {
                throw new Exception(failure.reason());
            }
        }
        return null;
    }

    public boolean documentDelete(String index, String indexType,
                                  String documentId) throws Exception {
        DeleteRequest request = new DeleteRequest(index, indexType, documentId);
        DeleteResponse deleteResponse = client.delete(request,RequestOptions.DEFAULT);
        if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
            return true;
        }
        ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
        if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
            return true;
        }
        if (shardInfo.getFailed() > 0) {
            for (ReplicationResponse.ShardInfo.Failure failure : shardInfo
                    .getFailures()) {
                throw new Exception(failure.reason());
            }
        }
        return false;
    }

    /**
     * 为指定index指定别名
     * @param indexName index名字
     * @param aliasesName 别名名字
     * @return
     */
    public boolean aliases(String indexName,String aliasesName){
            IndicesAliasesRequest request=new IndicesAliasesRequest();
            IndicesAliasesRequest.AliasActions aliasAction =
                    new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD)
                            .index(indexName)
                            .alias(aliasesName);
            request.addAliasAction(aliasAction);
            try {
                AcknowledgedResponse indicesAliasesResponse =
                        getClient().indices().updateAliases(request, RequestOptions.DEFAULT);
                return indicesAliasesResponse.isAcknowledged();
            } catch (IOException e) {
                log.info("指定别名失败");
                return  false;
            }
    }

    /**
     * 指定别名 并删除oldIndex的别名
     * @param oldindex
     * @param newIndex
     * @param aliasesName
     * @return
     */
    public boolean aliases(String oldindex, String newIndex,String aliasesName) {
        IndicesAliasesRequest request=new IndicesAliasesRequest();
        //旧的删除
        IndicesAliasesRequest.AliasActions aliasAction =
                new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.REMOVE)
                        .index(oldindex)
                        .alias(aliasesName);
        request.addAliasAction(aliasAction);
        //新的绑定
        aliasAction =
                new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD)
                        .index(newIndex)
                        .alias(aliasesName);
        request.addAliasAction(aliasAction);
        try {
            AcknowledgedResponse indicesAliasesResponse =
                    getClient().indices().updateAliases(request, RequestOptions.DEFAULT);
            return indicesAliasesResponse.isAcknowledged();
        } catch (IOException e) {
            return  false;
        }
    }
    /**
     * 根据id获得文档信息
     * @param index indexName
     * @param type typeName
     * @param id id
     * @return
     * @throws IOException
     */
    public GetResponse getDoucmentById(String index, String type, String id) throws IOException {
        GetRequest getRequest = new GetRequest(
                index,//索引
                type,//类型
                id);//文档ID
        GetResponse getResponse = client.get(getRequest,RequestOptions.DEFAULT);
        return getResponse;
    }
    /**
     * 简单的单表根据条件进行查询
     *
     * @param index      index
     * @param type       type
     * @param parameters 参数
     * @return
     */
    public List<String> queryByMatch(String index, String type, Map<String, String> parameters) {
        List<String> jsons=new ArrayList<String>();
        try {
            SearchRequest searchRequest=new SearchRequest();
            BoolQueryBuilder booleanQueryBuilder= QueryBuilders.boolQuery();
            SearchSourceBuilder searchSourceBuilder=new SearchSourceBuilder();
            for (String key :
                        parameters.keySet()) {
                    booleanQueryBuilder.must().add(QueryBuilders.termQuery(key,parameters.get(key)));
            }
            searchSourceBuilder.query(booleanQueryBuilder);
            searchRequest.source(searchSourceBuilder);

            SearchResponse response = client.search(searchRequest,RequestOptions.DEFAULT);
            if(response.getHits().getHits()!=null){
                for (SearchHit searchHit:
                response.getHits().getHits() ) {
                    jsons.add(searchHit.getSourceAsString());
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return jsons;
    }
    /**
     * 只包含字母
     *
     * @return 验证成功返回true,验证失败返回false
     */
    public static boolean checkLetter(String cardNum) {
        String regex = "^[A-Za-z]+$";
        return Pattern.matches(regex, cardNum);
    }

    /**
     * 验证中文
     *
     * @param chinese 中文字符
     * @return 验证成功返回true,验证失败返回false
     */
    public static boolean checkChinese(String chinese) {
        String regex = "^[u4E00-u9FA5]+$";
        return Pattern.matches(regex, chinese);
    }


    /**
     * Description:提示词,支持中文、拼音、首字母等
     * <p>
     * 1、检测搜索词是中文还是拼音
     * 2、若是中文,直接按照name字段提示
     * 3、若是拼音(拼音+汉字),先按照name.keyword_pinyin获取,若是无结果按照首字母name.keyword_first_py获取
     * @param index
     * @param type
     * @param field 提示字段名字
     * @param text 文本
     * @return
     * @author liqiang
     */
    public Set<String> getSuggestWord(String index, String type, String field, String text) throws IOException {
        String postField=field;
        if (checkLetter(text)) {
            postField = field + ".keyword_pinyin";
        } else if (checkChinese(text)) {
            postField = field;
        } else {
            postField = field + ".keyword_pinyin";
        }
        Set<String> suggestTexts= postSuggestWord(index,type,postField,text);
        if(org.springframework.util.CollectionUtils.isEmpty(suggestTexts)){
            return postSuggestWord(index,type,field+".keyword_first_py",text);
        }
        return suggestTexts;
    }
    /**
     * Description:提示词,支持中文、拼音、首字母等
     * @param index
     * @param type
     * @param field 提示字段名字
     * @param text 文本
     * @return
     * @author liqiang
     */
    private Set<String> postSuggestWord(String index, String type, String field, String text) throws IOException {

        // 1、创建search请求
        SearchRequest searchRequest = new SearchRequest(index);
        searchRequest.types(type);
        // 2、用SearchSourceBuilder来构造查询请求体 ,请仔细查看它的方法,构造各种查询的方法都在这。
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.fetchSource(false);//不返回_source数据
        sourceBuilder.size(0);//忽略hits
        //做查询建议
        //词项建议
        SuggestionBuilder termSuggestionBuilder =
                SuggestBuilders.completionSuggestion(field).text(text);
        SuggestBuilder suggestBuilder = new SuggestBuilder();
        suggestBuilder.addSuggestion("suggest_productName", termSuggestionBuilder);
        sourceBuilder.suggest(suggestBuilder);
        searchRequest.source(sourceBuilder);
        Set<String> suggestTextList = new HashSet<String>();
        //3、发送请求
        SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
        //4、处理响应
        //搜索结果状态信息
        if (RestStatus.OK.equals(searchResponse.status())) {
            // 获取建议结果
            Suggest suggest = searchResponse.getSuggest();
            CompletionSuggestion termSuggestion = suggest.getSuggestion("suggest_productName");
            for (CompletionSuggestion.Entry entry : termSuggestion.getEntries()) {
                for (CompletionSuggestion.Entry.Option option : entry) {
                    suggestTextList.add(option.getText().string());
                }
            }
        }
        return suggestTextList;
    }
    /**
     * @Project demo
     * @PackageName com.crb.ocms.product.service.impl
     * @ClassName SimpleRestHighLevelClient
     * @Author qiang.li
     * @Date 2019/4/2 10:45 AM
     * @Description es版本降级导致的高级api会生成部分属性 低版本不能识别  将解析结果的方法暴露在外面
     */
    public class SimpleRestHighLevelClient extends RestHighLevelClient {
        public SimpleRestHighLevelClient(RestClientBuilder restClientBuilder) {
            super(restClientBuilder);
        }

        /**
         * 父类解析的解析响应内容是受保护的 所以定义一个继承
         * 解决版本不一致的 自定义请求 解析
         * @param response
         * @param entityParser
         * @param <Req>
         * @param <Resp>
         * @return
         * @throws IOException
         */
        public <Req extends Validatable, Resp> Resp parseEntity(Response response,CheckedFunction<XContentParser, Resp, IOException> entityParser)throws IOException {

            return this.parseEntity(response.getEntity(),entityParser);
        }

        /**
         * 解析获得idnex的响应
         * @param response
         * @return
         * @throws IOException
         */
        public GetIndexResponse parseGetIndexRespons(Response response) throws IOException {
            return  parseEntity(response,GetIndexResponse::fromXContent);
        }

        /**
         * 解析查询响应
         * @param response
         * @return
         * @throws IOException
         */
        public SearchResponse parseSearchResponse(Response response) throws IOException {
            return parseEntity(response,SearchResponse::fromXContent);
        }

        /**
         * 解析索引迁移响应
         * @param response
         * @return
         * @throws IOException
         */
        public BulkByScrollResponse parseBulkByScrollResponse(Response response) throws IOException {
            return  parseEntity(response,BulkByScrollResponse::fromXContent);
        }



    }

}

封装的处理器

工作原理

将指定条件的数据迁移到新索引,然后再新索引上面进行导入(多线程并行导入),导入完毕删除老索引和别名 然后为新索引绑定别名 实现不停机更新

抽象接口

package com.crb.ocms.product.serviceTool;

import org.elasticsearch.index.reindex.ReindexRequest;

import java.io.IOException;
import java.util.concurrent.Future;

/**
 * @Project crb-product-service
 * @PackageName com.crb.ocms.product.serviceTool
 * @ClassName EsImportService
 * @Author liqiang
 * @Date 2019/3/29 1:28 PM
 * @Description es导入抽象接口
 */
public interface ESImportService {

    /**
     * 获得索引迁移条件
     * @param reindexRequest
     * @return
     * @throws IOException
     */
    public boolean reindex(ReindexRequest reindexRequest) throws IOException;

    /**
     * 异步导入
     * @return
     */
    public Future<Boolean> importAllAsyn();
}

抽象的处理器

使用模板模式将通用代码抽出来

package com.crb.ocms.product.serviceTool;

import com.crb.ocms.product.domain.config.ESConfiguration;
import com.crb.ocms.product.domain.entity.MdProduct;
import com.crb.ocms.product.domain.redisskey.MdEsProductRedisKeyEnum;
import com.crb.ocms.product.domain.util.exceptions.OCmsExceptions;
import com.crb.ocms.product.service.impl.ESHLRestUtil;
import com.hazelcast.util.StringUtil;
import lombok.extern.log4j.Log4j2;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.redisson.api.RedissonClient;
import org.springframework.data.domain.PageRequest;
import org.springframework.web.client.RestTemplate;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @Project crb-product-service
 * @PackageName com.crb.ocms.product.serviceTool
 * @ClassName ESAbstrctImpor
 * @Author liqiang
 * @Date 2019/3/29 1:35 PM
 * @Description 抽象的es导入处理器
 */
@Log4j2
public abstract class ESAbstractImport implements  ESImportService {
   private RestTemplate restTemplate;
    private ESConfiguration esConfiguration;
    private ESHLRestUtil eshlRestUtil;
    private RedissonClient redissonClient;
    /**
     * 当前index名字
     */
    private String templateIndexName;
    /**
     * 之前的index名字
     */
    private String beforeIndexName;
    public ESAbstractImport(RestTemplate restTemplate, ESConfiguration esConfiguration,RedissonClient redissonClient){
        esConfiguration.setIndexName("cmsproudct");
        esConfiguration.setAliasName("cmsproductAliasName");
        this.restTemplate=restTemplate;
        this.esConfiguration=esConfiguration;
        this.eshlRestUtil=new ESHLRestUtil(esConfiguration);
        this.redissonClient=redissonClient;
    }


    /**
     * 获得创建索引的mapping抽象方法
     * @return
     */
    public abstract String  getMapping();

    /**
     * 获得所有迁移requestBody抽象方法
     * @return
     */
    public abstract  ReindexRequest getReindex();

    /**
     * es索引秦阿姨

     * @return
     */
    @Override
    public boolean reindex(ReindexRequest reindexRequest) throws IOException {

      //  BulkByScrollResponse bulkByScrollResponse= eshlRestUtil.getClient().reindex(reindexRequest,RequestOptions.DEFAULT);
        /**
         * 因为现在是api使用6.5 线上是6.24 查询语句多生成zero_terms_query   使用以下方式 替换掉
         */
        String endpoint ="/_reindex";
        Request httpequest = new Request("POST", endpoint);
        BytesRef source = XContentHelper.toXContent(reindexRequest, XContentType.JSON, false).toBytesRef();
        org.apache.http.HttpEntity entity = new NStringEntity(new String(source.bytes).replaceAll(""zero_terms_query":"NONE","," "), ContentType.APPLICATION_JSON);
        httpequest.setEntity(entity);
        Response httpResponse=eshlRestUtil.getClient().getLowLevelClient().performRequest(httpequest);
        BulkByScrollResponse getIndexResponse=eshlRestUtil.getSimpleClient().parseBulkByScrollResponse(httpResponse);
        return true;
    }

    /**
     * 导入数据 根据设置的线程大小根据数据大小算出线程数量 通过多线程并行处理
     * @return
     */
    public boolean importAll() throws IOException {
        log.info("正在产品全量导入2:"+esConfiguration.getEsUrl());
        if (redissonClient.getMap(esConfiguration.getRedissKey()).isExists()) {
            throw new OCmsExceptions("全量索引正在导入中....");
        }
        String []indexNames=eshlRestUtil.getIndexNames(esConfiguration.getIndexName()+"*");

        if (indexNames!=null&&indexNames.length>0) {
            Integer number= eshlRestUtil.getMaxIndexNumber(indexNames);
            beforeIndexName=number==null?esConfiguration.getIndexName():esConfiguration.getIndexName()+"_"+number;
            templateIndexName=esConfiguration.getIndexName()+"_"+(number==null?1:(number+1));
        } else {
            templateIndexName = esConfiguration.getIndexName();
        }
        boolean isSuccess = true;
        long count = getCount();
        //根据线程处理大小获得页码
        int index = (new Double(Math.ceil(count*1.0 / esConfiguration.getTreadDataSize()))).intValue();
        if (index <= 0 && count > 0) {
            index = 1;
        }
        //获得最大处理线程大小
        int threadSize = index > esConfiguration.getThreadSize() ? esConfiguration.getThreadSize() : index;
        //导入处理器
        ESAbstractImport.ProcessImportIndex processImportIndex = new ESAbstractImport.ProcessImportIndex(esConfiguration.getTreadDataSize(), index, templateIndexName);
        ThreadPoolExecutor executorService=null;
        try {
            //第一次导入直接创建索引
            if (beforeIndexName == null) {
                log.info("开始创建索引");
                eshlRestUtil.indexCreate(templateIndexName,getMapping());
                log.info("创建索引成功");
            } else {
                log.info("开始创建索引1");
                eshlRestUtil.indexCreate(templateIndexName,getMapping());
                log.info("创建索引成功1");
                log.info("开始创建索引2");
                //非第一次导入将子集数据迁移进来
                reindex(getReindex());
                log.info("开始创建索引2");
            }

            /** 线程池的自定义配置,IO密集型任务. */
            executorService = new ThreadPoolExecutor(
                    // 核心线程数
                    threadSize,
                    // 最大线程数
                    100,
                    // 存活时间,30s
                    30,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<Runnable>(150),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.CallerRunsPolicy());

            redissonClient.getMap(esConfiguration.getRedissKey()).put("templateIndexName", templateIndexName);

            //统一保存异步处理结果
            List<Future<Boolean>> processResult = new ArrayList<Future<Boolean>>();
            for (int i = 0; i < threadSize; i++) {
                processResult.add(executorService.submit(processImportIndex));
            }

            for (Future<Boolean> future :
                    processResult) {
                if (!future.get()) {
                    isSuccess = future.get();
                    break;
                }
            }
            //表示重建成功
            if (isSuccess) {
                //表示第一次初始化 指定别名
                if (StringUtil.isNullOrEmpty(beforeIndexName)) {
                    boolean isFail = false;
                    int i = 0;
                    do {
                        isFail = !eshlRestUtil.aliases(templateIndexName,esConfiguration.getAliasName());
                        if(i>0) {
                            //休眠
                            Thread.sleep(i * 1000);
                        }
                        i++;
                    } while (isFail && i < 10);
                    if (isFail) {
                        log.info("开始删除索引2");
                        eshlRestUtil.deleteIndex(templateIndexName);
                        log.info("开始删除成功");
                    }
                } else {
                    boolean isFail = false;
                    int i = 0;
                    do {
                        //重新绑定别名 失败重试十次
                        isFail = !eshlRestUtil.aliases(beforeIndexName,templateIndexName,esConfiguration.getAliasName());
                        if(i>0) {
                            //休眠
                            Thread.sleep(i * 1000);
                        }
                        i++;
                    } while (isFail && i < 10);
                    if (isFail) {
                        //删除备份索引
                        eshlRestUtil.deleteIndex(templateIndexName);
                        throw new OCmsExceptions("索引重做失败,请重试");
                    } else {
                        //删除备份索引
                        eshlRestUtil.deleteIndex(beforeIndexName);
                    }
                }
            } else {
                for (Future<Boolean> future :
                        processResult) {
                    if (!future.get()) {
                        //没有重做成功删除创建的新索引
                        eshlRestUtil.deleteIndex(templateIndexName);
                        future.isCancelled();//取消其他正在执行的线程
                    }
                }
            }
        } catch (Exception e) {
            String uuid = UUID.randomUUID().toString();
            log.error(uuid + "
" + e.getMessage());
            eshlRestUtil.deleteIndex(templateIndexName);
            throw new OCmsExceptions("<ES文档导入失败>异常编码" + uuid + "
" + e.getMessage());
        } finally {
            if (executorService != null) {
                executorService.shutdown();
            }
            redissonClient.getBucket(esConfiguration.getRedissKey()).delete();

        }
        return isSuccess;
    }

    /**
     * 异步的导入方法
     * @return
     */
    @Override
    public Future<Boolean> importAllAsyn() {
        ExecutorService executorService=new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
        return executorService.submit(new Callable<Boolean>() {
            @Override
            public Boolean call() throws Exception {
                return importAll();
            }
        });
    }

    /**
     * 抽象的处理导入方法
     * @param pageRequest
     * @return
     * @throws IOException
     */
    public abstract   boolean processImport(PageRequest pageRequest) throws IOException;


    public abstract Long getCount();

    /**
     * 用于处理批量索引导入
     */
    class ProcessImportIndex implements Callable<Boolean> {

        //数据总页数
        private int pageCount;
        //总条数
        private int pageSize;

        private String indexName;
        private AtomicLong currentIndex = new AtomicLong();
        //当前处理页数
        AtomicInteger currentPage = new AtomicInteger();

        public ProcessImportIndex(int pageSize, int pageCount, String indexName) {
            this.pageSize = pageSize;
            this.pageCount = pageCount;
            this.indexName = indexName;
        }

        /**
         * Computes a result, or throws an exception if unable to do so.
         *
         * @return computed result
         * @throws Exception if unable to compute a result
         */
        @Override
        public Boolean call() throws Exception {
            try {
                return process();
            } catch (Exception e) {
                e.printStackTrace();
                return false;
            }
        }

        public boolean process() throws IOException {
            int page = currentPage.getAndIncrement();
            redissonClient.getMap(MdEsProductRedisKeyEnum.IMPORT_INDEX).put("pageCount", pageCount);
            while (page < pageCount) {
                redissonClient.getMap(MdEsProductRedisKeyEnum.IMPORT_INDEX).put("currentPage", page+1);
                processImport(new PageRequest(page, pageSize));
                page = currentPage.getAndIncrement();
            }
            return true;
        }

        public AtomicInteger getCurrentPage() {
            return currentPage;
        }

        public void setCurrentPage(AtomicInteger currentPage) {
            this.currentPage = currentPage;
        }


    }

    public String getTemplateIndexName() {
        return templateIndexName;
    }

    public void setTemplateIndexName(String templateIndexName) {
        this.templateIndexName = templateIndexName;
    }

    public interface BulkCallback {
        public void process(MdProduct mdProduct);
    }

    public RestTemplate getRestTemplate() {
        return restTemplate;
    }

    public void setRestTemplate(RestTemplate restTemplate) {
        this.restTemplate = restTemplate;
    }

    public ESConfiguration getEsConfiguration() {
        return esConfiguration;
    }

    public void setEsConfiguration(ESConfiguration esConfiguration) {
        this.esConfiguration = esConfiguration;
    }

    public ESHLRestUtil getEshlRestUtil() {
        return eshlRestUtil;
    }

    public void setEshlRestUtil(ESHLRestUtil eshlRestUtil) {
        this.eshlRestUtil = eshlRestUtil;
    }

    public RedissonClient getRedissonClient() {
        return redissonClient;
    }

    public void setRedissonClient(RedissonClient redissonClient) {
        this.redissonClient = redissonClient;
    }

    public String getBeforeIndexName() {
        return beforeIndexName;
    }

    public void setBeforeIndexName(String beforeIndexName) {
        this.beforeIndexName = beforeIndexName;
    }
}

使用例子

package com.crb.ocms.product.serviceTool;

import com.crb.ocms.product.domain.config.ESConfiguration;
import com.crb.ocms.product.domain.entity.MdProduct;
import com.crb.ocms.product.domain.repository.MdProductRepository;
import com.crb.ocms.product.domain.vo.req.FindProductCharacterStocksVo;
import com.crb.ocms.product.domain.vo.resp.ProductCharacterStocksVo;
import com.crb.ocms.product.service.MdProductService;
import com.crb.ocms.product.service.feign.IcProductStoreAccountService;
import com.crb.ocms.product.service.impl.MdESProductServiceImpl;
import lombok.extern.log4j.Log4j2;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.redisson.api.RedissonClient;
import org.springframework.data.domain.Example;
import org.springframework.data.domain.ExampleMatcher;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.web.client.RestTemplate;

import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;

/**
 * @Project crb-product-service
 * @PackageName com.crb.ocms.product.serviceTool
 * @ClassName ProductStockImport
 * @Author liqiang
 * @Date 2019/3/29 2:15 PM
 * @Description 处理库存全量索引导入
 */
@Log4j2
@Deprecated
public class ProductStockImport extends  ProductIndexImport {
    MdProductRepository mdProductRepository;
    IcProductStoreAccountService icProductStoreAccountService;
    String regionCode;
    public ProductStockImport(RestTemplate restTemplate, ESConfiguration esConfiguration, RedissonClient redissonClient, MdProductRepository mdProductRepository, IcProductStoreAccountService icProductStoreAccountService, String regionCode) {
        super(restTemplate, esConfiguration, redissonClient);
        this.mdProductRepository=mdProductRepository;
        this.regionCode=regionCode;
        this.icProductStoreAccountService=icProductStoreAccountService;
    }

    /**
     * 索引迁移条件。将指定条件数据迁移到新的索引
     * @return
     */
    @Override
    public ReindexRequest getReindex() {
        ReindexRequest reindexRequest=new ReindexRequest();
        reindexRequest.setSourceIndices(getBeforeIndexName());
        reindexRequest.setSourceQuery(QueryBuilders.boolQuery().mustNot(QueryBuilders.matchPhraseQuery("info","ic_product_store_account")));
        reindexRequest.setDestIndex(getTemplateIndexName());
        return reindexRequest;

    }

    /**
     * 已经迁移到经销权 先禁用
     * @param pageRequest
     * @return
     * @throws IOException
     */
    @Deprecated
    @Override
    public boolean processImport(PageRequest pageRequest) throws IOException {
        MdProduct mdProduct = new MdProduct();
        mdProduct.setRegionCode(regionCode);
        ExampleMatcher exampleMatcher = ExampleMatcher.matching().withMatcher("regionCode", ExampleMatcher.GenericPropertyMatchers.exact()).withIgnorePaths("optCounter","createdDate","updatedDate","id");;
        Example example=Example.of(mdProduct,exampleMatcher);
        Page<MdProduct> result = mdProductRepository.findAll(example,pageRequest);
        for (MdProduct md:result){
            if (md.getFullPalletSaleFlag()==null){
                md.setFullPalletSaleFlag(0);
            }
        }
        List<Long>  productIds=result.getContent().stream().map(MdProduct::getMdProductId).collect(Collectors.toList());
        FindProductCharacterStocksVo findProductCharacterStocksVo=new FindProductCharacterStocksVo();
        findProductCharacterStocksVo.setProductIds(productIds);
        findProductCharacterStocksVo.setRegionCode(regionCode);
        List<ProductCharacterStocksVo> productCharacterStocksVoList= icProductStoreAccountService.findProductCharacterStocks(findProductCharacterStocksVo);
        BulkRequest bulkRequest=new BulkRequest();
        for (ProductCharacterStocksVo productCharacterStocksVo:
                productCharacterStocksVoList) {
            if(productCharacterStocksVo.getCharacterId().longValue()==-1L){
                continue;
            }
            XContentBuilder builder = XContentFactory.jsonBuilder();
            builder.startObject();
            builder.field("productId",productCharacterStocksVo.getProductId());
            builder.field("factoryId",productCharacterStocksVo.getFactoryId());
            builder.field("characterId",productCharacterStocksVo.getCharacterId());
            builder.field("sumCount",productCharacterStocksVo.getSumCount());
            builder.field("regionCode",regionCode);
            builder.field("info").startObject()
                    .field("name","ic_product_store_account")
                    .field("parent",productCharacterStocksVo.getProductId()+"_"+regionCode)
                    .endObject();
            builder.endObject();
            String indexId="ic_product_store_account_"+productCharacterStocksVo.getProductId()+"_"+productCharacterStocksVo.getCharacterId()+"_"+productCharacterStocksVo.getFactoryId();
            IndexRequest indexRequest=new IndexRequest(getTemplateIndexName(),getEsConfiguration().getTypeName(), indexId);
            indexRequest.source(builder);
            indexRequest.routing(productCharacterStocksVo.getProductId()+"_"+regionCode);
            bulkRequest.add(indexRequest);
        }
        log.info(bulkRequest.getDescription());
        getEshlRestUtil().getClient().bulk(bulkRequest, RequestOptions.DEFAULT);
        return true;

    }

    /**
     * 数据总条数
     * @return
     */
    @Override
    public Long getCount() {
        MdProduct mdProduct = new MdProduct();
        mdProduct.setRegionCode(regionCode);
        ExampleMatcher exampleMatcher = ExampleMatcher.matching().
                withMatcher("regionCode", ExampleMatcher.GenericPropertyMatchers.exact()).withIgnorePaths("optCounter","createdDate","updatedDate","id");
        Example example=Example.of(mdProduct,exampleMatcher);
       return mdProductRepository.count(example);
    }
}
processImport就只用关系查询出指定分页的数据 然后导入到es
原文地址:https://www.cnblogs.com/LQBlog/p/10566845.html