spring Boot 整合 Elasticsearch

先来几个测试方法, pom在最下面:

完整代码 : https://github.com/lifan12589/infinite-possibilities/tree/master/springboot_stu/es-Api

创建索引:

    @Test
    @SneakyThrows
    public void createIndex() {
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("localhost", 9200, "http"),
                        new HttpHost("localhost", 9201, "http"),
                        new HttpHost("localhost", 9202, "http")));

        CreateIndexRequest request = new CreateIndexRequest("test_index");

        request.settings(Settings.builder()
                .put("index.number_of_shards", 3)//3个分片
                .put("index.number_of_replicas", 2)//2个副本
        );
        CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
        if (createIndexResponse.isAcknowledged()) {
            System.out.println("创建index成功!");
        } else {
            System.out.println("创建index失败!");
        }

        client.close();
    }

查询索引名称:

    @Test
    @SneakyThrows
    public void getIndex() {
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("localhost", 9200, "http"),
                        new HttpHost("localhost", 9201, "http"),
                        new HttpHost("localhost", 9202, "http")));

        GetIndexRequest request = new GetIndexRequest("se*");//查询se开头的索引名称
        GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT);
        String[] indices = response.getIndices();
        for (String indexName : indices) {
            System.out.println("index name:" + indexName);
        }
        client.close();
    }

删除索引:

    @Test
    @SneakyThrows
    public void delIndex() {
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("localhost", 9200, "http"),
                        new HttpHost("localhost", 9201, "http"),
                        new HttpHost("localhost", 9202, "http")));
        DeleteIndexRequest request = new DeleteIndexRequest("test_index");//要删除的索引名称
        AcknowledgedResponse response = client.indices().delete(request, RequestOptions.DEFAULT);
        if (response.isAcknowledged()) {
            System.out.println("删除index成功!");
        } else {
            System.out.println("删除index失败!");
        }
        client.close();
    }

插入数据:

@Test
    @SneakyThrows
    public void insertData() {
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("localhost", 9200, "http"),
                        new HttpHost("localhost", 9201, "http"),
                        new HttpHost("localhost", 9202, "http")));

        //从数据库查询
        List<ProductInfo> list = productInfoMapper.selectAll();

        System.out.println("list : "+list);
        //插入数据,index不存在则自动根据匹配到的template创建。index没必要每天创建一个,如果是为了灵活管理,最低建议每月一个 yyyyMM。
        IndexRequest request = new IndexRequest("index_product");
        //最好不要自定义id 会影响插入速度。
        ProductInfo product = list.get(0);
        Gson gson = new Gson();
        request.id(product.getId().toString());
        request.source(gson.toJson(product)
                , XContentType.JSON);
        IndexResponse response = client.index(request, RequestOptions.DEFAULT);
        System.out.println(response);
        client.close();
    }

批量插入:

@Test
    @SneakyThrows
    public void batchInsertData() {
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("localhost", 9200, "http"),
                        new HttpHost("localhost", 9201, "http"),
                        new HttpHost("localhost", 9202, "http")));
        //查询数据库  批量插入数据,更新和删除同理
        BulkRequest request = new BulkRequest("index_product");
        Gson gson = new Gson();
        //从数据库查询
        List<ProductInfo> lists = productInfoMapper.selectAll();

        for (ProductInfo list: lists) {
            System.out.println(gson.toJson(list));
            request.add(new IndexRequest().source(gson.toJson(list)
                    , XContentType.JSON));
        }

        BulkResponse response = client.bulk(request, RequestOptions.DEFAULT);
        System.out.println("数量:" + response.getItems().length);
        client.close();
    }

根据 id 查询:

@Test
    @SneakyThrows
    public void getById() {
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("localhost", 9200, "http"),
                        new HttpHost("localhost", 9201, "http"),
                        new HttpHost("localhost", 9202, "http")));
        //注意 这里查询使用的是别名。
        GetRequest request = new GetRequest("index_product", "mQYKzncBSw_3e2MkFkmH");

        //只查询特定字段。如果需要查询所有字段则不设置该项。
        String[] includes = {"itemname", "itemcode","inputDate"};
        String[] excludes = {"desc"};
        FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
        request.fetchSourceContext(fetchSourceContext);

        GetResponse response = client.get(request, RequestOptions.DEFAULT);
        System.out.println(response);
        client.close();

    }

根据 id 查询多条数据:

 @Test
    public void multiGetById() throws IOException {
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("localhost", 9200, "http"),
                        new HttpHost("localhost", 9201, "http"),
                        new HttpHost("localhost", 9202, "http")));
        //多个结果  根据id查询
        MultiGetRequest request = new MultiGetRequest();

        request.add("index_product", "8Sj00XcB2CbVnAcWca2M");
        request.add("index_product", "9Sj00XcB2CbVnAcWca2M");
        //两种写法
//        request.add(new MultiGetRequest.Item(
//                "test_index",
//                "ewbmzXcBSw_3e2MkIkk-"));
        MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);
        for (MultiGetItemResponse itemResponse : response) {
            System.out.println(itemResponse.getResponse().getSourceAsString());
        }
        client.close();
    }

根据 id 删除:

  @Test
    public void delById() throws IOException {
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("localhost", 9200, "http"),
                        new HttpHost("localhost", 9201, "http"),
                        new HttpHost("localhost", 9202, "http")));
        DeleteRequest request = new DeleteRequest("ac-new", "6");
        DeleteResponse response = client.delete(request, RequestOptions.DEFAULT);
        System.out.println(response);
        client.close();
    }

根据某个属性就行批量更改:

@Test
    public void updateByQuery() throws IOException {
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("localhost", 9200, "http"),
                        new HttpHost("localhost", 9201, "http"),
                        new HttpHost("localhost", 9202, "http")));
        UpdateByQueryRequest request = new UpdateByQueryRequest("index_product");
        //默认情况下,版本冲突会中止 UpdateByQueryRequest 进程,但是你可以用以下命令来代替
        //设置版本冲突继续
//        request.setConflicts("proceed");

        //设置更新条件   加 keyword 可以精确匹配,不会被分词
        request.setQuery(QueryBuilders.matchQuery("itemname.keyword","具体属性值"));
//        //限制更新条数
        request.setMaxDocs(8);
        request.setScript(
                new Script(ScriptType.INLINE, "painless", "ctx._source.itemname+='#';", Collections.emptyMap()));
        BulkByScrollResponse response = client.updateByQuery(request, RequestOptions.DEFAULT);
        System.out.println(response);
        client.close();
    }

嗅探器:

@Test
    public void sniffer() throws IOException {

        // region 监听器
        SniffOnFailureListener sniffOnFailureListener = new SniffOnFailureListener();

        //获取客户端
        RestClient restClient = RestClient.builder(
                new HttpHost("localhost",9200,"http"),
                new HttpHost("localhost",9201,"http"),
                new HttpHost("localhost",9202,"http"))
                .setFailureListener(sniffOnFailureListener)
                .build();

        //设置 https
        NodesSniffer nodesSniffer = new ElasticsearchNodesSniffer(
                restClient,
                ElasticsearchNodesSniffer.DEFAULT_SNIFF_REQUEST_TIMEOUT,
                ElasticsearchNodesSniffer.Scheme.HTTPS
        );

        //为 restClient 绑定 嗅探器
        Sniffer sniffer = Sniffer.builder(restClient)
                .setSniffIntervalMillis(5000)  //设置连续两次普通嗅探执行之间的间隔(以毫秒为单位)
                .setSniffAfterFailureDelayMillis(30000)  //设置失败后计划执行嗅探的延迟(以毫秒为单位)
                .setNodesSniffer(nodesSniffer)
                .build();

        sniffOnFailureListener.setSniffer(sniffer);

        //先关嗅探器, 后关客户端
        sniffer.close();
        restClient.close();
    }

查看节点信息:

 @Test
    @SneakyThrows
    public void snifferT(){
        RestHighLevelClient client = ESClient.getEsClient().getHighLevelClient();

        Iterator<Node> nodes = client.getLowLevelClient().getNodes().iterator();
        while (nodes.hasNext()){
            Node node = nodes.next();
            System.out.println("初始化节点 : " + node);
        }

        Thread.sleep(60000);
        System.out.println("准备二次扫描:");
        nodes = client.getLowLevelClient().getNodes().iterator();
        while (nodes.hasNext()){
            Node node = nodes.next();
            System.out.println("二次扫描 : "+node);
        }

        Thread.sleep(60000);
        System.out.println("准备三次扫描:");
        nodes = client.getLowLevelClient().getNodes().iterator();
        while (nodes.hasNext()){
            Node node = nodes.next();
            System.out.println("三次扫描 : "+node);
        }
        ESClient.getEsClient().closeClient();

    }

查询数据库数据,插入ES:

@Test
    @SneakyThrows
    public void bulkInit(){
        RestHighLevelClient client = ESClient.getEsClient().getHighLevelClient();
        GetIndexRequest request = new GetIndexRequest("bulk_index");
        Boolean exists = client.indices().exists(request,RequestOptions.DEFAULT);

        if(!exists) {
            CreateIndexRequest createIndexRequest = new CreateIndexRequest("bulk_index");
            createIndexRequest.settings(Settings.builder()
                    .put("index.number_of_shards", 3) //3个 主分片
                    .put("index.number_of_replicas", 2)); //2个副本 (每个主分片对应2个副本)

            CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest,RequestOptions.DEFAULT);
            int i = createIndexResponse.hashCode();
            boolean acknowledged = createIndexResponse.isAcknowledged();
            System.out.println("创建索引 ACK : "+acknowledged+"   索引 code : "+i);
        }

            List<BulkInfo> list = bulkInfoMapper.selectAll();

            BulkRequest bulkRequest = new BulkRequest("bulk_index");
            Gson gson = new Gson();
            for (int i=0;i<list.size();i++){

                //数据库时间转换
                Date date = new Date(list.get(i).getInputDate().getTime());
                SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd");
                String sd = sdf.format(date);
                //替换时间字段
                String gsons = gson.toJson(list.get(i));
                JSONObject json = new JSONObject(gsons);
                json.put("inputDate",sd);

                bulkRequest.add(new IndexRequest().id(Integer.toString(i)).source(json.toString(),XContentType.JSON));
            }

            BulkResponse response = client.bulk(bulkRequest,RequestOptions.DEFAULT);
            System.out.println("插入条数 : "+response.getItems().length);

           ESClient.getEsClient().closeClient();

    }

ESClient:

package com.infinitePossibilities.util;

import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.sniff.Sniffer;
import java.io.IOException;

public class ESClient {

    private static ESClient esClient;
    private String host = "localhost:9200,localhost:9201,localhost:9202";
    private RestClientBuilder builder;
    private static Sniffer sniffer;
    private  static RestHighLevelClient highLevelClient;

    private ESClient(){

    }

    public static ESClient getEsClient(){

        if(esClient == null){

            synchronized (ESClient.class){
                if(esClient == null){
                   esClient = new ESClient();
                   esClient.initBuilder();
                }
            }
        }
       return esClient;
    }


    public RestClientBuilder initBuilder(){

        String [] hosts = host.split(",");

        HttpHost[] httpHosts = new HttpHost[hosts.length];
        for(int i = 0;i<hosts.length;i++){
            String [] host = hosts[i].split(":");
            httpHosts[i] = new HttpHost(host[0],Integer.parseInt(host[1]),"http");
        }

        //region 在Builder中设置请求头
        //  1.设置请求头
        builder = RestClient.builder(httpHosts);
        Header[] defaultHeader = new Header[]{
                new BasicHeader("Content-type","application/json")
        };
        builder.setDefaultHeaders(defaultHeader);

        return builder;
    }

    public RestHighLevelClient getHighLevelClient(){

        if(highLevelClient == null){
            synchronized (ESClient.class){
                if(highLevelClient == null){
                    highLevelClient = new RestHighLevelClient(builder);
                    //开启嗅探器
                    sniffer = Sniffer.builder(highLevelClient.getLowLevelClient())
                            .setSniffIntervalMillis(5000)  //设置连续两次普通嗅探执行之间的间隔(以毫秒为单位)
                            .setSniffAfterFailureDelayMillis(15000)  //设置失败后计划执行嗅探的延迟(以毫秒为单位)
                            .build();
                }
            }
        }
        return highLevelClient;
    }

    public void closeClient(){

        if(null!=highLevelClient){

            try {
                sniffer.close();
                highLevelClient.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}

pom :

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.infinitePossibilities</groupId>
    <artifactId>es</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>es</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.mybatis.generator</groupId>
            <artifactId>mybatis-generator-maven-plugin</artifactId>
            <version>1.4.0</version>
            <type>maven-plugin</type>
        </dependency>

        <!--MyBatis-Plus启动器
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.3.1</version>
        </dependency>    -->

        <!-- mysql:MyBatis相关依赖-->
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.0.0</version>
        </dependency>

        <!--        ES transport client-->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>transport</artifactId>
            <version>7.6.2</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.6.2</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client-sniffer</artifactId>
            <version>7.6.2</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
            <version>7.6.2</version>
        </dependency>

        <!--        mysql-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <!-- mysql:阿里巴巴数据库连接池 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.12</version>
        </dependency>

        <!--        lombok-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.6</version>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.5</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.60</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
原文地址:https://www.cnblogs.com/lifan12589/p/14823257.html