小明同学YYDS 2018-06-05 16:12 采纳率: 0%
浏览 1881
已结题

kafka1.0.0的client,生产者生产数据失败

配置:

 Properties props = new Properties();

        //broker地址
        props.put("bootstrap.servers", "39.108.61.252:9092,39.108.61.252:9093,39.108.61.252:9094");

        //请求时候需要验证
        props.put("acks", "0");

        //请求失败时候需要重试
        props.put("retries", 1);

        //生产者就会尝试将记录组合成一个batch的请求。 这有助于客户端和服务器的性能。不能大于此默认值,否则浪费内存,反而降低吞吐量
        //props.put("batch.size", 16384);

        //汇聚一定时间内的记录一起发出
//        props.put("linger.ms", 50);

        //内存缓存区大小
        props.put("buffer.memory", 33554432);

        //指定消息key序列化方式
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");

        //指定消息本身的序列化方式
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");


        producer = new KafkaProducer<>(props);

生产数据

没有Thread.sleep()就不能成功发送数据!,有了就可已在消费者端接受到数据。若把Thread.sleep()删除,在生产末尾加上close()方法也能成功生产

 for (int i = 0; i < 10; i++) {
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    kafkaProducer.send(new ProducerRecord<>("topic_user_general_info_update", "simpleKey", "value-"+i));

                }

困扰很久,不知道配置还是哪里问题。

  • 写回答

4条回答 默认 最新

  • 邪恶八进制 2018-06-06 00:49
    关注

    close() 方法存在的意义就是为了在使用完队列后释放资源,如果不释放资源 所属的线程就会直接占用一整个 核 的资源。你在末尾添加 close() 方法后能运行就证明是没问题的。

    评论

报告相同问题?

悬赏问题

  • ¥15 Stata 面板数据模型选择
  • ¥20 idea运行测试代码报错问题
  • ¥15 网络监控:网络故障告警通知
  • ¥15 django项目运行报编码错误
  • ¥15 请问这个是什么意思?
  • ¥15 STM32驱动继电器
  • ¥15 Windows server update services
  • ¥15 关于#c语言#的问题:我现在在做一个墨水屏设计,2.9英寸的小屏怎么换4.2英寸大屏
  • ¥15 模糊pid与pid仿真结果几乎一样
  • ¥15 java的GUI的运用