elasticsearch的操作类

import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;

/**
 * Created by guanxiangqing on 2016/9/24.
 */
public class ESStorage {

    private static final Logger LOGGER = LoggerFactory.getLogger(ESStorage.class);

    /**
     * ElasticSearch集群的名称之KEY值
     */
    private static String ES_CLUSTER_NAME = "es.cluster.name";

    /**
     * ElasticSearch集群的地址之KEY值
     */
    private static String ES_CLUSTER_ADDR = "es.cluster.addr";

    /**
     * 创建一个Client变量
     */
    private static TransportClient client;

    /**
     * 创建一个锁对象
     */
    private static Object lock = new Object();

    /**
     * 初始化ElasticSearch客户端连接
     * @return
     * @throws UnknownHostException
     */
    public static Client buildClientByTransport() throws UnknownHostException {
        if(client == null){
            synchronized (lock) {
                /**
                 * ElasticSearch集群的名称之Value值
                 */
                String esClusterName =INSTANCE.getStringValue(ES_CLUSTER_NAME);

                /**
                 * ElasticSearch集群的地址之Value值
                 */
                String esClusterAddr = INSTANCE.getStringValue(ES_CLUSTER_ADDR);

                Settings settings = Settings.settingsBuilder()
                        .put("cluster.name", esClusterName)
                        .put("client.transport.sniff", true).build();  // 自动嗅取其他的节点

                //解析 es.cluster.addr=cloudwave3:9300,cloudwave2:9300,cloudwave1:9300
                String[] singleEsURL = esClusterAddr.split(",");

                if(singleEsURL.length == 1){
                    String esHostName = singleEsURL[0].split(":")[0];
                    int esPort = Integer.parseInt(singleEsURL[0].split(":")[1]);

                    client = TransportClient.builder().settings(settings).build()
                            .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esHostName), esPort));
                } else if (singleEsURL.length == 2){
                    String esHostName0 = singleEsURL[0].split(":")[0];
                    int esPort0 = Integer.parseInt(singleEsURL[0].split(":")[1]);

                    String esHostName1 = singleEsURL[1].split(":")[0];
                    int esPort1 = Integer.parseInt(singleEsURL[1].split(":")[1]);

                    client = TransportClient.builder().settings(settings).build()
                            .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esHostName0), esPort0))
                            .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esHostName1), esPort1));
                } else if (singleEsURL.length >= 3) {
                    String esHostName0 = singleEsURL[0].split(":")[0];
                    int esPort0 = Integer.parseInt(singleEsURL[0].split(":")[1]);

                    String esHostName1 = singleEsURL[1].split(":")[0];
                    int esPort1 = Integer.parseInt(singleEsURL[1].split(":")[1]);

                    String esHostName2 = singleEsURL[2].split(":")[0];
                    int esPort2 = Integer.parseInt(singleEsURL[2].split(":")[1]);

                    client = TransportClient.builder().settings(settings).build()
                            .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esHostName0), esPort0))
                            .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esHostName1), esPort1))
                            .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esHostName2), esPort2));
                }

            }
        }
        return client;
    }

    /**
     * 删除ElasticSearch的索引
     * @param index  索引名称
     * @param type   索引来的类型
     * @param id     数据的ID
     * @throws Exception
     */
    public static void deleteEsIndexByIndexAndTypeAndId(String index, String type, String id) throws Exception{
        ESStorageOperationUnit.buildClientByTransport();
        DeleteResponse deleteresponse = client.prepareDelete(index, type, id)   //"logs", "log2015","150"
                .execute()
                .actionGet();
        LOGGER.info("Delete index:" + index + " type: " + type + " id :" + id +
                " successfully, and the version of the delete operation is " + deleteresponse.getVersion());
    }

    /**
     * 基于ES的ID值,批量删除ElasticSearch的索引
     * @param index  索引名称
     * @param type  索引类型
     * @param queryIds  数据的ID集合
     * @throws Exception
     */
    public static void deleteEsIndexByIndexAndTypeAndIdWithBath(String index, String type, ArrayList<String> queryIds) throws Exception{
        for(String queryId: queryIds){
            ESStorage.deleteEsIndexByIndexAndTypeAndId(index,type,queryId);
        }
    }


    /**
     * 删除ElasticSearch的索引(只通过索引删除)
     * @param index  索引名称
     * @throws Exception
     */
    public void deleteEsIndex(String index) throws Exception{
        ESStorageOperationUnit.buildClientByTransport();
        DeleteIndexResponse delete = client.admin().indices()
                .delete(new DeleteIndexRequest(index)).actionGet();
        LOGGER.info("The ElasticSearch index of " + index + " is deleted." + delete.toString());
    }

    /**
     * 批量删除ElasticSearch的索引(只通过索引删除)
     * @param indexList 要删除的索引集合
     * @throws Exception
     */
    public void deleteEsIndexByBatch(ArrayList<String> indexList) throws Exception{
        ESStorageOperationUnit.buildClientByTransport();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        ArrayList<String> esAllIndex =  ESStorage.acquireAllEsIndexList();

        for(String  index :  indexList){
            if(esAllIndex.contains(index)){
                try{
                    client.admin().indices().delete(new DeleteIndexRequest(index)).actionGet();
                    LOGGER.info("The ElasticSearch index of " + index + " is deleted." + sdf.format(new Date()));
                } catch (IndexNotFoundException e){
                    LOGGER.info("The ElasticSearch index of " + index + "is not find");
                }
            }
        }
    }

    /**
     * 通过日期和表名获取数据表的Index集合
     * @param weekDayList 基于时间划分的集合
     * @param tableName 数据表名称
     * @return
     */
    public ArrayList<String> acquireEsIndexList(String[] weekDayList, String tableName){
        ArrayList<String> esIndexList = new ArrayList<String>();
        String tableNameLowCase = tableName.toLowerCase();
        String esListTemp;

        for(int i = 0; i < weekDayList.length - 1 ; i++){
            esListTemp = tableNameLowCase + "-" + weekDayList[i] + "00-" + weekDayList[i+1] + "00";
            esIndexList.add(esListTemp);
        }
        return esIndexList;
    }

    /**
     * 获取ES中的所有索引值
     * @return
     */
    public static ArrayList<String> acquireAllEsIndexList() throws Exception{
        ArrayList<String> esIndexLists = new ArrayList<String>();
        ESStorageOperationUnit.buildClientByTransport();
        String[] indexList = client.admin().cluster().prepareState().execute().actionGet().getState().getMetaData().concreteAllIndices();

        for(int i = 0; i < indexList.length; i++){
            esIndexLists.add(indexList[i].trim());
        }
        return esIndexLists;
    }

    /**
     * 通过时间属性字段,及属性字段的值(时间的毫秒数),来查询符合条件的所有ID值
     * @param index  ES索引:
     * @param fieldName 
     * @param mill
     * @return
     * @throws Exception
     */
    public static ArrayList<String> acquireQueryIDsByTimeAttribute(String index, String esType, String fieldName, String millsTime) throws Exception{
        ESStorage.buildClientByTransport();
        QueryBuilder queryBuilder = buildQuery(fieldName,millsTime);
        SearchResponse response = client.prepareSearch(index)   // 索引
                .setTypes(esType)
                .setQuery(queryBuilder)                         // 组装的查询条件
                .execute()
                .actionGet();

        ArrayList<String> esIds = new ArrayList<String>();
        if (response != null) {
            System.out.println(response);
            for (SearchHit hit : response.getHits().getHits()) {
                esIds.add(hit.getId());
            }
        }
        return esIds;
    }

    /**
     *
     * @param index
     * @param esType
     * @param fieldName
     * @param millsTime
     * @throws Exception
     */
    public static void  acquireQueryIDsByTimeAttributeAndDeleteEsData(String index, String esType, String fieldName, String millsTime) throws Exception{
        ESStorageOperationUnit.buildClientByTransport();
        long esHistsNumber = acquireQueryTotalhistsNumbers(index, esType, fieldName, millsTime);
        LOGGER.info("The total Number hits of ElasticSearch is " + esHistsNumber);
        if(esHistsNumber > 0){
            for(int i = 0; i < esHistsNumber/20000 + 1; i++){
                QueryBuilder queryBuilder = buildQuery(fieldName,millsTime);
                SearchResponse response = client.prepareSearch(index)   // 索引
                        .setTypes(esType)
                        .setQuery(queryBuilder).setSize(20000)         // 组装的查询条件
                        .execute()
                        .actionGet();

                ArrayList<String> esIds = new ArrayList<String>();
                if (response != null) {
                    for (SearchHit hit : response.getHits().getHits()) {
                        esIds.add(hit.getId());
                    }
                }
                deleteEsIndexByIndexAndTypeAndIdWithBath(index, esType, esIds);
            }
        }
    }

    /**
     * 获取特定条件,查询的Es中hits总数量(用于遍历所有的满足条件Es的hits数据)
     * @param index
     * @param esType
     * @param fieldName
     * @param millsTime
     * @return
     * @throws Exception
     */
    public static long acquireQueryTotalhistsNumbers(String index, String esType, String fieldName, String millsTime) throws Exception{
        ESStorageOperationUnit.buildClientByTransport();
        QueryBuilder queryBuilder = buildQuery(fieldName,millsTime);
        SearchResponse response = client.prepareSearch(index)   // 索引
                .setTypes(esType)
                .setQuery(queryBuilder)                         // 组装的查询条件
                .execute()
                .actionGet();
        return response.getHits().getTotalHits();
    }

    /**
     * 根据范围筛选,来组装条件,本方法基于ES的RangeQueryBuilder重组装
     * @param fieldName
     * @param millsTime
     * @return
     */
    public static QueryBuilder buildQuery(String fieldName, String millsTime) {
        RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(fieldName).lte(millsTime); //gt lt eq gte lte
        return rangeQueryBuilder;
    }

    /**
     * 通过集合Collection元素,或者改元素最大的一个值,实则为ES中最大的索引
     * 集合中元素格式:
     * @param 
     * @return 返回值格式
     */
    public static String acquireMaxValueFromCollectionsElement(ArrayList<String> collectionsElements){
        Collections.sort(collectionsElements);     //排序
        return collectionsElements.get(collectionsElements.size() - 1);
    }

    /**
     * 根据当前ES的索引,获取ES的下一个索引
     * @param currentEsIndex
     * @return
     */
    public static String acquireNextEsIndexByCurrentEsIndex(String currentEsIndex,String weekDays) throws Exception{
        String[] esIndex = currentEsIndex.split("-");
        String tableName = esIndex[0].trim();
        String lastTime = esIndex[2].trim();
        String yearMonthDay = lastTime.substring(0, 8);
        return  tableName + "-" + lastTime + "-" + WeekDayUtil.getNextDate(yearMonthDay,weekDays) + "00";
    }

    /**
     * 通过当前的Es索引,获取上一个月,最后最后一天的毫秒时间数
     * 比如:
     *      时间:2016-06-30 23:59:59
     *      毫秒数:1467302399000
     * @param currentEsIndex  Es的索引,
     * @return
     * @throws Exception
     */
    public static String acquireMillsTimeWithCurrentDMonthLastDayByCurrentEsIndex(String currentEsIndex) throws Exception{
        String[] esIndex = currentEsIndex.split("-");
        String lastTime = esIndex[2].trim();
        String year = lastTime.substring(0, 4);
        String month = lastTime.substring(4, 6);

        Calendar calendar = Calendar.getInstance();
        calendar.set(Calendar.MONTH, Integer.parseInt(month));
        calendar.set(Calendar.YEAR, Integer.parseInt(year));

        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
        calendar.set(Calendar.DAY_OF_MONTH, 0);
        String lastTime_temp = format.format(calendar.getTime()) + " 24:00:00";

        SimpleDateFormat format2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        format2.parse(lastTime_temp).getTime();
        return format2.parse(lastTime_temp).getTime() + "";
    }


    public static void main(String args []) throws Exception{
        /**
         * 功能测试1:
         * 功能测试说明:
         * 初始化部分:公共模块
         */
        Settings settings = Settings.settingsBuilder().put("cluster.name", "").build();
        TransportClient.builder().settings(settings).build()
                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("cloudwave1"), 9300))
                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("cloudwave2"), 9300))
                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("cloudwave3"), 9300));

        /**
         * 功能测试4:
         * 测试删除多个索引,并且存在所有不存在的情况
//
//        ArrayList<String> indexList = new ArrayList<String>();
//        indexList.add(index1);
//        indexList.add(index2);
//        indexList.add(index3);
//        indexList.add(index4);
//        indexList.add(index5);
//
//        for(String  index :  indexList){
//            try {
//                client.admin().indices().delete(new DeleteIndexRequest(index)).actionGet();
//                LOGGER.info("ES索引" + index + "删除结束:");
//            } catch (IndexNotFoundException e){
//                LOGGER.info("ES索引" + index + "不存在!!!");
//            }
//        }

        /**
         * 功能测试5:
         * 获取ES所有索引
         */
//        String[] indexList = client.admin().cluster().prepareState().execute().actionGet().getState().getMetaData().concreteAllIndices();
//        int parameterNumber = 0;
//        System.out.println("Index List:");
//        for (String index : indexList) {
//            System.out.println(index);
//            parameterNumber ++;
//        }
//        System.out.print("ES索引总数量" + parameterNumber);

        /**
         * 功能测试6:
         * 截取ES索引,并获取相应的信息(年月日)
         */
        String[] esIndex = "".split("-");
        String tableName = esIndex[0].trim();
        String lastTime = esIndex[2].trim();

        System.out.println("tableName:" + tableName);
        System.out.println("lastTime:" + lastTime);

        String year = lastTime.substring(0, 4);
        String month = lastTime.substring(4, 6);
        String day = lastTime.substring(6, 8);
        System.out.println("year:" + year);
        System.out.println("month:" + month);
        System.out.println("day:" + day);

        String yearMonthDay = lastTime.substring(0, 8);
        System.out.println("解析之后的值:" +tableName + "-" + lastTime + "-" + WeekDayUtil.getNextDate(yearMonthDay,"星期四") );
        System.out.println(tableName + "-" + lastTime + "-" + WeekDayUtil.getNextDate(yearMonthDay,"星期四") + "00");

        Calendar calendar = Calendar.getInstance();
        calendar.set(Calendar.MONTH, Integer.parseInt(month));
        calendar.set(Calendar.YEAR, Integer.parseInt(year));

        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
//        calendar.add(Calendar.MONTH, 1);
        calendar.set(Calendar.DAY_OF_MONTH, 0);
        System.out.println("当前月份最后一天时间1:" + format.format(calendar.getTime()));
        String lastTime_temp = format.format(calendar.getTime()) + " 23:59:59";
        System.out.println("当前月份最后一天时间2:" + lastTime_temp);

        SimpleDateFormat format2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        format2.parse(lastTime_temp).getTime();
        System.out.println("当前月份最后一天时间3:" + format2.parse(lastTime_temp).getTime());
        System.out.println("当前月份最后一天时间4:" + format2.format(format2.parse(lastTime_temp).getTime()));
    }
}
原文地址:https://www.cnblogs.com/jinniezheng/p/6387737.html