flume采集kafka报错怎么解决

报错信息:
Source.java:120)] Event #: 0
2018-11-23 17:59:18,995 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:119)] Waited: 965
2018-11-23 17:59:18,995 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:120)] Event #: 0
2018-11-23 17:59:19,005 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:119)] Waited: 975
2018-11-23 17:59:19,005 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:120)] Event #: 0
2018-11-23 17:59:19,015 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:119)] Waited: 985
2018-11-23 17:59:19,015 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:120)] Event #: 0
2018-11-23 17:59:19,025 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:119)] Waited: 995
2018-11-23 17:59:19,025 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:120)] Event #: 0
2018-11-23 17:59:19,036 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:119)] Waited: 1006
2018-11-23 17:59:19,036 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:120)] Event #: 0
2018-11-23 17:59:19,036 (PollableSourceRunner-KafkaSource-kaSource) [ERROR - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:153)] KafkaSource EXCEPTION, {}
java.lang.NullPointerException
at org.apache.flume.instrumentation.MonitoredCounterGroup.increment(MonitoredCounterGroup.java:261)
at org.apache.flume.instrumentation.kafka.KafkaSourceCounter.incrementKafkaEmptyCount(KafkaSourceCounter.java:49)
at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:146)
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
at java.lang.Thread.run(Thread.java:748)

--------------------------------------------

配置文件
kafkaLogger.sources = kaSource

kafkaLogger.channels = memoryChannel
kafkaLogger.sinks = kaSink

The channel can be defined as follows.

kafkaLogger.sources.kaSource.channels = memoryChannel
kafkaLogger.sources.kaSource.type= org.apache.flume.source.kafka.KafkaSource
kafkaLogger.sources.kaSource.zookeeperConnect=192.168.130.4:2181,192.168.130.5:2181,192.168.130.6:2181
kafkaLogger.sources.kaSource.topic=dwd-topic
kafkaLogger.sources.kaSource.groupId = 0

kafkaLogger.channels.memoryChannel.type=memory
kafkaLogger.channels.memoryChannel.capacity = 1000
kafkaLogger.channels.memoryChannel.keep-alive = 60

kafkaLogger.sinks.kaSink.type = elasticsearch
kafkaLogger.sinks.kaSink.hostNames = 192.168.130.6:9300
kafkaLogger.sinks.kaSink.indexName = flume_mq_es_d
kafkaLogger.sinks.kaSink.indexType = flume_mq_es
kafkaLogger.sinks.kaSink.clusterName = zyuc-elasticsearch
kafkaLogger.sinks.kaSink.batchSize = 100
kafkaLogger.sinks.kaSink.client = transport
kafkaLogger.sinks.kaSink.serializer = com.commons.flume.sink.elasticsearch.CommonElasticSearchIndexRequestBuilderFactory
kafkaLogger.sinks.kaSink.serializer.parse = com.commons.log.parser.LogTextParser
kafkaLogger.sinks.kaSink.serializer.formatPattern = yyyyMMdd
kafkaLogger.sinks.kaSink.serializer.dateFieldName = time
kafkaLogger.sinks.kaSink.channel = memoryChannel

1个回答

KafkaSource.process(KafkaSource.java:153)] KafkaSource EXCEPTION, {}
java.lang.NullPointerException
at org.apache.flume.instrumentation.MonitoredCounterGroup.increment(MonitoredCounterGroup.java:261)
at org.apache.flume.instrumentation.kafka.KafkaSourceCounter.incrementKafkaEmptyCount(KafkaSourceCounter.java:49)
at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:146)
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
at java.lang.Thread.run(Thread.java:748)
这里的报错是flume1.6版本的一个bug这个bug在1.7版本得到了修复,如果没有打算升级flume版本的话,可以写给flume1.6的源码重新打jar包
https://issues.apache.org/jira/browse/FLUME-2672 这个官方给出的解决方案
将KafkaSourceCounter该类中的
private static final String[] ATTRIBUTES = {TIMER_KAFKA_COMMIT, TIMER_KAFKA_EVENT_GET};
修改为
private static final String[] ATTRIBUTES = {TIMER_KAFKA_COMMIT, TIMER_KAFKA_EVENT_GET, COUNTER_KAFKA_EMPTY};


dingwd

Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
其他相关推荐
用flume读取kafka数据到hdfs,source创建时报错Kafka topic must be specified

计划使用flume读取kafka的数据传送到hdfs上,结果错误如下 ![图片说明](https://img-ask.csdn.net/upload/202003/20/1584694939_566000.png) 但是我的kafka里确实有对应的topic,名字为topic_start,下面是我的配置文件 ![图片说明](https://img-ask.csdn.net/upload/202003/20/1584695029_280148.png) 不知道问题出在哪里,目前测试结果为kafka里的topic,flume都无法读取出来,求大佬帮忙解决一下这个问题。目前我毫无头绪,如果不行只能考录重新安装flume等办法了

Flume和kafka连接的问题

本人使用flume1.60版本和kafka0.8.2.2版本进行连接,配置如下: a0.sources.r1.type = xiaomu.flume.source.TailFileSource a0.sources.r1.filePath = /root/access2.txt a0.sources.r1.posiFile = /root/posi2.txt a0.sources.r1.interval = 2000 a0.sources.r1.charset = UTF-8 a0.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a0.channels.c1.capacity = 1000 a0.channels.c1.transactionCapacity = 1000 a0.channels.c1.brokerList=slave1:9092,slave4:9092,slave3:9092 a0.channels.c1.topic=usertest3 a0.channels.c1.zookeeperConnect=slave2:2181,slave5:2181,slave6:2181 a0.channels.c1.parseAsFlumeEvent = false 但是我echo "xxx" >> access2.txt之后,在kafka那边就不一样了,比如我打xiaomu,就会出来两行,一行是xiaomum,第二行是空白,还有时候是一行但是开头有一个方框,如图所示: ![图片说明](https://img-ask.csdn.net/upload/201807/21/1532184292_313303.png) 这个怎么解决呀?求助各位大神了!

flume+kafka+hdfs 整合问题

本想搭建一个 flume+hdfs+kafka+storm+mysql 的日志实时分析和存储的系统,但是flume日志收集这块一直不通,查看flume的日志也没有报错,不知道该怎么解决了,求大家帮帮忙,贴出集群配置和配置文件如下: 共5台机器:node1~node5,其中node3~node5为日志收集的agent,node1~node2为flume的collector,最终存储两份,一份到kafka,一份到hdfs。 agent的配置文件如下: #def agent.sources = src_spooldir agent.channels = file memory agent.sinks = collector_avro1 collector_avro2 # sources agent.sources.src_spooldir.type = spooldir agent.sources.src_spooldir.channels = file memory agent.sources.src_spooldir.spoolDir = /data/flume/spoolDir agent.sources.src_spooldir.selector.type = multiplexing agent.sources.src_spooldir.fileHeader = true # channels agent.channels.file.type = file agent.channels.file.checkpointDir = /data/flume/checkpoint agent.channels.file.dataDirs = /data/flume/data agent.channels.memory.type = memory agent.channels.memory.capacity = 10000 agent.channels.memory.transactionCapacity = 10000 agent.channels.memory.byteCapacityBufferPercentage = 20 agent.channels.memory.byteCapacity = 800000 # sinks agent.sinks.collector_avro1.type = avro agent.sinks.collector_avro1.channel = file agent.sinks.collector_avro1.hostname = node1 agent.sinks.collector_avro1.port = 45456 agent.sinks.collector_avro2.type = avro agent.sinks.collector_avro2.channel = memory agent.sinks.collector_avro2.hostname = node2 agent.sinks.collector_avro2.port = 4545 collector端的配置文件如下: #def agent.sources = src_avro agent.channels = file memory agent.sinks = hdfs kafka # sources agent.sources.src_avro.type = avro agent.sources.src_avro.channels = file memory agent.sources.src_avro.bind = node1 agent.sources.src_avro.port = 45456 agent.sources.src_avro.selector.type = replicating # channels agent.channels.file.type = file agent.channels.file.checkpointDir = /data/flume/checkpoint agent.channels.file.dataDirs = /data/flume/data agent.channels.memory.type = memory agent.channels.memory.capacity = 10000 agent.channels.memory.transactionCapacity = 10000 agent.channels.memory.byteCapacityBufferPercentage = 20 agent.channels.memory.byteCapacity = 800000 # sinks agent.sinks.hdfs.type = hdfs agent.sinks.hdfs.channel = file agent.sinks.hdfs.hdfs.path = hdfs://node1/flume/events/%y-%m-%d/%H%M/%S agent.sinks.hdfs.hdfs.filePrefix = log_%Y%m%d_%H agent.sinks.hdfs.hdfs.fileSuffix = .txt agent.sinks.hdfs.hdfs.useLocalTimeStamp = true agent.sinks.hdfs.hdfs.writeFormat = Text agent.sinks.hdfs.hdfs.rollCount = 0 agent.sinks.hdfs.hdfs.rollSize = 1024 agent.sinks.hdfs.hdfs.rollInterval = 0 agent.sinks.kafka.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.kafka.channel = memory agent.sinks.kafka.kafka.topic = test agent.sinks.kafka.kafka.bootstrap.servers = node3:9092,node4:9092,node5:9092 agent.sinks.kafka.kafka.flumeBatchSize = 20 agent.sinks.kafka.kafka.producer.acks = 1 agent.sinks.kafka.kafka.producer.linger.ms = 1 agent.sinks.kafka.kafka.producer.compression.type = snappy 最终 hdfs和kafka都没有接收到数据。

关于flume和kafka结合效率的问题

最近做了个测试。是flume+kafka的。是读取文件夹的。31M的文件读了很长时间。大概20分钟。不知道什么原因。哪位大神知道啊。指导下。 下面是flume的配置 #agent section producer.sources = s producer.channels = c producer.sinks = r #source section #producer.sources.s.type = seq #producer.sources.s.channels = c producer.sources.s.type =spooldir producer.sources.s.spoolDir = /home/lb/data producer.sources.s.channels = c # Each sink's type must be defined producer.sinks.r.type = org.apache.flume.plugins.KafkaSink producer.sinks.r.metadata.broker.list=127.0.0.1:9092 producer.sinks.r.partition.key=1 producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition producer.sinks.r.serializer.class=kafka.serializer.StringEncoder producer.sinks.r.request.required.acks=0 producer.sinks.r.max.message.size=1000000 producer.sinks.r.producer.type=async producer.sinks.r.custom.encoding=UTF-8 producer.sinks.r.custom.topic.name=lbTestToptic #Specify the channel the sink should use producer.sinks.r.channel = c # Each channel's type is defined. producer.channels.c.type = memory producer.channels.c.capacity = 100000 producer.channels.c.transactionCapacity = 10000 希望有知道原因的大神给个帮助啊。谢谢

大数据:flume-ng启动报错

flume-ng1.5.0启动报错java.lang.OutOfMemoryError: Direct buffer memory。 flume-env.sh内存配置4G绝对足够了,请求解决方法

能否用spark streaming和flume或kafka对实时网络数据进行检测

目前已经有一个训练好的机器学习分类模型,存在于HDFS上,可以对LibSVMFile格式的数据进行检测。它是对很多的一段时间内的流量数据(比如1s,很多个1s)提取特征训练之后得到的。 我们知道streaming是将输入流分成微切片,微切片能否可以是从pcap文件读取呢?因为提取特征包括训练模型的时候是需要对pcap文件操作的。 flume和kafka都是可以传输txt的,能不能传输pcap文件呢?要将输入的网络数据流像tcpdump一样可以存为pcap文件,又有像kafka一样的缓存功能可以用哪些技术呢? 最后就是能否用spark streaming利用分类模型对网络数据流进行提特征并预测,而且与防火墙联动,这在技术上是否可行?

kafka连接flume因为hostname的配置报错?

flume连接kafka的时候kafka的server.properties的hostname使用localhost可以收集到flume发送过来的数据,但是想远程连接服务器,所以将localhost改成了服务器的ip地址,就连接不上了。 flume中的example.conf中的地址以及kafka消费者的启动语句中的都已经改掉了。

运行flume的agent,出现如下错误

我的代码: ``` agent.sources = s1 agent.channels = c1 agent.sinks = k1 agent.sources.s1.type=spooldir agent.sources.s1.spoolDir=/tmp/logs/tomcat2kafka agent.sources.s1.channels=c1 agent.channels.c1.type=memory agent.channels.c1.capacity=10000 agent.channels.c1.transactionCapacity=100 #设置Kafka接收 agent.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink #设置Kafka的broker地址和端口号 agent.sinks.k1.brokerList=222.30.194.254:9092 #设置Kafka的Topic agent.sinks.k1.topic=kafkatest2 #设置序列化方式 agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder agent.sinks.k1.channel=c1 ``` 错误提示: ``` [ERROR - org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:240)] Failed to publish events org.apache.kafka.common.errors.InterruptException: Flush interrupted. at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:546) at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57) at org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425) at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544) ... 4 more ``` 网上是真没有相应的答案,无奈了,给分求助

flume配置了kakfaChannel后,启动报错!求大神帮忙

是在虚拟机上配置搭建的服务,flume部署在172.235.10.10上, kafka部署在172.235.10.11上。配置文件如下: flume: a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = http a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 8000 a1.sinks.k1.type = file_roll a1.sinks.k1.channel=c1 a1.sinks.k1.directory=/usr/log/flume a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c1.brokerList=172.235.10.11:9002 a1.channels.c1.topic=test2 a1.channels.c1.zookeeperConnect=172.235.10.11:2181 zookeeper是使用了kafka自带的,配置文件没做修改 kafka配置server.properties修改: host.name=172.235.10.11 zookeeper.connect=172.235.10.11:2181 kafka启动正常,生产者和消费者也是可以通话的 但在启动flume的时候却报这样的一个错误: Error while getting events from Kafka. This is usually caused by trying to read a non-flume event. Ensure the setting for parseAsFlumeEvent is correct 求大神帮忙

做一个flume收集到另一个flume,再传给hdfs,但是现在flume连接hdfs出现如下错误

![![图片说明](https://img-ask.csdn.net/upload/201512/29/1451378748_777711.png) ![图片说明](https://img-ask.csdn.net/upload/201512/29/1451379043_209254.png) 错误主要是这个:Failed to start agent because dependencies were not found in classpath.上图是报错,麻烦大神解决 下面是配置文件 #master_agent master_agent.channels = c2 master_agent.sources = s2 master_agent.sinks = k2 #master_agent avrosources master_agent.sources.s2.type = avro master_agent.sources.s2.bind = master1 master_agent.sources.s2.port = 41415 master_agent.sources.s2.channels = c2 #master_agent filechannels master_agent.channels.c2.type = file master_agent.channels.c2.capacity = 100000 master_agent.channels.c2.transactionCapacity = 1000 #master_agent hdfssinks master_agent.sinks.k2.type = hdfs master_agent.sinks.k2.channel = c2 master_agent.sinks.k2.hdfs.path = hdfs://master1:9000/hdfs master_agent.sinks.k2.hdfs.filePrefix = test- master_agent.sinks.k2.hdfs.inUsePrefix = _ master_agent.sinks.k2.hdfs.inUseSuffix = .tmp master_agent.sinks.k2.hdfs.fileType = DataStream master_agent.sinks.k2.hdfs.writeFormat = Text master_agent.sinks.k2.hdfs.batchSize = 1000 master_agent.sinks.k2.hdfs.callTimeout = 6000

flume增量采集动态日志

请教一个flume增量采集日志问题,谢谢。 遇到一个应用,它生成的日志形式比较特殊,每天产生一个带日期的前缀,并根据文件容量扩展带序号的后缀。 例如2019年11月18日会首先生成20191118.log001,然后当日志文件写满20MB后,会生成20191118.log002,每个文件最大容量20MB,依次类推增加后缀的数字。 我们想用flume实时追加采集日志,如果source里定义: a1.sources.r1.type = exec a1.sources.r1.command = tail -f /tmp/20191118.log001 则可以实时收取,但只能取到一个日志文件,该文件写满20MB后,生成20191118.log2,flume就采集不到了。 如果使用: a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /tmp a1.sources.r1.includePattern = S2019*.* 则可以采集所有文件,但只能将文件采集一次,无法做到实时增量采集。 请教对于这样的日志,如何使用flume采集增量日志,谢谢。

log4j向flume发送数据乱码

flume部署在linux上,log4J2目部署在windows上。用log4j的flumeAppender把数据发送到kafka上,但通过logsink和filesink打印出来的都是乱码,log4J在Windows本地打印出来是正常的。请问这个怎么解决。。。。 我在linux上用flume的avro client把一个UTF-8编码的文本发送到flume,依然是乱码。但用GBK的文本就是正常的。 但是我在win上吧log4j的编码都改过,flume打印出来的还是乱码。

Flume运行报错,显示没有配置过滤器和正则表达式无效

Flume的配置文件: a1.sources=s1 a1.channels=c1 a1.sinks=k1 a1.sources.s1.type = spooldir a1.sources.s1.channels = c1 a1.sources.s1.spoolDir =/home/frankyu/serverlogs a1.source.s1.ignorePattern= ^(.)*\\.tmp$ a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://hadoop1:9000/flume a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.rollInterval = 0 a1.sinks.k1.hdfs.rollSize = 0 a1.sinks.k1.hdfs.rollCount = 10 a1.sinks.k1.channel = c1 a1.channels.c1.type = memory 报错信息: 2019-03-14 02:27:33,799 (conf-file-poller-0) [WARN - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateConfigFilterSet(FlumeConfiguration.java:623)] Agent configuration for 'a1' has no configfilters. 2019-03-14 02:27:33,790 (conf-file-poller-0) [WARN - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1161)] Invalid property specified: source.s1.ignorePattern 2019-03-14 02:27:33,796 (conf-file-poller-0) [WARN - org.apache.flume.conf.FlumeConfiguration.<init>(FlumeConfiguration.java:126)] Configuration property ignored: a1.source.s1.ignorePattern = ^(.)*\.tmp$ 感谢帮助!

flume采集数据到hdfs性能问题

本人目前遇到flume采集写入hdfs性能等各种问题,大致如下。在10上的xx/xx目录下的数据进行读取 sink到08上的flume 由08上的flume写到07的hdfs上 30多m的文件写了好久。有时候会内存溢出等问题![图片说明](https://img-ask.csdn.net/upload/201503/12/1426162664_624860.jpg) # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.bind = r09n08 a1.sources.r1.port = 55555 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = timestamp #hdfs sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://r09n07:8020/project/dame/input/%Y%m%d/%H a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.filePrefix = hdfs- a1.sinks.k1.hdfs.rollInterval = 0 #a1.sinks.k1.hdfs.fileSuffix = .log #a1.sinks.k1.hdfs.round = true #a1.sinks.k1.hdfs.roundValue = 1 #a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.rollSize = 67108864 a1.sinks.k1.hdfs.rollCount = 0 #a1.sinks.k1.hdfs.writeFormat = Text # Use a channel which buffers events in file a1.channels = c1 a1.channels.c1.type = memory #a1.channels.c1.checkpointDir=/home/nids/wg/apache-flume-1.5.2-bin/checkpoint #a1.channels.c1.dataDirs=/home/nids/wg/apache-flume-1.5.2-bin/datadir a1.sinks.k1.hdfs.batchSize = 10000 #a1.sinks.k1.hdfs.callTimeout = 6000 #a1.sinks.k1.hdfs.appendTimeout = 6000 #a1.channels.c1.type = memory a1.channels.c1.capacity = 100000 a1.channels.c1.transactionCapacity = 10000 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 上面是08机器上的配置文件 ``` 下面是10机器上的配置文件 # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe the sink a1.sinks.k1.type = logger #### a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /home/nids/wg/apache-flume-1.5.2-bin/ceshi12 a1.sources.r1.fileHeader =false a1.sources.r1.channels = c1 #### # Describe/configure the source #a1.sources.r1.type = avro a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # avro sink a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = r09n08 a1.sinks.k1.port = 55555 # Use a channel which buffers events in file a1.channels = c1 a1.channels.c1.type = memory #a1.channels.c1.checkpointDir = /home/nids/wg/apache-flume-1.5.2-bin/checkpoint #a1.channels.c1.dataDirs = /home/nids/wg/apache-flume-1.5.2-bin/datadir a1.sinks.k1.hdfs.batchSize = 10000 #a1.channels.c1.type = memory a1.channels.c1.capacity = 100000 a1.channels.c1.transactionCapacity = 10000 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 求各位高手解答。有时候只写了一部分数据就不再继续了,对单个文件执行时没有问题就是对目录扫描 channel是 memory类型时性能极差。不知道问题出在哪里 ```

kafka 消费端 处理数据比较慢,会不会出现数据积压?

如题,kafka消费端接收到数据后 要进行部分业务逻辑操作,可能会有3秒左右,处理很慢 的话,对程序有什么影响呢?新手提问, 望各位大神不吝赐教!

flume自定义source采集到的数据出现了空行

flume自定义source后,采集到hdfs上的数据出现了空行,有谁遇见过么?

kafka启动报错,java.lang.NoSuchMethodError: org.apache.zookeeper.ZooKeeper.multi(Ljava/lang/Iterable;Lorg/apache/zookeeper/AsyncCallback$MultiCallback;Ljava/lang/Object;)V

试过很多方法,降级zk使其和kafka依赖的版本保持一致; zk3.414 ,kafka2.3 删除了scala的环境变量,依然不行; java_home只有一个 java.lang.NoSuchMethodError: org.apache.zookeeper.ZooKeeper.multi(Ljava/lang/Iterable;Lorg/apache/zookeeper/AsyncCallback$MultiCallback;Ljava/lang/Object;)V at kafka.zookeeper.ZooKeeperClient.send(ZooKeeperClient.scala:238) at kafka.zookeeper.ZooKeeperClient.$anonfun$handleRequests$2(ZooKeeperClient.scala:160) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253) at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:259) at kafka.zookeeper.ZooKeeperClient.$anonfun$handleRequests$1(ZooKeeperClient.scala:160) at kafka.zookeeper.ZooKeeperClient.$anonfun$handleRequests$1$adapted(ZooKeeperClient.scala:156) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:156) at kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1660) at kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1647) at kafka.zk.KafkaZkClient.retryRequestUntilConnected(KafkaZkClient.scala:1642) at kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1712) at kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1689) at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:97) at kafka.server.KafkaServer.startup(KafkaServer.scala:262) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38) at kafka.Kafka$.main(Kafka.scala:84) at kafka.Kafka.main(Kafka.scala)

spark+flume运行jar包报错

spark+flume运行jar包报错 ![图片说明](https://img-ask.csdn.net/upload/201709/13/1505302384_331459.png) 这个jar包在类中就没有用到啊

flume-ng能否自定义数据读取完成标识?

flume读取文件时会为文件添加一个读取完成的标示,例如:文件python_20161027.log, 读取完成后会添加一个.COMPLETED的标示,把文件变成了python_20161027.log.COMPLETED 这样破坏了原有的文档结构,例如一些本来可以直接读取的txt文件,被flume采集后就不能再直接读取了,而且还会出现一些其他的问题。 我在使用flume的过程中发现,如果在上游一个程序在不停的写log日志,下游用flume实时采集日志有可能会报java.lang.IllegalStateException: File name has been re-used with different files. 这是因为我们上游的程序是以重定向的方式来写log日志的,当flume读取日志后,把日志名变成了python_20161027.log.COMPLETED后,上游程序再次生成log日志时,先判断是否存在python_20161027.log文件,如果没有的话就会创建python_20161027.log文件,而flume再次读取python_20161027.log文件时,还要生成python_20161027.log.COMPLETED文件。但是因为文件目录下已经存在此文件了,所以就会报上面的错误 我想问问各位大神,有没有什么办法可以让flume采集日志文件后,不改变原有的文件名,从而避免上面的问题

大学四年自学走来,这些私藏的实用工具/学习网站我贡献出来了

大学四年,看课本是不可能一直看课本的了,对于学习,特别是自学,善于搜索网上的一些资源来辅助,还是非常有必要的,下面我就把这几年私藏的各种资源,网站贡献出来给你们。主要有:电子书搜索、实用工具、在线视频学习网站、非视频学习网站、软件下载、面试/求职必备网站。 注意:文中提到的所有资源,文末我都给你整理好了,你们只管拿去,如果觉得不错,转发、分享就是最大的支持了。 一、电子书搜索 对于大部分程序员...

在中国程序员是青春饭吗?

今年,我也32了 ,为了不给大家误导,咨询了猎头、圈内好友,以及年过35岁的几位老程序员……舍了老脸去揭人家伤疤……希望能给大家以帮助,记得帮我点赞哦。 目录: 你以为的人生 一次又一次的伤害 猎头界的真相 如何应对互联网行业的「中年危机」 一、你以为的人生 刚入行时,拿着傲人的工资,想着好好干,以为我们的人生是这样的: 等真到了那一天,你会发现,你的人生很可能是这样的: ...

Java基础知识面试题(2020最新版)

文章目录Java概述何为编程什么是Javajdk1.5之后的三大版本JVM、JRE和JDK的关系什么是跨平台性?原理是什么Java语言有哪些特点什么是字节码?采用字节码的最大好处是什么什么是Java程序的主类?应用程序和小程序的主类有何不同?Java应用程序与小程序之间有那些差别?Java和C++的区别Oracle JDK 和 OpenJDK 的对比基础语法数据类型Java有哪些数据类型switc...

我以为我学懂了数据结构,直到看了这个导图才发现,我错了

数据结构与算法思维导图

String s = new String(" a ") 到底产生几个对象?

老生常谈的一个梗,到2020了还在争论,你们一天天的,哎哎哎,我不是针对你一个,我是说在座的各位都是人才! 上图红色的这3个箭头,对于通过new产生一个字符串(”宜春”)时,会先去常量池中查找是否已经有了”宜春”对象,如果没有则在常量池中创建一个此字符串对象,然后堆中再创建一个常量池中此”宜春”对象的拷贝对象。 也就是说准确答案是产生了一个或两个对象,如果常量池中原来没有 ”宜春” ,就是两个。...

技术大佬:我去,你写的 switch 语句也太老土了吧

昨天早上通过远程的方式 review 了两名新来同事的代码,大部分代码都写得很漂亮,严谨的同时注释也很到位,这令我非常满意。但当我看到他们当中有一个人写的 switch 语句时,还是忍不住破口大骂:“我擦,小王,你丫写的 switch 语句也太老土了吧!” 来看看小王写的代码吧,看完不要骂我装逼啊。 private static String createPlayer(PlayerTypes p...

Linux面试题(2020最新版)

文章目录Linux 概述什么是LinuxUnix和Linux有什么区别?什么是 Linux 内核?Linux的基本组件是什么?Linux 的体系结构BASH和DOS之间的基本区别是什么?Linux 开机启动过程?Linux系统缺省的运行级别?Linux 使用的进程间通信方式?Linux 有哪些系统日志文件?Linux系统安装多个桌面环境有帮助吗?什么是交换空间?什么是root帐户什么是LILO?什...

Linux命令学习神器!命令看不懂直接给你解释!

大家都知道,Linux 系统有非常多的命令,而且每个命令又有非常多的用法,想要全部记住所有命令的所有用法,恐怕是一件不可能完成的任务。 一般情况下,我们学习一个命令时,要么直接百度去搜索它的用法,要么就直接用 man 命令去查看守冗长的帮助手册。这两个都可以实现我们的目标,但有没有更简便的方式呢? 答案是必须有的!今天给大家推荐一款有趣而实用学习神器 — kmdr,让你解锁 Linux 学习新姿势...

和黑客斗争的 6 天!

互联网公司工作,很难避免不和黑客们打交道,我呆过的两家互联网公司,几乎每月每天每分钟都有黑客在公司网站上扫描。有的是寻找 Sql 注入的缺口,有的是寻找线上服务器可能存在的漏洞,大部分都...

史上最全的 python 基础知识汇总篇,没有比这再全面的了,建议收藏

网友们有福了,小编终于把基础篇的内容全部涉略了一遍,这是一篇关于基础知识的汇总的文章,请朋友们收下,不用客气,不过文章篇幅肯能会有点长,耐心阅读吧爬虫(七十)多进程multiproces...

讲一个程序员如何副业月赚三万的真实故事

loonggg读完需要3分钟速读仅需 1 分钟大家好,我是你们的校长。我之前讲过,这年头,只要肯动脑,肯行动,程序员凭借自己的技术,赚钱的方式还是有很多种的。仅仅靠在公司出卖自己的劳动时...

女程序员,为什么比男程序员少???

昨天看到一档综艺节目,讨论了两个话题:(1)中国学生的数学成绩,平均下来看,会比国外好?为什么?(2)男生的数学成绩,平均下来看,会比女生好?为什么?同时,我又联想到了一个技术圈经常讨...

85后蒋凡:28岁实现财务自由、34岁成为阿里万亿电商帝国双掌门,他的人生底层逻辑是什么?...

蒋凡是何许人也? 2017年12月27日,在入职4年时间里,蒋凡开挂般坐上了淘宝总裁位置。 为此,时任阿里CEO张勇在任命书中力赞: 蒋凡加入阿里,始终保持创业者的冲劲,有敏锐的...

总结了 150 余个神奇网站,你不来瞅瞅吗?

原博客再更新,可能就没了,之后将持续更新本篇博客。

副业收入是我做程序媛的3倍,工作外的B面人生是怎样的?

提到“程序员”,多数人脑海里首先想到的大约是:为人木讷、薪水超高、工作枯燥…… 然而,当离开工作岗位,撕去层层标签,脱下“程序员”这身外套,有的人生动又有趣,马上展现出了完全不同的A/B面人生! 不论是简单的爱好,还是正经的副业,他们都干得同样出色。偶尔,还能和程序员的特质结合,产生奇妙的“化学反应”。 @Charlotte:平日素颜示人,周末美妆博主 大家都以为程序媛也个个不修边幅,但我们也许...

MySQL数据库面试题(2020最新版)

文章目录数据库基础知识为什么要使用数据库什么是SQL?什么是MySQL?数据库三大范式是什么mysql有关权限的表都有哪几个MySQL的binlog有有几种录入格式?分别有什么区别?数据类型mysql有哪些数据类型引擎MySQL存储引擎MyISAM与InnoDB区别MyISAM索引与InnoDB索引的区别?InnoDB引擎的4大特性存储引擎选择索引什么是索引?索引有哪些优缺点?索引使用场景(重点)...

新一代神器STM32CubeMonitor介绍、下载、安装和使用教程

关注、星标公众号,不错过精彩内容作者:黄工公众号:strongerHuang最近ST官网悄悄新上线了一款比较强大的工具:STM32CubeMonitor V1.0.0。经过我研究和使用之...

如果你是老板,你会不会踢了这样的员工?

有个好朋友ZS,是技术总监,昨天问我:“有一个老下属,跟了我很多年,做事勤勤恳恳,主动性也很好。但随着公司的发展,他的进步速度,跟不上团队的步伐了,有点...

我入职阿里后,才知道原来简历这么写

私下里,有不少读者问我:“二哥,如何才能写出一份专业的技术简历呢?我总感觉自己写的简历太烂了,所以投了无数份,都石沉大海了。”说实话,我自己好多年没有写过简历了,但我认识的一个同行,他在阿里,给我说了一些他当年写简历的方法论,我感觉太牛逼了,实在是忍不住,就分享了出来,希望能够帮助到你。 01、简历的本质 作为简历的撰写者,你必须要搞清楚一点,简历的本质是什么,它就是为了来销售你的价值主张的。往深...

大学一路走来,学习互联网全靠这几个网站,最终拿下了一把offer

大佬原来都是这样炼成的

离职半年了,老东家又发 offer,回不回?

有小伙伴问松哥这个问题,他在上海某公司,在离职了几个月后,前公司的领导联系到他,希望他能够返聘回去,他很纠结要不要回去? 俗话说好马不吃回头草,但是这个小伙伴既然感到纠结了,我觉得至少说明了两个问题:1.曾经的公司还不错;2.现在的日子也不是很如意。否则应该就不会纠结了。 老实说,松哥之前也有过类似的经历,今天就来和小伙伴们聊聊回头草到底吃不吃。 首先一个基本观点,就是离职了也没必要和老东家弄的苦...

为什么你不想学习?只想玩?人是如何一步一步废掉的

不知道是不是只有我这样子,还是你们也有过类似的经历。 上学的时候总有很多光辉历史,学年名列前茅,或者单科目大佬,但是虽然慢慢地长大了,你开始懈怠了,开始废掉了。。。 什么?你说不知道具体的情况是怎么样的? 我来告诉你: 你常常潜意识里或者心理觉得,自己真正的生活或者奋斗还没有开始。总是幻想着自己还拥有大把时间,还有无限的可能,自己还能逆风翻盘,只不是自己还没开始罢了,自己以后肯定会变得特别厉害...

什么时候跳槽,为什么离职,你想好了么?

都是出来打工的,多为自己着想

为什么程序员做外包会被瞧不起?

二哥,有个事想询问下您的意见,您觉得应届生值得去外包吗?公司虽然挺大的,中xx,但待遇感觉挺低,马上要报到,挺纠结的。

当HR压你价,说你只值7K,你该怎么回答?

当HR压你价,说你只值7K时,你可以流畅地回答,记住,是流畅,不能犹豫。 礼貌地说:“7K是吗?了解了。嗯~其实我对贵司的面试官印象很好。只不过,现在我的手头上已经有一份11K的offer。来面试,主要也是自己对贵司挺有兴趣的,所以过来看看……”(未完) 这段话主要是陪HR互诈的同时,从公司兴趣,公司职员印象上,都给予对方正面的肯定,既能提升HR的好感度,又能让谈判气氛融洽,为后面的发挥留足空间。...

面试阿里p7,被按在地上摩擦,鬼知道我经历了什么?

面试阿里p7被问到的问题(当时我只知道第一个):@Conditional是做什么的?@Conditional多个条件是什么逻辑关系?条件判断在什么时候执...

你期望月薪4万,出门右拐,不送,这几个点,你也就是个初级的水平

先来看几个问题通过注解的方式注入依赖对象,介绍一下你知道的几种方式@Autowired和@Resource有何区别说一下@Autowired查找候选者的...

面试了一个 31 岁程序员,让我有所触动,30岁以上的程序员该何去何从?

最近面试了一个31岁8年经验的程序猿,让我有点感慨,大龄程序猿该何去何从。

大三实习生,字节跳动面经分享,已拿Offer

说实话,自己的算法,我一个不会,太难了吧

程序员垃圾简历长什么样?

已经连续五年参加大厂校招、社招的技术面试工作,简历看的不下于万份 这篇文章会用实例告诉你,什么是差的程序员简历! 疫情快要结束了,各个公司也都开始春招了,作为即将红遍大江南北的新晋UP主,那当然要为小伙伴们做点事(手动狗头)。 就在公众号里公开征简历,义务帮大家看,并一一点评。《启舰:春招在即,义务帮大家看看简历吧》 一石激起千层浪,三天收到两百多封简历。 花光了两个星期的所有空闲时...

立即提问
相关内容推荐