elasticsearch 2.3.3版本 数据迁移工具


  最近做了一个elasticsearch数据迁移的工具。  好比从from迁移到to

1    读取from的mapping,创建to的mapping  这个索引配置很重要

2 通过scroll 读取from

3 通过bulk批量存到to

4 边读边存



package com.esdatamove;

import java.net.InetAddress;
import java.util.Map;

import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;

import com.sun.star.uno.RuntimeException;

/**es 版本 number": "2.3.3       elasticsearch数据迁移
 * 获得索引信息
 *1 获得mapping  ,创建mapping
 *2 scroll遍历,边遍历边存储
 *
 *需要两个client对象,一个是from 一个to
 *需要两个index对象,一个是from 一个to
 * 需要两个type对象,一个是from 一个to
 * @author fy
 *
 */
public class getIndexInfoAndCreateAndDataMoveUtil {
	static int count = 0;

	public static void main(String[] args) throws Exception {
		

		Client client_from = null;
		Client client_to = null;

		String name_from = "netizenlabel-218";
		String node_from = "node-218";
		String ip_from = "192.168.8.218";
		String port_from = "9300";

		Settings settings_from = Settings.settingsBuilder().put("cluster.name", name_from).put("node.name", node_from)
				.put("client.transport.sniff", false).build();
		client_from = TransportClient.builder().settings(settings_from).build().addTransportAddress(
				new InetSocketTransportAddress(InetAddress.getByName(ip_from), Integer.parseInt(port_from)));

		System.out.println(client_from);

		

		String name_to = "elasticsearch";
		String node_to = "Amazon";
		String ip_to = "127.0.0.1";
		String port_to = "9300";

		Settings settings_to = Settings.settingsBuilder().put("cluster.name", name_to).put("node.name", node_to)
				.put("client.transport.sniff", false).build();
		client_to = TransportClient.builder().settings(settings_to).build().addTransportAddress(
				new InetSocketTransportAddress(InetAddress.getByName(ip_to), Integer.parseInt(port_to)));

		System.out.println(client_to);

		/
		String index_from = "hk";
		String type_from = "eml";
		String index_to = "hk";
		String type_to = "eml";
		
        moveDateFromfromToto(client_from, index_from, type_from, client_to, index_to, type_to);
		client_from.close();
		client_to.close();

	}
	
	
	/**es数据迁移唯一接口
	 * @param client_from
	 * @param index_from
	 * @param type_from
	 * @param client_to
	 * @param index_to
	 * @param type_to
	 * @return
	 */
	public static  boolean  moveDateFromfromToto(Client client_from, String index_from, String type_from,
			Client client_to, String index_to, String type_to) {
	 boolean  createInex = 	getIndexInfoAndCreate(client_from, index_from, type_from, client_to, index_to, type_to);
	 if(!createInex) {
		 throw new RuntimeException("创建to库索引失败");
	 }
	 
	  try {
		Thread.sleep(2000);
	} catch (InterruptedException e1) {
		e1.printStackTrace();
	}
		try {
			String id = getScrollId(client_from, index_from, type_from, client_to, index_to, type_to);
			searchByScrollId(client_from,id,client_to, index_to, type_to);
		} catch (Exception e) {
			e.printStackTrace();
			return false;
		}
	
		
		
		return true;
	}

	/**
	 * 从from库的索引建立to库的索引
	 * 
	 * @param client
	 * @return
	 */
	private static boolean getIndexInfoAndCreate(Client client_from, String index_from, String type_from,
			Client client_to, String index_to, String type_to) {

		ImmutableOpenMap<String, MappingMetaData> mappings = client_from.admin().cluster().prepareState().execute()
				.actionGet().getState().getMetaData().getIndices().get(index_from).getMappings();
		String mapping = mappings.get(type_from).source().toString();
		System.out.println(mapping);

		System.out.println("--------------");
		// 创建mapping

		client_to.admin().indices().prepareCreate(index_to).execute().actionGet();
		PutMappingRequest mappingnew = Requests.putMappingRequest(index_to).type(type_to)
				.source(mapping.replaceFirst(type_from, type_to));
		client_to.admin().indices().putMapping(mappingnew);

		return true;

	}

	/**
	 * 获得滚动id
	 * 
	 * @param client
	 * @return
	 */
	private static String getScrollId(Client client_from, String index_from, String type_from, Client client_to,
			String index_to, String type_to) {


		SearchResponse response = client_from.prepareSearch(index_from).setTypes(type_from).setScroll(new TimeValue(30000))
				.setSize(1000).execute().actionGet();

		String scrollId = response.getScrollId();
		long totalHits = response.getHits().getTotalHits();
		 saveManyData(response.getHits().getHits(),client_to,index_to,type_to);
		for (SearchHit hit : response.getHits()) {
			count++;
		}
		System.out.println("count = " + count);
		return scrollId;
	}

	/**
	 * 通过scrollid一次遍历
	 * 
	 * @param client
	 * @param scrollId
	 */
	private static void searchByScrollId(Client client, String scrollId,Client client_to ,String index_to,String type_to) {
		TimeValue timeValue = new TimeValue(30000);
		SearchScrollRequestBuilder searchScrollRequestBuilder;
		SearchResponse response;

		while (true) {
			searchScrollRequestBuilder = client.prepareSearchScroll(scrollId);
			// 重新设定滚动时间
			searchScrollRequestBuilder.setScroll(timeValue);
			// 请求
			response = searchScrollRequestBuilder.get();
			// 每次返回下一个批次结果 直到没有结果返回时停止 即hits数组空时
			if (response.getHits().getHits().length == 0) {
				break;
			} // if
				// 这一批次结果
			SearchHit[] searchHits = response.getHits().getHits();
			 saveManyData(searchHits,client_to,index_to,type_to);
			for (SearchHit searchHit : searchHits) {
				count++;
			}
			System.out.println(count);

			// 只有最近的滚动ID才能被使用
			scrollId = response.getScrollId();
		} // while

	}
	
	
	
	
	  /**bulk  多个存储
	 * @param searchHits
	 */
	private static void saveManyData(SearchHit[] searchHits,Client client_to,String index_to,String type_to) {
		  
		  BulkRequestBuilder bulk = client_to.prepareBulk();
		  for (SearchHit searchHit : searchHits) {
			    String id = searchHit.getId();
				Map<String, Object> source = searchHit.getSource();
			    IndexRequestBuilder builder = client_to.prepareIndex(index_to, type_to, id).setSource(source);
			    bulk.add(builder);
		}
		  
		  BulkResponse actionGet = bulk.execute().actionGet();
		  if(actionGet.hasFailures()) {
			  System.out.println(actionGet.buildFailureMessage());
		  }
		  
		  
	  }

}



原文地址:https://www.cnblogs.com/fangyuandoit/p/13713908.html