Elasticsearch Java High Level REST Client工具类

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.ultiwill</groupId>
    <artifactId>ultiwill-es7</artifactId>
    <version>1.0-SNAPSHOT</version>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <properties>
        <logback.version>1.2.3</logback.version>
        <slf4j.version>1.7.26</slf4j.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
            <version>7.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>7.8.0</version>
        </dependency>

        <!--日志-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>${logback.version}</version>
        </dependency>
        <!--json-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.32</version>
        </dependency>
    </dependencies>



</project>
View Code

EsClientUtil:

package com.ultiwill.utils;

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.TimeUnit;

/**
 * @author chong.zuo
 * @date 2020/8/3 17:18
 */
public class EsClientUtil {
    private static final Logger logger = LoggerFactory.getLogger(EsClientUtil.class);

    /**
     * 每次都取client太耗时,大约需要2秒左右,所以只取一次,放在内存中,不关闭,一直用
     */
    private static RestHighLevelClient client;
    private static BulkProcessor bulkProcessor;

    /**
     * 组装ES的hosts
     *
     * @return
     */
    private static HttpHost[] assembleESAddress() {
        HttpHost httpHost1 = new HttpHost("192.168.100.110", 9201, "http");
        HttpHost httpHost2 = new HttpHost("192.168.100.110", 9202, "http");
        List<HttpHost> list = new ArrayList<HttpHost>();
        list.add(httpHost1);
        list.add(httpHost2);
        HttpHost[] ipHost = new HttpHost[list.size()];
        HttpHost[] httpHosts = list.toArray(ipHost);
        return httpHosts;
    }

    /**
     * 获取client连接
     */
    public static RestHighLevelClient getClient() {
        if (client == null) {
            synchronized (EsClientUtil.class) {
                try {
                    if (client == null) {
                        /** 用户认证对象 */
                        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                        /** 设置账号密码 */
                        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic", "123456"));
                        /** 创建rest client对象 */
                        RestClientBuilder builder = RestClient.builder(assembleESAddress())
                                .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                                    @Override
                                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                                        return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                                    }
                                });
                        client = new RestHighLevelClient(builder);
                    }
                } catch (Exception e) {
                    logger.error("EsClient创建失败...." + client, e);
                }
            }
        }
        return client;
    }


    /**
     * 关闭client连接
     */
    public static void closeClient() {
        if (client != null) {
            synchronized (EsClientUtil.class) {
                try {
                    client.close();
                    logger.info("ES Client 关闭成功...");
                } catch (Exception e) {
                    logger.error("ES Client关闭失败...", e);
                }
            }
        }
    }


    /**
     * 单条保存
     *
     * @param index
     * @param id
     * @param m
     */
    private static void saveData(String index, String id, Map<String, Object> m) {
        try {
            RestHighLevelClient client = getClient();
            IndexRequest indexRequest = new IndexRequest(index)
                    .id(id)
                    .source(m);
            client.index(indexRequest, RequestOptions.DEFAULT);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    /**
     * 获取单例 BulkProcessor 批量处理类
     */
    public static BulkProcessor getBulkProcessor() {
        if (bulkProcessor == null) {
            synchronized (EsClientUtil.class) {
                try {
                    if (bulkProcessor == null) {
                        bulkProcessor = bulkProcessor(getClient());
                    }
                } catch (Exception e) {
                    logger.error("BulkProcessor创建失败...." + bulkProcessor, e);
                }
            }
        }
        return bulkProcessor;
    }


    /**
     * 实例化 BulkProcessor
     *
     * @param client
     * @return
     */
    public static BulkProcessor bulkProcessor(RestHighLevelClient client) {
        BulkProcessor.Listener listener = new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId, BulkRequest request) {
                //bulk请求前执行
                int numberOfActions = request.numberOfActions();
                logger.info("ES Executing bulk [{}] with {} request ", executionId, numberOfActions);
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                //bulk请求后执行
                if (response.hasFailures()) {
                    logger.error("ES Bulk [{}] executed with failures ", +executionId);
                } else {
                    logger.info("ES Bulk [{}] completed in {} milliseconds ", executionId, response.getTook().getMillis());
                }
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                // 失败后执行
                logger.error("ES Bulk Failed to execute bulk ", failure);
            }
        };

        BulkProcessor bulkProcessor = BulkProcessor.builder(
                (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
                listener)
                //  达到刷新的条数
                .setBulkActions(20000)
                // 达到 刷新的大小
                .setBulkSize(new ByteSizeValue(10, ByteSizeUnit.MB))
                // 固定刷新的时间频率
                .setFlushInterval(TimeValue.timeValueSeconds(300))
                //并发线程数
                .setConcurrentRequests(5)
                // 重试补偿策略
                .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)).build();
        return bulkProcessor;
    }


    public static void main(String[] args) {
        Date d = new Date();
        String id = d.getTime() + "";
        id = "12356789";
        Map<String, Object> m = new HashMap<String, Object>(16);
        m.put("id", id);
        m.put("area_id", 1);
        m.put("camera_id", 1);
        m.put("log_time", new Date().toString());
        m.put("age", 1);
        EsClientUtil.saveData("global_house_list", id, m);
        EsClientUtil.closeClient();

        /*BulkProcessor bulkProcessor = EsClientUtil.getBulkProcessor();
        IndexRequest one = new IndexRequest("posts").id("1").source(m);
        bulkProcessor.add(one);*/
    }


}
View Code

saveToEsBulk:

  /**
    * 使用bulkProcessor 批量写数据入es
  * */
  def save2ESByBulkProcessor(messages:util.ArrayList[util.Map[String,Any]],taskName:String):Unit={
    if(messages.nonEmpty){
      val bulk = IceBockEsConfig.getBulkProcessor(taskName)
      for(message <- messages){
        val indexRequest = new IndexRequest()
        indexRequest.index(message.remove("indexName").toString).`type`(message.remove("indexType").toString).opType(DocWriteRequest.OpType.INDEX)
        if (message.containsKey("indexId")){
          indexRequest.id(message.remove("indexId").toString)
        }
        indexRequest.source(message)
        bulk.add(indexRequest)
      }
    }
  }
View Code
原文地址:https://www.cnblogs.com/chong-zuo3322/p/13435042.html