ElasticSearch Document API

 

 

删除索引库

 可以看到id为1的索引库不见了

 

 这里要修改下配置文件

 

 slave1,slave2也做同样的操作,在这里就不多赘述了。

这个时候记得要重启elasticseach才能生效,怎么重启这里就不多说了

运行程序

 

这个函数的意思是如果文件存在就更新,不存在就创建

第一次执行下来

 

 第二次执行(因为文件已经存在了,所以就把里面的内容更新)

 这个是批量操作,来获取多条索引

 添加两个删除一个

 1 public void test13() throws IOException, InterruptedException,
 2             ExecutionException {
 3         
 4         BulkProcessor bulkProcessor = BulkProcessor.builder(
 5                 client,  
 6                 new BulkProcessor.Listener() {
 7                     
 8                     public void beforeBulk(long executionId, BulkRequest request) {
 9                         // TODO Auto-generated method stub
10                         System.out.println(request.numberOfActions());
11                     }
12                     
13                     public void afterBulk(long executionId, BulkRequest request,
14                             Throwable failure) {
15                         // TODO Auto-generated method stub
16                         System.out.println(failure.getMessage());
17                     }
18                     
19                     public void afterBulk(long executionId, BulkRequest request,
20                             BulkResponse response) {
21                         // TODO Auto-generated method stub
22                         System.out.println(response.hasFailures());
23                     }
24                 })
25                 .setBulkActions(1000) // 每个批次的最大数量
26                 .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))// 每个批次的最大字节数
27                 .setFlushInterval(TimeValue.timeValueSeconds(5))// 每批提交时间间隔
28                 .setConcurrentRequests(1) //设置多少个并发处理线程
29                 //可以允许用户自定义当一个或者多个bulk请求失败后,该执行如何操作
30                 .setBackoffPolicy(
31                     BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) 
32                 .build();
33         String json = "{" +
34                 ""user":"kimchy"," +
35                 ""postDate":"2013-01-30"," +
36                 ""message":"trying out Elasticsearch"" +
37             "}";
38         
39         for (int i = 0; i < 1000; i++) {
40             bulkProcessor.add(new IndexRequest("djt6", "user").source(json));
41         }
42         //阻塞至所有的请求线程处理完毕后,断开连接资源
43         bulkProcessor.awaitClose(3, TimeUnit.MINUTES);
44         client.close();
45     }
46     /**
47      * SearchType使用方式
48      * @throws Exception
49      */
50     @Test
51     public void test14() throws Exception {
52         SearchResponse response = client.prepareSearch("djt")  
53                 .setTypes("user")  
54                 //.setSearchType(SearchType.DFS_QUERY_THEN_FETCH) 
55                 .setSearchType(SearchType.QUERY_AND_FETCH)
56                 .execute()  
57                 .actionGet();  
58         SearchHits hits = response.getHits();
59         System.out.println(hits.getTotalHits());
60     }
61 }

这个是批量插入

这里有1000个,我就不数了

参考代码ESTestDocumentAPI.java

  1 package com.dajiangtai.djt_spider.elasticsearch;
  2 
  3 import java.io.IOException;
  4 import java.net.InetAddress;
  5 import java.net.UnknownHostException;
  6 import java.util.Date;
  7 import java.util.HashMap;
  8 import java.util.Iterator;
  9 import java.util.List;
 10 import java.util.Map;
 11 import java.util.concurrent.ExecutionException;
 12 import java.util.concurrent.TimeUnit;
 13 import static org.elasticsearch.node.NodeBuilder.*;
 14 import static org.elasticsearch.common.xcontent.XContentFactory.*;
 15 import org.elasticsearch.action.bulk.BackoffPolicy;
 16 import org.elasticsearch.action.bulk.BulkProcessor;
 17 import org.elasticsearch.common.unit.ByteSizeUnit;
 18 import org.elasticsearch.common.unit.ByteSizeValue;
 19 import org.elasticsearch.common.unit.TimeValue;
 20 import org.codehaus.jackson.map.ObjectMapper;
 21 import org.elasticsearch.action.bulk.BulkItemResponse;
 22 import org.elasticsearch.action.bulk.BulkRequest;
 23 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 24 import org.elasticsearch.action.bulk.BulkResponse;
 25 import org.elasticsearch.action.delete.DeleteRequestBuilder;
 26 import org.elasticsearch.action.delete.DeleteResponse;
 27 import org.elasticsearch.action.get.GetResponse;
 28 import org.elasticsearch.action.get.MultiGetItemResponse;
 29 import org.elasticsearch.action.get.MultiGetResponse;
 30 import org.elasticsearch.action.index.IndexRequest;
 31 import org.elasticsearch.action.index.IndexRequestBuilder;
 32 import org.elasticsearch.action.index.IndexResponse;
 33 import org.elasticsearch.action.search.SearchResponse;
 34 import org.elasticsearch.action.search.SearchType;
 35 import org.elasticsearch.action.update.UpdateRequest;
 36 import org.elasticsearch.client.Client;
 37 import org.elasticsearch.client.transport.TransportClient;
 38 import org.elasticsearch.cluster.node.DiscoveryNode;
 39 import org.elasticsearch.common.settings.Settings;
 40 import org.elasticsearch.common.transport.InetSocketTransportAddress;
 41 import org.elasticsearch.index.query.QueryBuilders;
 42 import org.elasticsearch.node.Node;
 43 import org.elasticsearch.script.Script;
 44 import org.elasticsearch.script.ScriptService;
 45 import org.elasticsearch.search.SearchHits;
 46 import org.junit.Before;
 47 import org.junit.Test;
 48 
 49 /**
 50  * Document API 操作
 51  * 
 52  * @author 大讲台
 53  * 
 54  */
 55 public class ESTestDocumentAPI {
 56     private TransportClient client;
 57 
 58     @Before
 59     public void test0() throws UnknownHostException {
 60 
 61         // 开启client.transport.sniff功能,探测集群所有节点
 62         Settings settings = Settings.settingsBuilder()
 63                 .put("cluster.name", "escluster")
 64                 .put("client.transport.sniff", true).build();
 65         // on startup
 66         // 获取TransportClient
 67         client = TransportClient
 68                 .builder()
 69                 .settings(settings)
 70                 .build()
 71                 .addTransportAddress(
 72                         new InetSocketTransportAddress(InetAddress
 73                                 .getByName("master"), 9300))
 74                 .addTransportAddress(
 75                         new InetSocketTransportAddress(InetAddress
 76                                 .getByName("slave1"), 9300))
 77                 .addTransportAddress(
 78                         new InetSocketTransportAddress(InetAddress
 79                                 .getByName("slave2"), 9300));
 80     }
 81 
 82     /**
 83      * 创建索引:use ElasticSearch helpers
 84      * 
 85      * @throws IOException
 86      */
 87     @Test
 88     public void test1() throws IOException {
 89         IndexResponse response = client
 90                 .prepareIndex("twitter", "tweet", "1")
 91                 .setSource(
 92                         jsonBuilder().startObject().field("user", "kimchy")
 93                                 .field("postDate", new Date())
 94                                 .field("message", "trying out Elasticsearch")
 95                                 .endObject()).get();
 96         System.out.println(response.getId());
 97         client.close();
 98     }
 99 
100     /**
101      * 创建索引:do it yourself
102      * 
103      * @throws IOException
104      */
105     @Test
106     public void test2() throws IOException {
107         String json = "{" + ""user":"kimchy","
108                 + ""postDate":"2013-01-30","
109                 + ""message":"trying out Elasticsearch"" + "}";
110         IndexResponse response = client.prepareIndex("twitter", "tweet")
111                 .setSource(json).get();
112         System.out.println(response.getId());
113         client.close();
114     }
115 
116     /**
117      * 创建索引:use map
118      * 
119      * @throws IOException
120      */
121     @Test
122     public void test3() throws IOException {
123         Map<String, Object> json = new HashMap<String, Object>();
124         json.put("user", "kimchy");
125         json.put("postDate", new Date());
126         json.put("message", "trying out Elasticsearch");
127 
128         IndexResponse response = client.prepareIndex("twitter", "tweet")
129                 .setSource(json).get();
130         System.out.println(response.getId());
131         client.close();
132     }
133 
134     /**
135      * 创建索引:serialize your beans
136      * 
137      * @throws IOException
138      */
139     @Test
140     public void test4() throws IOException {
141         User user = new User();
142         user.setUser("kimchy");
143         user.setPostDate(new Date());
144         user.setMessage("trying out Elasticsearch");
145 
146         // instance a json mapper
147         ObjectMapper mapper = new ObjectMapper(); // create once, reuse
148 
149         // generate json
150         byte[] json = mapper.writeValueAsBytes(user);
151 
152         IndexResponse response = client.prepareIndex("twitter", "tweet")
153                 .setSource(json).get();
154         System.out.println(response.getId());
155         client.close();
156     }
157 
158     /**
159      * 查询索引:get
160      * 
161      * @throws IOException
162      */
163     @Test
164     public void test5() throws IOException {
165         GetResponse response = client.prepareGet("twitter", "tweet", "1").get();
166         System.out.println(response.getSourceAsString());
167 
168         client.close();
169     }
170 
171     /**
172      * 删除索引:delete
173      * 
174      * @throws IOException
175      */
176     @Test
177     public void test6() throws IOException {
178         client.prepareDelete("twitter", "tweet", "1").get();
179         client.close();
180     }
181 
182     /**
183      * 更新索引:Update API-UpdateRequest
184      * 
185      * @throws IOException
186      * @throws ExecutionException
187      * @throws InterruptedException
188      */
189     @Test
190     public void test7() throws IOException, InterruptedException,
191             ExecutionException {
192         UpdateRequest updateRequest = new UpdateRequest();
193         updateRequest.index("twitter");
194         updateRequest.type("tweet");
195         updateRequest.id("AVyi3OORot7zkId708s8");
196         updateRequest.doc(jsonBuilder().startObject().field("gender", "male")
197                 .endObject());
198         client.update(updateRequest).get();
199         System.out.println(updateRequest.version());
200         client.close();
201     }
202 
203     /**
204      * 更新索引:Update API-prepareUpdate()-doc
205      * 
206      * @throws IOException
207      * @throws ExecutionException
208      * @throws InterruptedException
209      */
210     @Test
211     public void test8() throws IOException, InterruptedException,
212             ExecutionException {
213         client.prepareUpdate("twitter", "tweet", "AVyikSKIot7zkId708s6")
214                 .setDoc(jsonBuilder().startObject().field("gender", "female")
215                         .endObject()).get();
216         client.close();
217     }
218 
219     /**
220      * 更新索引:Update API-prepareUpdate()-script
221      * 需要开启:script.engine.groovy.inline.update: on
222      * 
223      * @throws IOException
224      * @throws ExecutionException
225      * @throws InterruptedException
226      */
227     @Test
228     public void test9() throws IOException, InterruptedException,
229             ExecutionException {
230         client.prepareUpdate("twitter", "tweet", "AVyi4oZfot7zkId708s-")
231                 .setScript(
232                         new Script("ctx._source.gender = "female"",
233                                 ScriptService.ScriptType.INLINE, null, null))
234                 .get();
235         client.close();
236     }
237 
238     /**
239      * 更新索引:Update API-UpdateRequest-upsert
240      * 
241      * @throws IOException
242      * @throws ExecutionException
243      * @throws InterruptedException
244      */
245     @Test
246     public void test10() throws IOException, InterruptedException,
247             ExecutionException {
248         IndexRequest indexRequest = new IndexRequest("twitter", "tweet", "1")
249                                         .source(jsonBuilder()
250                                         .startObject()
251                                         .field("name", "Joe Smith")
252                                         .field("gender", "male")
253                                         .endObject());
254         UpdateRequest updateRequest = new UpdateRequest("twitter", "tweet", "1")
255                                         .doc(jsonBuilder()
256                                         .startObject()
257                                         .field("gender", "female")
258                                         .endObject()).upsert(indexRequest);
259         client.update(updateRequest).get();
260         client.close();
261     }
262     
263     /**
264      * 批量查询索引:Multi Get API
265      * 
266      * @throws IOException
267      * @throws ExecutionException
268      * @throws InterruptedException
269      */
270     @Test
271     public void test11() throws IOException, InterruptedException,
272             ExecutionException {
273         MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
274                 .add("twitter", "tweet", "1")           
275                 .add("twitter", "tweet", "AVyi4oZfot7zkId708s-", "AVyi3OORot7zkId708s8", "AVyikSKIot7zkId708s6") 
276                 .add("djt2", "user", "1")          
277                 .get();
278 
279             for (MultiGetItemResponse itemResponse : multiGetItemResponses) { 
280                 GetResponse response = itemResponse.getResponse();
281                 if (response.isExists()) {                      
282                     String json = response.getSourceAsString(); 
283                     System.out.println(json);
284                 }
285             }
286         client.close();
287     }
288     
289     /**
290      * 批量操作索引:Bulk API
291      * 
292      * @throws IOException
293      * @throws ExecutionException
294      * @throws InterruptedException
295      */
296     @Test
297     public void test12() throws IOException, InterruptedException,
298             ExecutionException {
299         BulkRequestBuilder bulkRequest = client.prepareBulk();
300 
301         // either use client#prepare, or use Requests# to directly build index/delete requests
302         bulkRequest.add(client.prepareIndex("twitter", "tweet", "3")
303                 .setSource(jsonBuilder()
304                             .startObject()
305                                 .field("user", "kimchy")
306                                 .field("postDate", new Date())
307                                 .field("message", "trying out Elasticsearch")
308                             .endObject()
309                           )
310                 );
311 
312         bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
313                 .setSource(jsonBuilder()
314                             .startObject()
315                                 .field("user", "kimchy")
316                                 .field("postDate", new Date())
317                                 .field("message", "another post")
318                             .endObject()
319                           )
320                 );
321         DeleteRequestBuilder prepareDelete = client.prepareDelete("twitter", "tweet", "AVyikSKIot7zkId708s6");
322         bulkRequest.add(prepareDelete);
323         
324         
325         BulkResponse bulkResponse = bulkRequest.get();
326         //批量操作:其中一个操作失败不影响其他操作成功执行
327         if (bulkResponse.hasFailures()) {
328             // process failures by iterating through each bulk response item
329             BulkItemResponse[] items = bulkResponse.getItems();
330             for (BulkItemResponse bulkItemResponse : items) {
331                 System.out.println(bulkItemResponse.getFailureMessage());
332             }
333         }else{
334             System.out.println("bulk process success!");
335         }
336         client.close();
337     }
338     
339     /**
340      * 批量操作索引:Using Bulk Processor
341      * 优化:先关闭副本,再添加副本,提升效率
342      * @throws IOException
343      * @throws ExecutionException
344      * @throws InterruptedException
345      */
346     @Test
347     public void test13() throws IOException, InterruptedException,
348             ExecutionException {
349         
350         BulkProcessor bulkProcessor = BulkProcessor.builder(
351                 client,  
352                 new BulkProcessor.Listener() {
353                     
354                     public void beforeBulk(long executionId, BulkRequest request) {
355                         // TODO Auto-generated method stub
356                         System.out.println(request.numberOfActions());
357                     }
358                     
359                     public void afterBulk(long executionId, BulkRequest request,
360                             Throwable failure) {
361                         // TODO Auto-generated method stub
362                         System.out.println(failure.getMessage());
363                     }
364                     
365                     public void afterBulk(long executionId, BulkRequest request,
366                             BulkResponse response) {
367                         // TODO Auto-generated method stub
368                         System.out.println(response.hasFailures());
369                     }
370                 })
371                 .setBulkActions(1000) // 每个批次的最大数量
372                 .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))// 每个批次的最大字节数
373                 .setFlushInterval(TimeValue.timeValueSeconds(5))// 每批提交时间间隔
374                 .setConcurrentRequests(1) //设置多少个并发处理线程
375                 //可以允许用户自定义当一个或者多个bulk请求失败后,该执行如何操作
376                 .setBackoffPolicy(
377                     BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) 
378                 .build();
379         String json = "{" +
380                 ""user":"kimchy"," +
381                 ""postDate":"2013-01-30"," +
382                 ""message":"trying out Elasticsearch"" +
383             "}";
384         
385         for (int i = 0; i < 1000; i++) {
386             bulkProcessor.add(new IndexRequest("djt6", "user").source(json));
387         }
388         //阻塞至所有的请求线程处理完毕后,断开连接资源
389         bulkProcessor.awaitClose(3, TimeUnit.MINUTES);
390         client.close();
391     }
392     /**
393      * SearchType使用方式
394      * @throws Exception
395      */
396     @Test
397     public void test14() throws Exception {
398         SearchResponse response = client.prepareSearch("djt")  
399                 .setTypes("user")  
400                 //.setSearchType(SearchType.DFS_QUERY_THEN_FETCH) 
401                 .setSearchType(SearchType.QUERY_AND_FETCH)
402                 .execute()  
403                 .actionGet();  
404         SearchHits hits = response.getHits();
405         System.out.println(hits.getTotalHits());
406     }
407 }
原文地址:https://www.cnblogs.com/braveym/p/7011119.html