Elasticsearch Java High Level REST Client(Bulk API)

https://segmentfault.com/a/1190000019792172

针对ES 批量写入, 提供了3种方式,在 high-rest-client 中分别是 bulk  bulkAsync  bulkProcessor 3种方式。

BulkProcessor

BulkProcessor是一个线程安全的批量处理类,允许方便地设置刷新 一个新的批量请求 (基于消息大小,时间,消息数量), 容易控制并发批量的数量, 请求允许并行执行。

此processer的含义为如果消息数量到达20000 或者消息大小到大10M 或者时间达到300s 任意条件满足,客户端就会把当前的数据提交到服务端处理。减少网路开销, 效率很高。

/**
     * 实例化 BulkProcessor
     * @param client
     * @return
     */
    public static BulkProcessor bulkProcessor(RestHighLevelClient client) {
        BulkProcessor.Listener listener = new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId, BulkRequest request) {
                //bulk请求前执行
                int numberOfActions = request.numberOfActions();
                logger.info("ES Executing bulk [{}] with {} request ", executionId, numberOfActions);
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                //bulk请求后执行
                if (response.hasFailures()) {
                    logger.error("ES Bulk [{}] executed with failures ", +executionId);
                } else {
                    logger.info("ES Bulk [{}] completed in {} milliseconds ", executionId, response.getTook().getMillis());
                }
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                // 失败后执行
                logger.error("ES Bulk Failed to execute bulk ", failure);
            }
        };

        BulkProcessor bulkProcessor = BulkProcessor.builder(
                (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
                listener)
                //  达到刷新的条数
                .setBulkActions(20000)
                // 达到 刷新的大小
                .setBulkSize(new ByteSizeValue(10, ByteSizeUnit.MB))
                // 固定刷新的时间频率
                .setFlushInterval(TimeValue.timeValueSeconds(300))
                //并发线程数
                .setConcurrentRequests(5)
                // 重试补偿策略
                .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)).build();
        return bulkProcessor;
    }

awaitClose()方法可以用来等待,直到所有的请求都被处理完毕或者指定的等待时间过去.

boolean b = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);

如果所有bulk请求都已经完成,则该方法返回true,如果所有bulk请求完成之前的等待时间已经过去,则返回false.
close()方法可用于立即关闭bulkProcessor.

这两种方法都在关闭处理器之前刷新添加到处理器的请求,并且禁止向处理器添加任何新请求.


使用方法:

Map<String, Object> m = new HashMap<String, Object>(16);
        m.put("id", "544345");
        m.put("area_id", 1);
        m.put("camera_id", 1);
        m.put("log_time", new Date().toString());
        m.put("age", 1);
        BulkProcessor bulkProcessor = EsClientUtil.getBulkProcessor();
        IndexRequest one = new IndexRequest("posts").id("1").source(m);
        bulkProcessor.add(one);
原文地址:https://www.cnblogs.com/chong-zuo3322/p/13435101.html