我按照kafka官网部署了单节点kafka0.8.2.1,部署命令依次如下:
//启动自带的zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &
//启动kafka服务
bin/kafka-server-start.sh config/server.properties &
//创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test &
这是如果按官网的命令,启动一个生产者,写入数据,然后启动一个消费者,可以正常消费数据。
接下来我在spark1.5.2(通过官网文档说对应的kafka是0.8.2.1),所以我用的所有jar包都是0.8.2.1(至少kafka方面的都是0.8.2.1),代码如下
val sparkConf = new SparkConf().setAppName("kafka")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val properties = new Properties()
properties.put("bootstrap.servers", "100.173.249.68:2181")
properties.put("metadata.broker.list", "100.173.249.68:9092")
properties.put("group.id", "test-consumer-group")
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducerString, String
val record = new ProducerRecordString, String
producer.send(record)
ssc.start()
ssc.awaitTermination()
将该代码达成jar包到服务器上运行报错,kafka端报错如下:
INFO Accepted socket connection from /10.173.249.68:58489 (org.apache.zookeeper.server.NIOServerCnxnFactory)
WARN Exception causing close of session 0x0 due to java.io.EOFException (org.apache.zookeeper.server.NIOServerCnxn)
INFO Closed socket connection for client /10.173.249.68:58489 (no session established for client) (org.apache.zookeeper.server.NIOServerCnxn)
这些报错,不停的刷
同时spark处报错也不停的刷,如下,
WARN network.Selector: Error in I/O with chenxm/10.173.249.68
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
at java.lang.Thread.run(Thread.java:745)
不知道什么原因,拜托,多谢