我这里有一批测试数据,数据过滤之前的大小为65.2M 左右, 数据有 359021 行, 把这些发送到kafka中,一开始 batch.size 的大小设置为16K,发现每次插入数据的行数都不一样,差距特别大,后来我试着改变这个值,发现一直在发生变化很不稳定,最后在 70K 的时候发现是最接近 359021 行的, 行数稳定 358751 行,但是距离总行还是有所差别,所以想请教一下是什么原因导致的,batch.size 怎么设置才好详细代码如下:
object Producer {
def main(args: Array[String]): Unit = {
//创建 sparkContext
val conf: SparkConf = new SparkConf().setAppName("sparkStream").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
//读取文件并过滤空行
val lines: RDD[String] = sc.textFile("D:\\dev\\大数据\\大数据资料\\spark\\rng_comment.txt").filter(_.trim != "")
//指定连接的节点
val sink: KafkaProducerConf = new KafkaProducerConf("rng_comment", "hadoop01:9092,hadoop02:9092,hadoop03:9092")
//发送数据到kafka
sink.save(lines)
//关闭连接
sink.kafkaProducer.close()
sc.stop()
}
}
class KafkaProducerConf(topic: String, severs: String) extends Serializable {
def createKafkaConnection(): KafkaProducer[String, String] = {
val props: Properties = new Properties()
props.put("bootstrap.servers", severs)
props.put("acks", "all");// acks=0 配置适用于实现非常高的吞吐量 , acks=all 这是最安全的模式
props.put("key.serializer", classOf[StringSerializer].getName)
props.put("value.serializer", classOf[StringSerializer].getName)
props.put("retries", "0") //设置重试次数
props.put("batch.size", "66765") //设置批量发送的数据大小
props.put("buffer.memory", "33554432") //设置缓冲区大小
props.put("linger.ms", "1000") //最多延迟10000毫秒
props.put("partitioner.class", (new MyPartitioner).getClass)
new KafkaProducer[String, String](props)
}
lazy val kafkaProducer: KafkaProducer[String, String] = createKafkaConnection()
//把数据保存到kafka
def save(vs: RDD[String]): Unit = {
try {
vs.foreach(x => {
println(x)
val record: ProducerRecord[String, String] = new ProducerRecord[String, String](topic, x.split("\t")(0), x.toString)
kafkaProducer.send(record)
})
} catch {
case e: Exception => println("发送数据出错:" + e)
}
}
}
部分数据如下:
0 11381 2018/10/20 21:07 今天你还相信网恋吗:希望越大失望越大,技能省得莫名其妙,小规模团战打得又乱,最后一句,骄兵必败! 1 219517 3873092044 今天你还相信网恋吗 5 1540040820
1 683 2018/10/20 21:07 趣游电竞:这几把打的什么呀, 0 36213 3784336787 趣游电竞 2 1540040820
2 0 2018/10/20 21:08 阴暗感i:既然选择了RNG,那么我会一直支持RNG! 0 0 6019401575 阴暗感i 0 1540040880
3 0 2018/10/20 21:08 买桔子的先森:孙大勇和heart亲妈今晚必死,我不想看到他们 0 0 5440806717 买桔子的先森 0 1540040880
4 0 2018/10/20 21:08 还能改三次名别浪费了:菜逼 0 1 5288544674 还能改三次名别浪费了 5 1540040880
5 0 2018/10/20 21:08 弓长威0:退役 0 0 5567911915 弓长威0 0 1540040880
6 0 2018/10/20 21:08 春眠不觉晓丫:滚 0 0 3295234971 春眠不觉晓丫 0 1540040880
7 0 2018/10/20 21:08 面朝大海觉得好冷:滚出lpl 0 0 6023555412 面朝大海觉得好冷 0 1540040880
8 0 2018/10/20 21:08 Whiteiiiiii:你们今天这种态度真的不配赢 0 0 1846699503 Whiteiiiiii 0 1540040880
9 0 2018/10/20 21:08 万年熊队长:天天明年吗,有没有一点新意,我真的没办法再等了。毕业了 0 0 2670160850 万年熊队长 0 1540040880
10 0 2018/10/20 21:08 Airw10:不用明年了,解散了吧 0 0 3169314130 Airw10 0 1540040880
11 0 2018/10/20 21:08 Reck1e55:还能赢回来吗?? 0 1 6337686460 Reck1e55 0 1540040880
12 0 2018/10/20 21:08 SayWanli:四年了本科四年全部都喜欢rng战队 心如死灰 0 0 2356759803 SayWanli 0 1540040880
13 0 2018/10/20 21:08 佳旸牌咖喱牛肉面:买尼玛的冠军皮肤,老子钱都存好了 0 0 6227448758 佳旸牌咖喱牛肉面 2 1540040880
14 0 2018/10/20 21:08 你是我年少时最冒险的梦-:我想问问你们??打的是个什么?努力了一整年??到最后打成这样??真的失望 1 0 3972972440 你是我年少时最冒险的梦- 0 1540040880
15 0 2018/10/20 21:08 深思76863:脱了 0 0 6264488668 深思76863 0 1540040880
16 0 2018/10/20 21:08 石榴哥w:失望透顶失望透顶,不再关注RNG 0 0 6731579474 石榴哥w 0 1540040880
17 0 2018/10/20 21:08 许向暖呐:滚 0 0 5670089757 许向暖呐 0 1540040880
18 0 2018/10/20 21:08 曹大老实人:你知道我在网吧看比赛我周围的人一直说rng回家的时候我心里多难受吗 0 0 6384972437 曹大老实人 3 1540040880
19 0 2018/10/20 21:08 錯過_0927:滾 0 0 2430243037 錯過_0927 0 1540040880
20 0 2018/10/20 21:08 沉着的肉夹馍馍:明年再见 0 0 2128400513 沉着的肉夹馍馍 5 1540040880