.net core 如何向elasticsearch中批量插入数据--2

      上一篇我们说了如何向elasticsearch中创建索引和插入数据(https://www.cnblogs.com/zpy1993-09/p/13380197.html

      今天在做一个延伸,上次说的是单行插入数据,如果数据量小的话还可以,如果大的话,那单行 插入就不能满足需求了,必须要批量插入。如果做批量插入,一般都是做个缓存,到一定时间,或者一定条数然后批量插入一次,如果按一定时间那就是定时缓存,比如数据量很大是,10分钟把缓存的数据批量插入。如果按条数那就是如果缓存1000条插入一次。具体情况要根据数据需求及时性了。

     今天我们就说一下如何按一定条数批量插入。那么缓存应该怎么做呢,方式很多。你可以见一个消息队列,或者栈,字典,数组都行。

    为了方便理解,我今天就用大家最常见的List数组来做一个缓存。

    由于我们是通过API接口往elasticsearch中批量送数据的,所以要做一个List数组的缓存,首先要考虑List的实例不能被覆盖,也就是说每次调用一次接口插入数据,List数组的实例都要是同一个。不然每调用一个接口都要new 一个新的List对象,那还怎么做缓存 。所以我们首先要保证List实例一直唯一。所以我们都会想到单利,那就是单例模式。

   由于接口一般都是通过json接收的,我们要解析json最好做一个模型的映射。

  假设通过接口插入elasticsearch中的数据是学生信息。首先我们建一个简单的学生类。

public class Student
{
  public string name{get;set;}//姓名
  public  int     number{get;set;}//学号
  public  int     age{get;set;}//年龄
}

那我们就要定义一个Student的List数组的单例:

  public class ListExample<Student>
    {
        private volatile static List<Student> instance = null;
        private static readonly object _lock = new object();
        public static List<Student> GetInstance()
        {
            if (instance == null)
            {
                lock (_lock)
                {
                    if (instance == null)
                    {
                        instance = new List<Student>();
                    }
                }
            }
            return instance;
        }
    }

这样是不是就能保证List对象实例的唯一了,但是有些人会问了,这也太麻烦了,如果多了怎么办,我总不能每一个接口我都要做一个单例吧,那岂不是太苦逼了。

   的确是这样,所以,我们要做一下优化,把这个单例方法做成通用模式,支持任何对象的实例。所以,我们自然而然的就想到泛型了。

  public class ListExample<T>
    {
        private volatile static List<T> instance = null;
        private static readonly object _lock = new object();
        public static List<T> GetInstance()
        {
            if (instance == null)
            {
                lock (_lock)
                {
                    if (instance == null)
                    {
                        instance = new List<T>();
                    }
                }
            }
            return instance;
        }
    }

这样不就通用了!

 public static class ESHelper
    {
        public static  readonly string url = "http://IP/";
        /// <summary>
        /// 批量插入
        /// </summary>
        /// <param name="obj">出入的数据</param>
        /// <param name="index">索引</param>
        public static void ManyInsert(List<Student> obj, string index)
        {
            //设置连接字符串,DefaultIndex中的表名要小写
            var settings = new ConnectionSettings(new Uri(url)).DefaultIndex(index);
            var client = new ElasticClient(settings);
            var ndexResponse = client.IndexMany<Student>(obj);  
        }
       

    }

还是和单例一样的问题,我们不可能不通用,所以我们要做的通用,还是泛型:

 public static class ESHelper<T> where T : class
    {
        public static  readonly string url = "http://IP/";//elasticsearch的IP
        /// <summary>
        /// 批量插入
        /// </summary>
        /// <param name="obj">出入的数据</param>
        /// <param name="index">索引</param>
        public static void ManyInsert(List<T> obj, string index)
        {
            //设置连接字符串,DefaultIndex中的表名要小写
            var settings = new ConnectionSettings(new Uri(url)).DefaultIndex(index);
            var client = new ElasticClient(settings);
            var ndexResponse = client.IndexMany<T>(obj);  
        }

    }

那我们可以通过向ES送入学生信息演示一下:

/// <summary>
       /// 向ES中送入数据
       /// </summary>
       /// <param name="Data">json对象</param>
       /// <returns></returns>
        [HttpPost("GetLogList")]
        public string GetLogList([FromBody] object Data)
        {
           //json 数组格式:“{“name”:"张三”,"number",123456,"age":20}”
            JObject LogList = (JObject)JsonConvert.DeserializeObject(Data.ToString());
Student student=new Student{name=LogList["name"],number=LogList["number"],age=LogList["age"]}
ListExample<Student>.GetInstance().Add(student);
if(
ListExample<Student>.GetInstance().Count==1000)
{

                ESHelper<Student>.ManyInsert(ListExample<Student>.GetInstance(), "Student-" + DateTime.Now.ToString("yyyy-MM"));//批量插入ES中
                ListExample<Student>.GetInstance().Clear();//清空缓存

            }

}

这样批量向ES数据完毕,是不是很简单。但是通过一定条数批量插入式存在弊端的,比如2000条批量插入一次,如果另一边调用接口,上传了1000条数据,突然死掉的话,那么这缓存的1000条数据就送不到ES中了,所以最好用定时往ES中批量送数据,下一章我们说一下如何定时向ES中批量插入数据。

   

    

  

原文地址:https://www.cnblogs.com/zpy1993-09/p/13512065.html