2 qq 29244317 qq_29244317 于 2017.09.15 18:25 提问

SparkStream与flume的整合问题[急,在线等!!!]

各个版本信息:
spark2.0.2
flume1.7
sbt部分依赖 libraryDependencies += "org.apache.spark" % "spark-streaming-flume_2.11" % "2.0.2"

拉模式代码和简单的输出语句
val flumeStream = FlumeUtils.createPollingStream(ssc,host,port,StorageLevel.MEMORY_ONLY_SER_2)
flumeStream.count().map(cnt => "Received " + cnt + " flume events." ).print()

已经在各个节点添加依赖

flume简单配置
# 指定Agent的组件名称

a1.sources = r1
a1.sinks = k1
a1.channels = c1

指定Flume source(要监听的路径)

a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/hadoop/weixf_kafka/testflume

指定Flume sink

a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.channel =c1
a1.sinks.k1.hostname=172.28.41.196
a1.sinks.k1.port = 19999

指定Flume channel

a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 100000

绑定source和sink到channel上

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动flume,再启动SparkStreaming程序发现如下信息(部分)
17/09/15 17:44:53 INFO scheduler.DAGScheduler: Submitting ResultStage 2 (Receiver 0 ParallelCollectionRDD[3] at makeRDD at ReceiverTracker.scala:610), which has no missing parents
17/09/15 17:44:53 INFO scheduler.ReceiverTracker: Receiver 0 started
17/09/15 17:44:53 INFO memory.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 70.6 KB, free 413.8 MB)
17/09/15 17:44:53 INFO memory.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 25.1 KB, free 413.8 MB)
17/09/15 17:44:53 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on 172.28.41.193:41571 (size: 25.1 KB, free: 413.9 MB)
17/09/15 17:44:53 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1012
17/09/15 17:44:53 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (Receiver 0 ParallelCollectionRDD[3] at makeRDD at ReceiverTracker.scala:610)
17/09/15 17:44:53 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
17/09/15 17:44:54 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 70, 172.28.41.196, partition 0, PROCESS_LOCAL, 6736 bytes)
17/09/15 17:44:54 INFO cluster.CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 70 on executor id: 0 hostname: 172.28.41.196.
17/09/15 17:44:54 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on 172.28.41.196:33364 (size: 25.1 KB, free: 413.9 MB)
17/09/15 17:44:54 INFO util.RecurringTimer: Started timer for JobGenerator at time 1505468700000
17/09/15 17:44:54 INFO scheduler.JobGenerator: Started JobGenerator at 1505468700000 ms
17/09/15 17:44:54 INFO scheduler.JobScheduler: Started JobScheduler
17/09/15 17:44:54 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@534e58b6{/streaming,null,AVAILABLE}
17/09/15 17:44:54 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1b495d4{/streaming/json,null,AVAILABLE}
17/09/15 17:44:54 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@12fe1f28{/streaming/batch,null,AVAILABLE}
17/09/15 17:44:54 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@26fb4d06{/streaming/batch/json,null,AVAILABLE}
17/09/15 17:44:54 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2d38edfd{/static/streaming,null,AVAILABLE}
17/09/15 17:44:54 INFO streaming.StreamingContext: StreamingContext started
17/09/15 17:44:55 INFO scheduler.ReceiverTracker: Registered receiver for stream 0 from 172.28.41.196:45983
17/09/15 17:45:01 INFO scheduler.JobScheduler: Added jobs for time 1505468700000 ms
17/09/15 17:45:01 INFO scheduler.JobScheduler: Starting job streaming job 1505468700000 ms.0 from job set of time 1505468700000 ms
17/09/15 17:45:01 INFO spark.SparkContext: Starting job: print at FlumeLogPull.scala:44
17/09/15 17:45:01 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on 172.28.41.196:33364 in memory (size: 1969.0 B, free: 413.9 MB)
17/09/15 17:45:01 INFO scheduler.DAGScheduler: Registering RDD 7 (union at DStream.scala:605)
17/09/15 17:45:01 INFO scheduler.DAGScheduler: Got job 2 (print at FlumeLogPull.scala:44) with 1 output partitions
17/09/15 17:45:01 INFO scheduler.DAGScheduler: Final stage: ResultStage 4 (print at FlumeLogPull.scala:44)
17/09/15 17:45:01 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 3)
17/09/15 17:45:01 INFO scheduler.DAGScheduler: Missing parents: List(ShuffleMapStage 3)
17/09/15 17:45:01 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 3 (UnionRDD[7] at union at DStream.scala:605), which has no missing parents
17/09/15 17:45:01 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on 172.28.41.193:41571 in memory (size: 1969.0 B, free: 413.9 MB)
17/09/15 17:45:02 INFO memory.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 3.3 KB, free 413.8 MB)
17/09/15 17:45:02 INFO memory.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 2.0 KB, free 413.8 MB)
17/09/15 17:45:02 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on 172.28.41.193:41571 (size: 2.0 KB, free: 413.9 MB)
17/09/15 17:45:02 INFO spark.SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1012
17/09/15 17:45:02 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 3 (UnionRDD[7] at union at DStream.scala:605)
17/09/15 17:45:02 INFO scheduler.TaskSchedulerImpl: Adding task set 3.0 with 1 tasks
17/09/15 17:45:30 INFO scheduler.JobScheduler: Added jobs for time 1505468730000 ms
17/09/15 17:46:00 INFO scheduler.JobScheduler: Added jobs for time 1505468760000 ms
17/09/15 17:46:30 INFO scheduler.JobScheduler: Added jobs for time 1505468790000 ms
17/09/15 17:47:00 INFO scheduler.JobScheduler: Added jobs for time 1505468820000 ms
17/09/15 17:47:30 INFO scheduler.JobScheduler: Added jobs for time 1505468850000 ms
17/09/15 17:48:00 INFO scheduler.JobScheduler: Added jobs for time 1505468880000 ms
17/09/15 17:48:30 INFO scheduler.JobScheduler: Added jobs for time 1505468910000 ms
17/09/15 17:49:00 INFO scheduler.JobScheduler: Added jobs for time 1505468940000 ms
17/09/15 17:49:30 INFO scheduler.JobScheduler: Added jobs for time 1505468970000 ms
17/09/15 17:50:00 INFO scheduler.JobScheduler: Added jobs for time 1505469000000 ms
17/09/15 17:50:30 INFO scheduler.JobScheduler: Added jobs for time 1505469030000 ms
17/09/15 17:51:00 INFO scheduler.JobScheduler: Added jobs for time 1505469060000 ms
17/09/15 17:51:30 INFO scheduler.JobScheduler: Added jobs for time 1505469090000 ms
17/09/15 17:52:00 INFO scheduler.JobScheduler: Added jobs for time 1505469120000 ms
17/09/15 17:52:30 INFO scheduler.JobScheduler: Added jobs for time 1505469150000 ms
17/09/15 17:53:00 INFO scheduler.JobScheduler: Added jobs for time 1505469180000 ms
17/09/15 17:53:30 INFO scheduler.JobScheduler: Added jobs for time 1505469210000 ms
17/09/15 17:54:00 INFO scheduler.JobScheduler: Added jobs for time 1505469240000 ms
17/09/15 17:54:30 INFO scheduler.JobScheduler: Added jobs for time 1505469270000 ms
17/09/15 17:55:00 INFO scheduler.JobScheduler: Added jobs for time 1505469300000 ms
17/09/15 17:55:30 INFO scheduler.JobScheduler: Added jobs for time 1505469330000 ms
17/09/15 17:56:00 INFO scheduler.JobScheduler: Added jobs for time 1505469360000 ms
17/09/15 17:56:30 INFO scheduler.JobScheduler: Added jobs for time 1505469390000 ms
17/09/15 17:57:00 INFO scheduler.JobScheduler: Added jobs for time 1505469420000 ms
17/09/15 17:57:30 INFO scheduler.JobScheduler: Added jobs for time 1505469450000 ms
17/09/15 17:58:00 INFO scheduler.JobScheduler: Added jobs for time 1505469480000 ms
17/09/15 17:58:30 INFO scheduler.JobScheduler: Added jobs for time 1505469510000 ms
17/09/15 17:59:00 INFO scheduler.JobScheduler: Added jobs for time 1505469540000 ms
17/09/15 17:59:30 INFO scheduler.JobScheduler: Added jobs for time 1505469570000 ms
17/09/15 18:00:00 INFO scheduler.JobScheduler: Added jobs for time 1505469600000 ms
17/09/15 18:00:30 INFO scheduler.JobScheduler: Added jobs for time 1505469630000 ms
17/09/15 18:00:59 INFO storage.BlockManagerInfo: Added input-0-1505469659600 in memory on 172.28.41.196:33364 (size: 15.7 KB, free: 413.9 MB)
17/09/15 18:01:00 INFO scheduler.JobScheduler: Added jobs for time 1505469660000 ms
17/09/15 18:01:00 INFO storage.BlockManagerInfo: Added input-0-1505469659800 in memory on 172.28.41.196:33364 (size: 15.3 KB, free: 413.9 MB)
17/09/15 18:01:03 INFO storage.BlockManagerInfo: Added input-0-1505469662800 in memory on 172.28.41.196:33364 (size: 7.3 KB, free: 413.9 MB)
17/09/15 18:01:25 INFO storage.BlockManagerInfo: Added input-0-1505469684800 in memory on 172.28.41.196:33364 (size: 15.7 KB, free: 413.8 MB)
17/09/15 18:01:25 INFO storage.BlockManagerInfo: Added input-0-1505469685000 in memory on 172.28.41.196:33364 (size: 15.3 KB, free: 413.8 MB)

其中没有我想要的输出信息而是一直有类似
17/09/15 17:45:30 INFO scheduler.JobScheduler: Added jobs for time 1505468730000 ms
这样的信息,如果向监控的文件夹下copy文件得到这样的输出信息
17/09/15 18:00:59 INFO storage.BlockManagerInfo: Added input-0-1505469659600 in memory on 172.28.41.196:33364 (size: 15.7 KB, free: 413.9 MB)

想要的效果是输出类似这样的正常结果

Time: 1505468700000 ms

Received .. flume events.

实在是找不出来什么原因,求大神解惑,不胜感激

Csdn user default icon
上传中...
上传图片
插入图片