70308 [Thread-29-spout-read-kafka] INFO s.k.ZkCoordinator - Task [1/1] New partition managers: [Partition{host=pamshost02:9092, partition=9}, Partition{host=pamshost02:9092, partition=8}, Partition{host=pamshost02:9092, partition=7}, Partition{host=pamshost02:9092, partition=6}, Partition{host=pamshost02:9092, partition=5}, Partition{host=pamshost02:9092, partition=4}, Partition{host=pamshost02:9092, partition=3}, Partition{host=pamshost02:9092, partition=0}, Partition{host=pamshost02:9092, partition=1}, Partition{host=pamshost02:9092, partition=2}]
70599 [Thread-29-spout-read-kafka] INFO s.k.PartitionManager - Read partition information from: /detect/readKafka/partition_9 --> null
91326 [Thread-29-spout-read-kafka] INFO k.c.SimpleConsumer - Reconnect due to socket error: java.nio.channels.ClosedChannelException
91327 [Thread-29-spout-read-kafka] ERROR b.s.util - Async loop died!
java.lang.RuntimeException: java.nio.channels.ClosedChannelException
at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) ~[storm-kafka-0.10.0.jar:0.10.0]
at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) ~[storm-kafka-0.10.0.jar:0.10.0]
at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[storm-kafka-0.10.0.jar:0.10.0]
at backtype.storm.daemon.executor$fn__5624$fn__5639$fn__5670.invoke(executor.clj:607) ~[storm-core-0.10.0.jar:0.10.0]
at backtype.storm.util$async_loop$fn__545.invoke(util.clj:479) [storm-core-0.10.0.jar:0.10.0]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.6.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.7.0_75]
Caused by: java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) ~[kafka_2.11-0.9.0.0.jar:?]
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:98) ~[kafka_2.11-0.9.0.0.jar:?]
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumersendRequest(SimpleConsumer.scala:83) ~[kafka_2.11-0.9.0.0.jar:?]
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) ~[kafka_2.11-0.9.0.0.jar:?]
at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[kafka_2.11-0.9.0.0.jar:?]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:74) ~[storm-kafka-0.10.0.jar:0.10.0]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:64) ~[storm-kafka-0.10.0.jar:0.10.0]
at storm.kafka.PartitionManager.(PartitionManager.java:89) ~[storm-kafka-0.10.0.jar:0.10.0]
at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[storm-kafka-0.10.0.jar:0.10.0]
... 6 more
91329 [Thread-29-spout-read-kafka] ERROR b.s.d.executor -
java.lang.RuntimeException: java.nio.channels.ClosedChannelException
at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) ~[storm-kafka-0.10.0.jar:0.10.0]
at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) ~[storm-kafka-0.10.0.jar:0.10.0]
at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[storm-kafka-0.10.0.jar:0.10.0]
at backtype.storm.daemon.executor$fn__5624$fn__5639$fn__5670.invoke(executor.clj:607) ~[storm-core-0.10.0.jar:0.10.0]
at backtype.storm.util$async_loop$fn__545.invoke(util.clj:479) [storm-core-0.10.0.jar:0.10.0]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.6.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.7.0_75]
Caused by: java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) ~[kafka_2.11-0.9.0.0.jar:?]
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:98) ~[kafka_2.11-0.9.0.0.jar:?]
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumersendRequest(SimpleConsumer.scala:83) ~[kafka_2.11-0.9.0.0.jar:?]
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) ~[kafka_2.11-0.9.0.0.jar:?]
at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[kafka_2.11-0.9.0.0.jar:?]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:74) ~[storm-kafka-0.10.0.jar:0.10.0]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:64) ~[storm-kafka-0.10.0.jar:0.10.0]
at storm.kafka.PartitionManager.(PartitionManager.java:89) ~[storm-kafka-0.10.0.jar:0.10.0]
at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[storm-kafka-0.10.0.jar:0.10.0]
... 6 more
91535 [Thread-29-spout-read-kafka] ERROR b.s.util - Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:336) [storm-core-0.10.0.jar:0.10.0]
at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.6.0.jar:?]
at backtype.storm.daemon.worker$fn__7184$fn__7185.invoke(worker.clj:532) [storm-core-0.10.0.jar:0.10.0]
at backtype.storm.daemon.executor$mk_executor_data$fn__5523$fn__5524.invoke(executor.clj:261) [storm-core-0.10.0.jar:0.10.0]
at backtype.storm.util$async_loop$fn__545.invoke(util.clj:489) [storm-core-0.10.0.jar:0.10.0]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.6.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.7.0_75]
storm kafka整合报错,请大神帮忙看看啥情况
- 写回答
- 好问题 0 提建议
- 关注问题
- 邀请回答
-