qq_29244317 2017-09-15 10:25 采纳率: 0%
浏览 766

SparkStream与flume的整合问题[急,在线等!!!]

各个版本信息:
spark2.0.2
flume1.7
sbt部分依赖 libraryDependencies += "org.apache.spark" % "spark-streaming-flume_2.11" % "2.0.2"

拉模式代码和简单的输出语句
val flumeStream = FlumeUtils.createPollingStream(ssc,host,port,StorageLevel.MEMORY_ONLY_SER_2)
flumeStream.count().map(cnt => "Received " + cnt + " flume events." ).print()

已经在各个节点添加依赖

flume简单配置
# 指定Agent的组件名称

a1.sources = r1
a1.sinks = k1
a1.channels = c1

指定Flume source(要监听的路径)

a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/hadoop/weixf_kafka/testflume

指定Flume sink

a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.channel =c1
a1.sinks.k1.hostname=172.28.41.196
a1.sinks.k1.port = 19999

指定Flume channel

a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 100000

绑定source和sink到channel上

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动flume,再启动SparkStreaming程序发现如下信息(部分)
17/09/15 17:44:53 INFO scheduler.DAGScheduler: Submitting ResultStage 2 (Receiver 0 ParallelCollectionRDD[3] at makeRDD at ReceiverTracker.scala:610), which has no missing parents
17/09/15 17:44:53 INFO scheduler.ReceiverTracker: Receiver 0 started
17/09/15 17:44:53 INFO memory.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 70.6 KB, free 413.8 MB)
17/09/15 17:44:53 INFO memory.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 25.1 KB, free 413.8 MB)
17/09/15 17:44:53 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on 172.28.41.193:41571 (size: 25.1 KB, free: 413.9 MB)
17/09/15 17:44:53 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1012
17/09/15 17:44:53 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (Receiver 0 ParallelCollectionRDD[3] at makeRDD at ReceiverTracker.scala:610)
17/09/15 17:44:53 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
17/09/15 17:44:54 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 70, 172.28.41.196, partition 0, PROCESS_LOCAL, 6736 bytes)
17/09/15 17:44:54 INFO cluster.CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 70 on executor id: 0 hostname: 172.28.41.196.
17/09/15 17:44:54 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on 172.28.41.196:33364 (size: 25.1 KB, free: 413.9 MB)
17/09/15 17:44:54 INFO util.RecurringTimer: Started timer for JobGenerator at time 1505468700000
17/09/15 17:44:54 INFO scheduler.JobGenerator: Started JobGenerator at 1505468700000 ms
17/09/15 17:44:54 INFO scheduler.JobScheduler: Started JobScheduler
17/09/15 17:44:54 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@534e58b6{/streaming,null,AVAILABLE}
17/09/15 17:44:54 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1b495d4{/streaming/json,null,AVAILABLE}
17/09/15 17:44:54 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@12fe1f28{/streaming/batch,null,AVAILABLE}
17/09/15 17:44:54 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@26fb4d06{/streaming/batch/json,null,AVAILABLE}
17/09/15 17:44:54 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2d38edfd{/static/streaming,null,AVAILABLE}
17/09/15 17:44:54 INFO streaming.StreamingContext: StreamingContext started
17/09/15 17:44:55 INFO scheduler.ReceiverTracker: Registered receiver for stream 0 from 172.28.41.196:45983
17/09/15 17:45:01 INFO scheduler.JobScheduler: Added jobs for time 1505468700000 ms
17/09/15 17:45:01 INFO scheduler.JobScheduler: Starting job streaming job 1505468700000 ms.0 from job set of time 1505468700000 ms
17/09/15 17:45:01 INFO spark.SparkContext: Starting job: print at FlumeLogPull.scala:44
17/09/15 17:45:01 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on 172.28.41.196:33364 in memory (size: 1969.0 B, free: 413.9 MB)
17/09/15 17:45:01 INFO scheduler.DAGScheduler: Registering RDD 7 (union at DStream.scala:605)
17/09/15 17:45:01 INFO scheduler.DAGScheduler: Got job 2 (print at FlumeLogPull.scala:44) with 1 output partitions
17/09/15 17:45:01 INFO scheduler.DAGScheduler: Final stage: ResultStage 4 (print at FlumeLogPull.scala:44)
17/09/15 17:45:01 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 3)
17/09/15 17:45:01 INFO scheduler.DAGScheduler: Missing parents: List(ShuffleMapStage 3)
17/09/15 17:45:01 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 3 (UnionRDD[7] at union at DStream.scala:605), which has no missing parents
17/09/15 17:45:01 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on 172.28.41.193:41571 in memory (size: 1969.0 B, free: 413.9 MB)
17/09/15 17:45:02 INFO memory.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 3.3 KB, free 413.8 MB)
17/09/15 17:45:02 INFO memory.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 2.0 KB, free 413.8 MB)
17/09/15 17:45:02 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on 172.28.41.193:41571 (size: 2.0 KB, free: 413.9 MB)
17/09/15 17:45:02 INFO spark.SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1012
17/09/15 17:45:02 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 3 (UnionRDD[7] at union at DStream.scala:605)
17/09/15 17:45:02 INFO scheduler.TaskSchedulerImpl: Adding task set 3.0 with 1 tasks
17/09/15 17:45:30 INFO scheduler.JobScheduler: Added jobs for time 1505468730000 ms
17/09/15 17:46:00 INFO scheduler.JobScheduler: Added jobs for time 1505468760000 ms
17/09/15 17:46:30 INFO scheduler.JobScheduler: Added jobs for time 1505468790000 ms
17/09/15 17:47:00 INFO scheduler.JobScheduler: Added jobs for time 1505468820000 ms
17/09/15 17:47:30 INFO scheduler.JobScheduler: Added jobs for time 1505468850000 ms
17/09/15 17:48:00 INFO scheduler.JobScheduler: Added jobs for time 1505468880000 ms
17/09/15 17:48:30 INFO scheduler.JobScheduler: Added jobs for time 1505468910000 ms
17/09/15 17:49:00 INFO scheduler.JobScheduler: Added jobs for time 1505468940000 ms
17/09/15 17:49:30 INFO scheduler.JobScheduler: Added jobs for time 1505468970000 ms
17/09/15 17:50:00 INFO scheduler.JobScheduler: Added jobs for time 1505469000000 ms
17/09/15 17:50:30 INFO scheduler.JobScheduler: Added jobs for time 1505469030000 ms
17/09/15 17:51:00 INFO scheduler.JobScheduler: Added jobs for time 1505469060000 ms
17/09/15 17:51:30 INFO scheduler.JobScheduler: Added jobs for time 1505469090000 ms
17/09/15 17:52:00 INFO scheduler.JobScheduler: Added jobs for time 1505469120000 ms
17/09/15 17:52:30 INFO scheduler.JobScheduler: Added jobs for time 1505469150000 ms
17/09/15 17:53:00 INFO scheduler.JobScheduler: Added jobs for time 1505469180000 ms
17/09/15 17:53:30 INFO scheduler.JobScheduler: Added jobs for time 1505469210000 ms
17/09/15 17:54:00 INFO scheduler.JobScheduler: Added jobs for time 1505469240000 ms
17/09/15 17:54:30 INFO scheduler.JobScheduler: Added jobs for time 1505469270000 ms
17/09/15 17:55:00 INFO scheduler.JobScheduler: Added jobs for time 1505469300000 ms
17/09/15 17:55:30 INFO scheduler.JobScheduler: Added jobs for time 1505469330000 ms
17/09/15 17:56:00 INFO scheduler.JobScheduler: Added jobs for time 1505469360000 ms
17/09/15 17:56:30 INFO scheduler.JobScheduler: Added jobs for time 1505469390000 ms
17/09/15 17:57:00 INFO scheduler.JobScheduler: Added jobs for time 1505469420000 ms
17/09/15 17:57:30 INFO scheduler.JobScheduler: Added jobs for time 1505469450000 ms
17/09/15 17:58:00 INFO scheduler.JobScheduler: Added jobs for time 1505469480000 ms
17/09/15 17:58:30 INFO scheduler.JobScheduler: Added jobs for time 1505469510000 ms
17/09/15 17:59:00 INFO scheduler.JobScheduler: Added jobs for time 1505469540000 ms
17/09/15 17:59:30 INFO scheduler.JobScheduler: Added jobs for time 1505469570000 ms
17/09/15 18:00:00 INFO scheduler.JobScheduler: Added jobs for time 1505469600000 ms
17/09/15 18:00:30 INFO scheduler.JobScheduler: Added jobs for time 1505469630000 ms
17/09/15 18:00:59 INFO storage.BlockManagerInfo: Added input-0-1505469659600 in memory on 172.28.41.196:33364 (size: 15.7 KB, free: 413.9 MB)
17/09/15 18:01:00 INFO scheduler.JobScheduler: Added jobs for time 1505469660000 ms
17/09/15 18:01:00 INFO storage.BlockManagerInfo: Added input-0-1505469659800 in memory on 172.28.41.196:33364 (size: 15.3 KB, free: 413.9 MB)
17/09/15 18:01:03 INFO storage.BlockManagerInfo: Added input-0-1505469662800 in memory on 172.28.41.196:33364 (size: 7.3 KB, free: 413.9 MB)
17/09/15 18:01:25 INFO storage.BlockManagerInfo: Added input-0-1505469684800 in memory on 172.28.41.196:33364 (size: 15.7 KB, free: 413.8 MB)
17/09/15 18:01:25 INFO storage.BlockManagerInfo: Added input-0-1505469685000 in memory on 172.28.41.196:33364 (size: 15.3 KB, free: 413.8 MB)

其中没有我想要的输出信息而是一直有类似
17/09/15 17:45:30 INFO scheduler.JobScheduler: Added jobs for time 1505468730000 ms
这样的信息,如果向监控的文件夹下copy文件得到这样的输出信息
17/09/15 18:00:59 INFO storage.BlockManagerInfo: Added input-0-1505469659600 in memory on 172.28.41.196:33364 (size: 15.7 KB, free: 413.9 MB)

想要的效果是输出类似这样的正常结果

Time: 1505468700000 ms

Received .. flume events.

实在是找不出来什么原因,求大神解惑,不胜感激

  • 写回答

1条回答

  • CSDN-Ada助手 CSDN-AI 官方账号 2022-10-27 15:49
    关注
    不知道你这个问题是否已经解决, 如果还没有解决的话:

    如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 写成博客, 将相关链接放在评论区, 以帮助更多的人 ^-^
    评论

报告相同问题?

悬赏问题

  • ¥15 按键修改电子时钟,C51单片机
  • ¥60 Java中实现如何实现张量类,并用于图像处理(不运用其他科学计算库和图像处理库))
  • ¥20 5037端口被adb自己占了
  • ¥15 python:excel数据写入多个对应word文档
  • ¥60 全一数分解素因子和素数循环节位数
  • ¥15 ffmpeg如何安装到虚拟环境
  • ¥188 寻找能做王者评分提取的
  • ¥15 matlab用simulink求解一个二阶微分方程,要求截图
  • ¥30 乘子法解约束最优化问题的matlab代码文件,最好有matlab代码文件
  • ¥15 写论文,需要数据支撑