密使v华佗 2017-03-18 02:54 采纳率: 0%
浏览 3284

SparkStreming向kafka写数据,报 Error in I/O......

我按照kafka官网部署了单节点kafka0.8.2.1,部署命令依次如下:
//启动自带的zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &
//启动kafka服务
bin/kafka-server-start.sh config/server.properties &
//创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test &

这是如果按官网的命令,启动一个生产者,写入数据,然后启动一个消费者,可以正常消费数据。

接下来我在spark1.5.2(通过官网文档说对应的kafka是0.8.2.1),所以我用的所有jar包都是0.8.2.1(至少kafka方面的都是0.8.2.1),代码如下
val sparkConf = new SparkConf().setAppName("kafka")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val properties = new Properties()
properties.put("bootstrap.servers", "100.173.249.68:2181")
properties.put("metadata.broker.list", "100.173.249.68:9092")
properties.put("group.id", "test-consumer-group")
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducerString, String
val record = new ProducerRecordString, String
producer.send(record)
ssc.start()
ssc.awaitTermination()

将该代码达成jar包到服务器上运行报错,kafka端报错如下:
INFO Accepted socket connection from /10.173.249.68:58489 (org.apache.zookeeper.server.NIOServerCnxnFactory)
WARN Exception causing close of session 0x0 due to java.io.EOFException (org.apache.zookeeper.server.NIOServerCnxn)
INFO Closed socket connection for client /10.173.249.68:58489 (no session established for client) (org.apache.zookeeper.server.NIOServerCnxn)
这些报错,不停的刷

同时spark处报错也不停的刷,如下,
WARN network.Selector: Error in I/O with chenxm/10.173.249.68
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
at java.lang.Thread.run(Thread.java:745)

不知道什么原因,拜托,多谢

  • 写回答

2条回答 默认 最新

  • 阳光_shenke 2019-03-05 18:00
    关注

    请问下解决了吗?消费遇到同样的问题

    评论

报告相同问题?

悬赏问题

  • ¥15 基于卷积神经网络的声纹识别
  • ¥15 Python中的request,如何使用ssr节点,通过代理requests网页。本人在泰国,需要用大陆ip才能玩网页游戏,合法合规。
  • ¥100 为什么这个恒流源电路不能恒流?
  • ¥15 有偿求跨组件数据流路径图
  • ¥15 写一个方法checkPerson,入参实体类Person,出参布尔值
  • ¥15 我想咨询一下路面纹理三维点云数据处理的一些问题,上传的坐标文件里是怎么对无序点进行编号的,以及xy坐标在处理的时候是进行整体模型分片处理的吗
  • ¥15 CSAPPattacklab
  • ¥15 一直显示正在等待HID—ISP
  • ¥15 Python turtle 画图
  • ¥15 stm32开发clion时遇到的编译问题