定制logstash-output-kafka 添加额外事务参数

定制logstash-output-kafka 添加额外事务参数

早期的项目通过logsatash 处理数据写入kafka的流程,运行了有数年,丰富监控信息后,发现会有少量的数据丢失,猜测部分原因为kafka的幂等问题

kafka的写入幂等性保证,需要以下3个参数

retries = Integer.MAX_VALUE
max.in.flight.requests.per.connection = 1
enable.idempotence=true

官方插件即可设置retries

看logstash的运行日志可知,其默认的

enable.idempotence = false
max.in.flight.requests.per.connection = 5
运行日志
 
Sending Logstash's logs to /usr/share/logstash/logs which is now configured via log4j2.properties
[2020-12-07T11:29:57,094][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2020-12-07T11:29:57,932][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"6.3.0"}
[2020-12-07T11:30:00,977][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>24, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2020-12-07T11:30:01,021][WARN ][logstash.outputs.kafka   ] Kafka output is configured with finite retry. This instructs Logstash to LOSE DATA after a set number of send attempts fails. If you do
not want to lose data if Kafka is down, then you must remove the retry setting. {:retries=>2147483647}
[2020-12-07T11:30:01,098][INFO ][org.apache.kafka.clients.producer.ProducerConfig] ProducerConfig values:
	acks = all
	batch.size = 16384
	bootstrap.servers = [192.168.5.100:9092, 192.168.5.101:9092, 192.168.5.102:9092]
	buffer.memory = 33554432
	client.id =
	compression.type = none
	connections.max.idle.ms = 540000
	enable.idempotence = false
	interceptor.classes = null
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 10
	reconnect.backoff.ms = 10
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer
[2020-12-07T11:30:01,165][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version : 1.0.0
  

官方文档 https://www.elastic.co/guide/en/logstash/current/plugins-outputs-kafka.html

集成版本信息

问题来了官方的plugins-outputs-kafka插件直到最新版都不支持max.in.flight.requests.per.connectionenable.idempotence的配置

先尝试 按plugins-outputs-kafka插件的参数配置规则 把.换为_,添加这两项配置

        max_in_flight_requests_per_connection => 1
        enable_idempotence => "true"

服务报错,无法启动

官方源码 https://github.com/logstash-plugins/logstash-output-kafka(官方源码迁到了 https://github.com/logstash-plugins/logstash-integration-kafka 但依然不支持这两项参数)

官方有类似的issues

https://github.com/logstash-plugins/logstash-output-kafka/issues/195

这种某官方组件不支持的死胡同一般两条路

  • 另外实现一套环境替代
  • 定制代码添加功能(前提是开源,或可以逆向出代码)

这里也一样,重开发并替代logstash kafka写入部分的功能成本较高,好在之前搞过logstash部分插件的自定义实现,只是添加配置而已,应该很好解决


准备工作

确定版本信息

  • 搭建的kafka 服务端版本是1.0.0

  • 目前使用的logstash版本

logstash --version
logstash 6.3.0
  • 默认集成的logstash-output-kafka插件版本是logstash-output-kafka-7.0.10

  • logstash-output-kafka-7.0.10 默认的kafka jar包版本为kafka-clients-1.0.0.jar

ls -alh vendor/jar-dependencies/runtime-jars/*
-rw-rw-r-- 1 logstash root 1.6M Jun 12  2018 vendor/jar-dependencies/runtime-jars/kafka-clients-1.0.0.jar
-rw-rw-r-- 1 logstash root 479K Jun 12  2018 vendor/jar-dependencies/runtime-jars/log4j-1.2.17.jar
-rw-rw-r-- 1 logstash root  43K Jun 12  2018 vendor/jar-dependencies/runtime-jars/log4j-1.2-api-2.6.2.jar
-rw-rw-r-- 1 logstash root 195K Jun 12  2018 vendor/jar-dependencies/runtime-jars/log4j-api-2.6.2.jar
-rw-rw-r-- 1 logstash root 1.2M Jun 12  2018 vendor/jar-dependencies/runtime-jars/log4j-core-2.6.2.jar
-rw-rw-r-- 1 logstash root 362K Jun 12  2018 vendor/jar-dependencies/runtime-jars/lz4-java-1.4.jar
-rw-rw-r-- 1 logstash root  41K Jun 12  2018 vendor/jar-dependencies/runtime-jars/slf4j-api-1.7.25.jar
-rw-rw-r-- 1 logstash root 9.8K Jun 12  2018 vendor/jar-dependencies/runtime-jars/slf4j-log4j12-1.7.21.jar
-rw-rw-r-- 1 logstash root 1.5M Jun 12  2018 vendor/jar-dependencies/runtime-jars/snappy-java-1.1.4.jar

实际这些信息也可以通过源码查看

logstash kafka 的集成信息

https://rubygems.org/gems/logstash-output-kafka/versions/7.0.10

https://www.elastic.co/guide/en/logstash-versioned-plugins/current/v10.7.0-plugins-outputs-kafka.html

因为项目还用的旧版本,源码使用的是logstash-output-kafka而不是logstash-integration-kafka 所以我们先加在logstash-output-kafka 里

修改代码添加配置项,改动见commit 很简单四行代码

config :enable_idempotence, :validate => :boolean, :required => true
config :max_in_flight_requests_per_connection, :validate => :number,:default => 1, :required => true

props.put(kafka::MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, max_in_flight_requests_per_connection.to_s)  unless           										max_in_flight_requests_per_connection.nil?
props.put(kafka::ENABLE_IDEMPOTENCE_CONFIG, enable_idempotence.to_s) unless enable_idempotence.nil?

打包编译,成功会生成相应的文件gem文件

logstash-output-kafka-7.0.10.gem

替换国内源,打包会快些

https://gems.ruby-china.com/

https://developer.aliyun.com/mirror/rubygems

打包日志
 
The default interactive shell is now zsh.
To update your account to use zsh, please run `chsh -s /bin/zsh`.
For more details, please visit https://support.apple.com/kb/HT208050.
bjdeMacBook-Pro-Work:logstash-output-kafka cclient$ bundle install
unsupported Java version "11", defaulting to 1.7
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.jruby.util.io.FilenoUtil to method sun.nio.ch.SelChImpl.getFD()
WARNING: Please consider reporting this to the maintainers of org.jruby.util.io.FilenoUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Ignoring executable-hooks-1.4.2 because its extensions are not built. Try: gem pristine executable-hooks --version 1.4.2
Ignoring gem-wrappers-1.4.0 because its extensions are not built. Try: gem pristine gem-wrappers --version 1.4.0
Ignoring jruby-launcher-1.1.5-java because its extensions are not built. Try: gem pristine jruby-launcher --version 1.1.5
Ignoring rainbow-2.2.2 because its extensions are not built. Try: gem pristine rainbow --version 2.2.2
Ignoring ruby-debug-ide-0.6.1 because its extensions are not built. Try: gem pristine ruby-debug-ide --version 0.6.1
Ignoring executable-hooks-1.4.2 because its extensions are not built. Try: gem pristine executable-hooks --version 1.4.2
Ignoring gem-wrappers-1.4.0 because its extensions are not built. Try: gem pristine gem-wrappers --version 1.4.0
Ignoring jruby-launcher-1.1.5-java because its extensions are not built. Try: gem pristine jruby-launcher --version 1.1.5
Ignoring rainbow-2.2.2 because its extensions are not built. Try: gem pristine rainbow --version 2.2.2
Ignoring ruby-debug-ide-0.6.1 because its extensions are not built. Try: gem pristine ruby-debug-ide --version 0.6.1
Fetching gem metadata from https://rubygems.org/.........
Resolving dependencies..........................
Error loading RubyGems plugin "/Users/cclient/.rvm/gems/jruby-9.1.13.0@global/gems/executable-hooks-1.4.2/lib/rubygems_plugin.rb": no such file to load -- executable-hooks/wrapper (LoadError)
Error loading RubyGems plugin "/Users/cclient/.rvm/gems/jruby-9.1.13.0@global/gems/gem-wrappers-1.4.0/lib/rubygems_plugin.rb": no such file to load -- gem-wrappers (LoadError)
Fetching rake 13.0.1
Installing rake 13.0.1
Using bundler 1.17.1
Using numerizer 0.1.1
Using chronic_duration 0.10.6
Using clamp 0.6.5
Fetching coderay 1.1.3
Installing coderay 1.1.3
Fetching concurrent-ruby 1.1.7
Installing concurrent-ruby 1.1.7
Fetching diff-lcs 1.4.4
Installing diff-lcs 1.4.4
Fetching multi_json 1.15.0
Installing multi_json 1.15.0
Using elasticsearch-api 5.0.5
Fetching multipart-post 2.1.1
Installing multipart-post 2.1.1
Fetching faraday 1.0.1
Installing faraday 1.0.1
Using elasticsearch-transport 5.0.5
Using elasticsearch 5.0.5
Fetching ffi 1.13.1 (java)
Installing ffi 1.13.1 (java)
Using filesize 0.0.4
Fetching fivemat 1.3.7
Installing fivemat 1.3.7
Using gem_publisher 1.5.0
Using gems 0.8.3
Using i18n 0.6.9
Using insist 1.0.0
Using jar-dependencies 0.3.12
Fetching jrjackson 0.4.13 (java)
Installing jrjackson 0.4.13 (java)
Using jruby-openssl 0.9.19 (java)
Using kramdown 1.14.0
Fetching openssl_pkcs8_pure 0.0.0.2
Installing openssl_pkcs8_pure 0.0.0.2
Fetching manticore 0.7.0 (java)
Installing manticore 0.7.0 (java)
Using minitar 0.5.4
Using method_source 0.8.2
Using slop 3.6.0
Using spoon 0.0.6
Using pry 0.10.4 (java)
Using puma 2.16.0 (java)
Using rack 1.6.6
Using ruby-maven-libs 3.3.9
Using ruby-maven 3.3.12
Using rubyzip 1.1.7
Using rack-protection 1.5.5
Fetching tilt 2.0.10
Installing tilt 2.0.10
Using sinatra 1.4.8
Using stud 0.0.23
Using thread_safe 0.3.6 (java)
Using polyglot 0.3.5
Using treetop 1.4.15
Using logstash-core 5.6.4 (java)
Using logstash-core-plugin-api 2.1.28 (java)
Fetching logstash-codec-json 3.0.5
Installing logstash-codec-json 3.0.5
Using logstash-codec-plain 3.0.6
Fetching rspec-support 3.10.0
Installing rspec-support 3.10.0
Fetching rspec-core 3.10.0
Installing rspec-core 3.10.0
Fetching rspec-expectations 3.10.0
Installing rspec-expectations 3.10.0
Fetching rspec-mocks 3.10.0
Installing rspec-mocks 3.10.0
Fetching rspec 3.10.0
Installing rspec 3.10.0
Using rspec-wait 0.0.9
Using logstash-devutils 1.3.6 (java)
Using logstash-output-kafka 6.2.4 from source at `.`
Fetching poseidon 0.0.5
Installing poseidon 0.0.5
Fetching snappy-jars 1.1.0.1.2 (java)
Installing snappy-jars 1.1.0.1.2 (java)
Fetching snappy 0.1.0 (java)
Installing snappy 0.1.0 (java)
Bundle complete! 5 Gemfile dependencies, 59 gems now installed.
Use `bundle info [gemname]` to see where a bundled gem is installed.
  
安装自定义打包的的`logstash-output-kafka-7.0.10.gem`替代官方原始组件
  • 卸载官方原始logstash-output-kafka插件

    logstash-plugin remove logstash-output-kafka

  • 安装自定义logstash-output-kafka插件

    logstash-plugin install --no-verify logstash-output-kafka-7.3.2.gem

配置应用

output{
    kafka {
        acks => "all"
        codec => "json"
        topic_id => "test_topic"
        bootstrap_servers =>"a:9092,b:9092,c:9092"
        batch_size => 2048
        max_request_size =>512000
        max_in_flight_requests_per_connection => 1
        enable_idempotence => "true"
    }
}

启动服务确认参数生效

问题解决

调整代码和gem见

https://github.com/cclient/logstash-output-kafka

个人打了两个docker镜像集成这个gem包

https://hub.docker.com/repository/docker/cclient/logstash

其他事项

需重点关注logstash-output-kafka 和kafka-client的匹配问题

log stash version logstash-output-kafka version kafka-client version
6.3.0 logstash-output-kafka-7.0.10 1.0
logstash-output-kafka-6.2.4.gem 0.11
6.8.13 logstash-output-kafka-7.3.2.gem 2.1.0

logstash个人重度使用了1年,交付后支持各种业务两年多,目前也应用在其他一些产景

初期大大提高了工作效率,但也发现了一些痛点和瓶颈,目前技术序列上已经通过nifi替代logstash

更高的版本和其他插件可以按这个思路调整

这次也算是和logstash(ruby)告别了

原文地址:https://www.cnblogs.com/zihunqingxin/p/14459610.html