es分页查询 scroll

函数:

public long queryByScroll(QueryBuilder query, int size, HitFunction function) {
        try {
            String[] includeFields = new String[]{"venderId"};
            SearchRequestBuilder builder = client
                    .prepareSearch(INDEX)
                    .setTypes(TYPE)
                    // .addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC) // Scroll requests have optimizations that make them faster when the sort order is _doc. If you want to iterate over all documents regardless of the order, this is the most efficient option:
                    .setScroll(TimeValue.timeValueSeconds(DEFAULT_TIME_VALUE_IN_SECONDS))
                    .setQuery(query)
                    .setFetchSource(includeFields, null)
                    .setSize(size); // max of {size} hits will be returned for each scroll


            SearchResponse response = builder.get();
            long totalHits = response.getHits().getTotalHits();

            log.info("get from es, size:{}", totalHits);
            // Scroll until no hits are returned
            AtomicInteger counter = new AtomicInteger(0);
            scroll:
            do {
                log.info("enter while, response:{}, totalSize:{}",  JSON.toJSONString(response), response.getHits().getTotalHits());
                for (SearchHit hit : response.getHits().getHits()) {
                    if (!function.apply(counter.getAndIncrement(), hit)) {
                        if (log.isInfoEnabled()) {
                            log.info("index scroll break at index: {}, id: {}", counter.get(), hit.getId());
                        }
                        break scroll;
                    }
                }
                response = client
                        .prepareSearchScroll(response.getScrollId())
                        .setScroll(TimeValue.timeValueSeconds(DEFAULT_TIME_VALUE_IN_SECONDS))
                        .execute()
                        .actionGet();
                log.info("before exit while, response:{}",  JSON.toJSONString(response));
            } while (response.getHits().getHits().length != 0); // Zero hits mark the end of the scroll and the while loop.

            // Search context are automatically removed when the scroll timeout has been exceeded. However keeping scrolls open has a cost, as discussed in the previous section so scrolls should be explicitly cleared as soon as the scroll is not being used anymore using the clear-scroll API:
            ClearScrollResponse clearScrollResponse = client
                    .prepareClearScroll()
                    .addScrollId(response.getScrollId())
                    .get();

            if (log.isInfoEnabled()) {
                log.info("Clear scroll response:{}", clearScrollResponse.isSucceeded());
            }
            return totalHits;
        } catch (Exception e) {
            log.error("queryByScroll error", e);
        }
        return 0;
    }

 调用处:

venderCrowdEsDao.queryByScroll(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("venderMode", "POP")),
                200,  (index, hit) -> {
                    //获取venderId,通过venderId调用外呼获取电话、邮箱,首先获取运营负责人的电话,若为空,则使用店铺负责人的电话,若都为空则不存储。
                    Long venderId = Long.valueOf(String.valueOf(hit.getSourceAsMap().get("venderId")));
                    log.info("function applyL:{}", venderId);
                    VenderContactPO contactResult = venderServiceRpc.getContactsInfoByVenderId(venderId);
                    VenderContactPO contactPO = new VenderContactPO();//最终要存储到数据表中的对象
                    //加密,插入vender_contact_info表
                    if(contactResult != null){
                        contactPO.setVenderId(contactResult.getVenderId());
                        contactPO.setEmail(contactResult.getEmail());
                        contactPO.setPhoneNum(contactResult.getPhoneNum());
                    }else {
                        contactPO.setEmail("");
                        contactPO.setPhoneNum("");
                        contactPO.setVenderId(venderId);
                    }
                    contactPO.setShopName(venderServiceRpc.getShopNameByVenderId(venderId));

                    venderContactService.insertIfNotPresent(contactPO);
                    return true;
                })

  

原文地址:https://www.cnblogs.com/zhima-hu/p/14042608.html