predawnlove 2021-11-05 10:20 采纳率: 100%
浏览 40
已结题

kafka在PLAINTEXT模式下,客户端想创建生产者需要在properties里面set哪些字段

目前是确定要用 PLAIN 那种模式,连SSL都不用。然后网上我一直找不到具体要set的字段有哪些……麻烦各位帮忙看看,多谢了

这个里面ssl相关的键是不是可以删掉了。还需要新增哪些键呢……




    @Override
    public void generalDataDeliverBySouthKafka(JSONObject kafkaQueryResult, JSONObject dataModel) {
        JSONObject results = kafkaQueryResult.getJSONObject("results");

        String topic = results.getString("topic");
        if (!StringUtils.isNotEmpty(topic) || null == dataModel) {
            log.error("topic or JSONObject needed to send is empty.");
            return;
        }

        String userName = results.getString("user_name");
        String password = results.getString("password");
        String[] ipAndPorts = results.getString("cluster").split(",");
        String suites = results.getString("communication_encryption_Suite");
        String protocols = results.getString("security_protocol");
        String authorizedEncryptionMode = results.getString("authorized_encryption_mode");

        Properties pro = new Properties();
        // 配置 Kafka 信息。
        String sasl = String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\"  password=\"%s\";", userName, password);
        pro.setProperty("sasl.jaas.config", sasl);
//        pro.setProperty("ssl.cipher.suites", suites);   // 加密套件,通信过程中的加密方式。
//        pro.setProperty("ssl.enabled.protocols", authorizedEncryptionMode);
        pro.setProperty("security.protocol", protocols);    // 这个决定了需要什么样的证书,原始需求里支持什么协议。  SASL-SSL等,现在是死的 PLAIN,但是应该是什么PLAIN? PLAIN 还是 PLAINTEXT
        pro.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        pro.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");


        for (String ipAndPort : ipAndPorts) {
            // 这里配置 Kafka 连接地址
            pro.setProperty("bootstrap.servers", ipAndPort);
            try(Producer<String, String> producer = new KafkaProducer<>(pro)) {
                // Key不管是什么都能发进去,相当于用标签进行分组,所以我们采用 dataId 进行分组。
                producer.send(new ProducerRecord<>(topic, null, System.currentTimeMillis(),
                        dataModel.getString("dataId"), new Record(dataModel).toString()));
            } catch (Exception e) {
                log.info("**************** failed....", e.getMessage());
            }
        }
    }
  • 写回答

1条回答 默认 最新

  • weixin_41973264 2021-11-05 12:32
    关注

    1)参考链接:

    中的
    4.Configure the following properties in producer.properties or consumer.properties:的相关配置进行设置kafka-ssl相关配置。
    2)bootstrap.servers这项配置配置kafka-broker集群地址就行,不用进行多次连接(如果有就其他设计除外)。

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

问题事件

  • 系统已结题 11月19日
  • 已采纳回答 11月11日
  • 修改了问题 11月5日
  • 修改了问题 11月5日
  • 展开全部

悬赏问题

  • ¥15 filenotfounderror:文件是存在的,权限也给了,但还一直报错
  • ¥15 YOLOv5在进行trainpy训练后为什么会出现这种情况(语言-python)
  • ¥15 关于远程桌面的鼠标位置转换
  • ¥15 MATLAB和mosek的求解问题
  • ¥20 修改中兴光猫sn的时候提示失败
  • ¥15 java大作业爬取网页
  • ¥15 怎么获取欧易的btc永续合约和交割合约的5m级的历史数据用来回测套利策略?
  • ¥15 有没有办法利用libusb读取usb设备数据
  • ¥15 为什么openeluer里面按不了python3呢?
  • ¥15 关于#matlab#的问题:训练序列与输入层维度不一样