KAFKA-使用问题

本篇文档使用kafka版本为:0.9.0.0

问题1、在现场项目中,kafka连接正常一直无数据?

  1)通常是确认配置是否正确,包含任务配置,ip端口号;

2)查看topic offset:是否有新数据进来,数据是否被消费掉了,

3)然后检查kafka服务是否正常,查看服务是否有节点挂掉,topic配置是否做了副本,

4)如果kafka是集群,而topic没有设置副本,那么挂掉一个节点就会导致无法拉取数据;

5)其次是网络是否是通的,通过ping命令ping ip;

之前遇到的棘手问题,上述这些都是正常的,程序讲过检测也是没问题的,后面发现一个现场网络很差,情况和上述类似;

再经过抓包工具发现上述现场数据包发送成功很低,网咯很不稳定,发现网路问题导致无法接收数据。

 

问题2、Occur runtime exception:org.apache.kafka.common.errors.RecordTooLargeException:

There are some messages at [Partition=Offset]: {HIK_SMART_METADATA_TOPIC-1=8155364} whose size is larger than the fetch size 1048576 and hence cannot be ever returned.

Increase the fetch size, or decrease the maximum message size the broker will allow.

问题原因:消费下级Kafka数据,存在单条数据大小大于1048576 bytes (1M),造成工具无法消费该数据,该区县数据消费也会一直阻塞在这

(上述单条数据大于1M仅个别数据)

解决办法:

配置项中消费端增加kafka配置项  max.partition.fetch.bytes=1572864  (1.5M)

  • max.partition.fetch.bytes:每次从单个分区中拉取的消息最大尺寸(byte),默认为1M

 

 

问题3 、错误信息 :java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

         at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:706)

         at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:453)

         at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:339)

         at com.hikvision.bigdata.hwf.workflow.node.steps.impl.KafkaToKafkaStep.sendRecordToKafka(KafkaToKafkaStep.java:453)

         at com.hikvision.bigdata.hwf.workflow.node.steps.impl.KafkaToKafkaStep$KafkaToKafkaGroupTask.run(KafkaToKafkaStep.java:204)

         at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)

         at java.util.concurrent.FutureTask.run(Unknown Source)

         at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

         at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

         at java.lang.Thread.run(Unknown Source)

Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

问题原因: 定位原因是发送数据到上级Kafka时,producer.send(record)  执行逻辑是从服务端获取metadata信息,然后send数据 ,

若数据量过大、或者发送过于频繁等原因,更新metadata信息有可能超时(60s),捕获异常,然后重新发送,异常捕获后会打印出发送失败的数据,并打印异常信息

 

问题4

区县kafka集群是8台服务器,而HIK_SMART_METADATA_TOPIC partion仅在两台服务器上,重启另外几台kafka,仍是如此,其他topic无此现象

 

解决办法:

1、重建topic,需要topic数据全部被消费

2、均衡topic

kafka副本相关.docx

问题5

日志报如下错误

2018-06-01 18:52:39 ERROR [node_2_Worker-18] core.JobRunShell (run:211) - Job com.hikvision.bigdata.hwf.workflow.node.steps.impl.KafkaToKafkaStep.node_2_(kafka_BoLe)_(kafka_sanhui) threw an unhandled Exception:
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:321)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:181)
at com.hikvision.bigdata.hwf.workflow.node.steps.impl.KafkaToKafkaStep.startStep(KafkaToKafkaStep.java:126)
at com.hikvision.bigdata.hwf.workflow.node.steps.AbstractWorkflowNodeStep.execute(AbstractWorkflowNodeStep.java:143)
at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)
Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Unable to establish loopback connection
at org.apache.kafka.common.network.Selector.<init>(Selector.java:98)
at org.apache.kafka.common.network.Selector.<init>(Selector.java:122)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:272)
... 5 more
Caused by: java.io.IOException: Unable to establish loopback connection
at sun.nio.ch.PipeImpl$Initializer.run(Unknown Source)
at sun.nio.ch.PipeImpl$Initializer.run(Unknown Source)
at java.security.AccessController.doPrivileged(Native Method)
at sun.nio.ch.PipeImpl.<init>(Unknown Source)
at sun.nio.ch.SelectorProviderImpl.openPipe(Unknown Source)
at java.nio.channels.Pipe.open(Unknown Source)
at sun.nio.ch.WindowsSelectorImpl.<init>(Unknown Source)
at sun.nio.ch.WindowsSelectorProvider.openSelector(Unknown Source)
at java.nio.channels.Selector.open(Unknown Source)
at org.apache.kafka.common.network.Selector.<init>(Selector.java:96)
... 7 more
Caused by: java.net.SocketException: No buffer space available (maximum connections reached?): bind
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Unknown Source)
at sun.nio.ch.Net.bind(Unknown Source)
at sun.nio.ch.ServerSocketChannelImpl.bind(Unknown Source)
at sun.nio.ch.ServerSocketAdaptor.bind(Unknown Source)
at sun.nio.ch.ServerSocketAdaptor.bind(Unknown Source)

 

问题结论:  本机tcp连接被占用,导致无法建立tcp(socket)连接

方法1:重启服务器释放连接解决

方法2: 调整本机tcp连接数设置,增大buffer大小,找出tcp连接未被释放原因

cmd

netstat -an

 

netstat -an中state含义
LISTEN:侦听来自远方的TCP端口的连接请求
SYN-SENT:再发送连接请求后等待匹配的连接请求
SYN-RECEIVED:再收到和发送一个连接请求后等待对方对连接请求的确认
ESTABLISHED:代表一个打开的连接
FIN-WAIT-1:等待远程TCP连接中断请求,或先前的连接中断请求的确认
FIN-WAIT-2:从远程TCP等待连接中断请求
CLOSE-WAIT:等待从本地用户发来的连接中断请求
CLOSING:等待远程TCP对连接中断的确认
LAST-ACK:等待原来的发向远程TCP的连接中断请求的确认
TIME-WAIT:等待足够的时间以确保远程TCP接收到连接中断请求的确认
CLOSED:没有任何连接状态

有关kafka offset的一些方法:

查询topic的offset的范围
用下面命令可以查询到topic:test broker:suna:9092的offset的最小值:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list suna:9092 -topic test --time -2

输出

test:0:1288

查询offset的最大值:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list suna:9092 -topic test --time -1

输出

test:0:7885

从上面的输出可以看出topic:test只有一个partition:0 offset范围为:[1288,7885]

设置consumer group的offset
启动zookeeper client

/zookeeper/bin/zkCli.sh

通过下面命令设置consumer group:testgroup topic:test partition:0的offset为1288:

set /consumers/testgroup/offsets/test/0 1288

注意如果你的kafka设置了zookeeper root,比如为/kafka,那么命令应该改为:

set /kafka/consumers/testgroup/offsets/test/0 1288

重启相关的应用程序,就可以从设置的offset开始读数据了。

手动更新Kafka存在Zookeeper中的偏移量。我们有时候需要手动将某个主题的偏移量设置成某个值,这时候我们就需要更新Zookeeper中的数据了。Kafka内置为我们提供了修改偏移量的类:kafka.tools.UpdateOffsetsInZK,我们可以通过它修改Zookeeper中某个主题的偏移量,具体操作如下:

 


[iteblog@www.iteblog.com ~]$ bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK

USAGE: kafka.tools.UpdateOffsetsInZK$ [earliest | latest] consumer.properties topic

 

 

在不输入参数的情况下,我们可以得知kafka.tools.UpdateOffsetsInZK类需要输入的参数。我们的consumer.properties文件配置内容如下:

 


zookeeper.connect=www.iteblog.com:2181

# timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

#consumer group id
group.id=group

 


这个工具只能把Zookeeper中偏移量设置成earliest或者latest,如下:

[iteblog@www.iteblog.com ~]$ bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK

    earliest config/consumer.properties iteblog
updating partition 0 with new offset: 276022922
updating partition 1 with new offset: 234360148
updating partition 2 with new offset: 157237157
updating partition 3 with new offset: 106968019
updating partition 4 with new offset: 80696130
updating partition 5 with new offset: 317144986
updating partition 6 with new offset: 299182459
updating partition 7 with new offset: 197012246
updating partition 8 with new offset: 230433681
updating partition 9 with new offset: 120971431
updating partition 10 with new offset: 51200673
updated the offset
for 11 partitions

原文地址:https://www.cnblogs.com/yangh2016/p/14756047.html