小明同学YYDS 2018-06-05 16:12 采纳率: 0%
浏览 1881
已结题

kafka1.0.0的client,生产者生产数据失败

配置:

 Properties props = new Properties();

        //broker地址
        props.put("bootstrap.servers", "39.108.61.252:9092,39.108.61.252:9093,39.108.61.252:9094");

        //请求时候需要验证
        props.put("acks", "0");

        //请求失败时候需要重试
        props.put("retries", 1);

        //生产者就会尝试将记录组合成一个batch的请求。 这有助于客户端和服务器的性能。不能大于此默认值,否则浪费内存,反而降低吞吐量
        //props.put("batch.size", 16384);

        //汇聚一定时间内的记录一起发出
//        props.put("linger.ms", 50);

        //内存缓存区大小
        props.put("buffer.memory", 33554432);

        //指定消息key序列化方式
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");

        //指定消息本身的序列化方式
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");


        producer = new KafkaProducer<>(props);

生产数据

没有Thread.sleep()就不能成功发送数据!,有了就可已在消费者端接受到数据。若把Thread.sleep()删除,在生产末尾加上close()方法也能成功生产

 for (int i = 0; i < 10; i++) {
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    kafkaProducer.send(new ProducerRecord<>("topic_user_general_info_update", "simpleKey", "value-"+i));

                }

困扰很久,不知道配置还是哪里问题。

  • 写回答

4条回答 默认 最新

  • 邪恶八进制 2018-06-06 00:49
    关注

    close() 方法存在的意义就是为了在使用完队列后释放资源,如果不释放资源 所属的线程就会直接占用一整个 核 的资源。你在末尾添加 close() 方法后能运行就证明是没问题的。

    评论

报告相同问题?

悬赏问题

  • ¥15 如何在scanpy上做差异基因和通路富集?
  • ¥20 关于#硬件工程#的问题,请各位专家解答!
  • ¥15 关于#matlab#的问题:期望的系统闭环传递函数为G(s)=wn^2/s^2+2¢wn+wn^2阻尼系数¢=0.707,使系统具有较小的超调量
  • ¥15 FLUENT如何实现在堆积颗粒的上表面加载高斯热源
  • ¥30 截图中的mathematics程序转换成matlab
  • ¥15 动力学代码报错,维度不匹配
  • ¥15 Power query添加列问题
  • ¥50 Kubernetes&Fission&Eleasticsearch
  • ¥15 報錯:Person is not mapped,如何解決?
  • ¥15 c++头文件不能识别CDialog