qq_35160615 2018-11-26 02:17 采纳率: 0%
浏览 2028

flume采集kafka报错怎么解决

报错信息:
Source.java:120)] Event #: 0
2018-11-23 17:59:18,995 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:119)] Waited: 965
2018-11-23 17:59:18,995 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:120)] Event #: 0
2018-11-23 17:59:19,005 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:119)] Waited: 975
2018-11-23 17:59:19,005 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:120)] Event #: 0
2018-11-23 17:59:19,015 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:119)] Waited: 985
2018-11-23 17:59:19,015 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:120)] Event #: 0
2018-11-23 17:59:19,025 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:119)] Waited: 995
2018-11-23 17:59:19,025 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:120)] Event #: 0
2018-11-23 17:59:19,036 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:119)] Waited: 1006
2018-11-23 17:59:19,036 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:120)] Event #: 0
2018-11-23 17:59:19,036 (PollableSourceRunner-KafkaSource-kaSource) [ERROR - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:153)] KafkaSource EXCEPTION, {}
java.lang.NullPointerException
at org.apache.flume.instrumentation.MonitoredCounterGroup.increment(MonitoredCounterGroup.java:261)
at org.apache.flume.instrumentation.kafka.KafkaSourceCounter.incrementKafkaEmptyCount(KafkaSourceCounter.java:49)
at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:146)
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
at java.lang.Thread.run(Thread.java:748)

--------------------------------------------

配置文件
kafkaLogger.sources = kaSource

kafkaLogger.channels = memoryChannel
kafkaLogger.sinks = kaSink

The channel can be defined as follows.

kafkaLogger.sources.kaSource.channels = memoryChannel
kafkaLogger.sources.kaSource.type= org.apache.flume.source.kafka.KafkaSource
kafkaLogger.sources.kaSource.zookeeperConnect=192.168.130.4:2181,192.168.130.5:2181,192.168.130.6:2181
kafkaLogger.sources.kaSource.topic=dwd-topic
kafkaLogger.sources.kaSource.groupId = 0

kafkaLogger.channels.memoryChannel.type=memory
kafkaLogger.channels.memoryChannel.capacity = 1000
kafkaLogger.channels.memoryChannel.keep-alive = 60

kafkaLogger.sinks.kaSink.type = elasticsearch
kafkaLogger.sinks.kaSink.hostNames = 192.168.130.6:9300
kafkaLogger.sinks.kaSink.indexName = flume_mq_es_d
kafkaLogger.sinks.kaSink.indexType = flume_mq_es
kafkaLogger.sinks.kaSink.clusterName = zyuc-elasticsearch
kafkaLogger.sinks.kaSink.batchSize = 100
kafkaLogger.sinks.kaSink.client = transport
kafkaLogger.sinks.kaSink.serializer = com.commons.flume.sink.elasticsearch.CommonElasticSearchIndexRequestBuilderFactory
kafkaLogger.sinks.kaSink.serializer.parse = com.commons.log.parser.LogTextParser
kafkaLogger.sinks.kaSink.serializer.formatPattern = yyyyMMdd
kafkaLogger.sinks.kaSink.serializer.dateFieldName = time
kafkaLogger.sinks.kaSink.channel = memoryChannel

  • 写回答

1条回答 默认 最新

  • qq_35160615 2018-12-13 02:43
    关注

    KafkaSource.process(KafkaSource.java:153)] KafkaSource EXCEPTION, {}
    java.lang.NullPointerException
    at org.apache.flume.instrumentation.MonitoredCounterGroup.increment(MonitoredCounterGroup.java:261)
    at org.apache.flume.instrumentation.kafka.KafkaSourceCounter.incrementKafkaEmptyCount(KafkaSourceCounter.java:49)
    at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:146)
    at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
    at java.lang.Thread.run(Thread.java:748)
    这里的报错是flume1.6版本的一个bug这个bug在1.7版本得到了修复,如果没有打算升级flume版本的话,可以写给flume1.6的源码重新打jar包
    https://issues.apache.org/jira/browse/FLUME-2672 这个官方给出的解决方案
    将KafkaSourceCounter该类中的
    private static final String[] ATTRIBUTES = {TIMER_KAFKA_COMMIT, TIMER_KAFKA_EVENT_GET};
    修改为
    private static final String[] ATTRIBUTES = {TIMER_KAFKA_COMMIT, TIMER_KAFKA_EVENT_GET, COUNTER_KAFKA_EMPTY};


    dingwd

    评论

报告相同问题?

悬赏问题

  • ¥100 Jenkins自动化部署—悬赏100元
  • ¥15 关于#python#的问题:求帮写python代码
  • ¥20 MATLAB画图图形出现上下震荡的线条
  • ¥15 关于#windows#的问题:怎么用WIN 11系统的电脑 克隆WIN NT3.51-4.0系统的硬盘
  • ¥15 perl MISA分析p3_in脚本出错
  • ¥15 k8s部署jupyterlab,jupyterlab保存不了文件
  • ¥15 ubuntu虚拟机打包apk错误
  • ¥199 rust编程架构设计的方案 有偿
  • ¥15 回答4f系统的像差计算
  • ¥15 java如何提取出pdf里的文字?