predawnlove 2021-11-05 10:20 采纳率: 100%
浏览 40
已结题

kafka在PLAINTEXT模式下,客户端想创建生产者需要在properties里面set哪些字段

目前是确定要用 PLAIN 那种模式,连SSL都不用。然后网上我一直找不到具体要set的字段有哪些……麻烦各位帮忙看看,多谢了

这个里面ssl相关的键是不是可以删掉了。还需要新增哪些键呢……




    @Override
    public void generalDataDeliverBySouthKafka(JSONObject kafkaQueryResult, JSONObject dataModel) {
        JSONObject results = kafkaQueryResult.getJSONObject("results");

        String topic = results.getString("topic");
        if (!StringUtils.isNotEmpty(topic) || null == dataModel) {
            log.error("topic or JSONObject needed to send is empty.");
            return;
        }

        String userName = results.getString("user_name");
        String password = results.getString("password");
        String[] ipAndPorts = results.getString("cluster").split(",");
        String suites = results.getString("communication_encryption_Suite");
        String protocols = results.getString("security_protocol");
        String authorizedEncryptionMode = results.getString("authorized_encryption_mode");

        Properties pro = new Properties();
        // 配置 Kafka 信息。
        String sasl = String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\"  password=\"%s\";", userName, password);
        pro.setProperty("sasl.jaas.config", sasl);
//        pro.setProperty("ssl.cipher.suites", suites);   // 加密套件,通信过程中的加密方式。
//        pro.setProperty("ssl.enabled.protocols", authorizedEncryptionMode);
        pro.setProperty("security.protocol", protocols);    // 这个决定了需要什么样的证书,原始需求里支持什么协议。  SASL-SSL等,现在是死的 PLAIN,但是应该是什么PLAIN? PLAIN 还是 PLAINTEXT
        pro.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        pro.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");


        for (String ipAndPort : ipAndPorts) {
            // 这里配置 Kafka 连接地址
            pro.setProperty("bootstrap.servers", ipAndPort);
            try(Producer<String, String> producer = new KafkaProducer<>(pro)) {
                // Key不管是什么都能发进去,相当于用标签进行分组,所以我们采用 dataId 进行分组。
                producer.send(new ProducerRecord<>(topic, null, System.currentTimeMillis(),
                        dataModel.getString("dataId"), new Record(dataModel).toString()));
            } catch (Exception e) {
                log.info("**************** failed....", e.getMessage());
            }
        }
    }
  • 写回答

1条回答 默认 最新

  • weixin_41973264 2021-11-05 12:32
    关注

    1)参考链接:

    中的
    4.Configure the following properties in producer.properties or consumer.properties:的相关配置进行设置kafka-ssl相关配置。
    2)bootstrap.servers这项配置配置kafka-broker集群地址就行,不用进行多次连接(如果有就其他设计除外)。

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

问题事件

  • 系统已结题 11月19日
  • 已采纳回答 11月11日
  • 修改了问题 11月5日
  • 修改了问题 11月5日
  • 展开全部

悬赏问题

  • ¥20 西门子S7-Graph,S7-300,梯形图
  • ¥50 用易语言http 访问不了网页
  • ¥50 safari浏览器fetch提交数据后数据丢失问题
  • ¥15 matlab不知道怎么改,求解答!!
  • ¥15 永磁直线电机的电流环pi调不出来
  • ¥15 用stata实现聚类的代码
  • ¥15 请问paddlehub能支持移动端开发吗?在Android studio上该如何部署?
  • ¥20 docker里部署springboot项目,访问不到扬声器
  • ¥15 netty整合springboot之后自动重连失效
  • ¥15 悬赏!微信开发者工具报错,求帮改