程许源 2021-01-21 15:52 采纳率: 0%
浏览 281

基于C#实现kafka的kerberos认证

现有需求基于C#实现kafka的kerberos认证,我在网上没有找到别的方案,目前我用的是Confluent.kafka
代码如下

```
using Confluent.Kafka;
using Confluent.Kafka.Serialization;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;


namespace GatewaySim
{
    class _cls_confluent_producer : cls_disposable_base,_inf_confluent_producer
    {
        private string broker;
        private Producer<string,string> _producer;
        private Dictionary<string, object> config_produce = new Dictionary<string, object>();

        public bool init_produce(string broker,string keytab)
        {
            try
            {
                this.broker = broker;
                this.config_produce.Add("bootstrap.servers", broker);
                this.config_produce.Add("api.version.request", "true");
                this.config_produce.Add("security.protocol", "SASL_PLAINTEXT");
                this.config_produce.Add("sasl.mechanisms", "GSSAPI");
                this.config_produce.Add("sasl.kerberos.service.name", "kafka");
                this.config_produce.Add("debug", "security,broker,protocol");
                this.config_produce.Add("sasl.kerberos.principal", "xxxx@xxx.xxx.COM");
                _producer = new Producer<string, string>(config_produce, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8));
                
                return true;
            }
            catch (Exception e) { 
                
                LogHelper.Info("_cls_confluent_producer init exception:"+ e.Message); 
            }
            return false;
        }
        public obj_msg_result produce(string topic, string message, string key)
        {
            try
            {
                var ret = _producer.ProduceAsync(topic, key, message).Result;
                if(ret.Error.Code == ErrorCode.NoError)
                {
                    obj_msg_result s = new obj_msg_result();
                    s.Offset = ret.Offset;
                    s.Partition = ret.Partition;
                    s.Topic = topic;
                    s.Message = ret.Value;
                    s.Key = ret.Key;

                    return s;
                }
                _producer.Flush(10000);
               
            }
            catch (Exception e) { 
                LogHelper.Info("_cls_confluent_producer exception :" + e.Message);
               
            };

            return null;
        }
}

main代码------------------------------------------------------


public static _cls_confluent_producer _producer = new _cls_confluent_producer();
static void Main(string[] args)
        {
            

            _producer.init_produce(Properties.Settings.Default.kafka_broker, Properties.Settings.Default.keytab);
            int i = 0;
            while (i < 10)
            {
                i++;
                _producer.produce("test", "test"+i, i+"");
                LogHelper.Info("test " + i);
            }
            }
```

代码指定到var ret = _producer.ProduceAsync(topic, key, message).Result;这一行就会报错,报错如下
![图片说明](https://img-ask.csdn.net/upload/202101/21/1611215252_272464.jpg)

有没有大佬会解决的,或者有别的基于C#实现kafka认证kerberos的方案也可以指导一下(调用java实现的就算了),多谢!!!

  • 写回答

1条回答 默认 最新

  • CSDN-Ada助手 CSDN-AI 官方账号 2022-09-07 19:24
    关注
    不知道你这个问题是否已经解决, 如果还没有解决的话:

    如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 以帮助更多的人 ^-^
    评论

报告相同问题?

悬赏问题

  • ¥15 素材场景中光线烘焙后灯光失效
  • ¥15 请教一下各位,为什么我这个没有实现模拟点击
  • ¥15 执行 virtuoso 命令后,界面没有,cadence 启动不起来
  • ¥50 comfyui下连接animatediff节点生成视频质量非常差的原因
  • ¥20 有关区间dp的问题求解
  • ¥15 多电路系统共用电源的串扰问题
  • ¥15 slam rangenet++配置
  • ¥15 有没有研究水声通信方面的帮我改俩matlab代码
  • ¥15 ubuntu子系统密码忘记
  • ¥15 保护模式-系统加载-段寄存器