predawnlove 2021-11-05 10:20 采纳率: 100%
浏览 40
已结题

kafka在PLAINTEXT模式下,客户端想创建生产者需要在properties里面set哪些字段

目前是确定要用 PLAIN 那种模式,连SSL都不用。然后网上我一直找不到具体要set的字段有哪些……麻烦各位帮忙看看,多谢了

这个里面ssl相关的键是不是可以删掉了。还需要新增哪些键呢……




    @Override
    public void generalDataDeliverBySouthKafka(JSONObject kafkaQueryResult, JSONObject dataModel) {
        JSONObject results = kafkaQueryResult.getJSONObject("results");

        String topic = results.getString("topic");
        if (!StringUtils.isNotEmpty(topic) || null == dataModel) {
            log.error("topic or JSONObject needed to send is empty.");
            return;
        }

        String userName = results.getString("user_name");
        String password = results.getString("password");
        String[] ipAndPorts = results.getString("cluster").split(",");
        String suites = results.getString("communication_encryption_Suite");
        String protocols = results.getString("security_protocol");
        String authorizedEncryptionMode = results.getString("authorized_encryption_mode");

        Properties pro = new Properties();
        // 配置 Kafka 信息。
        String sasl = String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\"  password=\"%s\";", userName, password);
        pro.setProperty("sasl.jaas.config", sasl);
//        pro.setProperty("ssl.cipher.suites", suites);   // 加密套件,通信过程中的加密方式。
//        pro.setProperty("ssl.enabled.protocols", authorizedEncryptionMode);
        pro.setProperty("security.protocol", protocols);    // 这个决定了需要什么样的证书,原始需求里支持什么协议。  SASL-SSL等,现在是死的 PLAIN,但是应该是什么PLAIN? PLAIN 还是 PLAINTEXT
        pro.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        pro.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");


        for (String ipAndPort : ipAndPorts) {
            // 这里配置 Kafka 连接地址
            pro.setProperty("bootstrap.servers", ipAndPort);
            try(Producer<String, String> producer = new KafkaProducer<>(pro)) {
                // Key不管是什么都能发进去,相当于用标签进行分组,所以我们采用 dataId 进行分组。
                producer.send(new ProducerRecord<>(topic, null, System.currentTimeMillis(),
                        dataModel.getString("dataId"), new Record(dataModel).toString()));
            } catch (Exception e) {
                log.info("**************** failed....", e.getMessage());
            }
        }
    }
  • 写回答

1条回答 默认 最新

  • weixin_41973264 2021-11-05 12:32
    关注

    1)参考链接:

    中的
    4.Configure the following properties in producer.properties or consumer.properties:的相关配置进行设置kafka-ssl相关配置。
    2)bootstrap.servers这项配置配置kafka-broker集群地址就行,不用进行多次连接(如果有就其他设计除外)。

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

问题事件

  • 系统已结题 11月19日
  • 已采纳回答 11月11日
  • 修改了问题 11月5日
  • 修改了问题 11月5日
  • 展开全部

悬赏问题

  • ¥15 微信公众平台自制会员卡可以通过收款码收款码收款进行自动积分吗
  • ¥15 随身WiFi网络灯亮但是没有网络,如何解决?
  • ¥15 gdf格式的脑电数据如何处理matlab
  • ¥20 重新写的代码替换了之后运行hbuliderx就这样了
  • ¥100 监控抖音用户作品更新可以微信公众号提醒
  • ¥15 UE5 如何可以不渲染HDRIBackdrop背景
  • ¥70 2048小游戏毕设项目
  • ¥20 mysql架构,按照姓名分表
  • ¥15 MATLAB实现区间[a,b]上的Gauss-Legendre积分
  • ¥15 delphi webbrowser组件网页下拉菜单自动选择问题