Elasticsearch Java Client连接池

按照Elasticsearch API,在Java端使用是ES服务需要创建Java Client,但是每一次连接都实例化一个client,对系统的消耗很大,即使在使用完毕之后将client close掉,由于服务器不能及时回收socket资源,极端情况下会导致服务器达到最大连接数。

为了解决上述问题并提高client利用率,可以参考使用池化技术复用client。

 1 import java.io.IOException;
 2 import java.net.InetSocketAddress;
 3 import java.util.ArrayList;
 4 import java.util.HashMap;
 5 import java.util.List;
 6 import java.util.Map;
 7 import java.util.concurrent.ConcurrentHashMap;
 8 
 9 import org.elasticsearch.client.Client;
10 import org.elasticsearch.common.settings.Settings;
11 import org.elasticsearch.common.transport.InetSocketTransportAddress;
12 import org.elasticsearch.common.xcontent.XContentBuilder;
13 import org.elasticsearch.common.xcontent.XContentFactory;
14 import org.elasticsearch.index.mapper.Mapping;
15 import org.elasticsearch.transport.client.PreBuiltTransportClient;
16 
17 import com.chinadigitalvideo.esagent.servlet.WebServiceInit;
18 
19 /**
20  * Created by tgg on 16-3-17.
21  */
22 public class ClientHelper {
23     private static String ip;
24     private static int port;
25 
26     private Settings setting;
27     private Mapping mapping;
28     
29     private Map<String, Client> clientMap = new ConcurrentHashMap<String, Client>();
30 
31     private Map<String, Integer> ips = new HashMap<String, Integer>(); // hostname port
32 
33     private String clusterName = WebServiceInit.clusterName;
34 
35     private ClientHelper(String ip,Integer port) {
36         init(ip,port);
37         //TO-DO 添加你需要的client到helper
38     }
39 
40     public static final ClientHelper getInstance(String ipConf ,Integer portConf) {
41         ip=ipConf;
42         port=portConf;
43         return ClientHolder.INSTANCE;
44     }
45 
46     private static class ClientHolder {
47         private static final ClientHelper INSTANCE = new ClientHelper(ip,port);
48     }
49 
50     /**
51      * 初始化默认的client
52      */
53     public void init(String ip,int port) {
54         
55         ips.put(ip, port);
56         setting =Settings.builder()
57                 .put("client.transport.sniff",true)
58                 .put("cluster.name",clusterName).build();
59         addClient(setting, getAllAddress(ips));
60     }
61 
62     /**
63      * 获得所有的地址端口
64      *
65      * @return
66      */
67     public List<InetSocketTransportAddress> getAllAddress(Map<String, Integer> ips) {
68         List<InetSocketTransportAddress> addressList = new ArrayList<InetSocketTransportAddress>();
69         for (String ip : ips.keySet()) {
70             addressList.add(new InetSocketTransportAddress(new InetSocketAddress(ip, ips.get(ip))));
71         }
72         return addressList;
73     }
74 
75     public Client getClient() {
76         return getClient(clusterName);
77     }
78 
79     public Client getClient(String clusterName) {
80         return clientMap.get(clusterName);//通过集群名称得到一个Client
81     }
82 
83     public void addClient(Settings setting, List<InetSocketTransportAddress> transportAddress) {
84         Client client = new PreBuiltTransportClient(setting)
85                 .addTransportAddresses(transportAddress.toArray(new InetSocketTransportAddress[transportAddress.size()]));
86         
87         clientMap.put(setting.get("cluster.name"), client);
88     }
89 }
原文地址:https://www.cnblogs.com/DreamDrive/p/6710081.html