SparkStream与flume的整合问题[急,在线等!!!]

各个版本信息:
spark2.0.2
flume1.7
sbt部分依赖 libraryDependencies += "org.apache.spark" % "spark-streaming-flume_2.11" % "2.0.2"

拉模式代码和简单的输出语句
val flumeStream = FlumeUtils.createPollingStream(ssc,host,port,StorageLevel.MEMORY_ONLY_SER_2)
flumeStream.count().map(cnt => "Received " + cnt + " flume events." ).print()

已经在各个节点添加依赖

flume简单配置
# 指定Agent的组件名称

a1.sources = r1
a1.sinks = k1
a1.channels = c1

指定Flume source(要监听的路径)

a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/hadoop/weixf_kafka/testflume

指定Flume sink

a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.channel =c1
a1.sinks.k1.hostname=172.28.41.196
a1.sinks.k1.port = 19999

指定Flume channel

a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 100000

绑定source和sink到channel上

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动flume,再启动SparkStreaming程序发现如下信息(部分)
17/09/15 17:44:53 INFO scheduler.DAGScheduler: Submitting ResultStage 2 (Receiver 0 ParallelCollectionRDD[3] at makeRDD at ReceiverTracker.scala:610), which has no missing parents
17/09/15 17:44:53 INFO scheduler.ReceiverTracker: Receiver 0 started
17/09/15 17:44:53 INFO memory.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 70.6 KB, free 413.8 MB)
17/09/15 17:44:53 INFO memory.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 25.1 KB, free 413.8 MB)
17/09/15 17:44:53 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on 172.28.41.193:41571 (size: 25.1 KB, free: 413.9 MB)
17/09/15 17:44:53 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1012
17/09/15 17:44:53 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (Receiver 0 ParallelCollectionRDD[3] at makeRDD at ReceiverTracker.scala:610)
17/09/15 17:44:53 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
17/09/15 17:44:54 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 70, 172.28.41.196, partition 0, PROCESS_LOCAL, 6736 bytes)
17/09/15 17:44:54 INFO cluster.CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 70 on executor id: 0 hostname: 172.28.41.196.
17/09/15 17:44:54 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on 172.28.41.196:33364 (size: 25.1 KB, free: 413.9 MB)
17/09/15 17:44:54 INFO util.RecurringTimer: Started timer for JobGenerator at time 1505468700000
17/09/15 17:44:54 INFO scheduler.JobGenerator: Started JobGenerator at 1505468700000 ms
17/09/15 17:44:54 INFO scheduler.JobScheduler: Started JobScheduler
17/09/15 17:44:54 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@534e58b6{/streaming,null,AVAILABLE}
17/09/15 17:44:54 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1b495d4{/streaming/json,null,AVAILABLE}
17/09/15 17:44:54 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@12fe1f28{/streaming/batch,null,AVAILABLE}
17/09/15 17:44:54 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@26fb4d06{/streaming/batch/json,null,AVAILABLE}
17/09/15 17:44:54 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2d38edfd{/static/streaming,null,AVAILABLE}
17/09/15 17:44:54 INFO streaming.StreamingContext: StreamingContext started
17/09/15 17:44:55 INFO scheduler.ReceiverTracker: Registered receiver for stream 0 from 172.28.41.196:45983
17/09/15 17:45:01 INFO scheduler.JobScheduler: Added jobs for time 1505468700000 ms
17/09/15 17:45:01 INFO scheduler.JobScheduler: Starting job streaming job 1505468700000 ms.0 from job set of time 1505468700000 ms
17/09/15 17:45:01 INFO spark.SparkContext: Starting job: print at FlumeLogPull.scala:44
17/09/15 17:45:01 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on 172.28.41.196:33364 in memory (size: 1969.0 B, free: 413.9 MB)
17/09/15 17:45:01 INFO scheduler.DAGScheduler: Registering RDD 7 (union at DStream.scala:605)
17/09/15 17:45:01 INFO scheduler.DAGScheduler: Got job 2 (print at FlumeLogPull.scala:44) with 1 output partitions
17/09/15 17:45:01 INFO scheduler.DAGScheduler: Final stage: ResultStage 4 (print at FlumeLogPull.scala:44)
17/09/15 17:45:01 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 3)
17/09/15 17:45:01 INFO scheduler.DAGScheduler: Missing parents: List(ShuffleMapStage 3)
17/09/15 17:45:01 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 3 (UnionRDD[7] at union at DStream.scala:605), which has no missing parents
17/09/15 17:45:01 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on 172.28.41.193:41571 in memory (size: 1969.0 B, free: 413.9 MB)
17/09/15 17:45:02 INFO memory.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 3.3 KB, free 413.8 MB)
17/09/15 17:45:02 INFO memory.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 2.0 KB, free 413.8 MB)
17/09/15 17:45:02 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on 172.28.41.193:41571 (size: 2.0 KB, free: 413.9 MB)
17/09/15 17:45:02 INFO spark.SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1012
17/09/15 17:45:02 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 3 (UnionRDD[7] at union at DStream.scala:605)
17/09/15 17:45:02 INFO scheduler.TaskSchedulerImpl: Adding task set 3.0 with 1 tasks
17/09/15 17:45:30 INFO scheduler.JobScheduler: Added jobs for time 1505468730000 ms
17/09/15 17:46:00 INFO scheduler.JobScheduler: Added jobs for time 1505468760000 ms
17/09/15 17:46:30 INFO scheduler.JobScheduler: Added jobs for time 1505468790000 ms
17/09/15 17:47:00 INFO scheduler.JobScheduler: Added jobs for time 1505468820000 ms
17/09/15 17:47:30 INFO scheduler.JobScheduler: Added jobs for time 1505468850000 ms
17/09/15 17:48:00 INFO scheduler.JobScheduler: Added jobs for time 1505468880000 ms
17/09/15 17:48:30 INFO scheduler.JobScheduler: Added jobs for time 1505468910000 ms
17/09/15 17:49:00 INFO scheduler.JobScheduler: Added jobs for time 1505468940000 ms
17/09/15 17:49:30 INFO scheduler.JobScheduler: Added jobs for time 1505468970000 ms
17/09/15 17:50:00 INFO scheduler.JobScheduler: Added jobs for time 1505469000000 ms
17/09/15 17:50:30 INFO scheduler.JobScheduler: Added jobs for time 1505469030000 ms
17/09/15 17:51:00 INFO scheduler.JobScheduler: Added jobs for time 1505469060000 ms
17/09/15 17:51:30 INFO scheduler.JobScheduler: Added jobs for time 1505469090000 ms
17/09/15 17:52:00 INFO scheduler.JobScheduler: Added jobs for time 1505469120000 ms
17/09/15 17:52:30 INFO scheduler.JobScheduler: Added jobs for time 1505469150000 ms
17/09/15 17:53:00 INFO scheduler.JobScheduler: Added jobs for time 1505469180000 ms
17/09/15 17:53:30 INFO scheduler.JobScheduler: Added jobs for time 1505469210000 ms
17/09/15 17:54:00 INFO scheduler.JobScheduler: Added jobs for time 1505469240000 ms
17/09/15 17:54:30 INFO scheduler.JobScheduler: Added jobs for time 1505469270000 ms
17/09/15 17:55:00 INFO scheduler.JobScheduler: Added jobs for time 1505469300000 ms
17/09/15 17:55:30 INFO scheduler.JobScheduler: Added jobs for time 1505469330000 ms
17/09/15 17:56:00 INFO scheduler.JobScheduler: Added jobs for time 1505469360000 ms
17/09/15 17:56:30 INFO scheduler.JobScheduler: Added jobs for time 1505469390000 ms
17/09/15 17:57:00 INFO scheduler.JobScheduler: Added jobs for time 1505469420000 ms
17/09/15 17:57:30 INFO scheduler.JobScheduler: Added jobs for time 1505469450000 ms
17/09/15 17:58:00 INFO scheduler.JobScheduler: Added jobs for time 1505469480000 ms
17/09/15 17:58:30 INFO scheduler.JobScheduler: Added jobs for time 1505469510000 ms
17/09/15 17:59:00 INFO scheduler.JobScheduler: Added jobs for time 1505469540000 ms
17/09/15 17:59:30 INFO scheduler.JobScheduler: Added jobs for time 1505469570000 ms
17/09/15 18:00:00 INFO scheduler.JobScheduler: Added jobs for time 1505469600000 ms
17/09/15 18:00:30 INFO scheduler.JobScheduler: Added jobs for time 1505469630000 ms
17/09/15 18:00:59 INFO storage.BlockManagerInfo: Added input-0-1505469659600 in memory on 172.28.41.196:33364 (size: 15.7 KB, free: 413.9 MB)
17/09/15 18:01:00 INFO scheduler.JobScheduler: Added jobs for time 1505469660000 ms
17/09/15 18:01:00 INFO storage.BlockManagerInfo: Added input-0-1505469659800 in memory on 172.28.41.196:33364 (size: 15.3 KB, free: 413.9 MB)
17/09/15 18:01:03 INFO storage.BlockManagerInfo: Added input-0-1505469662800 in memory on 172.28.41.196:33364 (size: 7.3 KB, free: 413.9 MB)
17/09/15 18:01:25 INFO storage.BlockManagerInfo: Added input-0-1505469684800 in memory on 172.28.41.196:33364 (size: 15.7 KB, free: 413.8 MB)
17/09/15 18:01:25 INFO storage.BlockManagerInfo: Added input-0-1505469685000 in memory on 172.28.41.196:33364 (size: 15.3 KB, free: 413.8 MB)

其中没有我想要的输出信息而是一直有类似
17/09/15 17:45:30 INFO scheduler.JobScheduler: Added jobs for time 1505468730000 ms
这样的信息,如果向监控的文件夹下copy文件得到这样的输出信息
17/09/15 18:00:59 INFO storage.BlockManagerInfo: Added input-0-1505469659600 in memory on 172.28.41.196:33364 (size: 15.7 KB, free: 413.9 MB)

想要的效果是输出类似这样的正常结果

Time: 1505468700000 ms

Received .. flume events.

实在是找不出来什么原因,求大神解惑,不胜感激

Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
其他相关推荐
flume日志收集,拒绝链接什么问题呀!!!!!!!!!!
-
flume 的hdfs sink效率低的问题
-
flume+kafka+hdfs 整合问题
-
如何处理JDBC批量插入sql不支持多表的情况下的入库速率不稳定的问题?
-
关于Flume-ng的netcat配置问题
-
关于flume和kafka结合效率的问题
-
关于flume-ng输出文件名的问题
-
flume采集数据到hdfs性能问题
-
kafka连接flume因为hostname的配置报错?
-
Flume和kafka连接的问题
-
flume的spooldir source只能监听本机目录吗
-
flume开启报错java.lang.SecurityException: sealing violation: package org.apache.flume.conf is sealed
-
运行flume的agent,出现如下错误
-
spark+flume运行jar包报错
-
flume配置了kakfaChannel后,启动报错!求大神帮忙
-
(已解决)flume安装在windows上,试运行报错,求解决方案!
-
flume-ng能否自定义数据读取完成标识?
-
log4j向flume发送数据乱码
-
flume-ng 1.4 elasticsearch sink 报错
-
程序员实用工具网站
目录 1、搜索引擎 2、PPT 3、图片操作 4、文件共享 5、应届生招聘 6、程序员面试题库 7、办公、开发软件 8、高清图片、视频素材网站 9、项目开源 10、在线工具宝典大全 程序员开发需要具备良好的信息检索能力,为了备忘(收藏夹真是满了),将开发过程中常用的网站进行整理。 1、搜索引擎 1.1、秘迹搜索 一款无敌有良心、无敌安全的搜索引擎,不会收集私人信息,保...
我花了一夜用数据结构给女朋友写个H5走迷宫游戏
起因 又到深夜了,我按照以往在csdn和公众号写着数据结构!这占用了我大量的时间!我的超越妹妹严重缺乏陪伴而 怨气满满! 而女朋友时常埋怨,认为数据结构这么抽象难懂的东西没啥作用,常会问道:天天写这玩意,有啥作用。而我答道:能干事情多了,比如写个迷宫小游戏啥的! 当我码完字准备睡觉时:写不好别睡觉! 分析 如果用数据结构与算法造出东西来呢? ...
别再翻了,面试二叉树看这 11 个就够了~
写在前边 数据结构与算法: 不知道你有没有这种困惑,虽然刷了很多算法题,当我去面试的时候,面试官让你手写一个算法,可能你对此算法很熟悉,知道实现思路,但是总是不知道该在什么地方写,而且很多边界条件想不全面,一紧张,代码写的乱七八糟。如果遇到没有做过的算法题,思路也不知道从何寻找。面试吃了亏之后,我就慢慢的做出总结,开始分类的把数据结构所有的题型和解题思路每周刷题做出的系统性总结写在了 Github...
让程序员崩溃的瞬间(非程序员勿入)
今天给大家带来点快乐,程序员才能看懂。 来源:https://zhuanlan.zhihu.com/p/47066521 1. 公司实习生找 Bug 2.在调试时,将断点设置在错误的位置 3.当我有一个很棒的调试想法时 4.偶然间看到自己多年前写的代码 5.当我第一次启动我的单元测试时 ...
七个开源的 Spring Boot 前后端分离项目,一定要收藏!
前后端分离已经在慢慢走进各公司的技术栈,根据松哥了解到的消息,不少公司都已经切换到这个技术栈上面了。即使贵司目前没有切换到这个技术栈上面,松哥也非常建议大家学习一下前后端分离开发,以免在公司干了两三年,SSH 框架用的滚瓜烂熟,出来却发现自己依然没有任何优势! 其实前后端分离本身并不难,后段提供接口,前端做数据展示,关键是这种思想。很多人做惯了前后端不分的开发,在做前后端分离的时候,很容易带进来一...
用Python分析2000款避孕套,得出这些有趣的结论
到现在为止,我们的淘宝教程已经写到了第四篇,前三篇分别是: 第一篇:Python模拟登录淘宝,详细讲解如何使用requests库登录淘宝pc端。 第二篇:淘宝自动登录2.0,新增Cookies序列化,教大家如何将cookies保存起来。 第三篇:Python爬取淘宝商品避孕套,教大家如何爬取淘宝pc端商品信息。 今天,我们来看看淘宝系列的第四篇 我们在上一篇的时候已经将淘宝数据爬取下来了,...
接私活必备的 10 个开源项目!
点击蓝色“GitHubDaily”关注我加个“星标”,每天下午 18:35,带你逛 GitHub!作者 | SevDot来源 | http://1t.click/VE8W...
阿里资深工程师教你如何优化 Java 代码!
作者 | 王超 责编 | 伍杏玲 明代王阳明先生在《传习录》谈为学之道时说: 私欲日生,如地上尘,一日不扫,便又有一层。着实用功,便见道无终穷,愈探愈深,必使精白无一毫不彻方可。 代码中的"坏味道",如"私欲"如"灰尘",每天都在增加,一日不去清除,便会越累越多。如果用功去清除这些"坏味道",不仅能提高自己的编码水平,也能使代码变得"精白无一毫不彻"。这里,整理了日常工作中的一...
GitHub开源的10个超棒后台管理面板
目录 1、AdminLTE 2、vue-Element-Admin 3、tabler 4、Gentelella 5、ng2-admin 6、ant-design-pro 7、blur-admin 8、iview-admin 9、material-dashboard 10、layui 项目开发中后台管理平台必不可少,但是从零搭建一套多样化后台管理并不容易,目前有许多开源、免费、...
100 个网络基础知识普及,看完成半个网络高手
欢迎添加华为云小助手微信(微信号:HWCloud002或HWCloud003),输入关键字“加群”,加入华为云线上技术讨论群;输入关键字“最新活动”,获取华为云最新特惠促销。华为云诸多技术大咖、特惠活动等你来撩! 1)什么是链接? 链接是指两个设备之间的连接。它包括用于一个设备能够与另一个设备通信的电缆类型和协议。 2)OSI 参考模型的层次是什么? 有 7 个 OSI 层:物理...
Google离开我们快十年了
2010年1月13日,Google离开中国。掐指算来,Google已经离开我们快十年了。2010年是个特殊的年份,这一年还发生了3Q大战。为什么诸多大事都发生在2010年...
面试官的HTTP五连问法?我竟然回答不上来...
作者丨松若章来源丨http://1t.click/ataf曾经有这么一道经典面试题:从 URL 在浏览器被被输入到页面展现的过程中发生了什么?相信大多数准备过的同学都能回...
中国最顶级的一批程序员,从首富到首负!
过去的20年是程序员快意恩仇的江湖时代通过代码,实现梦想和财富有人痴迷于技术,做出一夜成名的产品有人将技术变现,创办企业成功上市这些早一代的程序员们创造的奇迹引发了一浪高...
为什么面向对象糟透了?
又是周末,编程语言“三巨头”Java, Lisp 和C语言在Hello World咖啡馆聚会。服务员送来咖啡的同时还带来了一张今天的报纸, 三人寒暄了几句, C语言翻开了...
分享靠写代码赚钱的一些门路
作者 mezod,译者 josephchang10如今,通过自己的代码去赚钱变得越来越简单,不过对很多人来说依然还是很难,因为他们不知道有哪些门路。今天给大家分享一个精彩...
对计算机专业来说学历真的重要吗?
我本科学校是渣渣二本,研究生学校是985,现在毕业五年,校招笔试、面试,社招面试参加了两年了,就我个人的经历来说下这个问题。 这篇文章很长,但绝对是精华,相信我,读完以后,你会知道学历不好的解决方案,记得帮我点赞哦。 先说结论,无论赞不赞同,它本质就是这样:对于技术类工作而言,学历五年以内非常重要,但有办法弥补。五年以后,不重要。 目录: 张雪峰讲述的事实 我看到的事实 为什么会这样 ...
在线就能用的Linux我给你找好了
来源:公众号【编程珠玑】 作者:守望先生 网站:https://www.yanbinghu.com 前言 是不是不想装虚拟机,还想体验一下Linux?是不是自己的电脑不在,又想搞事情?今天给大家推荐几个在线就可以玩的Linux环境以及学习Shell的地方。 在线Linux环境 如果你不想安装虚拟机,这里提供几个在线就能把玩Linux的网站,他们不需要注册用户,可以直接使用。 Unix...
世界上最好的学习法:费曼学习法
你是否曾幻想读一遍书就记住所有的内容?是否想学习完一项技能就马上达到巅峰水平?除非你是天才,不然这是不可能的。对于大多数的普通人来说,可以通过笨办法(死记硬背)来达到学习的目的,但效率低下。当然,也可以通过优秀的学习法来进行学习,比如今天讲的“费曼学习法”,可以将你的学习效率极大的提高。 费曼学习法是由加拿大物理学家费曼所发明的一种高效的学习方法,费曼本身是一个天才,13岁自学微积分,24岁加入曼...
学Linux到底学什么
来源:公众号【编程珠玑】 作者:守望先生 网站:https://www.yanbinghu.com/2019/09/25/14472.html 前言 ​我们常常听到很多人说要学学Linux或者被人告知说应该学学Linux,那么学Linux到底要学什么? 为什么要学Linux 在回答学什么之前,我们先看看为什么要学。首先我们需要认识到的是,很多服务器使用的是Linux系统,而作为服务器应...
深入理解C语言指针
一、指针的概念 要知道指针的概念,要先了解变量在内存中如何存储的。在存储时,内存被分为一块一块的。每一块都有一个特有的编号。而这个编号可以暂时理解为指针,就像酒店的门牌号一样。 1.1、变量和地址 先写一段简单的代码: void main(){ int x = 10, int y = 20; } 这段代码非常简单,就是两个变量的声明,分别赋值了 10、20。我们把内存当做一个酒店,而每个房间就...
C语言实现推箱子游戏
很早就想过做点小游戏了,但是一直没有机会动手。今天闲来无事,动起手来。过程还是蛮顺利的,代码也不是非常难。今天给大家分享一下~ 一、介绍 开发语言:C语言 开发工具:Dev-C++ 5.11 日期:2019年9月28日 作者:ZackSock 也不说太多多余的话了,先看一下效果图: 游戏中的人物、箱子、墙壁、球都是字符构成的。通过wasd键移动,规则的话就是推箱子的规则,也就不多说了。 二、代...
相关热词 对文件aes加密vc# c#读取栈中所有的值 c# rsa256加密 好 学c# 还是c++ c# 和java的差距 c# curl网络框架 c# https证书请求 c# 中崎 c#窗体自动弹出子窗体 c# 连接sqlite