Elasticsearch Java API 很全的整理以及架构剖析

Elasticsearch 的API 分为 REST Client API(http请求形式)以及 transportClient API两种。相比来说transportClient API效率更高,transportClient 是通过Elasticsearch内部RPC的形式进行请求的,连接可以是一个长连接,相当于是把客户端的请求当成

Elasticsearch 集群的一个节点,当然 REST Client API 也支持http keepAlive形式的长连接,只是非内部RPC形式。但是从Elasticsearch 7 后就会移除transportClient 。主要原因是transportClient 难以向下兼容版本。

本文中所有的讲解和操作都是基于jdk 1.8 和elasticsearch 6.2.4版本。

备注:本文参考了很多Elasticsearch 的官方文档以及部l网络资料做的综合整理。  本人github 参考代码:https://github.com/597365581/bigdata_tools/tree/master/yongqing-bigdata-tools/yongqing-elasticsearch-tool

一、High REST Client

High Client 基于 Low Client, 主要目的是暴露一些 API,这些 API 可以接受请求对象为参数,返回响应对象,而对请求和响应细节的处理都是由 client 自动完成的。

API 在调用时都可以是同步或者异步两种形式
同步 API 会导致阻塞,一直等待数据返回
异步 API 在命名上会加上 async 后缀,需要有一个 listener 作为参数,等这个请求返回结果或者发生错误时,这个 listener 就会被调用,listener主要是解决自动回调的问题,有点像安卓 开发里面的listener监听回调。

Elasticsearch REST APi 官方 地址:https://www.elastic.co/guide/en/elasticsearch/reference/6.2/index.html

Maven 依赖

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.2.4</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.2.4</version>
</dependency>

client初始化:

RestHighLevelClient 实例依赖 REST low-level client builder

public class ElasticSearchClient {
private String[] hostsAndPorts;

public ElasticSearchClient(String[] hostsAndPorts) {
this.hostsAndPorts = hostsAndPorts;
}
public RestHighLevelClient getClient() {
        RestHighLevelClient client = null;
        List<HttpHost> httpHosts = new ArrayList<HttpHost>();
        if (hostsAndPorts.length > 0) {
            for (String hostsAndPort : hostsAndPorts) {
                String[] hp = hostsAndPort.split(":");
                httpHosts.add(new HttpHost(hp[0], Integer.valueOf(hp[1]), "http"));
            }
            client = new RestHighLevelClient(
                    RestClient.builder(httpHosts.toArray(new HttpHost[0])));
        } else {
            client = new RestHighLevelClient(
                    RestClient.builder(new HttpHost("127.0.0.1", 9200, "http")));
        }
        return client;
    }
}

文档 API(High level rest 客户端支持下面的 文档(Document) API):

  • 单文档 API:
  • index API
  • Get API
  • Delete API
  • Update API
  • 多文档 API:
  • Bulk API
  • Multi-Get API

1、Index API:
IndexRequest:
封装好的参考方法:

private IndexRequest getIndexRequest(String index, String indexType, String docId, Map<String, Object> dataMap) {
        IndexRequest indexRequest = null;
        if (null == index || null == indexType) {
            throw new ElasticsearchException("index or indexType must not be null");
        }
        if (null == docId) {
            indexRequest = new IndexRequest(index, indexType);
        } else {
            indexRequest = new IndexRequest(index, indexType, docId);
        }
        return indexRequest;
    }

    /**
     * 同步执行索引
     *
     * @param index
     * @param indexType
     * @param docId
     * @param dataMap
     * @throws IOException
     */
    public IndexResponse execIndex(String index, String indexType, String docId, Map<String, Object> dataMap) throws IOException {
        return getClient().index(getIndexRequest(index, indexType, docId, dataMap).source(dataMap));
    }

    /**
     * 异步执行
     *
     * @param index
     * @param indexType
     * @param docId
     * @param dataMap
     * @param indexResponseActionListener
     * @throws IOException
     */
    public void asyncExecIndex(String index, String indexType, String docId, Map<String, Object> dataMap, ActionListener<IndexResponse> indexResponseActionListener) throws IOException {
        getClient().indexAsync(getIndexRequest(index, indexType, docId, dataMap).source(dataMap), indexResponseActionListener);
    }

API解释:  

IndexRequest request = new IndexRequest(
        "posts",  // 索引 Index
        "doc",  // Type 
        "1");  // 文档 Document Id 
String jsonString = "{" +
        ""user":"kimchy"," +
        ""postDate":"2013-01-30"," +
        ""message":"trying out Elasticsearch"" +
        "}";
request.source(jsonString, XContentType.JSON); // 文档源格式为 json string

Document Source
document source 可以是下面的格式

本文作者:张永清,转载请注明出处:Elasticsearch Java API 很全的整理

Map类型的输入:

Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "trying out Elasticsearch");
IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
        .source(jsonMap);  // 会自动将 Map 转换为 JSON 格式

XContentBuilder : 这是 Document Source 提供的帮助类,专门用来产生 json 格式的数据:

XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
    builder.field("user", "kimchy");
    builder.timeField("postDate", new Date());
    builder.field("message", "trying out Elasticsearch");
}
builder.endObject();
IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
        .source(builder);

Object 键对:

IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
        .source("user", "kimchy",
                "postDate", new Date(),
                "message", "trying out Elasticsearch"); 

同步索引:

IndexResponse indexResponse = client.index(request);

异步索引:异步执行函数需要添加 listener, 而对于 index 而言,这个 listener 的类型就是 ActionListener

client.indexAsync(request, listener); 

异步方法执行后会立刻返回,在索引操作执行完成后,ActionListener 就会被回调:

执行成功,调用 onResponse 函数
执行失败,调用 onFailure 函数

ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
    @Override
    public void onResponse(IndexResponse indexResponse) {
        
    }

    @Override
    public void onFailure(Exception e) {
        
    }
};

IndexResponse:
不管是同步回调还是异步回调,如果调用成功,都会返回 IndexRespose 对象。 

String index = indexResponse.getIndex();
String type = indexResponse.getType();
String id = indexResponse.getId();
long version = indexResponse.getVersion();
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
   // 文档第一次创建 
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
   // 文档之前已存在,当前是重写
}
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
    // 成功的分片数量少于总分片数量 
}
if (shardInfo.getFailed() > 0) {
    for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
        String reason = failure.reason();  // 处理潜在的失败信息
    }
}

在索引时有版本冲突的话,会抛出 ElasticsearchException

IndexRequest request = new IndexRequest("posts", "doc", "1")
        .source("field", "value")
        .version(1); // 这里是文档版本号
try {
    IndexResponse response = client.index(request);
} catch(ElasticsearchException e) {
    if (e.status() == RestStatus.CONFLICT) {
       // 冲突了 
    }
}

如果将 opType 设置为 create, 而且如果索引的文档与已存在的文档在 index, type 和 id 上均相同,也会抛出冲突异常。

IndexRequest request = new IndexRequest("posts", "doc", "1")
        .source("field", "value")
        .opType(DocWriteRequest.OpType.CREATE);
try {
    IndexResponse response = client.index(request);
} catch(ElasticsearchException e) {
    if (e.status() == RestStatus.CONFLICT) {
        
    }
}

2、GET API
GET 请求
每个 GET 请求都必须需传入下面 3 个参数:

  • Index
  • Type
  • Document id
GetRequest getRequest = new GetRequest(
        "posts", 
        "doc",  
        "1");  

可选参数
下面的参数都是可选的, 里面的选项并不完整,如要获取完整的属性,请参考 官方文档

不获取源数据,默认是获取的

request.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE); 

配置返回数据中包含指定字段

String[] includes = new String[]{"message", "*Date"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext =
        new FetchSourceContext(true, includes, excludes);
request.fetchSourceContext(fetchSourceContext); 

配置返回数据中排除指定字段

String[] includes = Strings.EMPTY_ARRAY;
String[] excludes = new String[]{"message"};
FetchSourceContext fetchSourceContext =
        new FetchSourceContext(true, includes, excludes);
request.fetchSourceContext(fetchSourceContext); 

实时 默认为 true

request.realtime(false);

版本

request.version(2); 

版本类型

request.versionType(VersionType.EXTERNAL);

同步执行

GetResponse getResponse = client.get(getRequest);

异步执行
此部分与 index 相似, 只有一点不同, 返回类型为 GetResponse

Get Response
返回的 GetResponse 对象包含要请求的文档数据(包含元数据和字段)

String index = getResponse.getIndex();
String type = getResponse.getType();
String id = getResponse.getId();
if (getResponse.isExists()) {
    long version = getResponse.getVersion();
    String sourceAsString = getResponse.getSourceAsString(); // string 形式   
    Map<String, Object> sourceAsMap = getResponse.getSourceAsMap(); // map 
    byte[] sourceAsBytes = getResponse.getSourceAsBytes(); // 字节形式 
} else {
   // 没有发现请求的文档 
}

在请求中如果包含特定的文档版本,如果与已存在的文档版本不匹配, 就会出现冲突

try {
    GetRequest request = new GetRequest("posts", "doc", "1").version(2);
    GetResponse getResponse = client.get(request);
} catch (ElasticsearchException exception) {
    if (exception.status() == RestStatus.CONFLICT) {
        // 版本冲突        
    }
}
封装好的参考方法:
  /**
     * @param index
     * @param indexType
     * @param docId
     * @param includes  返回需要包含的字段,可以传入空
     * @param excludes  返回需要不包含的字段,可以传入为空
     * @param excludes  version
     * @param excludes  versionType
     * @return
     * @throws IOException
     */

    public GetResponse getRequest(String index, String indexType, String docId, String[] includes, String[] excludes, Integer version, VersionType versionType) throws IOException {
        if (null == includes || includes.length == 0) {
            includes = Strings.EMPTY_ARRAY;
        }
        if (null == excludes || excludes.length == 0) {
            excludes = Strings.EMPTY_ARRAY;
        }
        GetRequest getRequest = new GetRequest(index, indexType, docId);
        FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
        getRequest.realtime(true);
        if (null != version) {
            getRequest.version(version);
        }
        if (null != versionType) {
            getRequest.versionType(versionType);
        }
        return getClient().get(getRequest.fetchSourceContext(fetchSourceContext));
    }

    /**
     * @param index
     * @param indexType
     * @param docId
     * @param includes
     * @param excludes
     * @return
     * @throws IOException
     */

    public GetResponse getRequest(String index, String indexType, String docId, String[] includes, String[] excludes) throws IOException {
        return getRequest(index, indexType, docId, includes, excludes, null, null);
    }

    /**
     * @param index
     * @param indexType
     * @param docId
     * @return
     * @throws IOException
     */
    public GetResponse getRequest(String index, String indexType, String docId) throws IOException {
        GetRequest getRequest = new GetRequest(index, indexType, docId);
        return getClient().get(getRequest);
    }

3、Exists API

如果文档存在 Exists API 返回 true, 否则返回 fasle。

Exists Request

GetRequest 用法和 Get API 差不多,两个对象的可选参数是相同的。由于 exists() 方法只返回 true 或者 false, 建议将获取 _source 以及任何存储字段的值关闭,尽量使请求轻量级。

GetRequest getRequest = new GetRequest(
    "posts",  // Index
    "doc",    // Type
    "1");     // Document id
getRequest.fetchSourceContext(new FetchSourceContext(false));  // 禁用 _source 字段
getRequest.storedFields("_none_"); // 禁止存储任何字段   

同步请求

boolean exists = client.exists(getRequest);

异步请求
异步请求与 Index API 相似,此处不赘述,只粘贴代码。如需详细了解,请参阅官方地址

ActionListener<Boolean> listener = new ActionListener<Boolean>() {
    @Override
    public void onResponse(Boolean exists) {
        
    }

    @Override
    public void onFailure(Exception e) {
        
    }
};

client.existsAsync(getRequest, listener); 

封装的参考方法:

   /**
     * @param index
     * @param indexType
     * @param docId
     * @return
     * @throws IOException
     */
    public Boolean existDoc(String index, String indexType, String docId) throws IOException {
        GetRequest getRequest = new GetRequest(index, indexType, docId);
        getRequest.fetchSourceContext(new FetchSourceContext(false));
        getRequest.storedFields("_none_");
        return getClient().exists(getRequest);
    }

4、Delete API

Delete Request
DeleteRequest 必须传入下面参数

DeleteRequest request = new DeleteRequest(
        "posts",   // index 
        "doc",     // doc
        "1");      // document id

可选参数
超时时间

request.timeout(TimeValue.timeValueMinutes(2)); 
request.timeout("2m"); 

刷新策略

request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
request.setRefreshPolicy("wait_for");    

版本

request.version(2); 

版本类型

request.versionType(VersionType.EXTERNAL); 
同步执行
DeleteResponse deleteResponse = client.delete(request);

异步执行

ActionListener<DeleteResponse> listener = new ActionListener<DeleteResponse>() {
    @Override
    public void onResponse(DeleteResponse deleteResponse) {
        
    }

    @Override
    public void onFailure(Exception e) {
        
    }
};


client.deleteAsync(request, listener);
Delete Response

 

DeleteResponse 可以检索执行操作的信息

String index = deleteResponse.getIndex();
String type = deleteResponse.getType();
String id = deleteResponse.getId();
long version = deleteResponse.getVersion();
ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
    // 成功分片数目小于总分片
}
if (shardInfo.getFailed() > 0) {
    for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
        String reason = failure.reason(); // 处理潜在失败
    }
}

也可以来检查文档是否存在

DeleteRequest request = new DeleteRequest("posts", "doc", "does_not_exist");
DeleteResponse deleteResponse = client.delete(request);
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
    // 文档不存在
}
版本冲突时也会抛出 `ElasticsearchException

try {
    DeleteRequest request = new DeleteRequest("posts", "doc", "1").version(2);
    DeleteResponse deleteResponse = client.delete(request);
} catch (ElasticsearchException exception) {
    if (exception.status() == RestStatus.CONFLICT) {
        // 版本冲突
    }
}

封装好的参考方法:

本文作者:张永清,转载请注明出处:Elasticsearch Java API 很全的整理

  /**
     * @param index
     * @param indexType
     * @param docId
     * @param timeValue
     * @param refreshPolicy
     * @param version
     * @param versionType
     * @return
     * @throws IOException
     */
    public DeleteResponse deleteDoc(String index, String indexType, String docId, TimeValue timeValue, WriteRequest.RefreshPolicy refreshPolicy, Integer version, VersionType versionType) throws IOException {
        DeleteRequest deleteRequest = new DeleteRequest(index, indexType, docId);
        if (null != timeValue) {
            deleteRequest.timeout(timeValue);
        }
        if (null != refreshPolicy) {
            deleteRequest.setRefreshPolicy(refreshPolicy);
        }
        if (null != version) {
            deleteRequest.version(version);
        }
        if (null != versionType) {
            deleteRequest.versionType(versionType);
        }
        return getClient().delete(deleteRequest);
    }

    /**
     * @param index
     * @param indexType
     * @param docId
     * @return
     * @throws IOException
     */
    public DeleteResponse deleteDoc(String index, String indexType, String docId) throws IOException {
        return deleteDoc(index, indexType, docId, null, null, null, null);
    }

5、Update API

Update Request
UpdateRequest 的必需参数如下

UpdateRequest request = new UpdateRequest(
        "posts",  // Index
        "doc",  // 类型
        "1");   // 文档 Id

使用脚本更新

部分文档更新:
在更新部分文档时,已存在文档与部分文档会合并。

部分文档可以有以下形式:

JSON 格式:

UpdateRequest request = new UpdateRequest("posts", "doc", "1");
String jsonString = "{" +
        ""updated":"2017-01-01"," +
        ""reason":"daily update"" +
        "}";
request.doc(jsonString, XContentType.JSON); 

Map 格式:

Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("updated", new Date());
jsonMap.put("reason", "daily update");
UpdateRequest request = new UpdateRequest("posts", "doc", "1")
        .doc(jsonMap); 

XContentBuilder 对象:

XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
    builder.timeField("updated", new Date());
    builder.field("reason", "daily update");
}
builder.endObject();
UpdateRequest request = new UpdateRequest("posts", "doc", "1")
        .doc(builder);  
Object key-pairs

UpdateRequest request = new UpdateRequest("posts", "doc", "1")
        .doc("updated", new Date(),
             "reason", "daily update"); 

Upserts:如果文档不存在,可以使用 upserts 方法将文档以新文档的方式创建。

UpdateRequest request = new UpdateRequest("posts", "doc", "1")
        .doc("updated", new Date(),
             "reason", "daily update"); 

upserts 方法支持的文档格式与 update 方法相同。

可选参数:
超时时间

request.timeout(TimeValue.timeValueSeconds(1)); 
request.timeout("1s"); 

刷新策略

request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
request.setRefreshPolicy("wait_for");  

冲突后重试次数

request.retryOnConflict(3);

获取数据源,默认是开启的

request.fetchSource(true); 

包括特定字段

String[] includes = new String[]{"updated", "r*"};
String[] excludes = Strings.EMPTY_ARRAY;
request.fetchSource(new FetchSourceContext(true, includes, excludes)); 

排除特定字段

String[] includes = Strings.EMPTY_ARRAY;
String[] excludes = new String[]{"updated"};
request.fetchSource(new FetchSourceContext(true, includes, excludes)); 

指定版本

request.version(2); 

禁用 noop detection

request.scriptedUpsert(true); 

设置如果更新的文档不存在,就必须要创建一个

request.docAsUpsert(true); 

同步执行

UpdateResponse updateResponse = client.update(request);

异步执行

ActionListener<UpdateResponse> listener = new ActionListener<UpdateResponse>() {
    @Override
    public void onResponse(UpdateResponse updateResponse) {
        
    }

    @Override
    public void onFailure(Exception e) {
        
    }
};

client.updateAsync(request, listener); 

Update Response

String index = updateResponse.getIndex();
String type = updateResponse.getType();
String id = updateResponse.getId();
long version = updateResponse.getVersion();
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
    // 文档已创建
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
    // 文档已更新
} else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
    // 文档已删除
} else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
    // 文档不受更新的影响
}

如果在 UpdateRequest 中使能了获取源数据,响应中则包含了更新后的源文档信息。

GetResult result = updateResponse.getGetResult(); 
if (result.isExists()) {
    String sourceAsString = result.sourceAsString();  // 将获取的文档以 string 格式输出
    Map<String, Object> sourceAsMap = result.sourceAsMap(); // 以 Map 格式输出
    byte[] sourceAsBytes = result.source();  // 字节形式
} else {
    // 默认情况下,不会返回文档源数据
}

也可以检测是否分片失败

ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
    // 成功的分片数量小于总分片数量
}
if (shardInfo.getFailed() > 0) {
    for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
        String reason = failure.reason(); // 得到分片失败的原因
    }
}

如果在执行 UpdateRequest 时,文档不存在,响应中会包含 404 状态码,而且会抛出 ElasticsearchException 。

UpdateRequest request = new UpdateRequest("posts", "type", "does_not_exist")
        .doc("field", "value");
try {
    UpdateResponse updateResponse = client.update(request);
} catch (ElasticsearchException e) {
    if (e.status() == RestStatus.NOT_FOUND) {
        // 处理文档不存在的情况
    }
}

如果版本冲突,也会抛出 ElasticsearchException

UpdateRequest request = new UpdateRequest("posts", "doc", "1")
        .doc("field", "value")
        .version(1);
try {
    UpdateResponse updateResponse = client.update(request);
} catch(ElasticsearchException e) {
    if (e.status() == RestStatus.CONFLICT) {
        // 处理版本冲突的情况
    }
}

封装好的参考方法:

   /**
     * @param index
     * @param indexType
     * @param docId
     * @param dataMap
     * @param timeValue
     * @param refreshPolicy
     * @param version
     * @param versionType
     * @param docAsUpsert
     * @param includes
     * @param excludes
     * @return
     * @throws IOException
     */
    public UpdateResponse updateDoc(String index, String indexType, String docId, Map<String, Object> dataMap, TimeValue timeValue, WriteRequest.RefreshPolicy refreshPolicy, Integer version, VersionType versionType, Boolean docAsUpsert, String[] includes, String[] excludes) throws IOException {
        UpdateRequest updateRequest = new UpdateRequest(index, indexType, docId);
        updateRequest.doc(dataMap);
        if (null != timeValue) {
            updateRequest.timeout(timeValue);
        }
        if (null != refreshPolicy) {
            updateRequest.setRefreshPolicy(refreshPolicy);
        }
        if (null != version) {
            updateRequest.version(version);
        }
        if (null != versionType) {
            updateRequest.versionType(versionType);
        }
        updateRequest.docAsUpsert(docAsUpsert);
        //冲突时重试的次数
        updateRequest.retryOnConflict(3);
        if (null == includes && null == excludes) {
            return getClient().update(updateRequest);
        } else {
            if (null == includes || includes.length == 0) {
                includes = Strings.EMPTY_ARRAY;
            }
            if (null == excludes || excludes.length == 0) {
                excludes = Strings.EMPTY_ARRAY;
            }
            return getClient().update(updateRequest.fetchSource(new FetchSourceContext(true, includes, excludes)));
        }
    }

    /**
     * 更新时不存在就插入
     *
     * @param index
     * @param indexType
     * @param docId
     * @param dataMap
     * @return
     * @throws IOException
     */
    public UpdateResponse upDdateocAsUpsert(String index, String indexType, String docId, Map<String, Object> dataMap) throws IOException {
        return updateDoc(index, indexType, docId, dataMap, null, null, null, null, true, null, null);
    }

    /**
     * 存在才更新
     *
     * @param index
     * @param indexType
     * @param docId
     * @param dataMap
     * @return
     * @throws IOException
     */
    public UpdateResponse updateDoc(String index, String indexType, String docId, Map<String, Object> dataMap) throws IOException {
        return updateDoc(index, indexType, docId, dataMap, null, null, null, null, false, null, null);
    }

6、Bulk API 批量处理

批量请求
使用 BulkRequest 可以在一次请求中执行多个索引,更新和删除的操作。

BulkRequest request = new BulkRequest();  
request.add(new IndexRequest("posts", "doc", "1")  
        .source(XContentType.JSON,"field", "foo")); // 将第一个 IndexRequest 添加到批量请求中
request.add(new IndexRequest("posts", "doc", "2")  
        .source(XContentType.JSON,"field", "bar")); // 第二个
request.add(new IndexRequest("posts", "doc", "3")  
        .source(XContentType.JSON,"field", "baz")); // 第三个

在同一个 BulkRequest 也可以添加不同的操作类型

BulkRequest request = new BulkRequest();
request.add(new DeleteRequest("posts", "doc", "3")); 
request.add(new UpdateRequest("posts", "doc", "2") 
        .doc(XContentType.JSON,"other", "test"));
request.add(new IndexRequest("posts", "doc", "4")  
        .source(XContentType.JSON,"field", "baz"));

可选参数
超时时间

request.timeout(TimeValue.timeValueMinutes(2)); 
request.timeout("2m"); 

刷新策略

request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
request.setRefreshPolicy("wait_for"); 

设置在批量操作前必须有几个分片处于激活状态

request.waitForActiveShards(2); 
request.waitForActiveShards(ActiveShardCount.ALL);  // 全部分片都处于激活状态
request.waitForActiveShards(ActiveShardCount.DEFAULT);  // 默认
request.waitForActiveShards(ActiveShardCount.ONE);  // 一个

同步请求

BulkResponse bulkResponse = client.bulk(request);

异步请求

ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
    @Override
    public void onResponse(BulkResponse bulkResponse) {
        
    }

    @Override
    public void onFailure(Exception e) {
        
    }
};

client.bulkAsync(request, listener); 

Bulk Response
BulkResponse 中包含执行操作后的信息,并允许对每个操作结果迭代。

for (BulkItemResponse bulkItemResponse : bulkResponse) { // 遍历所有的操作结果
    DocWriteResponse itemResponse = bulkItemResponse.getResponse(); // 获取操作结果的响应,可以是  IndexResponse, UpdateResponse or DeleteResponse, 它们都可以惭怍是 DocWriteResponse 实例

    if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
            || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { 
        IndexResponse indexResponse = (IndexResponse) itemResponse; // index 操作后的响应结果

    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) { 
        UpdateResponse updateResponse = (UpdateResponse) itemResponse; // update 操作后的响应结果

    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { 
        DeleteResponse deleteResponse = (DeleteResponse) itemResponse; // delete 操作后的响应结果
    }
}

此外,批量响应还有一个非常便捷的方法来检测是否有一个或多个操作失败

if (bulkResponse.hasFailures()) { 
    // 表示至少有一个操作失败
}

在这种情况下,我们要遍历所有的操作结果,检查是否是失败的操作,并获取对应的失败信息

for (BulkItemResponse bulkItemResponse : bulkResponse) {
    if (bulkItemResponse.isFailed()) { // 检测给定的操作是否失败
        BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); // 获取失败信息

    }
}

Bulk Processor
BulkProcessor 是为了简化 Bulk API 的操作提供的一个工具类,要执行操作,就需要下面组件

RestHighLevelClient 用来执行 BulkRequest 并获取 BulkResponse`
BulkProcessor.Listener 对 BulkRequest 执行前后以及失败时监听
BulkProcessor.builder 方法用来构建一个新的BulkProcessor

BulkProcessor.Listener listener = new BulkProcessor.Listener() { 
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        // 在每个 BulkRequest 执行前调用
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            BulkResponse response) {
        // 在每个 BulkRequest 执行后调用
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
        // 失败时调用
    }
};

BulkProcessor.Builder 提供了多个方法来配置 BulkProcessor
如何来处理请求的执行。

BulkProcessor.Builder builder = BulkProcessor.builder(client::bulkAsync, listener);
builder.setBulkActions(500); // 指定多少操作时,就会刷新一次
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); 
builder.setConcurrentRequests(0);  // 指定多大容量,就会刷新一次
builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); // 允许并发执行的数量 
builder.setBackoffPolicy(BackoffPolicy
        .constantBackoff(TimeValue.timeValueSeconds(1L), 3)); 
BulkProcessor 创建后,各种请求就可以添加进去:

IndexRequest one = new IndexRequest("posts", "doc", "1").
        source(XContentType.JSON, "title",
                "In which order are my Elasticsearch queries executed?");
IndexRequest two = new IndexRequest("posts", "doc", "2")
        .source(XContentType.JSON, "title",
                "Current status and upcoming changes in Elasticsearch");
IndexRequest three = new IndexRequest("posts", "doc", "3")
        .source(XContentType.JSON, "title",
                "The Future of Federated Search in Elasticsearch");

bulkProcessor.add(one);
bulkProcessor.add(two);
bulkProcessor.add(three);

BulkProcessor 执行时,会对每个 bulk request调用 BulkProcessor.Listener , listener 提供了下面方法来访问 BulkRequest 和 BulkResponse:

BulkProcessor.Listener listener = new BulkProcessor.Listener() {
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        int numberOfActions = request.numberOfActions(); // 在执行前获取操作的数量
        logger.debug("Executing bulk [{}] with {} requests",
                executionId, numberOfActions);
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            BulkResponse response) {
        if (response.hasFailures()) { // 执行后查看响应中是否包含失败的操作
            logger.warn("Bulk [{}] executed with failures", executionId);
        } else {
            logger.debug("Bulk [{}] completed in {} milliseconds",
                    executionId, response.getTook().getMillis());
        }
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
        logger.error("Failed to execute bulk", failure); // 请求失败时打印信息
    }
};

请求添加到 BulkProcessor , 它的实例可以使用下面两种方法关闭请求。

awaitClose() 在请求返回后或等待一定时间关闭

boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS); 

close() 立刻关闭

bulkProcessor.close();

两个方法都会在关闭前对处理器中的请求进行刷新,并避免新的请求添加进去。

封装好的参考方法:

    /**
     * 批量操作
     *
     * @param indexBeanList
     * @param timeValue
     * @param refreshPolicy
     * @return
     * @throws IOException
     */
    public BulkResponse bulkRequest(List<IndexBean> indexBeanList, TimeValue timeValue, WriteRequest.RefreshPolicy refreshPolicy) throws IOException {
        BulkRequest bulkRequest = getBulkRequest(indexBeanList);
        if (null != timeValue) {
            bulkRequest.timeout(timeValue);
        }
        if (null != refreshPolicy) {
            bulkRequest.setRefreshPolicy(refreshPolicy);
        }
        return getClient().bulk(bulkRequest);
    }

    private BulkRequest getBulkRequest(List<IndexBean> indexBeanList) {
        BulkRequest bulkRequest = new BulkRequest();
        indexBeanList.forEach(indexBean -> {
            if ("1".equals(indexBean.getOperateType())) {
                bulkRequest.add(null != indexBean.getDocId() ? new IndexRequest(indexBean.getIndex(), indexBean.getIndexType(), indexBean.getDocId()) : new IndexRequest(indexBean.getIndex(), indexBean.getIndexType()));
            } else if ("2".equals(indexBean.getOperateType())) {
                if ((null != indexBean.getDocId())) {
                    throw new ElasticsearchException("update action docId must not be null");
                }
                bulkRequest.add(new UpdateRequest(indexBean.getIndex(), indexBean.getIndexType(), indexBean.getDocId()));
            } else if ("3".equals(indexBean.getOperateType())) {
                if ((null != indexBean.getDocId())) {
                    throw new ElasticsearchException("delete action docId must not be null");
                }
                bulkRequest.add(new DeleteRequest(indexBean.getIndex(), indexBean.getIndexType(), indexBean.getDocId()));
            } else {
                throw new ElasticsearchException("OperateType" + indexBean.getOperateType() + "is not support");
            }
        });
        return bulkRequest;
    }

    /**
     * 批量操作
     *
     * @param indexBeanList
     * @return
     */
    public BulkResponse bulkRequest(List<IndexBean> indexBeanList) throws IOException {
        return bulkRequest(indexBeanList, null, null);
    }

    /**
     * 批量异步操作
     *
     * @param indexBeanList
     * @param bulkResponseActionListener
     */

    public void AsyncBulkRequest(List<IndexBean> indexBeanList, ActionListener<BulkResponse> bulkResponseActionListener) {
        getClient().bulkAsync(getBulkRequest(indexBeanList), bulkResponseActionListener);
    }

7、Search APIs:

Java High Level REST Client 支持下面的 Search API:

  • Search API
  • Search Scroll API
  • Clear Scroll API
  • Multi-Search API
  • Ranking Evaluation API

Search API
Search Request
searchRequest 用来完成和搜索文档,聚合,建议等相关的任何操作同时也提供了各种方式来完成对查询结果的高亮操作。

最基本的查询操作如下

SearchRequest searchRequest = new SearchRequest(); 
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); 
searchSourceBuilder.query(QueryBuilders.matchAllQuery()); // 添加 match_all 查询
searchRequest.source(searchSourceBuilder); // 将 SearchSourceBuilder  添加到 SeachRequest 中

可选参数

SearchRequest searchRequest = new SearchRequest("posts");  // 设置搜索的 index
searchRequest.types("doc");  // 设置搜索的 type

除了配置 index 和 type 外,还有一些其他的可选参数

searchRequest.routing("routing"); // 设置 routing 参数
searchRequest.preference("_local");  // 配置搜索时偏爱使用本地分片,默认是使用随机分片

什么是 routing 参数?
当索引一个文档的时候,文档会被存储在一个主分片上。在存储时一般都会有多个主分片。Elasticsearch 如何知道一个文档应该放置在哪个分片呢?这个过程是根据下面的这个公式来决定的:

shard = hash(routing) % number_of_primary_shards
routing 是一个可变值,默认是文档的 _id ,也可以设置成一个自定义的值
number_of_primary_shards 是主分片数量
所有的文档 API 都接受一个叫做 routing 的路由参数,通过这个参数我们可以自定义文档到分片的映射。一个自定义的路由参数可以用来确保所有相关的文档——例如所有属于同一个用户的文档——都被存储到同一个分片中。

使用 SearchSourceBuilder
对搜索行为的配置可以使用 SearchSourceBuilder 来完成,来看一个实例

SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();  // 默认配置
sourceBuilder.query(QueryBuilders.termQuery("user", "kimchy")); // 设置搜索,可以是任何类型的 QueryBuilder
sourceBuilder.from(0); // 起始 index
sourceBuilder.size(5); // 大小 size
sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS)); // 设置搜索的超时时间

设置完成后,就可以添加到 SearchRequest 中。

SearchRequest searchRequest = new SearchRequest();
searchRequest.source(sourceBuilder);

构建查询条件
查询请求是通过使用 QueryBuilder 对象来完成的,并且支持 Query DSL。

DSL (domain-specific language) 领域特定语言,是指专注于某个应用程序领域的计算机语言。

可以使用构造函数来创建 QueryBuilder

MatchQueryBuilder matchQueryBuilder = new MatchQueryBuilder("user", "kimchy"); 

QueryBuilder 创建后,就可以调用方法来配置它的查询选项:

matchQueryBuilder.fuzziness(Fuzziness.AUTO);  // 模糊查询
matchQueryBuilder.prefixLength(3); // 前缀查询的长度
matchQueryBuilder.maxExpansions(10); // max expansion 选项,用来控制模糊查询

也可以使用QueryBuilders 工具类来创建 QueryBuilder 对象。这个类提供了函数式编程风格的各种方法用来快速创建 QueryBuilder 对象。

QueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("user", "kimchy")
                                        .fuzziness(Fuzziness.AUTO)
                                                .prefixLength(3)
                                                .maxExpansions(10);

fuzzy-matching 拼写错误时的匹配:

好的全文检索不应该是完全相同的限定逻辑,相反,可以扩大范围来包括可能的匹配,从而根据相关性得分将更好的匹配放在前面。

例如,搜索 quick brown fox 时会匹配一个包含 fast brown foxes 的文档

不论什么方式创建的 QueryBuilder ,最后都需要添加到 `SearchSourceBuilder 中

searchSourceBuilder.query(matchQueryBuilder);

构建查询 文档中提供了一个丰富的查询列表,里面包含各种查询对应的QueryBuilder 对象以及QueryBuilder helper 方法,大家可以去参考。

关于构建查询的内容会在下篇文章中讲解,敬请期待。

指定排序
SearchSourceBuilder 允许添加一个或多个SortBuilder 实例。这里包含 4 种特殊的实现, (Field-, Score-, GeoDistance- 和 ScriptSortBuilder)

sourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC)); // 根据分数 _score 降序排列 (默认行为)
sourceBuilder.sort(new FieldSortBuilder("_uid").order(SortOrder.ASC));  // 根据 id 降序排列

过滤数据源
默认情况下,查询请求会返回文档的内容 _source ,当然我们也可以配置它。例如,禁止对 _source 的获取

sourceBuilder.fetchSource(false);

也可以使用通配符模式以更细的粒度包含或排除特定的字段:

String[] includeFields = new String[] {"title", "user", "innerObject.*"};
String[] excludeFields = new String[] {"_type"};
sourceBuilder.fetchSource(includeFields, excludeFields);

高亮请求
可以通过在 SearchSourceBuilder 上设置 HighlightBuilder 完成对结果的高亮,而且可以配置不同的字段具有不同的高亮行为。

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
HighlightBuilder highlightBuilder = new HighlightBuilder(); 
HighlightBuilder.Field highlightTitle =
        new HighlightBuilder.Field("title"); // title 字段高亮
highlightTitle.highlighterType("unified");  // 配置高亮类型
highlightBuilder.field(highlightTitle);  // 添加到 builder
HighlightBuilder.Field highlightUser = new HighlightBuilder.Field("user");
highlightBuilder.field(highlightUser);
searchSourceBuilder.highlighter(highlightBuilder);

聚合请求
要实现聚合请求分两步

创建合适的 `AggregationBuilder
作为参数配置在 `SearchSourceBuilder 上

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_company")
        .field("company.keyword");
aggregation.subAggregation(AggregationBuilders.avg("average_age")
        .field("age"));
searchSourceBuilder.aggregation(aggregation);

建议请求 Requesting Suggestions

SuggestionBuilder 实现类是由 SuggestBuilders 工厂类来创建的。

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
SuggestionBuilder termSuggestionBuilder =
    SuggestBuilders.termSuggestion("user").text("kmichy"); 
SuggestBuilder suggestBuilder = new SuggestBuilder();
suggestBuilder.addSuggestion("suggest_user", termSuggestionBuilder); 
searchSourceBuilder.suggest(suggestBuilder);

对请求和聚合分析
分析 API 可用来对一个特定的查询操作中的请求和聚合进行分析,此时要将SearchSourceBuilder 的 profile标志位设置为 true

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.profile(true);

只要 SearchRequest 执行完成,对应的 SearchResponse 响应中就会包含 分析结果

同步执行
同步执行是阻塞式的,只有结果返回后才能继续执行。

SearchResponse searchResponse = client.search(searchRequest);

异步执行
异步执行使用的是 listener 对结果进行处理。

ActionListener<SearchResponse> listener = new ActionListener<SearchResponse>() {
    @Override
    public void onResponse(SearchResponse searchResponse) {
        // 查询成功
    }

    @Override
    public void onFailure(Exception e) {
        // 查询失败
    }
};

SearchResponse

查询执行完成后,会返回 SearchResponse 对象,并在对象中包含查询执行的细节和符合条件的文档集合。

归纳一下, SerchResponse 包含的信息如下

请求本身的信息,如 HTTP 状态码,执行时间,或者请求是否超时

RestStatus status = searchResponse.status(); // HTTP 状态码
TimeValue took = searchResponse.getTook(); // 查询占用的时间
Boolean terminatedEarly = searchResponse.isTerminatedEarly(); // 是否由于 SearchSourceBuilder 中设置 terminateAfter 而过早终止
boolean timedOut = searchResponse.isTimedOut(); // 是否超时

查询影响的分片数量的统计信息,成功和失败的分片

int totalShards = searchResponse.getTotalShards();
int successfulShards = searchResponse.getSuccessfulShards();
int failedShards = searchResponse.getFailedShards();
for (ShardSearchFailure failure : searchResponse.getShardFailures()) {
    // failures should be handled here
}

检索 SearchHits
要访问返回的文档,首先要在响应中获取其中的 SearchHits

SearchHits hits = searchResponse.getHits();

SearchHits 中包含了所有命中的全局信息,如查询命中的数量或者最大分值:

long totalHits = hits.getTotalHits();
float maxScore = hits.getMaxScore();

查询的结果嵌套在 SearchHits 中,可以通过遍历循环获取

SearchHit[] searchHits = hits.getHits();
for (SearchHit hit : searchHits) {
    // do something with the SearchHit
}
SearchHit 提供了如 index , type, docId 和每个命中查询的分数
String index = hit.getIndex();
String type = hit.getType();
String id = hit.getId();
float score = hit.getScore();

而且,还可以获取到文档的源数据,以 JSON-String 形式或者 key-value map 对的形式。在 map 中,字段可以是普通类型,或者是列表类型,嵌套对象。

String sourceAsString = hit.getSourceAsString();
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
String documentTitle = (String) sourceAsMap.get("title");
List<Object> users = (List<Object>) sourceAsMap.get("user");
Map<String, Object> innerObject =
        (Map<String, Object>) sourceAsMap.get("innerObject");

Search API 查询关系
上面的 QueryBuilder , SearchSourceBuilder 和 SearchRequest 之间都是嵌套关系, 可以参考下图:

8、全文查询 Full Text Queries

什么是全文查询?
像使用 match 或者 query_string 这样的高层查询都属于全文查询,

查询 日期(date) 或整数(integer) 字段,会将查询字符串分别作为日期或整数对待。
查询一个( not_analyzed )未分析的精确值字符串字段,会将整个查询字符串作为单个词项对待。
查询一个( analyzed )已分析的全文字段,会先将查询字符串传递到一个合适的分析器,然后生成一个供查询的词项列表
组成了词项列表,后面就会对每个词项逐一执行底层查询,将查询结果合并,并且为每个文档生成最终的相关度评分。

Match
match 查询的单个词的步骤是什么?
检查字段类型,查看字段是 analyzed, not_analyzed
分析查询字符串,如果只有一个单词项, match 查询在执行时就会是单个底层的 term 查询
查找匹配的文档,会在倒排索引中查找匹配文档,然后获取一组包含该项的文档
为每个文档评分
构建 Match 查询
match 查询可以接受 text/numeric/dates 格式的参数,分析,并构建一个查询。

GET /_search
{
    "query": {
        "match" : {
            "message" : "this is a test"
        }
    }
}

上面的实例中 message 是一个字段名。

对应的 QueryBuilder class : MatchQueryBuilder

具体方法 : QueryBuilders.matchQuery()

全文查询 API 列表

Search QueryQueryBuilder ClassMethod in QueryBuilders
Match MatchQueryBuilder QueryBuilders.matchQuery()
Match Phrase MatchPhraseQueryBuilder QueryBuilders.matchPhraseQuery()
Match Phrase Prefix MatchPhrasePrefixQueryBuilder QueryBuilders.matchPhrasePrefixQuery()
Multi Match MultiMatchQueryBuilder QueryBuilders.multiMatchQuery()
Common Terms CommonTermsQueryBuilder QueryBuilders.commonTermsQuery()
Query String QueryStringQueryBuilder QueryBuilders.queryStringQuery()
Simple Query String SimpleQueryStringBuilder QueryBuilders.simpleQueryStringQuery()

基于词项的查询
这种类型的查询不需要分析,它们是对单个词项操作,只是在倒排索引中查找准确的词项(精确匹配)并且使用 TF/IDF 算法为每个包含词项的文档计算相关度评分 _score。

Term
term 查询可用作精确值匹配,精确值的类型则可以是数字,时间,布尔类型,或者是那些 not_analyzed 的字符串。

对应的 QueryBuilder class 是TermQueryBuilder

具体方法是 QueryBuilders.termQuery()

Terms
terms 查询允许指定多个值进行匹配。如果这个字段包含了指定值中的任何一个值,就表示该文档满足条件。

对应的 QueryBuilder class 是 TermsQueryBuilder

具体方法是 QueryBuilders.termsQuery()

Wildcard
wildcard 通配符查询是一种底层基于词的查询,它允许指定匹配的正则表达式。而且它使用的是标准的 shell 通配符查询:

? 匹配任意字符
* 匹配 0 个或多个字符
wildcard 需要扫描倒排索引中的词列表才能找到所有匹配的词,然后依次获取每个词相关的文档 ID。

由于通配符和正则表达式只能在查询时才能完成,因此查询效率会比较低,在需要高性能的场合,应当谨慎使用。

对应的 QueryBuilder class 是 WildcardQueryBuilder

具体方法是 QueryBuilders.wildcardQuery()

基于词项 API 列表

Search QueryQueryBuilder ClassMethod in QueryBuilders
Term TermQueryBuilder QueryBuilders.termQuery()
Terms TermsQueryBuilder QueryBuilders.termsQuery()
Range RangeQueryBuilder QueryBuilders.rangeQuery()
Exists ExistsQueryBuilder QueryBuilders.existsQuery()
Prefix PrefixQueryBuilder QueryBuilders.prefixQuery()
Wildcard WildcardQueryBuilder QueryBuilders.wildcardQuery()
Regexp RegexpQueryBuilder QueryBuilders.regexpQuery()
Fuzzy FuzzyQueryBuilder QueryBuilders.fuzzyQuery()
Type TypeQueryBuilder QueryBuilders.typeQuery()
Ids IdsQueryBuilder QueryBuilders.idsQuery()

复合查询
什么是复合查询?
复合查询会将其他的复合查询或者叶查询包裹起来,以嵌套的形式展示和执行,得到的结果也是对各个子查询结果和分数的合并。可以分为下面几种:

constant_score query

经常用在使用 filter 的场合,所有匹配的文档分数都是一个不变的常量

bool query

可以将多个叶查询和组合查询再组合起来,可接受的参数如下

must : 文档必须匹配这些条件才能被包含进来
must_not 文档必须不匹配才能被包含进来
should 如果满足其中的任何语句,都会增加分数;即使不满足,也没有影响
filter 以过滤模式进行,不评分,但是必须匹配
dis_max query

叫做分离最大化查询,它会将任何与查询匹配的文档都作为结果返回,但是只是将其中最佳匹配的评分作为最终的评分返回。

function_score query

允许为每个与主查询匹配的文档应用一个函数,可用来改变甚至替换原始的评分

boosting query

用来控制(提高或降低)复合查询中子查询的权重。

Search QueryQueryBuilder ClassMethod in QueryBuilders
Constant Score ConstantScoreQueryBuilder QueryBuilders.constantScoreQuery()
Bool BoolQueryBuilder QueryBuilders.boolQuery()
Dis Max DisMaxQueryBuilder QueryBuilders.disMaxQuery()
Function Score FunctionScoreQueryBuilder QueryBuilders.functionScoreQuery()
Boosting BoostingQueryBuilder QueryBuilders.boostingQuery()

特殊查询
Wrapper Query
这里比较重要的一个是 Wrapper Query,是说可以接受任何其他 base64 编码的字符串作为子查询。

主要应用场合就是在 Rest High-Level REST client 中接受 json 字符串作为参数。比如使用 gson 等 json 库将要查询的语句拼接好,直接塞到 Wrapper Query 中查询就可以了,非常方便。

Wrapper Query 对应的 QueryBuilder class 是WrapperQueryBuilder

具体方法是 QueryBuilders.wrapperQuery()

9、关于 REST Client的完整工具类代码

本文作者:张永清,转载请注明出处:Elasticsearch Java API 很全的整理

public class IndexBean {
    //index name
    private String index;
    //index type
    private String indexType;
    //index doc id
    private String docId;
    // 1 IndexRequest 2 UpdateRequest  3 DeleteRequest
    private String operateType;

    public String getOperateType() {
        return operateType;
    }

    public void setOperateType(String operateType) {
        this.operateType = operateType;
    }

    public String getIndex() {
        return index;
    }

    public void setIndex(String index) {
        this.index = index;
    }

    public String getIndexType() {
        return indexType;
    }

    public void setIndexType(String indexType) {
        this.indexType = indexType;
    }

    public String getDocId() {
        return docId;
    }

    public void setDocId(String docId) {
        this.docId = docId;
    }
}
/**
 * 自定义的es异常类
 */
public class ElasticsearchException extends RuntimeException {
    public ElasticsearchException(String s, Exception e) {
        super(s, e);
    }
    public ElasticsearchException(String s){
        super(s);
    }
}
import org.apache.http.HttpHost;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.sort.SortBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
 * es操作
 *  
 */
public class ElasticSearchClient {
    private String[] hostsAndPorts;

    public ElasticSearchClient(String[] hostsAndPorts) {
        this.hostsAndPorts = hostsAndPorts;
    }

    public RestHighLevelClient getClient() {
        RestHighLevelClient client = null;
        List<HttpHost> httpHosts = new ArrayList<HttpHost>();
        if (hostsAndPorts.length > 0) {
            for (String hostsAndPort : hostsAndPorts) {
                String[] hp = hostsAndPort.split(":");
                httpHosts.add(new HttpHost(hp[0], Integer.valueOf(hp[1]), "http"));
            }
            client = new RestHighLevelClient(
                    RestClient.builder(httpHosts.toArray(new HttpHost[0])));
        } else {
            client = new RestHighLevelClient(
                    RestClient.builder(new HttpHost("127.0.0.1", 9200, "http")));
        }
        return client;
    }

    private IndexRequest getIndexRequest(String index, String indexType, String docId, Map<String, Object> dataMap) {
        IndexRequest indexRequest = null;
        if (null == index || null == indexType) {
            throw new ElasticsearchException("index or indexType must not be null");
        }
        if (null == docId) {
            indexRequest = new IndexRequest(index, indexType);
        } else {
            indexRequest = new IndexRequest(index, indexType, docId);
        }
        return indexRequest;
    }

    /**
     * 同步执行索引
     *
     * @param index
     * @param indexType
     * @param docId
     * @param dataMap
     * @throws IOException
     */
    public IndexResponse execIndex(String index, String indexType, String docId, Map<String, Object> dataMap) throws IOException {
        return getClient().index(getIndexRequest(index, indexType, docId, dataMap).source(dataMap));
    }

    /**
     * 异步执行
     *
     * @param index
     * @param indexType
     * @param docId
     * @param dataMap
     * @param indexResponseActionListener
     * @throws IOException
     */
    public void asyncExecIndex(String index, String indexType, String docId, Map<String, Object> dataMap, ActionListener<IndexResponse> indexResponseActionListener) throws IOException {
        getClient().indexAsync(getIndexRequest(index, indexType, docId, dataMap).source(dataMap), indexResponseActionListener);
    }

    /**
     * @param index
     * @param indexType
     * @param docId
     * @param includes  返回需要包含的字段,可以传入空
     * @param excludes  返回需要不包含的字段,可以传入为空
     * @param excludes  version
     * @param excludes  versionType
     * @return
     * @throws IOException
     */

    public GetResponse getRequest(String index, String indexType, String docId, String[] includes, String[] excludes, Integer version, VersionType versionType) throws IOException {
        if (null == includes || includes.length == 0) {
            includes = Strings.EMPTY_ARRAY;
        }
        if (null == excludes || excludes.length == 0) {
            excludes = Strings.EMPTY_ARRAY;
        }
        GetRequest getRequest = new GetRequest(index, indexType, docId);
        FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
        getRequest.realtime(true);
        if (null != version) {
            getRequest.version(version);
        }
        if (null != versionType) {
            getRequest.versionType(versionType);
        }
        return getClient().get(getRequest.fetchSourceContext(fetchSourceContext));
    }

    /**
     * @param index
     * @param indexType
     * @param docId
     * @param includes
     * @param excludes
     * @return
     * @throws IOException
     */

    public GetResponse getRequest(String index, String indexType, String docId, String[] includes, String[] excludes) throws IOException {
        return getRequest(index, indexType, docId, includes, excludes, null, null);
    }

    /**
     * @param index
     * @param indexType
     * @param docId
     * @return
     * @throws IOException
     */
    public GetResponse getRequest(String index, String indexType, String docId) throws IOException {
        GetRequest getRequest = new GetRequest(index, indexType, docId);
        return getClient().get(getRequest);
    }

    /**
     * @param index
     * @param indexType
     * @param docId
     * @return
     * @throws IOException
     */
    public Boolean existDoc(String index, String indexType, String docId) throws IOException {
        GetRequest getRequest = new GetRequest(index, indexType, docId);
        getRequest.fetchSourceContext(new FetchSourceContext(false));
        getRequest.storedFields("_none_");
        return getClient().exists(getRequest);
    }

    /**
     * @param index
     * @param indexType
     * @param docId
     * @param timeValue
     * @param refreshPolicy
     * @param version
     * @param versionType
     * @return
     * @throws IOException
     */
    public DeleteResponse deleteDoc(String index, String indexType, String docId, TimeValue timeValue, WriteRequest.RefreshPolicy refreshPolicy, Integer version, VersionType versionType) throws IOException {
        DeleteRequest deleteRequest = new DeleteRequest(index, indexType, docId);
        if (null != timeValue) {
            deleteRequest.timeout(timeValue);
        }
        if (null != refreshPolicy) {
            deleteRequest.setRefreshPolicy(refreshPolicy);
        }
        if (null != version) {
            deleteRequest.version(version);
        }
        if (null != versionType) {
            deleteRequest.versionType(versionType);
        }
        return getClient().delete(deleteRequest);
    }

    /**
     * @param index
     * @param indexType
     * @param docId
     * @return
     * @throws IOException
     */
    public DeleteResponse deleteDoc(String index, String indexType, String docId) throws IOException {
        return deleteDoc(index, indexType, docId, null, null, null, null);
    }

    /**
     * @param index
     * @param indexType
     * @param docId
     * @param dataMap
     * @param timeValue
     * @param refreshPolicy
     * @param version
     * @param versionType
     * @param docAsUpsert
     * @param includes
     * @param excludes
     * @return
     * @throws IOException
     */
    public UpdateResponse updateDoc(String index, String indexType, String docId, Map<String, Object> dataMap, TimeValue timeValue, WriteRequest.RefreshPolicy refreshPolicy, Integer version, VersionType versionType, Boolean docAsUpsert, String[] includes, String[] excludes) throws IOException {
        UpdateRequest updateRequest = new UpdateRequest(index, indexType, docId);
        updateRequest.doc(dataMap);
        if (null != timeValue) {
            updateRequest.timeout(timeValue);
        }
        if (null != refreshPolicy) {
            updateRequest.setRefreshPolicy(refreshPolicy);
        }
        if (null != version) {
            updateRequest.version(version);
        }
        if (null != versionType) {
            updateRequest.versionType(versionType);
        }
        updateRequest.docAsUpsert(docAsUpsert);
        //冲突时重试的次数
        updateRequest.retryOnConflict(3);
        if (null == includes && null == excludes) {
            return getClient().update(updateRequest);
        } else {
            if (null == includes || includes.length == 0) {
                includes = Strings.EMPTY_ARRAY;
            }
            if (null == excludes || excludes.length == 0) {
                excludes = Strings.EMPTY_ARRAY;
            }
            return getClient().update(updateRequest.fetchSource(new FetchSourceContext(true, includes, excludes)));
        }
    }

    /**
     * 更新时不存在就插入
     *
     * @param index
     * @param indexType
     * @param docId
     * @param dataMap
     * @return
     * @throws IOException
     */
    public UpdateResponse upDdateocAsUpsert(String index, String indexType, String docId, Map<String, Object> dataMap) throws IOException {
        return updateDoc(index, indexType, docId, dataMap, null, null, null, null, true, null, null);
    }

    /**
     * 存在才更新
     *
     * @param index
     * @param indexType
     * @param docId
     * @param dataMap
     * @return
     * @throws IOException
     */
    public UpdateResponse updateDoc(String index, String indexType, String docId, Map<String, Object> dataMap) throws IOException {
        return updateDoc(index, indexType, docId, dataMap, null, null, null, null, false, null, null);
    }

    /**
     * 批量操作
     *
     * @param indexBeanList
     * @param timeValue
     * @param refreshPolicy
     * @return
     * @throws IOException
     */
    public BulkResponse bulkRequest(List<IndexBean> indexBeanList, TimeValue timeValue, WriteRequest.RefreshPolicy refreshPolicy) throws IOException {
        BulkRequest bulkRequest = getBulkRequest(indexBeanList);
        if (null != timeValue) {
            bulkRequest.timeout(timeValue);
        }
        if (null != refreshPolicy) {
            bulkRequest.setRefreshPolicy(refreshPolicy);
        }
        return getClient().bulk(bulkRequest);
    }

    private BulkRequest getBulkRequest(List<IndexBean> indexBeanList) {
        BulkRequest bulkRequest = new BulkRequest();
        indexBeanList.forEach(indexBean -> {
            if ("1".equals(indexBean.getOperateType())) {
                bulkRequest.add(null != indexBean.getDocId() ? new IndexRequest(indexBean.getIndex(), indexBean.getIndexType(), indexBean.getDocId()) : new IndexRequest(indexBean.getIndex(), indexBean.getIndexType()));
            } else if ("2".equals(indexBean.getOperateType())) {
                if ((null != indexBean.getDocId())) {
                    throw new ElasticsearchException("update action docId must not be null");
                }
                bulkRequest.add(new UpdateRequest(indexBean.getIndex(), indexBean.getIndexType(), indexBean.getDocId()));
            } else if ("3".equals(indexBean.getOperateType())) {
                if ((null != indexBean.getDocId())) {
                    throw new ElasticsearchException("delete action docId must not be null");
                }
                bulkRequest.add(new DeleteRequest(indexBean.getIndex(), indexBean.getIndexType(), indexBean.getDocId()));
            } else {
                throw new ElasticsearchException("OperateType" + indexBean.getOperateType() + "is not support");
            }
        });
        return bulkRequest;
    }

    /**
     * 批量操作
     *
     * @param indexBeanList
     * @return
     */
    public BulkResponse bulkRequest(List<IndexBean> indexBeanList) throws IOException {
        return bulkRequest(indexBeanList, null, null);
    }

    /**
     * 批量异步操作
     *
     * @param indexBeanList
     * @param bulkResponseActionListener
     */

    public void AsyncBulkRequest(List<IndexBean> indexBeanList, ActionListener<BulkResponse> bulkResponseActionListener) {
        getClient().bulkAsync(getBulkRequest(indexBeanList), bulkResponseActionListener);
    }


    private SearchRequest getSearchRequest(String index, String indexType) {
        SearchRequest searchRequest;
        if (null == index) {
            throw new ElasticsearchException("index name must not be null");
        }
        if (null != indexType) {
            searchRequest = new SearchRequest(index, indexType);
        } else {
            searchRequest = new SearchRequest(index);
        }
        return searchRequest;
    }

    /**
     * @param index
     * @param indexType
     * @return
     * @throws IOException
     */
    public SearchResponse searchRequest(String index, String indexType) throws IOException {
        return getClient().search(getSearchRequest(index, indexType));
    }

    /**
     * @param index
     * @param indexType
     * @param from
     * @param size
     * @param termQueryBuilder
     * @return
     * @throws IOException
     */
    public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, TermQueryBuilder termQueryBuilder) throws IOException {
        return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, termQueryBuilder, null, null, null)));
    }

    private SearchSourceBuilder getSearchSourceBuilder(String index, String indexType, Integer from, Integer size, TermQueryBuilder termQueryBuilder, String sortField, SortBuilder sortBuilder, Boolean fetchSource) {
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        if (null != termQueryBuilder) {
            searchSourceBuilder.query(termQueryBuilder);
        }
        searchSourceBuilder.from(from);
        searchSourceBuilder.size(size);
        if (null != sortField) {
            searchSourceBuilder.sort(sortField);
        }
        if (null != sortBuilder) {
            searchSourceBuilder.sort(sortBuilder);
        }
        //设置超时时间
        searchSourceBuilder.timeout(new TimeValue(120, TimeUnit.SECONDS));
        if (null != fetchSource) {
            searchSourceBuilder.fetchSource(fetchSource);
        }
        return searchSourceBuilder;
    }

    /**
     * @param index
     * @param indexType
     * @param from
     * @param size
     * @param termQueryBuilder
     * @param matchQueryBuilder
     * @return
     * @throws IOException
     */
    public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, TermQueryBuilder termQueryBuilder, MatchQueryBuilder matchQueryBuilder) throws IOException {
        if (null == matchQueryBuilder) {
            throw new ElasticsearchException("matchQueryBuilder is null");
        }
        return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, termQueryBuilder, null, null, null).query(matchQueryBuilder)));
    }

    /**
     * @param index
     * @param indexType
     * @param from
     * @param size
     * @param matchQueryBuilder
     * @return
     * @throws IOException
     */
    public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, MatchQueryBuilder matchQueryBuilder) throws IOException {
        if (null == matchQueryBuilder) {
            throw new ElasticsearchException("matchQueryBuilder is null");
        }
        return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, null, null, null, null).query(matchQueryBuilder)));
    }

    /**
     * @param index
     * @param indexType
     * @param from
     * @param size
     * @param matchQueryBuilder
     * @param sortField
     * @return
     * @throws IOException
     */
    public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, MatchQueryBuilder matchQueryBuilder, String sortField) throws IOException {
        if (null == matchQueryBuilder) {
            throw new ElasticsearchException("matchQueryBuilder is null");
        }
        return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, null, sortField, null, null).query(matchQueryBuilder)));
    }

    /**
     * @param index
     * @param indexType
     * @param from
     * @param size
     * @param matchQueryBuilder
     * @param sortField
     * @param fetchSource
     * @return
     * @throws IOException
     */
    public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, MatchQueryBuilder matchQueryBuilder, String sortField, Boolean fetchSource) throws IOException {
        if (null == matchQueryBuilder) {
            throw new ElasticsearchException("matchQueryBuilder is null");
        }
        return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, null, sortField, null, fetchSource).query(matchQueryBuilder)));
    }

    /**
     * @param index
     * @param indexType
     * @param from
     * @param size
     * @param matchQueryBuilder
     * @param sortBuilder
     * @return
     * @throws IOException
     */
    public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, MatchQueryBuilder matchQueryBuilder, SortBuilder sortBuilder) throws IOException {
        if (null == matchQueryBuilder) {
            throw new ElasticsearchException("matchQueryBuilder is null");
        }
        return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, null, null, sortBuilder, null).query(matchQueryBuilder)));
    }


    /**
     * 支持排序
     *
     * @param index
     * @param indexType
     * @param from
     * @param size
     * @param termQueryBuilder
     * @param matchQueryBuilder
     * @param sortField
     * @return
     * @throws IOException
     */
    public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, TermQueryBuilder termQueryBuilder, MatchQueryBuilder matchQueryBuilder, String sortField) throws IOException {
        if (null == matchQueryBuilder) {
            throw new ElasticsearchException("matchQueryBuilder is null");
        }
        return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, termQueryBuilder, sortField, null, null).query(matchQueryBuilder)));
    }

    /**
     * @param index
     * @param indexType
     * @param from
     * @param size
     * @param termQueryBuilder
     * @param matchQueryBuilder
     * @param sortBuilder
     * @param fetchSource       开关
     * @return
     * @throws IOException
     */
    public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, TermQueryBuilder termQueryBuilder, MatchQueryBuilder matchQueryBuilder, SortBuilder sortBuilder, Boolean fetchSource) throws IOException {
        if (null == matchQueryBuilder) {
            throw new ElasticsearchException("matchQueryBuilder is null");
        }
        return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, termQueryBuilder, null, sortBuilder, fetchSource).query(matchQueryBuilder)));
    }

    /**
     * @param index
     * @param indexType
     * @param from
     * @param size
     * @param termQueryBuilder
     * @param matchQueryBuilder
     * @param sortBuilder
     * @return
     * @throws IOException
     */
    public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, TermQueryBuilder termQueryBuilder, MatchQueryBuilder matchQueryBuilder, SortBuilder sortBuilder) throws IOException {
        if (null == matchQueryBuilder) {
            throw new ElasticsearchException("matchQueryBuilder is null");
        }
        return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, termQueryBuilder, null, sortBuilder, null).query(matchQueryBuilder)));
    }

    /**
     * @param index
     * @param indexType
     * @param from
     * @param size
     * @param termQueryBuilder
     * @param matchQueryBuilder
     * @param sortField
     * @param fetchSource
     * @return
     * @throws IOException
     */
    public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, TermQueryBuilder termQueryBuilder, MatchQueryBuilder matchQueryBuilder, String sortField, Boolean fetchSource) throws IOException {
        if (null == matchQueryBuilder) {
            throw new ElasticsearchException("matchQueryBuilder is null");
        }
        return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, termQueryBuilder, sortField, null, fetchSource).query(matchQueryBuilder)));
    }

    /**
     * 异步操作
     * @param index
     * @param indexType
     * @param from
     * @param size
     * @param termQueryBuilder
     * @param matchQueryBuilder
     * @param sortBuilder
     * @param listener
     * @throws IOException
     */

    public void asyncSearchRequest(String index, String indexType, Integer from, Integer size, TermQueryBuilder termQueryBuilder, MatchQueryBuilder matchQueryBuilder, SortBuilder sortBuilder,ActionListener<SearchResponse> listener) throws IOException {
        if (null == matchQueryBuilder) {
            throw new ElasticsearchException("matchQueryBuilder is null");
        }
        getClient().searchAsync(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, termQueryBuilder, null, sortBuilder, null).query(matchQueryBuilder)),listener);
    }

    /**
     * 异步操作
     * @param index
     * @param indexType
     * @param from
     * @param size
     * @param termQueryBuilder
     * @param matchQueryBuilder
     * @param sortField
     * @param listener
     * @throws IOException
     */
    public void asyncSearchRequest(String index, String indexType, Integer from, Integer size, TermQueryBuilder termQueryBuilder, MatchQueryBuilder matchQueryBuilder, String sortField,ActionListener<SearchResponse> listener) throws IOException {
        if (null == matchQueryBuilder) {
            throw new ElasticsearchException("matchQueryBuilder is null");
        }
        getClient().searchAsync(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, termQueryBuilder, sortField, null, null).query(matchQueryBuilder)),listener);
    }
}

二、transportClient API

未完待续,近期继续整理

三、Elasticsearch 架构

1、基础概念:

1)、集群(Cluster): 包含一个或多个具有相同 cluster.name 的节点.

  • 集群内节点协同工作,共享数据,并共同分担工作负荷。
  • 由于节点是从属集群的,集群会自我重组来均匀地分发数据. 
  • cluster Name是很重要的,因为每个节点只能是群集的一部分,当该节点被设置为相同的名称时,就会自动加入群集。
  • 集群中通过选举产生一个mater节点,它将负责管理集群范畴的变更,例如创建或删除索引,添加节点到集群或从集群删除节点。master 节点无需参与文档层面的变更和搜索,这意味着仅有一个 master 节点并不会因流量增长而成为瓶颈。任意一个节点都可以成为 master 节点。我们例举的集群只有一个节点,因此它会扮演 master 节点的角色。
  • 作为用户,我们可以访问包括 master 节点在内的集群中的任一节点。每个节点都知道各个文档的位置,并能够将我们的请求直接转发到拥有我们想要的数据的节点。无论我们访问的是哪个节点,它都会控制从拥有数据的节点收集响应的过程,并返回给客户端最终的结果。这一切都是由 Elasticsearch 透明管理的

2)、节点(node):  一个节点是一个逻辑上独立的服务,可以存储数据,并参与集群的索引和搜索功能, 一个节点也有唯一的名字,群集通过节点名称进行管理和通信.

3)、索引(Index): 索引与关系型数据库实例(Database)相当。索引只是一个 逻辑命名空间,它指向一个或多个分片(shards),内部用Apache Lucene实现索引中数据的读写

4)、文档类型(Type):相当于数据库中的table概念。每个文档在ElasticSearch中都必须设定它的类型。文档类型使得同一个索引中在存储结构不同文档时,只需要依据文档类型就可以找到对应的参数映射(Mapping)信息,方便文档的存取

5)、文档(Document) :相当于数据库中的row, 是可以被索引的基本单位。例如,你可以有一个的客户文档,有一个产品文档,还有一个订单的文档。文档是以JSON格式存储的。在一个索引中,您可以存储多个的文档。请注意,虽然在一个索引中有多分文档,但这些文档的结构是一致的,并在第一次存储的时候指定, 文档属于一种 类型(type),各种各样的类型存在于一个 索引 中。你也可以通过类比传统的关系数据库得到一些大致的相似之处:

6)、Mapping: 相当于数据库中的schema,用来约束字段的类型,不过 Elasticsearch 的 mapping 可以自动根据数据创建

7)、分片(shard) :是 工作单元(worker unit) 底层的一员,用来分配集群中的数据,它只负责保存索引中所有数据的一小片。

  • 分片是一个独立的Lucene实例,并且它自身也是一个完整的搜索引擎。
  • 文档存储并且被索引在分片中,但是我们的程序并不会直接与它们通信。取而代之,它们直接与索引进行通信的
  • 把分片想象成一个数据的容器。数据被存储在分片中,然后分片又被分配在集群的节点上。当你的集群扩展或者缩小时,elasticsearch 会自动的在节点之间迁移分配分片,以便集群保持均衡
  • 分片分为 主分片(primary shard) 以及 从分片(replica shard) 两种。在你的索引中,每一个文档都属于一个主分片
  • 从分片只是主分片的一个副本,它用于提供数据的冗余副本,在硬件故障时提供数据保护,同时服务于搜索和检索这种只读请求
  • 索引中的主分片的数量在索引创建后就固定下来了,但是从分片的数量可以随时改变。
  • 一个索引默认设置了5个主分片,每个主分片有一个从分片对应

2、ES模块

1)、 Gateway: 代表ES的持久化存储方式,包含索引信息,ClusterState(集群信息),mapping,索引碎片信息,以及transaction log等

  • 对于分布式集群来说,当一个或多个节点down掉了,能够保证我们的数据不能丢,最通用的解放方案就是对失败节点的数据进行复制,通过控制复制的份数可以保证集群有很高的可用性,复制这个方案的精髓主要是保证操作的时候没有单点,对一个节点的操作会同步到其他的复制节点上去。
  • ES一个索引会拆分成多个碎片,每个碎片可以拥有一个或多个副本(创建索引的时候可以配置),这里有个例子,每个索引分成3个碎片,每个碎片有2个副本,如下:
$ curl -XPUT http://localhost:9200/twitter/ -d '
index :
    number_of_shards : 3
    number_of_replicas : 2
  • 每个操作会自动路由主碎片所在的节点,在上面执行操作,并且同步到其他复制节点,通过使用“non blocking IO”模式所有复制的操作都是并行执行的,也就是说如果你的节点的副本越多,你网络上的流量消耗也会越大。复制节点同样接受来自外面的读操作,意义就是你的复制节点越多,你的索引的可用性就越强,对搜索的可伸缩行就更好,能够承载更多的操作
  • 第一次启动的时候,它会去持久化设备读取集群的状态信息(创建的索引,配置等)然后执行应用它们(创建索引,创建mapping映射等),每一次shard节点第一次实例化加入复制组,它都会从长持久化存储里面恢复它的状态信息

2)、 Lucence Directory:  是lucene的框架服务发现以及选主 ZenDiscovery: 用来实现节点自动发现,还有Master节点选取,假如Master出现故障,其它的这个节点会自动选举,产生一个新的Master

它是Lucene存储的一个抽象,由此派生了两个类:FSDirectory和RAMDirectory,用于控制索引文件的存储位置。使用FSDirectory类,就是存储到硬盘;使用RAMDirectory类,则是存储到内存

 一个Directory对象是一份文件的清单。文件可能只在被创建的时候写一次。一旦文件被创建,它将只被读取或者删除。在读取的时候进行写入操作是允许的

3)、Discovery

  • 节点启动后先ping(这里的ping是 Elasticsearch 的一个RPC命令。如果 discovery.zen.ping.unicast.hosts 有设置,则ping设置中的host,否则尝试ping localhost 的几个端口, Elasticsearch 支持同一个主机启动多个节点)
  • Ping的response会包含该节点的基本信息以及该节点认为的master节点
  • 选举开始,先从各节点认为的master中选,规则很简单,按照id的字典序排序,取第一个
  • 如果各节点都没有认为的master,则从所有节点中选择,规则同上。这里有个限制条件就是 discovery.zen.minimum_master_nodes,如果节点数达不到最小值的限制,则循环上述过程,直到节点数足够可以开始选举
  • 最后选举结果是肯定能选举出一个master,如果只有一个local节点那就选出的是自己
  • 如果当前节点是master,则开始等待节点数达到 minimum_master_nodes,然后提供服务, 如果当前节点不是master,则尝试加入master.
  • ES支持任意数目的集群(1-N),所以不能像 Zookeeper/Etcd 那样限制节点必须是奇数,也就无法用投票的机制来选主,而是通过一个规则,只要所有的节点都遵循同样的规则,得到的信息都是对等的,选出来的主节点肯定是一致的. 但分布式系统的问题就出在信息不对等的情况,这时候很容易出现脑裂(Split-Brain)的问题,大多数解决方案就是设置一个quorum值,要求可用节点必须大于quorum(一般是超过半数节点),才能对外提供服务。而 Elasticsearch 中,这个quorum的配置就是 discovery.zen.minimum_master_nodes 。

4)、memcached

  • 通过memecached协议来访问ES的接口,支持二进制和文本两种协议.通过一个名为transport-memcached插件提供
  • Memcached命令会被映射到REST接口,并且会被同样的REST层处理,memcached命令列表包括:get/set/delete/quit

5)、River :代表es的一个数据源,也是其它存储方式(如:数据库)同步数据到es的一个方法。它是以插件方式存在的一个es服务,通过读取river中的数据并把它索引到es中,官方的river有couchDB的,RabbitMQ的,Twitter的,Wikipedia的。

本文作者:张永清,转载请注明出处:Elasticsearch Java API 很全的整理  最近发现很多转载了本博客的文章不注明出处,作者将追究法律责任

原文地址:https://www.cnblogs.com/laoqing/p/11693144.html