现有需求基于C#实现kafka的kerberos认证,我在网上没有找到别的方案,目前我用的是Confluent.kafka
代码如下
```
using Confluent.Kafka;
using Confluent.Kafka.Serialization;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace GatewaySim
{
class _cls_confluent_producer : cls_disposable_base,_inf_confluent_producer
{
private string broker;
private Producer<string,string> _producer;
private Dictionary<string, object> config_produce = new Dictionary<string, object>();
public bool init_produce(string broker,string keytab)
{
try
{
this.broker = broker;
this.config_produce.Add("bootstrap.servers", broker);
this.config_produce.Add("api.version.request", "true");
this.config_produce.Add("security.protocol", "SASL_PLAINTEXT");
this.config_produce.Add("sasl.mechanisms", "GSSAPI");
this.config_produce.Add("sasl.kerberos.service.name", "kafka");
this.config_produce.Add("debug", "security,broker,protocol");
this.config_produce.Add("sasl.kerberos.principal", "xxxx@xxx.xxx.COM");
_producer = new Producer<string, string>(config_produce, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8));
return true;
}
catch (Exception e) {
LogHelper.Info("_cls_confluent_producer init exception:"+ e.Message);
}
return false;
}
public obj_msg_result produce(string topic, string message, string key)
{
try
{
var ret = _producer.ProduceAsync(topic, key, message).Result;
if(ret.Error.Code == ErrorCode.NoError)
{
obj_msg_result s = new obj_msg_result();
s.Offset = ret.Offset;
s.Partition = ret.Partition;
s.Topic = topic;
s.Message = ret.Value;
s.Key = ret.Key;
return s;
}
_producer.Flush(10000);
}
catch (Exception e) {
LogHelper.Info("_cls_confluent_producer exception :" + e.Message);
};
return null;
}
}
main代码------------------------------------------------------
public static _cls_confluent_producer _producer = new _cls_confluent_producer();
static void Main(string[] args)
{
_producer.init_produce(Properties.Settings.Default.kafka_broker, Properties.Settings.Default.keytab);
int i = 0;
while (i < 10)
{
i++;
_producer.produce("test", "test"+i, i+"");
LogHelper.Info("test " + i);
}
}
```
代码指定到var ret = _producer.ProduceAsync(topic, key, message).Result;这一行就会报错,报错如下
![图片说明](https://img-ask.csdn.net/upload/202101/21/1611215252_272464.jpg)
有没有大佬会解决的,或者有别的基于C#实现kafka认证kerberos的方案也可以指导一下(调用java实现的就算了),多谢!!!