Elasticsearch 5.4.3实战--Java API调用:批量写入数据

这个其实比较简单,直接上代码.

注意部分逻辑可以换成你自己的逻辑

  1 package com.cs99lzzs.elasticsearch.service.imp;
  2 
  3 import java.sql.Timestamp;
  4 import java.text.DecimalFormat;
  5 import java.text.SimpleDateFormat;
  6 import java.util.ArrayList;
  7 import java.util.Date;
  8 import java.util.HashMap;
  9 import java.util.List;
 10 import java.util.Map;
 11 
 12 import javax.annotation.Resource;
 13 
 14 import org.apache.commons.lang.StringUtils;
 15 import org.apache.log4j.Logger;
 16 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 17 import org.elasticsearch.action.bulk.BulkResponse;
 18 import org.elasticsearch.client.Client;
 19 import org.springframework.beans.factory.annotation.Value;
 20 import org.springframework.stereotype.Service;
 21 
 22 
 23 @Service("productIndexService")
 24 public class ProductIndexServiceImp implements ProductIndexService {
 25 
 26     @Resource
 27     private TradeService               tradeService;
 28 
 29     @Resource
 30     private ProductService             productService;
 31     
 32     @Resource
 33     private ShopService                shopService;
 34     
 35     @Resource(name="esClient")
 36     Client esClient;
 37 
 38     @Value("${elasticsearch.index}")
 39     private String CLUSTER_INDEX;
 40 
 41     @Value("${elasticsearch.type}")
 42     private String CLUSTER_TYPE;
 43     
 44 
 45     private static final int           _DEFAULT_PAGE_SIZE = 50;
 46 
 47     private static final Logger        logger             = Logger.getLogger(ProductIndexServiceImp.class);
 48 
 49     @Override
 50     public void createIndex(Timestamp updateTime) {
 51         DecimalFormat decimalFormat = new DecimalFormat("#0.0");
 52         int page = 1;
 53         List<SKU> skus = null;
 54         
 55         long startTime = System.currentTimeMillis();
 56         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd  HH:mm:ss");
 57         logger.info("elasticsearch索引构建开始,开始时间:" + sdf.format(new Date()));
 58         
 59         while (true) {
 60             //获取当页的sku列表
 61             StringBuffer sb = new StringBuffer(1000);
 62             skus = productService.getSkus(updateTime, page, _DEFAULT_PAGE_SIZE);
 63             if(skus == null|| skus.isEmpty()){
 64                 break;
 65             }
 66             //批量写入
 67             BulkRequestBuilder bulkRequest = esClient.prepareBulk();
 68             
 69             for(SKU sku:skus){
 70                 try {
 71                     if(sku:skus == null){
 72                         logger.error("elasticsearch: skuId=" + sku.getId() + "对应的SPU或者Brand或者Cate对象为空,无需构建索引");
 73                         continue;
 74                     }
 75                     Map<String, Object> source = putToMap(sku);
 76                     bulkRequest.add(esClient
 77                             .prepareIndex(CLUSTER_INDEX, CLUSTER_TYPE, "" + elasticseachSku.getId())
 78                             .setSource(source));
 79                     sb.append(sku.getId()).append(",");
 80                 } catch (Exception e) {
 81                     logger.error("更新elasticsearch索引出现异常, skuId=" + sku.getId() + ",exception info is:" 
 82                             + e.getMessage() + ", e.Cause is " + e.getCause());
 83                 }
 84             } 
 85             
 86             BulkResponse response = bulkRequest.execute().actionGet();
 87             if (response == null || response.hasFailures()) {
 88                 logger.error("elasticsearch 批量构建索引失败, failure message is: " + response.buildFailureMessage());
 89             } else {
 90                 logger.info("elasticsearch 批量构建索引成功, skuId list is : " + sb.toString());
 91             }
 92             page ++;
 93         }
 94         logger.info("elasticsearch本次索引构建时间:" + (System.currentTimeMillis() - startTime)/1000 + "秒。");
 95         logger.info("elasticsearch索引构建任务结束,结束时间:" + sdf.format(new Date()));
 96     }
 97 
 98     /**
 99      * @param elasticseachSku
100      * @return
101      */
102     private Map<String, Object> putToMap(Sku elasticseachSku) {
103         
104         Map<String, Object> source = new HashMap<String, Object>();
105         source.put("brandZhName", elasticseachSku.getBrandZhName());
106         source.put("brandEnName", elasticseachSku.getBrandEnName());
107         source.put("brandAliases", elasticseachSku.getBrandAliases());
108         source.put("aliases", elasticseachSku.getAliases());
109         source.put("zhName", elasticseachSku.getZhName());
110         source.put("enName", elasticseachSku.getEnName());
111         source.put("brandZhName", elasticseachSku.getBrandZhName());
112         
113         /* suggester */
114         List<String> nameList = new ArrayList<String>();
115         if (StringUtils.isNotEmpty(elasticseachSku.getZhName())) {
116             nameList.add(elasticseachSku.getZhName());
117         }
118         if (StringUtils.isNotEmpty(elasticseachSku.getBrandZhName())) {
119             nameList.add(elasticseachSku.getBrandZhName());
120         }
121         if (StringUtils.isNotEmpty(elasticseachSku.getAliases())) {
122             nameList.add(elasticseachSku.getAliases());
123         }
124         if (StringUtils.isNotEmpty(elasticseachSku.getEnName())) {
125             nameList.add(elasticseachSku.getEnName());
126         }
127         if (StringUtils.isNotEmpty(elasticseachSku.getBrandEnName())) {
128             nameList.add(elasticseachSku.getBrandEnName());
129         }
130         if (StringUtils.isNotEmpty(elasticseachSku.getBrandAliases())) {
131             nameList.add(elasticseachSku.getBrandAliases());
132         }
133         if (nameList.size() > 0) {
134             source.put("suggestName", nameList);
135         }
136         
137         return source;
138     }
139 }
原文地址:https://www.cnblogs.com/cs99lzzs/p/7212474.html