Elasticsearch批量插入数据

zh转自:https://blog.csdn.net/qq_34382260/article/details/80483881

1. 创建本地TransportClient:
    static TransportClient client = null;

private static Logger logger = LoggerFactory.getLogger(ESUtils.class);

public final static String HOST = "localhost";

public final static int PORT = 9300;

static Settings settings = Settings.builder().put("cluster.name","elasticsearch")
.put("client.transport.sniff",true)
.put("transport.type","netty3")
.put("http.type", "netty3")
.build();
static {
getConnection();
}
@SuppressWarnings({ "resource", "unchecked" })
public static void getConnection() {
try {
client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new TransportAddress(InetAddress.getByName(HOST), PORT));
} catch (UnknownHostException e) {
e.printStackTrace();
}
logger.debug("Elasticsearch connect info:" + client.toString());
}
public void closeClient() {
if(client != null) {
client.close();
}
}
public static void main(String[] args) {
System.out.println(client);
}
2. 创建索引:
    public void addIndexByName(String index, String type, TransportClient client) throws IOException {
XContentBuilder mapper = XContentFactory.jsonBuilder().startObject().startObject("settings")
.startObject("index")
.field("number_of_shards",3).field("number_of_replicas",0)
.endObject().endObject()
.startObject("mappings").startObject(type)
.field("dynamic","false")
.startObject("_all").field("enabled",false)
.endObject()
.startObject("properties")
.startObject("mnc").field("type","keyword").field("ignore_above",128).endObject()
.startObject("lac").field("type","keyword").field("ignore_above",128).endObject()
.startObject("ci").field("type","keyword").field("ignore_above",128).endObject()
.startObject("latitude").field("type","keyword").field("ignore_above",128).endObject()
.startObject("longtitude").field("type","keyword").field("ignore_above",128).endObject()
.startObject("uli").field("type","keyword").field("ignore_above",128).endObject()
.startObject("area_code").field("type","keyword").field("ignore_above",128).endObject()
.startObject("validity").field("type","keyword").field("ignore_above",128).endObject()
.startObject("address").field("type","keyword").field("ignore_above",128).endObject()
.startObject("province").field("type","keyword").field("ignore_above",128).endObject()
.startObject("city").field("type","keyword").field("ignore_above",128).endObject()
.startObject("district").field("type","keyword").field("ignore_above",128).endObject()
.startObject("township").field("type","keyword").field("ignore_above",128).endObject()
.startObject("acc").field("type","keyword").field("ignore_above",128).endObject()
.endObject().endObject().endObject().endObject();
System.out.println(mapper);
CreateIndexRequestBuilder cirb = client.admin().indices().prepareCreate(index).setSource(mapper);
CreateIndexResponse reponse = cirb.execute().actionGet();
mapper.close();
if(reponse.isAcknowledged()) {
System.out.println("创建索引成功");
}else {
System.out.println("创建索引失败");
}
}
3. Model实体:
public class Cellinfo {

private String mnc;
private String lac;
private String ci;
private String latitude;
private String longtitude;
private String uli;
private String area_code;
private String validity;
private String address;
private String province;
private String city;
private String district;
private String township;
private String acc;
public Cellinfo() {
super();
// TODO Auto-generated constructor stub
}
public String getMnc() {
return mnc;
}
public void setMnc(String mnc) {
this.mnc = mnc;
}
...
public String getAcc() {
return acc;
}
public void setAcc(String acc) {
this.acc = acc;
}

    }

4. 批量插入数据:
    private void save(List<Cellinfo> list) {
BulkRequestBuilder bulkRequest = ESUtils.client.prepareBulk();
int count = 0;
for(Cellinfo cell : list) {
JSONObject json = JSONObject.fromObject(cell);
bulkRequest.add(ESUtils.client.prepareIndex("cellinfo", "nx_type").setSource(json));

if(++count%1000 == 0) {
bulkRequest.execute().actionGet();
bulkRequest = ESUtils.client.prepareBulk();
System.out.println("插入-->"+1000+"--条!");
}

}
bulkRequest.execute().actionGet();
System.out.println("插入完毕,共计:"+count+"条...");

}
4. maven依赖:
<!-- https://mvnrepository.com/artifact/org.elasticsearch.client/transport -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>6.2.4</version>
</dependency>
<!-- 日志依赖 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-core -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.14</version>
</dependency>
<dependency>
    <groupId>net.sf.json-lib</groupId>
    <artifactId>json-lib</artifactId>
    <version>2.4</version>
    <classifier>jdk15</classifier>
</dependency>
---------------------
作者:一品_人生
来源:CSDN
原文:https://blog.csdn.net/qq_34382260/article/details/80483881
版权声明:本文为博主原创文章,转载请附上博文链接!

原文地址:https://www.cnblogs.com/sqw8080/p/10943934.html