程许源
2021-01-21 15:52基于C#实现kafka的kerberos认证
现有需求基于C#实现kafka的kerberos认证,我在网上没有找到别的方案,目前我用的是Confluent.kafka
代码如下
```
using Confluent.Kafka;
using Confluent.Kafka.Serialization;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace GatewaySim
{
class _cls_confluent_producer : cls_disposable_base,_inf_confluent_producer
{
private string broker;
private Producer<string,string> _producer;
private Dictionary<string, object> config_produce = new Dictionary<string, object>();
public bool init_produce(string broker,string keytab)
{
try
{
this.broker = broker;
this.config_produce.Add("bootstrap.servers", broker);
this.config_produce.Add("api.version.request", "true");
this.config_produce.Add("security.protocol", "SASL_PLAINTEXT");
this.config_produce.Add("sasl.mechanisms", "GSSAPI");
this.config_produce.Add("sasl.kerberos.service.name", "kafka");
this.config_produce.Add("debug", "security,broker,protocol");
this.config_produce.Add("sasl.kerberos.principal", "xxxx@xxx.xxx.COM");
_producer = new Producer<string, string>(config_produce, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8));
return true;
}
catch (Exception e) {
LogHelper.Info("_cls_confluent_producer init exception:"+ e.Message);
}
return false;
}
public obj_msg_result produce(string topic, string message, string key)
{
try
{
var ret = _producer.ProduceAsync(topic, key, message).Result;
if(ret.Error.Code == ErrorCode.NoError)
{
obj_msg_result s = new obj_msg_result();
s.Offset = ret.Offset;
s.Partition = ret.Partition;
s.Topic = topic;
s.Message = ret.Value;
s.Key = ret.Key;
return s;
}
_producer.Flush(10000);
}
catch (Exception e) {
LogHelper.Info("_cls_confluent_producer exception :" + e.Message);
};
return null;
}
}
main代码------------------------------------------------------
public static _cls_confluent_producer _producer = new _cls_confluent_producer();
static void Main(string[] args)
{
_producer.init_produce(Properties.Settings.Default.kafka_broker, Properties.Settings.Default.keytab);
int i = 0;
while (i < 10)
{
i++;
_producer.produce("test", "test"+i, i+"");
LogHelper.Info("test " + i);
}
}
```
代码指定到var ret = _producer.ProduceAsync(topic, key, message).Result;这一行就会报错,报错如下

有没有大佬会解决的,或者有别的基于C#实现kafka认证kerberos的方案也可以指导一下(调用java实现的就算了),多谢!!!
- 点赞
- 回答
- 收藏
- 复制链接分享
0条回答
为你推荐
- 从kafka里面消费数据,就是连接不到kafka,在kafka里面生成和消费都没有问题,也没有报错,求大神指教哪里有问题,我也从来没遇到过这个问题,求指教?
- scala
- 1个回答
- kafka服务器:偏移的主题尚未创建
- it技术
- 互联网问答
- IT行业问题
- 计算机技术
- 编程语言问答
- 2个回答
- 在Linux上使用confluent-kafka-go构建Go应用程序
- docker
- 2个回答
- 将Kafka的murmur2实现移植到Go
- encoding
- java
- hash
- 1个回答
- Flume和kafka连接的问题
- 大数据
- flume
- kafka 消息发布与订阅
- 1个回答