predawnlove 2021-11-05 10:20 采纳率: 100%
浏览 40
已结题

kafka在PLAINTEXT模式下,客户端想创建生产者需要在properties里面set哪些字段

目前是确定要用 PLAIN 那种模式,连SSL都不用。然后网上我一直找不到具体要set的字段有哪些……麻烦各位帮忙看看,多谢了

这个里面ssl相关的键是不是可以删掉了。还需要新增哪些键呢……




    @Override
    public void generalDataDeliverBySouthKafka(JSONObject kafkaQueryResult, JSONObject dataModel) {
        JSONObject results = kafkaQueryResult.getJSONObject("results");

        String topic = results.getString("topic");
        if (!StringUtils.isNotEmpty(topic) || null == dataModel) {
            log.error("topic or JSONObject needed to send is empty.");
            return;
        }

        String userName = results.getString("user_name");
        String password = results.getString("password");
        String[] ipAndPorts = results.getString("cluster").split(",");
        String suites = results.getString("communication_encryption_Suite");
        String protocols = results.getString("security_protocol");
        String authorizedEncryptionMode = results.getString("authorized_encryption_mode");

        Properties pro = new Properties();
        // 配置 Kafka 信息。
        String sasl = String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\"  password=\"%s\";", userName, password);
        pro.setProperty("sasl.jaas.config", sasl);
//        pro.setProperty("ssl.cipher.suites", suites);   // 加密套件,通信过程中的加密方式。
//        pro.setProperty("ssl.enabled.protocols", authorizedEncryptionMode);
        pro.setProperty("security.protocol", protocols);    // 这个决定了需要什么样的证书,原始需求里支持什么协议。  SASL-SSL等,现在是死的 PLAIN,但是应该是什么PLAIN? PLAIN 还是 PLAINTEXT
        pro.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        pro.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");


        for (String ipAndPort : ipAndPorts) {
            // 这里配置 Kafka 连接地址
            pro.setProperty("bootstrap.servers", ipAndPort);
            try(Producer<String, String> producer = new KafkaProducer<>(pro)) {
                // Key不管是什么都能发进去,相当于用标签进行分组,所以我们采用 dataId 进行分组。
                producer.send(new ProducerRecord<>(topic, null, System.currentTimeMillis(),
                        dataModel.getString("dataId"), new Record(dataModel).toString()));
            } catch (Exception e) {
                log.info("**************** failed....", e.getMessage());
            }
        }
    }
  • 写回答

1条回答 默认 最新

  • weixin_41973264 2021-11-05 12:32
    关注

    1)参考链接:

    中的
    4.Configure the following properties in producer.properties or consumer.properties:的相关配置进行设置kafka-ssl相关配置。
    2)bootstrap.servers这项配置配置kafka-broker集群地址就行,不用进行多次连接(如果有就其他设计除外)。

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

问题事件

  • 系统已结题 11月19日
  • 已采纳回答 11月11日
  • 修改了问题 11月5日
  • 修改了问题 11月5日
  • 展开全部

悬赏问题

  • ¥15 u盘问题:盘符不显示 无媒体
  • ¥50 R语言读取nc按月均值转为tif
  • ¥30 智能车串级pid调参
  • ¥15 visual studio code翻译老是错误
  • ¥20 卫星测高数据的高程转换
  • ¥15 爬取招聘网站数据信息
  • ¥15 安装完tensorflow,import tensorflow as tf后报错,如何解决?
  • ¥15 ultralytics库导出onnx模型,模型失去预测能力
  • ¥15 linux下点对点协议连接2个USB串口的硬件流量控制问题
  • ¥15 SQL数据自动生成问题