flume多个Source对应一个Channel

flume多个Source对应一个Channel,会有什么结果?会造成部分Source启动或收集数据失败吗?

Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
其他相关推荐
flume自定义source采集到的数据出现了空行
flume自定义source后,采集到hdfs上的数据出现了空行,有谁遇见过么?
flume的spooldir source只能监听本机目录吗
flume的spooldir source是用来监听指定指定目录下的文件 ,通过spoolDir 该配置项指定目录 , 所以这个配置项是仅仅只能配置成本机的目录吗?可以配置成远程的目录吗?或者类似FTP那种方式?
flume增量采集动态日志
请教一个flume增量采集日志问题,谢谢。 遇到一个应用,它生成的日志形式比较特殊,每天产生一个带日期的前缀,并根据文件容量扩展带序号的后缀。 例如2019年11月18日会首先生成20191118.log001,然后当日志文件写满20MB后,会生成20191118.log002,每个文件最大容量20MB,依次类推增加后缀的数字。 我们想用flume实时追加采集日志,如果source里定义: a1.sources.r1.type = exec a1.sources.r1.command = tail -f /tmp/20191118.log001 则可以实时收取,但只能取到一个日志文件,该文件写满20MB后,生成20191118.log2,flume就采集不到了。 如果使用: a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /tmp a1.sources.r1.includePattern = S2019*.* 则可以采集所有文件,但只能将文件采集一次,无法做到实时增量采集。 请教对于这样的日志,如何使用flume采集增量日志,谢谢。
Flume运行报错,试了网上很多办法都没有效果,求解。
![图片说明](https://img-ask.csdn.net/upload/202001/10/1578656542_760185.png)
flume可以监控hdfs上的指定目录吗
现在需要监控hdfs上的一个目录 把新增文件传到另一个hdfs上 目前想到的就是flume 大佬们求帮助,或者其他组建有能完成的吗。
Flume宕机问题求解?(面试)
请问flume宕机怎么办,网上又说flume有事件机制,不会丢失数据,但面试的时候这么回答, 面试官好像不大满意啊
flume采集kafka报错怎么解决
报错信息: Source.java:120)] Event #: 0 2018-11-23 17:59:18,995 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:119)] Waited: 965 2018-11-23 17:59:18,995 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:120)] Event #: 0 2018-11-23 17:59:19,005 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:119)] Waited: 975 2018-11-23 17:59:19,005 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:120)] Event #: 0 2018-11-23 17:59:19,015 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:119)] Waited: 985 2018-11-23 17:59:19,015 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:120)] Event #: 0 2018-11-23 17:59:19,025 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:119)] Waited: 995 2018-11-23 17:59:19,025 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:120)] Event #: 0 2018-11-23 17:59:19,036 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:119)] Waited: 1006 2018-11-23 17:59:19,036 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:120)] Event #: 0 2018-11-23 17:59:19,036 (PollableSourceRunner-KafkaSource-kaSource) [ERROR - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:153)] KafkaSource EXCEPTION, {} java.lang.NullPointerException at org.apache.flume.instrumentation.MonitoredCounterGroup.increment(MonitoredCounterGroup.java:261) at org.apache.flume.instrumentation.kafka.KafkaSourceCounter.incrementKafkaEmptyCount(KafkaSourceCounter.java:49) at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:146) at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139) at java.lang.Thread.run(Thread.java:748) -------------------------------------------- 配置文件 kafkaLogger.sources = kaSource kafkaLogger.channels = memoryChannel kafkaLogger.sinks = kaSink # The channel can be defined as follows. kafkaLogger.sources.kaSource.channels = memoryChannel kafkaLogger.sources.kaSource.type= org.apache.flume.source.kafka.KafkaSource kafkaLogger.sources.kaSource.zookeeperConnect=192.168.130.4:2181,192.168.130.5:2181,192.168.130.6:2181 kafkaLogger.sources.kaSource.topic=dwd-topic kafkaLogger.sources.kaSource.groupId = 0 kafkaLogger.channels.memoryChannel.type=memory kafkaLogger.channels.memoryChannel.capacity = 1000 kafkaLogger.channels.memoryChannel.keep-alive = 60 kafkaLogger.sinks.kaSink.type = elasticsearch kafkaLogger.sinks.kaSink.hostNames = 192.168.130.6:9300 kafkaLogger.sinks.kaSink.indexName = flume_mq_es_d kafkaLogger.sinks.kaSink.indexType = flume_mq_es kafkaLogger.sinks.kaSink.clusterName = zyuc-elasticsearch kafkaLogger.sinks.kaSink.batchSize = 100 kafkaLogger.sinks.kaSink.client = transport kafkaLogger.sinks.kaSink.serializer = com.commons.flume.sink.elasticsearch.CommonElasticSearchIndexRequestBuilderFactory kafkaLogger.sinks.kaSink.serializer.parse = com.commons.log.parser.LogTextParser kafkaLogger.sinks.kaSink.serializer.formatPattern = yyyyMMdd kafkaLogger.sinks.kaSink.serializer.dateFieldName = time kafkaLogger.sinks.kaSink.channel = memoryChannel
关于flume和kafka结合效率的问题
最近做了个测试。是flume+kafka的。是读取文件夹的。31M的文件读了很长时间。大概20分钟。不知道什么原因。哪位大神知道啊。指导下。 下面是flume的配置 #agent section producer.sources = s producer.channels = c producer.sinks = r #source section #producer.sources.s.type = seq #producer.sources.s.channels = c producer.sources.s.type =spooldir producer.sources.s.spoolDir = /home/lb/data producer.sources.s.channels = c # Each sink's type must be defined producer.sinks.r.type = org.apache.flume.plugins.KafkaSink producer.sinks.r.metadata.broker.list=127.0.0.1:9092 producer.sinks.r.partition.key=1 producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition producer.sinks.r.serializer.class=kafka.serializer.StringEncoder producer.sinks.r.request.required.acks=0 producer.sinks.r.max.message.size=1000000 producer.sinks.r.producer.type=async producer.sinks.r.custom.encoding=UTF-8 producer.sinks.r.custom.topic.name=lbTestToptic #Specify the channel the sink should use producer.sinks.r.channel = c # Each channel's type is defined. producer.channels.c.type = memory producer.channels.c.capacity = 100000 producer.channels.c.transactionCapacity = 10000 希望有知道原因的大神给个帮助啊。谢谢
flume-ng 1.4 elasticsearch sink 报错
哪位知道这个怎么回事啊? 使用flume-ng 使用elasticsearch 作为sink的时候报错。 20 三月 2014 22:28:09,417 INFO [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.loadChannels:150) - Creating channels 20 三月 2014 22:28:09,438 INFO [conf-file-poller-0] (org.apache.flume.channel.DefaultChannelFactory.create:40) - Creating instance of channel c1 type memory 20 三月 2014 22:28:09,451 INFO [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.loadChannels:205) - Created channel c1 20 三月 2014 22:28:09,453 INFO [conf-file-poller-0] (org.apache.flume.source.DefaultSourceFactory.create:39) - Creating instance of source r1, type spooldir 20 三月 2014 22:28:09,478 INFO [conf-file-poller-0] (org.apache.flume.sink.DefaultSinkFactory.create:40) - Creating instance of sink: k1, type: elasticsearch 20 三月 2014 22:28:09,486 ERROR [conf-file-poller-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run:145) - Failed to start agent because dependencies were not found in classpath. Error follows. java.lang.NoClassDefFoundError: org/elasticsearch/common/transport/TransportAddress at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:190) at org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:67) at org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:41) at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:415) at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:103) at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) Caused by: java.lang.ClassNotFoundException: org.elasticsearch.common.transport.TransportAddress at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
flume 的hdfs sink效率低的问题
哈喽,大家好,我现在遇到了一个问题。 我的flume向hdfs中写文件时,效率比较低 大约1G/3分钟 我单独测试时用put方式 1分钟能达到8G 如果用file sink也能达到1分钟1G 日志没有任何异常 只是DEBUG的时候发现每次提交一个块用时将近20秒 有高手能帮忙分析下是什么原因么 client.sources = r1 client.channels = c1 client.sinks = k1 client.sources.r1.type = spooldir client.sources.r1.spoolDir = /var/data/tmpdata client.sources.r1.fileSuffix = .COMPLETED client.sources.r1.deletePolicy = never client.sources.r1.batchSize = 500 client.sources.r1.channels = c1 client.channels.c1.type = memory client.channels.c1.capacity = 1000000 client.channels.c1.transactionCapacity = 50000 client.channels.c1.keep-alive = 3 client.sinks.k1.type = hdfs client.sinks.k1.hdfs.path = /flume/events/%Y%m%d/%H client.sinks.k1.hdfs.useLocalTimeStamp = true client.sinks.k1.hdfs.rollInterval = 3600 client.sinks.k1.hdfs.rollSize = 1000000000 client.sinks.k1.hdfs.rollCount = 0 client.sinks.k1.hdfs.batchSize = 500 client.sinks.k1.hdfs.callTimeout = 30000 client.sinks.k1.hdfs.fileType = DataStream client.sinks.k1.channel = c1 12 Aug 2015 16:14:24,739 DEBUG [conf-file-poller-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run:126) - Checking file:../conf/flume-client.conf for changes 12 Aug 2015 16:14:54,740 DEBUG [conf-file-poller-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run:126) - Checking file:../conf/flume-client.conf for changes 12 Aug 2015 16:15:24,740 DEBUG [conf-file-poller-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run:126) - Checking file:../conf/flume-client.conf for changes 12 Aug 2015 16:15:54,741 DEBUG [conf-file-poller-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run:126) - Checking file:../conf/flume-client.conf for changes 12 Aug 2015 16:16:24,742 DEBUG [conf-file-poller-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run:126) - Checking file:../conf/flume-client.conf for changes 12 Aug 2015 16:16:54,742 DEBUG [conf-file-poller-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run:126) - Checking file:../conf/flume-client.conf for changes 12 Aug 2015 16:17:24,743 DEBUG [conf-file-poller-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run:126) - Checking file:../conf/flume-client.conf for changes 12 Aug 2015 16:17:54,744 DEBUG [conf-file-poller-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run:126) - Checking file:../conf/flume-client.conf for changes 12 Aug 2015 16:18:24,745 DEBUG [conf-file-poller-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run:126) - Checking file:../conf/flume-client.conf for changes 12 Aug 2015 16:18:54,746 DEBUG [conf-file-poller-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run:126) - Checking file:../conf/flume-client.conf for changes 12 Aug 2015 16:19:24,746 DEBUG [conf-file-poller-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run:126) - Checking file:../conf/flume-client.conf for changes 日志没有问题 就是慢
关于Flume-ng的netcat配置问题
参考网上的相关教程,我的netcat配置如下: ``` agent1.sources.source1.type = netcat agent1.sources.source1.bind = localhost agent1.sources.source1.port = 44444 ``` 其他的配置就省略了。我启动服务后也正常,出现如下正常日志: ``` 2017-05-09 21:40:21,951 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:164)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444] ``` 然后在windows上开启一个console,telnet 192.168.200.143 44444,结果提示我无法连接主机端口(ps:192.168.200.143就是Flume的主机IP)。 一顿懵逼后,想了想,我并没有开启过44444端口,于是换了下8089端口,这个端口我开了服务,重启启动,报了一堆错: ``` Caused by: java.net.BindException: Address already in use at sun.nio.ch.Net.bind(Native Method) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:126) ``` 地址已经被使用,瞬间崩溃!那个地址的配置不正是监听服务器8089端口的数据情况莫,怎么会说地址被占用,难道启动的时候flume会自己开启8089端口? 好吧,我重新改了下配置,改成监听我windows机器的端口: ``` agent1.sources.source1.type = netcat agent1.sources.source1.bind = 192.168.205.143 #远程windows机器 agent1.sources.source1.port = 9000 #windows开启的9000服务 ``` 再次启动,又是报错: ``` Caused by: java.net.BindException: Cannot assign requested address at sun.nio.ch.Net.bind(Native Method) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:126) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52) at org.apache.flume.source.NetcatSource.start(NetcatSource.java:162) ``` 我彻底崩溃,完全被这个配置搞晕了。 在这里我有个问题需要弄清楚,望大神们帮我解答,多谢! netcat的绑定地址和端口,这个配置到底是什么意思? 1)是Flume自己会根据配置的地址和端口去创建socketServer端口服务,然后客户端程序向这个端口发送日志数据?这显然不符合Flume主动采集日志的特性。 2)还是Flume根据配置的端口和地址去监听着个服务端口和日志数据。我想Flume应该是监听,但是为什么我去监听指定的端口却连启动都不行。 我现在是特地一脸懵逼,被卡在这好难受,大神们快快出现,小弟多谢!
flume采集数据到hdfs性能问题
本人目前遇到flume采集写入hdfs性能等各种问题,大致如下。在10上的xx/xx目录下的数据进行读取 sink到08上的flume 由08上的flume写到07的hdfs上 30多m的文件写了好久。有时候会内存溢出等问题![图片说明](https://img-ask.csdn.net/upload/201503/12/1426162664_624860.jpg) # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.bind = r09n08 a1.sources.r1.port = 55555 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = timestamp #hdfs sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://r09n07:8020/project/dame/input/%Y%m%d/%H a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.filePrefix = hdfs- a1.sinks.k1.hdfs.rollInterval = 0 #a1.sinks.k1.hdfs.fileSuffix = .log #a1.sinks.k1.hdfs.round = true #a1.sinks.k1.hdfs.roundValue = 1 #a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.rollSize = 67108864 a1.sinks.k1.hdfs.rollCount = 0 #a1.sinks.k1.hdfs.writeFormat = Text # Use a channel which buffers events in file a1.channels = c1 a1.channels.c1.type = memory #a1.channels.c1.checkpointDir=/home/nids/wg/apache-flume-1.5.2-bin/checkpoint #a1.channels.c1.dataDirs=/home/nids/wg/apache-flume-1.5.2-bin/datadir a1.sinks.k1.hdfs.batchSize = 10000 #a1.sinks.k1.hdfs.callTimeout = 6000 #a1.sinks.k1.hdfs.appendTimeout = 6000 #a1.channels.c1.type = memory a1.channels.c1.capacity = 100000 a1.channels.c1.transactionCapacity = 10000 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 上面是08机器上的配置文件 ``` 下面是10机器上的配置文件 # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe the sink a1.sinks.k1.type = logger #### a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /home/nids/wg/apache-flume-1.5.2-bin/ceshi12 a1.sources.r1.fileHeader =false a1.sources.r1.channels = c1 #### # Describe/configure the source #a1.sources.r1.type = avro a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # avro sink a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = r09n08 a1.sinks.k1.port = 55555 # Use a channel which buffers events in file a1.channels = c1 a1.channels.c1.type = memory #a1.channels.c1.checkpointDir = /home/nids/wg/apache-flume-1.5.2-bin/checkpoint #a1.channels.c1.dataDirs = /home/nids/wg/apache-flume-1.5.2-bin/datadir a1.sinks.k1.hdfs.batchSize = 10000 #a1.channels.c1.type = memory a1.channels.c1.capacity = 100000 a1.channels.c1.transactionCapacity = 10000 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 求各位高手解答。有时候只写了一部分数据就不再继续了,对单个文件执行时没有问题就是对目录扫描 channel是 memory类型时性能极差。不知道问题出在哪里 ```
做一个flume收集到另一个flume,再传给hdfs,但是现在flume连接hdfs出现如下错误
![![图片说明](https://img-ask.csdn.net/upload/201512/29/1451378748_777711.png) ![图片说明](https://img-ask.csdn.net/upload/201512/29/1451379043_209254.png) 错误主要是这个:Failed to start agent because dependencies were not found in classpath.上图是报错,麻烦大神解决 下面是配置文件 #master_agent master_agent.channels = c2 master_agent.sources = s2 master_agent.sinks = k2 #master_agent avrosources master_agent.sources.s2.type = avro master_agent.sources.s2.bind = master1 master_agent.sources.s2.port = 41415 master_agent.sources.s2.channels = c2 #master_agent filechannels master_agent.channels.c2.type = file master_agent.channels.c2.capacity = 100000 master_agent.channels.c2.transactionCapacity = 1000 #master_agent hdfssinks master_agent.sinks.k2.type = hdfs master_agent.sinks.k2.channel = c2 master_agent.sinks.k2.hdfs.path = hdfs://master1:9000/hdfs master_agent.sinks.k2.hdfs.filePrefix = test- master_agent.sinks.k2.hdfs.inUsePrefix = _ master_agent.sinks.k2.hdfs.inUseSuffix = .tmp master_agent.sinks.k2.hdfs.fileType = DataStream master_agent.sinks.k2.hdfs.writeFormat = Text master_agent.sinks.k2.hdfs.batchSize = 1000 master_agent.sinks.k2.hdfs.callTimeout = 6000
如何处理JDBC批量插入sql不支持多表的情况下的入库速率不稳定的问题?
是这样的,现在我要从来自Kafka的20多个topic中消费出数据,每个topic对应Clickhouse里面的一个表。 但是Clickhouse的JDBC批量插入只支持预编译SQL,即每个 PrepareStatement对象只能批量插入一个表的数据。如下: ``` Connection connection = getConnection(); PrepareStatement ps = connection.prepareStatement("insert into xxx values (?, ?, ?, ?)"); ps.setObject(1, xxx); ps.setObject(2, xxx); ps.setObject(3, xxx); ps.addBatch(); ps.executeBatch(); ps.clearBatch(); // ...... ``` 所以,我在入库程序把每个表的入库分为不同的线程,分别维护不同的PrepareStatement对象, 入库不同的表。比如现在有20个表,我设定每个表3个线程,那么总共就有60个入库线程。 但是这样子做的话,我无法保证入库的速率稳定,因为有的表数据量大,有的因为业务开启有时较大,而分配的入库线程是固定的。各位盆友有什么解决办法吗? ps:入库程序用的flume,用的官方的KafkaSource,然后写了一个Clickhouse的Sink,每个sink就是对应一个入库clickhouse的线程。Channel用的文件内存通道。当Kafka数据量大时,入库速率远远小于消费速率,可能导致Channel通道满,堆积大量磁盘文件,读写磁盘操作又进一步影响sink取数据,然后越来越慢。。最后Kafka都报一堆问题。 再ps:Clickhouse是6个节点的集群,三个分片,一个副本的配置。我批量插入设置150000条一次批量插入。
flume1.5.2希望将log4j的日志写入hdfs报错Unexpected exception from downstream.
1、conf文件如下 ``` agent1.sources = source1 agent1.channels = channel1 agent1.sinks = snik1 # source agent1.sources.source1.type = avro agent1.sources.source1.bind = nnode agent1.sources.source1.port = 44446 agent1.sources.source1.threads = 5 # channel agent1.channels.channel1.type = memory agent1.channels.channel1.capacity = 100000 agent1.channels.channel1.transactionCapacity = 1000 agent1.channels.channel1.keep-alive = 30 agent1.channels.channel1.byteCapacityBufferPercentage = 20 # agent1.channels.channel1.byteCapacity = 200M # sink agent1.sinks.sink1.type = hdfs agent1.sinks.sink1.hdfs.path = /flume/ agent1.sinks.sink1.hdfs.fileType = DataStream agent1.sinks.sink1.hdfs.filePrefix = event_%y-%m-%d_%H_%M_%S agent1.sinks.sink1.hdfs.fileSuffix = .log agent1.sinks.sink1.hdfs.writeFormat = Text agent1.sinks.sink1.hdfs.rollInterval = 30 agent1.sinks.sink1.hdfs.rollSize = 1024 agent1.sinks.sink1.hdfs.rollCount = 0 agent1.sinks.sink1.hdfs.idleTimeout = 20 agent1.sinks.sink1.hdfs.batchSize = 100 # agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1 ``` 2、hdfs集群为hdfs://cluster,两个namenode节点分别为:nnode、dnode1 3、java代码 ``` package com.invic.hdfs; import java.io.IOException; import java.util.Arrays; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.log4j.Logger; /** * * @author lucl * */ public class MyHdfs { public static void main(String[] args) throws IOException { System.setProperty("hadoop.home.dir", "E:\\Hadoop\\hadoop-2.6.0\\hadoop-2.6.0\\"); Logger logger = Logger.getLogger(MyHdfs.class); Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://cluster"); conf.set("dfs.nameservices", "cluster"); conf.set("dfs.ha.namenodes.cluster", "nn1,nn2"); conf.set("dfs.namenode.rpc-address.cluster.nn1", "nnode:8020"); conf.set("dfs.namenode.rpc-address.cluster.nn2", "dnode1:8020"); conf.set("dfs.client.failover.proxy.provider.cluster", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); for (int i = 0; i < 500; i++) { String str = "the sequence is " + i; logger.info(str); } try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } System.exit(0); } } ``` 4、log4j ``` log4j.rootLogger=info,stdout,flume ### stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n ### flume log4j.appender.flume=org.apache.flume.clients.log4jappender.Log4jAppender log4j.appender.flume.layout=org.apache.log4j.PatternLayout log4j.appender.flume.Hostname=nnode log4j.appender.flume.Port=44446 log4j.appender.flume.UnsafeMode=true ``` 5、执行结果 ![后台报错](https://img-ask.csdn.net/upload/201505/26/1432569606_584763.png)
Flume和kafka连接的问题
本人使用flume1.60版本和kafka0.8.2.2版本进行连接,配置如下: a0.sources.r1.type = xiaomu.flume.source.TailFileSource a0.sources.r1.filePath = /root/access2.txt a0.sources.r1.posiFile = /root/posi2.txt a0.sources.r1.interval = 2000 a0.sources.r1.charset = UTF-8 a0.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a0.channels.c1.capacity = 1000 a0.channels.c1.transactionCapacity = 1000 a0.channels.c1.brokerList=slave1:9092,slave4:9092,slave3:9092 a0.channels.c1.topic=usertest3 a0.channels.c1.zookeeperConnect=slave2:2181,slave5:2181,slave6:2181 a0.channels.c1.parseAsFlumeEvent = false 但是我echo "xxx" >> access2.txt之后,在kafka那边就不一样了,比如我打xiaomu,就会出来两行,一行是xiaomum,第二行是空白,还有时候是一行但是开头有一个方框,如图所示: ![图片说明](https://img-ask.csdn.net/upload/201807/21/1532184292_313303.png) 这个怎么解决呀?求助各位大神了!
flume怎么从redis读取数据,sink的时候怎么按照我自己的规则格式保存日志
如题 flume要从redis读取数据要自定义source吗?自定义source应该怎么写?
flume配置了kakfaChannel后,启动报错!求大神帮忙
是在虚拟机上配置搭建的服务,flume部署在172.235.10.10上, kafka部署在172.235.10.11上。配置文件如下: flume: a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = http a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 8000 a1.sinks.k1.type = file_roll a1.sinks.k1.channel=c1 a1.sinks.k1.directory=/usr/log/flume a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c1.brokerList=172.235.10.11:9002 a1.channels.c1.topic=test2 a1.channels.c1.zookeeperConnect=172.235.10.11:2181 zookeeper是使用了kafka自带的,配置文件没做修改 kafka配置server.properties修改: host.name=172.235.10.11 zookeeper.connect=172.235.10.11:2181 kafka启动正常,生产者和消费者也是可以通话的 但在启动flume的时候却报这样的一个错误: Error while getting events from Kafka. This is usually caused by trying to read a non-flume event. Ensure the setting for parseAsFlumeEvent is correct 求大神帮忙
log4j向flume发送数据乱码
flume部署在linux上,log4J2目部署在windows上。用log4j的flumeAppender把数据发送到kafka上,但通过logsink和filesink打印出来的都是乱码,log4J在Windows本地打印出来是正常的。请问这个怎么解决。。。。 我在linux上用flume的avro client把一个UTF-8编码的文本发送到flume,依然是乱码。但用GBK的文本就是正常的。 但是我在win上吧log4j的编码都改过,flume打印出来的还是乱码。
相见恨晚的超实用网站
相见恨晚的超实用网站 持续更新中。。。
爬虫福利二 之 妹子图网MM批量下载
爬虫福利一:27报网MM批量下载 点击 看了本文,相信大家对爬虫一定会产生强烈的兴趣,激励自己去学习爬虫,在这里提前祝:大家学有所成! 目标网站:妹子图网 环境:Python3.x 相关第三方模块:requests、beautifulsoup4 Re:各位在测试时只需要将代码里的变量path 指定为你当前系统要保存的路径,使用 python xxx.py 或IDE运行即可。 ...
字节跳动视频编解码面经
三四月份投了字节跳动的实习(图形图像岗位),然后hr打电话过来问了一下会不会opengl,c++,shador,当时只会一点c++,其他两个都不会,也就直接被拒了。 七月初内推了字节跳动的提前批,因为内推没有具体的岗位,hr又打电话问要不要考虑一下图形图像岗,我说实习投过这个岗位不合适,不会opengl和shador,然后hr就说秋招更看重基础。我当时想着能进去就不错了,管他哪个岗呢,就同意了面试...
开源一个功能完整的SpringBoot项目框架
福利来了,给大家带来一个福利。 最近想了解一下有关Spring Boot的开源项目,看了很多开源的框架,大多是一些demo或者是一个未成形的项目,基本功能都不完整,尤其是用户权限和菜单方面几乎没有完整的。 想到我之前做的框架,里面通用模块有:用户模块,权限模块,菜单模块,功能模块也齐全了,每一个功能都是完整的。 打算把这个框架分享出来,供大家使用和学习。 为什么用框架? 框架可以学习整体...
源码阅读(19):Java中主要的Map结构——HashMap容器(下1)
HashMap容器从字面的理解就是,基于Hash算法构造的Map容器。从数据结构的知识体系来说,HashMap容器是散列表在Java中的具体表达(并非线性表结构)。具体来说就是,利用K-V键值对中键对象的某个属性(默认使用该对象的“内存起始位置”这一属性)作为计算依据进行哈希计算(调用hashCode方法),然后再以计算后的返回值为依据,将当前K-V键值对在符合HashMap容器构造原则的基础上,放置到HashMap容器的某个位置上,且这个位置和之前添加的K-V键值对的存储位置完全独立,不一定构成连续的存储
c++制作的植物大战僵尸,开源,一代二代结合游戏
此游戏全部由本人自己制作完成。游戏大部分的素材来源于原版游戏素材,少部分搜集于网络,以及自己制作。 此游戏为同人游戏而且仅供学习交流使用,任何人未经授权,不得对本游戏进行更改、盗用等,否则后果自负。目前有六种僵尸和六种植物,植物和僵尸的动画都是本人做的。qq:2117610943 开源代码下载 提取码:3vzm 点击下载--&gt; 11月28日 新增四种植物 统一植物画风,全部修...
Java学习的正确打开方式
在博主认为,对于入门级学习java的最佳学习方法莫过于视频+博客+书籍+总结,前三者博主将淋漓尽致地挥毫于这篇博客文章中,至于总结在于个人,实际上越到后面你会发现学习的最好方式就是阅读参考官方文档其次就是国内的书籍,博客次之,这又是一个层次了,这里暂时不提后面再谈。博主将为各位入门java保驾护航,各位只管冲鸭!!!上天是公平的,只要不辜负时间,时间自然不会辜负你。 何谓学习?博主所理解的学习,它是一个过程,是一个不断累积、不断沉淀、不断总结、善于传达自己的个人见解以及乐于分享的过程。
程序员必须掌握的核心算法有哪些?
由于我之前一直强调数据结构以及算法学习的重要性,所以就有一些读者经常问我,数据结构与算法应该要学习到哪个程度呢?,说实话,这个问题我不知道要怎么回答你,主要取决于你想学习到哪些程度,不过针对这个问题,我稍微总结一下我学过的算法知识点,以及我觉得值得学习的算法。这些算法与数据结构的学习大多数是零散的,并没有一本把他们全部覆盖的书籍。下面是我觉得值得学习的一些算法以及数据结构,当然,我也会整理一些看过...
Python——画一棵漂亮的樱花树(不同种樱花+玫瑰+圣诞树喔)
最近翻到一篇知乎,上面有不少用Python(大多是turtle库)绘制的树图,感觉很漂亮,我整理了一下,挑了一些我觉得不错的代码分享给大家(这些我都测试过,确实可以生成) one 樱花树 动态生成樱花 效果图(这个是动态的): 实现代码 import turtle as T import random import time # 画樱花的躯干(60,t) def Tree(branch, ...
linux系列之常用运维命令整理笔录
本博客记录工作中需要的linux运维命令,大学时候开始接触linux,会一些基本操作,可是都没有整理起来,加上是做开发,不做运维,有些命令忘记了,所以现在整理成博客,当然vi,文件操作等就不介绍了,慢慢积累一些其它拓展的命令,博客不定时更新 free -m 其中:m表示兆,也可以用g,注意都要小写 Men:表示物理内存统计 total:表示物理内存总数(total=used+free) use...
Python 基础(一):入门必备知识
Python 入门必备知识,你都掌握了吗?
深度学习图像算法在内容安全领域的应用
互联网给人们生活带来便利的同时也隐含了大量不良信息,防范互联网平台有害内容传播引起了多方面的高度关注。本次演讲从技术层面分享网易易盾在内容安全领域的算法实践经验,包括深度...
程序员接私活怎样防止做完了不给钱?
首先跟大家说明一点,我们做 IT 类的外包开发,是非标品开发,所以很有可能在开发过程中会有这样那样的需求修改,而这种需求修改很容易造成扯皮,进而影响到费用支付,甚至出现做完了项目收不到钱的情况。 那么,怎么保证自己的薪酬安全呢? 我们在开工前,一定要做好一些证据方面的准备(也就是“讨薪”的理论依据),这其中最重要的就是需求文档和验收标准。一定要让需求方提供这两个文档资料作为开发的基础。之后开发...
网页实现一个简单的音乐播放器(大佬别看。(⊙﹏⊙))
今天闲着无事,就想写点东西。然后听了下歌,就打算写个播放器。 于是乎用h5 audio的加上js简单的播放器完工了。 演示地点演示 html代码如下` music 这个年纪 七月的风 音乐 ` 然后就是css`*{ margin: 0; padding: 0; text-decoration: none; list-...
Python十大装B语法
Python 是一种代表简单思想的语言,其语法相对简单,很容易上手。不过,如果就此小视 Python 语法的精妙和深邃,那就大错特错了。本文精心筛选了最能展现 Python 语法之精妙的十个知识点,并附上详细的实例代码。如能在实战中融会贯通、灵活使用,必将使代码更为精炼、高效,同时也会极大提升代码B格,使之看上去更老练,读起来更优雅。
数据库优化 - SQL优化
以实际SQL入手,带你一步一步走上SQL优化之路!
2019年11月中国大陆编程语言排行榜
2019年11月2日,我统计了某招聘网站,获得有效程序员招聘数据9万条。针对招聘信息,提取编程语言关键字,并统计如下: 编程语言比例 rank pl_ percentage 1 java 33.62% 2 cpp 16.42% 3 c_sharp 12.82% 4 javascript 12.31% 5 python 7.93% 6 go 7.25% 7 p...
通俗易懂地给女朋友讲:线程池的内部原理
餐盘在灯光的照耀下格外晶莹洁白,女朋友拿起红酒杯轻轻地抿了一小口,对我说:“经常听你说线程池,到底线程池到底是个什么原理?”
经典算法(5)杨辉三角
写在前面: 我是 扬帆向海,这个昵称来源于我的名字以及女朋友的名字。我热爱技术、热爱开源、热爱编程。技术是开源的、知识是共享的。 这博客是对自己学习的一点点总结及记录,如果您对 Java、算法 感兴趣,可以关注我的动态,我们一起学习。 用知识改变命运,让我们的家人过上更好的生活。 目录一、杨辉三角的介绍二、杨辉三角的算法思想三、代码实现1.第一种写法2.第二种写法 一、杨辉三角的介绍 百度
腾讯算法面试题:64匹马8个跑道需要多少轮才能选出最快的四匹?
昨天,有网友私信我,说去阿里面试,彻底的被打击到了。问了为什么网上大量使用ThreadLocal的源码都会加上private static?他被难住了,因为他从来都没有考虑过这个问题。无独有偶,今天笔者又发现有网友吐槽了一道腾讯的面试题,我们一起来看看。 腾讯算法面试题:64匹马8个跑道需要多少轮才能选出最快的四匹? 在互联网职场论坛,一名程序员发帖求助到。二面腾讯,其中一个算法题:64匹...
面试官:你连RESTful都不知道我怎么敢要你?
干货,2019 RESTful最贱实践
为啥国人偏爱Mybatis,而老外喜欢Hibernate/JPA呢?
关于SQL和ORM的争论,永远都不会终止,我也一直在思考这个问题。昨天又跟群里的小伙伴进行了一番讨论,感触还是有一些,于是就有了今天这篇文。 声明:本文不会下关于Mybatis和JPA两个持久层框架哪个更好这样的结论。只是摆事实,讲道理,所以,请各位看官勿喷。 一、事件起因 关于Mybatis和JPA孰优孰劣的问题,争论已经很多年了。一直也没有结论,毕竟每个人的喜好和习惯是大不相同的。我也看...
项目中的if else太多了,该怎么重构?
介绍 最近跟着公司的大佬开发了一款IM系统,类似QQ和微信哈,就是聊天软件。我们有一部分业务逻辑是这样的 if (msgType = "文本") { // dosomething } else if(msgType = "图片") { // doshomething } else if(msgType = "视频") { // doshomething } else { // doshom...
致 Python 初学者
欢迎来到“Python进阶”专栏!来到这里的每一位同学,应该大致上学习了很多 Python 的基础知识,正在努力成长的过程中。在此期间,一定遇到了很多的困惑,对未来的学习方向感到迷茫。我非常理解你们所面临的处境。我从2007年开始接触 python 这门编程语言,从2009年开始单一使用 python 应对所有的开发工作,直至今天。回顾自己的学习过程,也曾经遇到过无数的困难,也曾经迷茫过、困惑过。开办这个专栏,正是为了帮助像我当年一样困惑的 Python 初学者走出困境、快速成长。希望我的经验能真正帮到你
Python 编程实用技巧
Python是一门很灵活的语言,也有很多实用的方法,有时候实现一个功能可以用多种方法实现,我这里总结了一些常用的方法,并会持续更新。
“狗屁不通文章生成器”登顶GitHub热榜,分分钟写出万字形式主义大作
一、垃圾文字生成器介绍 最近在浏览GitHub的时候,发现了这样一个骨骼清奇的雷人项目,而且热度还特别高。 项目中文名:狗屁不通文章生成器 项目英文名:BullshitGenerator 根据作者的介绍,他是偶尔需要一些中文文字用于GUI开发时测试文本渲染,因此开发了这个废话生成器。但由于生成的废话实在是太过富于哲理,所以最近已经被小伙伴们给玩坏了。 他的文风可能是这样的: 你发现,
程序员:我终于知道post和get的区别
IT界知名的程序员曾说:对于那些月薪三万以下,自称IT工程师的码农们,其实我们从来没有把他们归为我们IT工程师的队伍。他们虽然总是以IT工程师自居,但只是他们一厢情愿罢了。 此话一出,不知激起了多少(码农)程序员的愤怒,却又无可奈何,于是码农问程序员。 码农:你知道get和post请求到底有什么区别? 程序员:你看这篇就知道了。 码农:你月薪三万了? 程序员:嗯。 码农:你是怎么做到的? 程序员:
"狗屁不通文章生成器"登顶GitHub热榜,分分钟写出万字形式主义大作
前言 GitHub 被誉为全球最大的同性交友网站,……,陪伴我们已经走过 10+ 年时间,它托管了大量的软件代码,同时也承载了程序员无尽的欢乐。 上周给大家分享了一篇10个让你笑的合不拢嘴的Github项目,而且还拿了7万+个Star哦,有兴趣的朋友,可以看看, 印象最深刻的是 “ 呼吸不止,码字不停 ”: 老实交代,你是不是经常准备写个技术博客,打开word后瞬间灵感便秘,码不出字? 有什么
推荐几款比较实用的工具,网站
1.盘百度PanDownload 这个云盘工具是免费的,可以进行资源搜索,提速(偶尔会抽风????) 不要去某站买付费的???? PanDownload下载地址 2.BeJSON 这是一款拥有各种在线工具的网站,推荐它的主要原因是网站简洁,功能齐全,广告相比其他广告好太多了 bejson网站 3.二维码美化 这个网站的二维码美化很好看,网站界面也很...
《程序人生》系列-这个程序员只用了20行代码就拿了冠军
你知道的越多,你不知道的越多 点赞再看,养成习惯GitHub上已经开源https://github.com/JavaFamily,有一线大厂面试点脑图,欢迎Star和完善 前言 这一期不算《吊打面试官》系列的,所有没前言我直接开始。 絮叨 本来应该是没有这期的,看过我上期的小伙伴应该是知道的嘛,双十一比较忙嘛,要值班又要去帮忙拍摄年会的视频素材,还得搞个程序员一天的Vlog,还要写BU
相关热词 c#处理浮点数 c# 生成字母数字随机数 c# 动态曲线 控件 c# oracle 开发 c#选择字体大小的控件 c# usb 批量传输 c#10进制转8进制 c#转base64 c# 科学计算 c#下拉列表获取串口
立即提问