目前是确定要用 PLAIN 那种模式,连SSL都不用。然后网上我一直找不到具体要set的字段有哪些……麻烦各位帮忙看看,多谢了
这个里面ssl相关的键是不是可以删掉了。还需要新增哪些键呢……
@Override
public void generalDataDeliverBySouthKafka(JSONObject kafkaQueryResult, JSONObject dataModel) {
JSONObject results = kafkaQueryResult.getJSONObject("results");
String topic = results.getString("topic");
if (!StringUtils.isNotEmpty(topic) || null == dataModel) {
log.error("topic or JSONObject needed to send is empty.");
return;
}
String userName = results.getString("user_name");
String password = results.getString("password");
String[] ipAndPorts = results.getString("cluster").split(",");
String suites = results.getString("communication_encryption_Suite");
String protocols = results.getString("security_protocol");
String authorizedEncryptionMode = results.getString("authorized_encryption_mode");
Properties pro = new Properties();
// 配置 Kafka 信息。
String sasl = String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", userName, password);
pro.setProperty("sasl.jaas.config", sasl);
// pro.setProperty("ssl.cipher.suites", suites); // 加密套件,通信过程中的加密方式。
// pro.setProperty("ssl.enabled.protocols", authorizedEncryptionMode);
pro.setProperty("security.protocol", protocols); // 这个决定了需要什么样的证书,原始需求里支持什么协议。 SASL-SSL等,现在是死的 PLAIN,但是应该是什么PLAIN? PLAIN 还是 PLAINTEXT
pro.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
pro.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
for (String ipAndPort : ipAndPorts) {
// 这里配置 Kafka 连接地址
pro.setProperty("bootstrap.servers", ipAndPort);
try(Producer<String, String> producer = new KafkaProducer<>(pro)) {
// Key不管是什么都能发进去,相当于用标签进行分组,所以我们采用 dataId 进行分组。
producer.send(new ProducerRecord<>(topic, null, System.currentTimeMillis(),
dataModel.getString("dataId"), new Record(dataModel).toString()));
} catch (Exception e) {
log.info("**************** failed....", e.getMessage());
}
}
}