R5_ES读写流程

基本概念

  • refresh:es接收数据请求时先存入ES进程中的内存 Buffer ,默认每隔一秒(index.refresh_interval:1s)会从内存buffer中将数据写入 os cache,这个过程叫做refresh;
  • fsync:translog会每隔5秒或者在一个变更请求完成之后执行一次fsync操作,将translog从 os cache 刷入磁盘,这个操作比较耗时,如果对数据一致性要求不是跟高时建议将索引改为 async,如果节点宕机时会有5秒数据丢失;
  • flush:es默认每隔30分钟会将 buffer cache、 os cache 中的数据刷入磁盘同时清空translog日志文件,生成 commit point文件, 这个过程叫做flush。

说明:

  • Buffer:缓冲区,用于存储速度不同步的设备或优先级不同的设备之间传输数据;通过buffer可以减少进程间通信需要等待的时间,当存储速度快的设备与存储速度慢的设备进行通信时,存储慢的数据先把数据存放到buffer,达到一定程度存储快的设备再读取buffer的数据,在此期间存储快的设备CPU可以干其他的事情。
  • Cache:缓存区,是高速缓存,是位于CPU和主内存之间的容量较小但速度很快的存储器,因为CPU的速度远远高于主内存的速度,CPU从内存中读取数据需等待很长的时间,而  Cache保存着CPU刚用过的数据或循环使用的部分数据,这时从Cache中读取数据会更快,减少了CPU等待的时间,提高了系统的性能。

 

ES写数据主要流程


   

  1. Es 客户端选择集群中的一个节点发起写请求;
  2. Es 集群将节点标记为协调节点;
  3. 协调节点对写入的 document 进行路由,选择 primary shard 写入数据(图中只是表示简单路由到一个节点,实际会把数据路由到多个节点);
  4. 该 primary shard 上的数据写入完毕后,将数据同步到副分片中;
  5. 协调节点告诉客户端 primary shard 和 replica shard 已经写入数据完毕;

 

写数据底层原理


  

  1. 写入请求将数据同时发送到 Es 的 Buffer 缓存translog 日志,此时数据在 Es 进程的 Buffer 缓存中,无法查询到;
  2. 默认每 1 秒,Es Buffer 中的数据会被 Refres 到 segment file os cache 中,并清空 ES 进程的 Buffer 缓存,此时数据就可以被查到了,这个过程就是refresh
    • index.refresh_interval:1s
    • 新segment会被先写入到 os cache –这一步代价会比较低,稍后再被刷新到磁盘–这一步代价比较高。
    • 每隔1秒钟,es将buffer中的数据写入一个新的segment file,这个segment file中就存储最近1秒内 buffer中写入的数据
    • 操作系统里面,磁盘文件其实都有一个东西,叫做 os cache,即操作系统缓存,就是说数据写入磁盘文件之前,会先进入os cache,先进入操作系统级别的一个内存缓存中去。只要buffer中的数据被refresh操作输入os cache中,这个数据就可以被搜索到了。
  3. 重复上面的步骤,新的数据就不断进入buffer和translog,不断将buffer数据写入一个又一个新的segment file中去,每次refresh完buffer清空,translog保留
  4. 随着这个过程的推进,translog会变得越来越大。当translog达到一定的大小的时候,或者到达设置的默认时长(30min)后, 就会触发commit操作,这种操作也叫 flush(可以通过 API 触发)
    • flush操作 = refresh + 将translog中的记录刷到磁盘上 + 更新commit point信息 + 清空translog文件.
    • 1)、先强行将 Es Buffer 中的数据写入 segment file os cache 中,然后清空 Es Buffer;
    • 2)、向磁盘写入一个 commit point 文件,该文件标识着 commit point 对应的所有 segment file;
    • 3)、强行将 segment file os cache 中的数据都 fsync 到磁盘文件中去;
    • 4)、清空translog中的数据 (6.x版本为了实现sequenceIDs,不删除translog) 
    • 参数设置见: R3_Elasticsearch Index Setting

translog 文件:

  • 在执行 commit(flush)操作前,所有数据基本都是在 os cache 中的,一旦机器宕机,或者断电,内存中的数据就会全部丢失。所以将数据先写入一个专门的日志文件,当机器出现故障并重启后,可以从日志文件中将数据重新读取出来,恢复到 Es Buffer 和 os cahce 中,保证数据完整性。
  • 但是 translog 也是先写入到 translog os cache 中的,默认每经过 5 秒才会同步到本地磁盘,也就是说断电后,translog os cache 中的数也会丢失,但也是只丢失 5 秒的数据,但是性能要好一些;

  • 也可以设置为不写入 translog os cache 中,直接 fsync 到本地磁盘,但是这样性能会差一些;

commit point 文件:

  • 这个文件中记录中所有可用的 segment file,并且每个 commit point 都会维护一个 .del 文件
  • 当es做删改操作时首先会在.del文件中声明某个document已经被删除,文件内记录了在某个segment内某个文档已经被删除,当查询请求过来时在segment中被删除的文件是能够查出来的,但是当返回结果时会根据commit point维护的那个.del文件把已经删除的文档过滤掉;

说明: es第一是准实时的,默认数据写入1秒后就可以搜索到。可能会丢失数据的,默认有5秒的数据,停留在buffer、translog os cache 、segment file os cache中,而不在磁盘上,

写入小结:

  1. ES的任意节点都可以作为协调节点(coordinating node)接受请求
  2. 当协调节点接受到请求后进行一系列处理,然后通过_routing字段找到对应的primary shard,并将请求转发给primary shard
  3. primary shard完成写入后,将写入并发发送给各replica, raplica执行写入操作后返回给primary shard, primary shard再将请求返回给协调节点。
  4. 数据写入ES buffer,然后每隔1s,将数据refresh到os cache,到了os cache数据就能被搜索到。
  5. 默认每隔5s,将 translog os cache 数据写入到translog文件,  当translog达到一定量或者默认每隔30min,会触发commit操作,将缓存区的数据flush到segment file磁盘文件中。
  6. 数据写入到segment file之后,同时就建立好了倒排索引

 

删除、更新数据底层原理


  • segment 不可改变,所以 docment 并不能从之前的 segment 中移除或更新。
  • 所以每次 commit生成 commit point 时,会有一个 .del 文件,里面会列出被删除的 document(逻辑删除。而查询时,获取到的结果在返回前会经过 .del 过滤。
  • 更新时,也会标记旧的 docment 被删除,写入到 .del 文件,同时会写入一个新的文件。此时查询会查询到两个版本的数据,但在返回前会被移除掉一个。

部分更新说明:

  • lucene支持对文档的整体更新,ES为了支持局部更新,在Lucene的Store索引中存储了一个_source字段,该字段的key值是文档ID, 内容是文档的原文。
  • 当进行更新操作先从_source中获取原文与更新部分合并后,再调用lucene API进行全量更新, 对于写入了ES但是还没有refresh的文档,可以从translog中获取。
  • 另外为了防止读取文档过程后执行更新前有其他线程修改了文档,ES增加了版本机制,当执行更新操作时发现当前文档的版本与预期不符,则会重新获取文档再更新。 

 

segment 合并


 segment合并原因:

  • 由上述近实时性搜索的描述, 可知ES默认每秒都会产生一个新的segment文件,segment 数目太多会带来较大的麻烦。
  • 每一个 segment 都会消耗文件句柄、内存cpu运行周期。更重要的是, 每次搜索时都要遍历所有的segment, 这非常影响搜索性能.
  • 为解决这一问题, ES会对这些零散的segment进行merge(归并)操作, 尽量让索引中只保有少量的、体积较大的segment文件.
  • 这个过程由独立的merge线程负责, 不会影响新segment的产生。同时, 在merge段文件(segment)的过程中, 被标记为deleted的document也会被彻底物理删除.

段合并可能带来的问题?

  • 磁盘IO操作的代价
  • 速度慢的系统中,段合并会显著影响性能

merge操作的流程

  • 1、选择一些有相似大小的segment, merge成一个大的segment;

  • 2、将新的segment刷新到磁盘上;
  • 3、写入一个包含新 segment 且排除旧的和较小的 segment的新 commit point。
  • 4、打开新的segment, 完成搜索请求的转移;
  • 5、删除旧的小segment.
  • 物理删除:在 segment merge 这块,那些被逻辑删除的 document 才会被真正的物理删除。减少了最终的索引段中的 document 数目。

优化merge的配置项

1、merge策略

    merge过程是lucene有一个后台线程,它会根据merge策略来决定是否进行merge,一旦merge的条件满足,就会启动后台merge。

    merge策略分为两种,这也是大多数大数据框架所采用的,segment的大小segment中doc的数量

    在Elasticsearch 5.x 后,tired merge policy 成为了唯一的merge策略,index.merge.policy.type 被移除。

2、归并线程的数目

    合并调度程序(ConcurrentMergeScheduler)在需要时控制合并操作的执行。合并在单独的线程中运行,并且当达到最大线程数时,接下来的合并将等待。

    单个分片上可能一次合并的最大线程数:当设备硬件为传统磁盘时,启动1个线程。当设备硬件为SSD固态硬盘时,启动的线程数为max(1, min(4, core/2))

  • index.merge.scheduler.max_thread_count:1 

3、其它参数

  • 1 index.merge.policy.floor_segment:2GB               // 默认 2MB,小于这个大小的 segment,优先被归并。
    2 index.merge.policy.max_merge_at_once: 10           // 默认一次最多归并 10 个 segment
    3 index.merge.policy.max_merge_at_once_explicit: 30 // 默认 forcemerge 时一次最多归并 30 个 segment。
    4 index.merge.policy.max_merged_segment: 5GB                 // 默认 5 GB,大于这个大小的segment,不用参与归并。forcemerge 除外

 

forcemerge 接口 

   既然默认的最大 segment 大小是 5GB。那么一个比较庞大的数据索引,就必然会有为数不少的 segment 永远存在,这对文件句柄,内存等资源都是极大的浪费。

   但是由于归并任务太消耗资源,所以一般不太选择加大index.merge.policy.max_merged_segment 配置,而是在负载较低的时间段,通过 forcemerge 接口,强制归并 segment。

  • # curl -XPOST http://127.0.0.1:9200/logstash-2015-06.10/_forceme
    rge?max_num_segments=1

    由于 forcemerge 线程对资源的消耗比普通的归并线程大得多,所以,绝对不建议对还在写入数据的热索引执行这个操作。

    典型的场景是记录日志,这中情况下日志是按照每天,周,月存入索引。旧的索引一般是只可读的,它们是不可能修改的。 这种情况下,把每个索引的段降至1是有效的。搜索过程就会用到更少的资源,性能更好:

 

ES 查询与检索


 1、 查询流程:

  • GET my-index/_doc/0
  1. Client 将请求发送到任意节点 node,此时 node 节点就是协调节点(coordinating node) 

  2. 协调节点对 id 进行路由,从而判断该数据在哪个shard。
  3. 在 primary shard 和 replica shard 之间 随机选择一个,请求获取 doc。
  4. 接收请求的节点会将数据返回给协调节点,协调节点会将数据返回给Client。

  说明: 可以通过 preference 参数指定执行操作的节点或分片。默认为随机

2、检索流程

  • 1 GET /my-index/_search
  • search流程分为两个步骤  query+fetch, 因为不知道文档id,需要根据传入的搜索关键词,查询出符合条件的文档id(query阶段),然后根据得到的文档id,查询出文档信息(fetch阶段)
  1. Client 将请求发送到任意节点 node,此时 node 节点就是协调节点(coordinating node) 

  2. 协调节点进行分词等操作后,去查询所有的 shard (primary shard 和 replica shard 选择一个)
  3. 所有 shard 将满足条件的数据 id 排序字段 等信息返回给路由节点
  4. 路由节点重新进行排序,截取数据后,获取到真正需要返回的数据的 id
  5. 路由节点再次请求对应的 shard (此时有 id 了,可以直接定位到对应shard)
  6. 获取到全量数据,返回给 Client

3、ES读数据建议

  避免大结果集和深翻,ES 提供了 Scroll 和 Scroll-Scan 这两种查询模式。

   1、Scroll

  • Scroll:是为检索大量的结果而设计的。例如,我们需要查询 1~100 页的数据,每页 100 条数据。

  • 如果使用Search查询:每次都需要在每个分片上查询得分最高的 from+100 条数据,然后协同节点把收集到的 n×(from+100)条数据聚合起来再进行一次排序。

  • 每次返回from+1开始的100条数据,并且要重复执行100次。

  2、Scroll-Scan

  • 如果使用Scroll查询:在各个分片上查询10000条数据,协同节点聚合n×10000条数据进行合并、排序,并将排名前10000的结果快照起来。这样做的好处是减少了查询和排序的次数。

总结:


  • 写请求是写入 primary shard,然后同步给所有的 replica shard;读请求可以从 primary shard 或 replica shard 读取,采用的是随机轮询算法。

 Flag:

  • 1、在flush前,数据在写入时,同时有一份写到了 translog os cache , 在此过程中查询时为什么不能直接从 translog os cache 中获取?
    • Re: Translog 以 key value的形式存写, Key是Id, Value是Doc的内容。
    • 当查询的时候,如果请求的是GetDocById则可以直接根据_id从translog中获取
    • 当通过id查询、更新、删除一个文档时, 从segment中检索之前, 先检查translog中的最新变化 —— ES总是能够实时地获取到文档的最新版本.
  • 2、为什么segment 是不可变的?
    • Re: 在 lucene 中,为了实现高索引速度,故使用了segment 分段架构存储。
    • 一批写入数据保存在一个段中,其中每个段是磁盘中的单个文件。
    • 由于两次写入之间的文件操作非常繁重,因此将一个段设为不可变的,以便所有后续写入都转到New段。
  • 3、热冷分离后,冷库不再写入,使用 forcemerge 将索引的segment 合并为1 ?
  • 4、单个segment 大小的限制 ? 单个shared 上? 单个结点? 单个index?
  • 5、读数时原理写得稍简单了,cache 、 disk ?

参考资料


原文地址:https://www.cnblogs.com/tgzhu/p/14449087.html