flume采集kafka报错怎么解决

报错信息:
Source.java:120)] Event #: 0
2018-11-23 17:59:18,995 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:119)] Waited: 965
2018-11-23 17:59:18,995 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:120)] Event #: 0
2018-11-23 17:59:19,005 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:119)] Waited: 975
2018-11-23 17:59:19,005 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:120)] Event #: 0
2018-11-23 17:59:19,015 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:119)] Waited: 985
2018-11-23 17:59:19,015 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:120)] Event #: 0
2018-11-23 17:59:19,025 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:119)] Waited: 995
2018-11-23 17:59:19,025 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:120)] Event #: 0
2018-11-23 17:59:19,036 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:119)] Waited: 1006
2018-11-23 17:59:19,036 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:120)] Event #: 0
2018-11-23 17:59:19,036 (PollableSourceRunner-KafkaSource-kaSource) [ERROR - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:153)] KafkaSource EXCEPTION, {}
java.lang.NullPointerException
at org.apache.flume.instrumentation.MonitoredCounterGroup.increment(MonitoredCounterGroup.java:261)
at org.apache.flume.instrumentation.kafka.KafkaSourceCounter.incrementKafkaEmptyCount(KafkaSourceCounter.java:49)
at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:146)
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
at java.lang.Thread.run(Thread.java:748)

--------------------------------------------

配置文件
kafkaLogger.sources = kaSource

kafkaLogger.channels = memoryChannel
kafkaLogger.sinks = kaSink

The channel can be defined as follows.

kafkaLogger.sources.kaSource.channels = memoryChannel
kafkaLogger.sources.kaSource.type= org.apache.flume.source.kafka.KafkaSource
kafkaLogger.sources.kaSource.zookeeperConnect=192.168.130.4:2181,192.168.130.5:2181,192.168.130.6:2181
kafkaLogger.sources.kaSource.topic=dwd-topic
kafkaLogger.sources.kaSource.groupId = 0

kafkaLogger.channels.memoryChannel.type=memory
kafkaLogger.channels.memoryChannel.capacity = 1000
kafkaLogger.channels.memoryChannel.keep-alive = 60

kafkaLogger.sinks.kaSink.type = elasticsearch
kafkaLogger.sinks.kaSink.hostNames = 192.168.130.6:9300
kafkaLogger.sinks.kaSink.indexName = flume_mq_es_d
kafkaLogger.sinks.kaSink.indexType = flume_mq_es
kafkaLogger.sinks.kaSink.clusterName = zyuc-elasticsearch
kafkaLogger.sinks.kaSink.batchSize = 100
kafkaLogger.sinks.kaSink.client = transport
kafkaLogger.sinks.kaSink.serializer = com.commons.flume.sink.elasticsearch.CommonElasticSearchIndexRequestBuilderFactory
kafkaLogger.sinks.kaSink.serializer.parse = com.commons.log.parser.LogTextParser
kafkaLogger.sinks.kaSink.serializer.formatPattern = yyyyMMdd
kafkaLogger.sinks.kaSink.serializer.dateFieldName = time
kafkaLogger.sinks.kaSink.channel = memoryChannel

1个回答

KafkaSource.process(KafkaSource.java:153)] KafkaSource EXCEPTION, {}
java.lang.NullPointerException
at org.apache.flume.instrumentation.MonitoredCounterGroup.increment(MonitoredCounterGroup.java:261)
at org.apache.flume.instrumentation.kafka.KafkaSourceCounter.incrementKafkaEmptyCount(KafkaSourceCounter.java:49)
at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:146)
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
at java.lang.Thread.run(Thread.java:748)
这里的报错是flume1.6版本的一个bug这个bug在1.7版本得到了修复,如果没有打算升级flume版本的话,可以写给flume1.6的源码重新打jar包
https://issues.apache.org/jira/browse/FLUME-2672 这个官方给出的解决方案
将KafkaSourceCounter该类中的
private static final String[] ATTRIBUTES = {TIMER_KAFKA_COMMIT, TIMER_KAFKA_EVENT_GET};
修改为
private static final String[] ATTRIBUTES = {TIMER_KAFKA_COMMIT, TIMER_KAFKA_EVENT_GET, COUNTER_KAFKA_EMPTY};


dingwd

Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
立即提问