红尘丶世界 2020-04-23 19:33 采纳率: 66.7%
浏览 2541

求大牛指点,kafka 怎么设置 batch.size 合适?

我这里有一批测试数据,数据过滤之前的大小为65.2M 左右, 数据有 359021 行, 把这些发送到kafka中,一开始 batch.size 的大小设置为16K,发现每次插入数据的行数都不一样,差距特别大,后来我试着改变这个值,发现一直在发生变化很不稳定,最后在 70K 的时候发现是最接近 359021 行的, 行数稳定 358751 行‬,但是距离总行还是有所差别,所以想请教一下是什么原因导致的,batch.size 怎么设置才好详细代码如下:

object Producer {
  def main(args: Array[String]): Unit = {
    //创建 sparkContext
    val conf: SparkConf = new SparkConf().setAppName("sparkStream").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    //读取文件并过滤空行
    val lines: RDD[String] = sc.textFile("D:\\dev\\大数据\\大数据资料\\spark\\rng_comment.txt").filter(_.trim != "")
    //指定连接的节点
    val sink: KafkaProducerConf = new KafkaProducerConf("rng_comment", "hadoop01:9092,hadoop02:9092,hadoop03:9092")
    //发送数据到kafka
    sink.save(lines)
    //关闭连接
    sink.kafkaProducer.close()
    sc.stop()
  }
}


class KafkaProducerConf(topic: String, severs: String) extends Serializable {

  def createKafkaConnection(): KafkaProducer[String, String] = {
    val props: Properties = new Properties()
    props.put("bootstrap.servers", severs)
    props.put("acks", "all");// acks=0 配置适用于实现非常高的吞吐量 , acks=all 这是最安全的模式
    props.put("key.serializer", classOf[StringSerializer].getName)
    props.put("value.serializer", classOf[StringSerializer].getName)
    props.put("retries", "0") //设置重试次数
    props.put("batch.size", "66765") //设置批量发送的数据大小
    props.put("buffer.memory", "33554432") //设置缓冲区大小
    props.put("linger.ms", "1000") //最多延迟10000毫秒
    props.put("partitioner.class", (new MyPartitioner).getClass)
    new KafkaProducer[String, String](props)
  }

  lazy val kafkaProducer: KafkaProducer[String, String] = createKafkaConnection()


  //把数据保存到kafka
  def save(vs: RDD[String]): Unit = {
    try {
      vs.foreach(x => {
        println(x)
        val record: ProducerRecord[String, String] = new ProducerRecord[String, String](topic, x.split("\t")(0), x.toString)
        kafkaProducer.send(record)
      })
    } catch {
      case e: Exception => println("发送数据出错:" + e)
    }
  }
}


部分数据如下:

0   11381   2018/10/20 21:07    今天你还相信网恋吗:希望越大失望越大,技能省得莫名其妙,小规模团战打得又乱,最后一句,骄兵必败!        1   219517      3873092044  今天你还相信网恋吗 5   1540040820

1   683 2018/10/20 21:07    趣游电竞:这几把打的什么呀,          0   36213       3784336787  趣游电竞    2   1540040820

2   0   2018/10/20 21:08    阴暗感i:既然选择了RNG,那么我会一直支持RNG!        0   0       6019401575  阴暗感i  0   1540040880

3   0   2018/10/20 21:08    买桔子的先森:孙大勇和heart亲妈今晚必死,我不想看到他们        0   0       5440806717  买桔子的先森  0   1540040880

4   0   2018/10/20 21:08    还能改三次名别浪费了:菜逼 0   1       5288544674  还能改三次名别浪费了  5   1540040880

5   0   2018/10/20 21:08    弓长威0:退役         0   0       5567911915  弓长威0  0   1540040880

6   0   2018/10/20 21:08    春眠不觉晓丫:滚        0   0       3295234971  春眠不觉晓丫  0   1540040880

7   0   2018/10/20 21:08    面朝大海觉得好冷:滚出lpl        0   0       6023555412  面朝大海觉得好冷    0   1540040880

8   0   2018/10/20 21:08    Whiteiiiiii:你们今天这种态度真的不配赢           0   0       1846699503  Whiteiiiiii 0   1540040880

9   0   2018/10/20 21:08    万年熊队长:天天明年吗,有没有一点新意,我真的没办法再等了。毕业了         0   0       2670160850  万年熊队长 0   1540040880

10  0   2018/10/20 21:08    Airw10:不用明年了,解散了吧         0   0       3169314130  Airw10  0   1540040880

11  0   2018/10/20 21:08    Reck1e55:还能赢回来吗??         0   1       6337686460  Reck1e55    0   1540040880

12  0   2018/10/20 21:08    SayWanli:四年了本科四年全部都喜欢rng战队 心如死灰           0   0       2356759803  SayWanli    0   1540040880

13  0   2018/10/20 21:08    佳旸牌咖喱牛肉面:买尼玛的冠军皮肤,老子钱都存好了         0   0       6227448758  佳旸牌咖喱牛肉面    2   1540040880

14  0   2018/10/20 21:08    你是我年少时最冒险的梦-:我想问问你们??打的是个什么?努力了一整年??到最后打成这样??真的失望         1   0       3972972440  你是我年少时最冒险的梦-  0   1540040880

15  0   2018/10/20 21:08    深思76863:脱了        0   0       6264488668  深思76863 0   1540040880

16  0   2018/10/20 21:08    石榴哥w:失望透顶失望透顶,不再关注RNG         0   0       6731579474  石榴哥w  0   1540040880

17  0   2018/10/20 21:08    许向暖呐:滚          0   0       5670089757  许向暖呐    0   1540040880

18  0   2018/10/20 21:08    曹大老实人:你知道我在网吧看比赛我周围的人一直说rng回家的时候我心里多难受吗         0   0       6384972437  曹大老实人 3   1540040880

19  0   2018/10/20 21:08    錯過_0927:滾           0   0       2430243037  錯過_0927 0   1540040880

20  0   2018/10/20 21:08    沉着的肉夹馍馍:明年再见        0   0       2128400513  沉着的肉夹馍馍   5   1540040880
  • 写回答

1条回答 默认 最新

  • zyt_jam 2020-05-13 20:04
    关注

    batch.size 是kafka设置RecordBatch的大小,单位是字节,你上面的部分数据,每行的大小也不同,如何能够用行来监控kafka传输的数据大小呢?
    另外kafka在发送消息时候也会有超时的判断,如果距离上一次发送的时间到了,不管RecordBatch满了没有,都会发送的。
    我理解是这些,楼主看下有没有到你的点上,互相交流下~

    评论

报告相同问题?

悬赏问题

  • ¥50 永磁型步进电机PID算法
  • ¥15 sqlite 附加(attach database)加密数据库时,返回26是什么原因呢?
  • ¥88 找成都本地经验丰富懂小程序开发的技术大咖
  • ¥15 如何处理复杂数据表格的除法运算
  • ¥15 如何用stc8h1k08的片子做485数据透传的功能?(关键词-串口)
  • ¥15 有兄弟姐妹会用word插图功能制作类似citespace的图片吗?
  • ¥200 uniapp长期运行卡死问题解决
  • ¥15 latex怎么处理论文引理引用参考文献
  • ¥15 请教:如何用postman调用本地虚拟机区块链接上的合约?
  • ¥15 为什么使用javacv转封装rtsp为rtmp时出现如下问题:[h264 @ 000000004faf7500]no frame?