filebeat句柄占用问题

参加这两篇文章比较的经典呀:

https://developer.aliyun.com/article/637173

https://blog.csdn.net/weixin_34342578/article/details/89585085

日志量巨大时filebeat占用文件句柄导致磁盘被打满

现状:
采用filebeat→logstash→elasticsearch的流程进行业务日志采集,elk版本为6.3.2。
生产系统采用Log4j作为日志系统。
filebeat负责采集log4j输出的业务日志并部署多个节点,单节点单个日志文件test.log大小设置50Mb,日志输出量巨大,几十秒甚至几秒钟就能把50MB打满,然后日志文件被重命名为test.log.1.....test.log.N。由于test.log文件在重命名时filebeat尚未把test.log中的日志全部输送出去(因为此时filebeat data目录中的register文件中还存在对重命名后的文件的索引),导致test.log.1的句柄仍未释放。由此导致当日志量巨大的时候,filebeat来不及处理就会存在大量test.log.N的句柄不释放,而不释放的句柄又会占用大量内存,直至操作系统内存被耗尽。
分析:
业务日志量巨大,下游logstash无法满足吞吐要求,会对上游的filebeat产生反压,反压的后果是导致filebeat收集性能急剧降低,register文件中积压大量重命名之后的log文件。
解决思路:
1、添加Kafka做日志缓冲
2、横向扩充logstash节点
部署Kafka:找了5台配置为CPU16核内存16GB硬盘77GB的机器做Kafka集群(具体部署方式网上一大堆),然后将filebeat的output指向Kafka具体请参考官往https://www.elastic.co/guide/en/beats/filebeat/current/kafka-output.html
部署完成,创建日志topic并将partitions设置为30,重启启动filebeat,运行一段时间之后register文件中的积压现象不复存在,但是观察Kafka中的订阅情况,发现订阅延迟迅速增加,初步判断为logstash吞吐不够。
扩充logstash节点:修改logstash的input为Kafka,具体修改方式请参考官网https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html
迅速扩充为3个节点,每个节点开10个线程订阅Kafka中的日志topic。重启logstash,发现Kafka中的延迟现象并未得到缓解,此时怀疑是Kafka分区太少,并发数不够,随即将partitions由30添加至300,并调整每台logstash的线程数为100,观察一段时间之后发现延迟仍未得到缓解。增一度怀疑是logstash性能太差,期间曾出现过几次elasticsearch节点发生OOM而宕机,此时已接近崩溃。后来开始注意到logstash中输出大量的如下INFO日志:
image
意思是发送给elasticsearch的请求被拒绝。此时观察elasticsearch,发现存在大量的error:

2018-11-29T16:39:25,152[o.e.a.b.TransportBulkAction] [data-node3] failed to execute pipeline for a bulk request
org.elasticsearch.common.util.concurrent.EsRejectedExecutionException: rejected execution of org.elasticsearch.ingest.PipelineExecutionService$1@1e5ccc24 on EsThreadPoolExecutor[name = data-node3/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@c58bbfc[Running, pool size = 80, active threads = 80, queued tasks = 200, completed tasks = 18742]]

  1.  
    at org.elasticsearch.common.util.concurrent.EsAbortPolicy.rejectedExecution(EsAbortPolicy.java:48) ~[elasticsearch-6.3.2.jar:6.3.2]
  2.  
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) ~[?:1.8.0_121]
  3.  
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) ~[?:1.8.0_121]
  4.  
    at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.doExecute(EsThreadPoolExecutor.java:98) ~[elasticsearch-6.3.2.jar:6.3.2]
  5.  
    at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.execute(EsThreadPoolExecutor.java:93) ~[elasticsearch-6.3.2.jar:6.3.2]
  6.  
    at org.elasticsearch.ingest.PipelineExecutionService.executeBulkRequest(PipelineExecutionService.java:59) ~[elasticsearch-6.3.2.jar:6.3.2]
  7.  
    at org.elasticsearch.action.bulk.TransportBulkAction.processBulkIndexIngestRequest(TransportBulkAction.java:495) ~[elasticsearch-6.3.2.jar:6.3.2]
  8.  
    at org.elasticsearch.action.bulk.TransportBulkAction.doExecute(TransportBulkAction.java:134) ~[elasticsearch-6.3.2.jar:6.3.2]
  9.  
    at org.elasticsearch.action.bulk.TransportBulkAction.doExecute(TransportBulkAction.java:85) ~[elasticsearch-6.3.2.jar:6.3.2]
  10.  
    at org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:167) ~[elasticsearch-6.3.2.jar:6.3.2]
  11.  
    at org.elasticsearch.xpack.security.action.filter.SecurityActionFilter.apply(SecurityActionFilter.java:128) ~[?:?]
  12.  
    at org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:165) ~[elasticsearch-6.3.2.jar:6.3.2]
  13.  
    at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:139) ~[elasticsearch-6.3.2.jar:6.3.2]
  14.  
    at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:81) ~[elasticsearch-6.3.2.jar:6.3.2]
  15.  
    at org.elasticsearch.client.node.NodeClient.executeLocally(NodeClient.java:87) ~[elasticsearch-6.3.2.jar:6.3.2]
  16.  
    at org.elasticsearch.client.node.NodeClient.doExecute(NodeClient.java:76) ~[elasticsearch-6.3.2.jar:6.3.2]
  17.  
    at org.elasticsearch.client.support.AbstractClient.execute(AbstractClient.java:405) ~[elasticsearch-6.3.2.jar:6.3.2]
  18.  
    at org.elasticsearch.client.support.AbstractClient.bulk(AbstractClient.java:482) ~[elasticsearch-6.3.2.jar:6.3.2]
  19.  
    at org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin(ClientHelper.java:62) ~[x-pack-core-6.3.2.jar:6.3.2]
  20.  
    at org.elasticsearch.xpack.monitoring.exporter.local.LocalBulk.doFlush(LocalBulk.java:108) ~[?:?]
  21.  
    at org.elasticsearch.xpack.monitoring.exporter.ExportBulk.flush(ExportBulk.java:60) ~[?:?]
  22.  
    at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$Compound.lambda$doFlush$1(ExportBulk.java:154) ~[?:?]
  23.  
    at org.elasticsearch.xpack.core.common.IteratingActionListener.run(IteratingActionListener.java:81) [x-pack-core-6.3.2.jar:6.3.2]
  24.  
    at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$Compound.doFlush(ExportBulk.java:170) [x-pack-monitoring-6.3.2.jar:6.3.2]
  25.  
    at org.elasticsearch.xpack.monitoring.exporter.ExportBulk.flushAndClose(ExportBulk.java:84) [x-pack-monitoring-6.3.2.jar:6.3.2]
  26.  
    at org.elasticsearch.xpack.monitoring.exporter.ExportBulk.close(ExportBulk.java:74) [x-pack-monitoring-6.3.2.jar:6.3.2]
  27.  
    at org.elasticsearch.xpack.monitoring.exporter.Exporters.export(Exporters.java:195) [x-pack-monitoring-6.3.2.jar:6.3.2]
  28.  
    at org.elasticsearch.xpack.monitoring.MonitoringService$MonitoringExecution$1.doRun(MonitoringService.java:258) [x-pack-monitoring-6.3.2.jar:6.3.2]
  29.  
    at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-6.3.2.jar:6.3.2]
  30.  
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_121]
  31.  
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_121]
  32.  
    at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:626) [elasticsearch-6.3.2.jar:6.3.2]
  33.  
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_121]
  34.  
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_121]
  35.  
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]

2018-11-29T16:39:25,161[o.e.x.m.MonitoringService] [data-node3] monitoring execution failed

org.elasticsearch.xpack.monitoring.exporter.ExportException: Exception when closing export bulk

  1.  
    at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$1$1.<init>(ExportBulk.java:95) ~[?:?]
  2.  
    at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$1.onFailure(ExportBulk.java:93) ~[?:?]
  3.  
    at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$Compound$1.onResponse(ExportBulk.java:206) ~[?:?]
  4.  
    at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$Compound$1.onResponse(ExportBulk.java:200) ~[?:?]
  5.  
    at org.elasticsearch.xpack.core.common.IteratingActionListener.onResponse(IteratingActionListener.java:96) ~[?:?]
  6.  
    at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$Compound.lambda$doFlush$0(ExportBulk.java:164) ~[?:?]
  7.  
    at org.elasticsearch.action.ActionListener$1.onFailure(ActionListener.java:68) ~[elasticsearch-6.3.2.jar:6.3.2]
  8.  
    at org.elasticsearch.xpack.monitoring.exporter.local.LocalBulk.lambda$doFlush$1(LocalBulk.java:115) ~[?:?]
  9.  
    at org.elasticsearch.action.ActionListener$1.onFailure(ActionListener.java:68) ~[elasticsearch-6.3.2.jar:6.3.2]
  10.  
    at org.elasticsearch.action.support.ContextPreservingActionListener.onFailure(ContextPreservingActionListener.java:50) ~[elasticsearch-6.3.2.jar:6.3.2]
  11.  
    at org.elasticsearch.action.support.TransportAction$1.onFailure(TransportAction.java:91) ~[elasticsearch-6.3.2.jar:6.3.2]
  12.  
    at org.elasticsearch.action.bulk.TransportBulkAction.lambda$processBulkIndexIngestRequest$4(TransportBulkAction.java:502) ~[elasticsearch-6.3.2.jar:6.3.2]
  13.  
    at org.elasticsearch.ingest.PipelineExecutionService$1.onFailure(PipelineExecutionService.java:63) ~[elasticsearch-6.3.2.jar:6.3.2]
  14.  
    at org.elasticsearch.common.util.concurrent.AbstractRunnable.onRejection(AbstractRunnable.java:63) ~[elasticsearch-6.3.2.jar:6.3.2]
  15.  
    at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.onRejection(ThreadContext.java:715) ~[elasticsearch-6.3.2.jar:6.3.2]
  16.  
    at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.doExecute(EsThreadPoolExecutor.java:104) ~[elasticsearch-6.3.2.jar:6.3.2]
  17.  
    at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.execute(EsThreadPoolExecutor.java:93) ~[elasticsearch-6.3.2.jar:6.3.2]
  18.  
    at org.elasticsearch.ingest.PipelineExecutionService.executeBulkRequest(PipelineExecutionService.java:59) ~[elasticsearch-6.3.2.jar:6.3.2]
  19.  
    at org.elasticsearch.action.bulk.TransportBulkAction.processBulkIndexIngestRequest(TransportBulkAction.java:495) ~[elasticsearch-6.3.2.jar:6.3.2]
  20.  
    at org.elasticsearch.action.bulk.TransportBulkAction.doExecute(TransportBulkAction.java:134) ~[elasticsearch-6.3.2.jar:6.3.2]
  21.  
    at org.elasticsearch.action.bulk.TransportBulkAction.doExecute(TransportBulkAction.java:85) ~[elasticsearch-6.3.2.jar:6.3.2]
  22.  
    at org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:167) ~[elasticsearch-6.3.2.jar:6.3.2]
  23.  
    at org.elasticsearch.xpack.security.action.filter.SecurityActionFilter.apply(SecurityActionFilter.java:128) ~[?:?]
  24.  
    at org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:165) ~[elasticsearch-6.3.2.jar:6.3.2]
  25.  
    at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:139) ~[elasticsearch-6.3.2.jar:6.3.2]
  26.  
    at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:81) ~[elasticsearch-6.3.2.jar:6.3.2]
  27.  
    at org.elasticsearch.client.node.NodeClient.executeLocally(NodeClient.java:87) ~[elasticsearch-6.3.2.jar:6.3.2]
  28.  
    at org.elasticsearch.client.node.NodeClient.doExecute(NodeClient.java:76) ~[elasticsearch-6.3.2.jar:6.3.2]
  29.  
    at org.elasticsearch.client.support.AbstractClient.execute(AbstractClient.java:405) ~[elasticsearch-6.3.2.jar:6.3.2]
  30.  
    at org.elasticsearch.client.support.AbstractClient.bulk(AbstractClient.java:482) ~[elasticsearch-6.3.2.jar:6.3.2]
  31.  
    at org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin(ClientHelper.java:62) ~[x-pack-core-6.3.2.jar:6.3.2]
  32.  
    at org.elasticsearch.xpack.monitoring.exporter.local.LocalBulk.doFlush(LocalBulk.java:108) ~[?:?]
  33.  
    at org.elasticsearch.xpack.monitoring.exporter.ExportBulk.flush(ExportBulk.java:60) ~[?:?]
  34.  
    at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$Compound.lambda$doFlush$1(ExportBulk.java:154) ~[?:?]
  35.  
    at org.elasticsearch.xpack.core.common.IteratingActionListener.run(IteratingActionListener.java:81) [x-pack-core-6.3.2.jar:6.3.2]
  36.  
    at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$Compound.doFlush(ExportBulk.java:170) [x-pack-monitoring-6.3.2.jar:6.3.2]
  37.  
    at org.elasticsearch.xpack.monitoring.exporter.ExportBulk.flushAndClose(ExportBulk.java:84) [x-pack-monitoring-6.3.2.jar:6.3.2]
  38.  
    at org.elasticsearch.xpack.monitoring.exporter.ExportBulk.close(ExportBulk.java:74) [x-pack-monitoring-6.3.2.jar:6.3.2]
  39.  
    at org.elasticsearch.xpack.monitoring.exporter.Exporters.export(Exporters.java:195) [x-pack-monitoring-6.3.2.jar:6.3.2]
  40.  
    at org.elasticsearch.xpack.monitoring.MonitoringService$MonitoringExecution$1.doRun(MonitoringService.java:258) [x-pack-monitoring-6.3.2.jar:6.3.2]
  41.  
    at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-6.3.2.jar:6.3.2]
  42.  
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_121]
  43.  
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_121]
  44.  
    at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:626) [elasticsearch-6.3.2.jar:6.3.2]
  45.  
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_121]
  46.  
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_121]
  47.  
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]

Caused by: org.elasticsearch.xpack.monitoring.exporter.ExportException: failed to flush export bulks

  1.  
    at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$Compound.lambda$doFlush$0(ExportBulk.java:156) ~[?:?]
  2.  
    ... 41 more

Caused by: org.elasticsearch.xpack.monitoring.exporter.ExportException: failed to flush export bulk [default_local]

... 40 more

Caused by: org.elasticsearch.common.util.concurrent.EsRejectedExecutionException: rejected execution of org.elasticsearch.ingest.PipelineExecutionService$1@1e5ccc24 on EsThreadPoolExecutor[name = data-node3/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@c58bbfc[Running, pool size = 80, active threads = 80, queued tasks = 200, completed tasks = 18742]]

  1.  
    at org.elasticsearch.common.util.concurrent.EsAbortPolicy.rejectedExecution(EsAbortPolicy.java:48) ~[elasticsearch-6.3.2.jar:6.3.2]
  2.  
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) ~[?:1.8.0_121]
  3.  
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) ~[?:1.8.0_121]
  4.  
    at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.doExecute(EsThreadPoolExecutor.java:98) ~[elasticsearch-6.3.2.jar:6.3.2]
  5.  
    ... 31 more

意思也是在拒绝大量请求。才意识到原来瓶颈出现在elasticsearch端,elasticsearch无法响应大批量请求后,对logstash请求进行拒绝。
后来续持续对elasticsearch进行优化后,Kafka中的堆积延迟消费状况得到缓解。此时已经历了一周的时间。
具体elasticsearch是如何调优才缓解该问题的会在下一篇博客中进行记录。

总结:出现filebeat句柄占用导致磁盘空间过大的问题

本质上就是生产产生的量非常的大,后端的logstash或者es的处理能力无法更上导致的

因为filebeat是贪婪的,他发现日志因为产生的速度很大,filebeat还没有把这些数据传递给后端的kafaka处理

这个时候如果手动的把这个文件删除了,但是filebeat还是拥有这个文件的占用量使用

要模拟这个场景其实比较简单:

1、第一采集的文件数量必须是在4096行以上,我模拟的时候发现文件数量低于这个数目,好像复现不了

2、就是filebeat填写一个错误的kafka地址就可以了,这样filebeat的日志一直堆积无法在后端进行处理

3、在采集的过程中手动将采集的文件删除

解决的办法就是:

在filebeat的镜像中设置:

(2)close_timeout: 3h

       传输了3h后荏没有传输完成的话就强行关闭文件句柄,这个配置项是解决以上案例问题的key point;

       注意了,开了这个配置项旨在避免以上案例的问题,但是有丢数据的风险哦。鱼与熊掌不可兼得也,就是之前一直没有发送成功的日志文件会存在丢失情况。

原文地址:https://www.cnblogs.com/kebibuluan/p/15665702.html