weixin_42563605
程许源
2021-01-21 15:52

基于C#实现kafka的kerberos认证

  • c#

现有需求基于C#实现kafka的kerberos认证,我在网上没有找到别的方案,目前我用的是Confluent.kafka
代码如下

```
using Confluent.Kafka;
using Confluent.Kafka.Serialization;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;


namespace GatewaySim
{
    class _cls_confluent_producer : cls_disposable_base,_inf_confluent_producer
    {
        private string broker;
        private Producer<string,string> _producer;
        private Dictionary<string, object> config_produce = new Dictionary<string, object>();

        public bool init_produce(string broker,string keytab)
        {
            try
            {
                this.broker = broker;
                this.config_produce.Add("bootstrap.servers", broker);
                this.config_produce.Add("api.version.request", "true");
                this.config_produce.Add("security.protocol", "SASL_PLAINTEXT");
                this.config_produce.Add("sasl.mechanisms", "GSSAPI");
                this.config_produce.Add("sasl.kerberos.service.name", "kafka");
                this.config_produce.Add("debug", "security,broker,protocol");
                this.config_produce.Add("sasl.kerberos.principal", "xxxx@xxx.xxx.COM");
                _producer = new Producer<string, string>(config_produce, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8));
                
                return true;
            }
            catch (Exception e) { 
                
                LogHelper.Info("_cls_confluent_producer init exception:"+ e.Message); 
            }
            return false;
        }
        public obj_msg_result produce(string topic, string message, string key)
        {
            try
            {
                var ret = _producer.ProduceAsync(topic, key, message).Result;
                if(ret.Error.Code == ErrorCode.NoError)
                {
                    obj_msg_result s = new obj_msg_result();
                    s.Offset = ret.Offset;
                    s.Partition = ret.Partition;
                    s.Topic = topic;
                    s.Message = ret.Value;
                    s.Key = ret.Key;

                    return s;
                }
                _producer.Flush(10000);
               
            }
            catch (Exception e) { 
                LogHelper.Info("_cls_confluent_producer exception :" + e.Message);
               
            };

            return null;
        }
}

main代码------------------------------------------------------


public static _cls_confluent_producer _producer = new _cls_confluent_producer();
static void Main(string[] args)
        {
            

            _producer.init_produce(Properties.Settings.Default.kafka_broker, Properties.Settings.Default.keytab);
            int i = 0;
            while (i < 10)
            {
                i++;
                _producer.produce("test", "test"+i, i+"");
                LogHelper.Info("test " + i);
            }
            }
```

代码指定到var ret = _producer.ProduceAsync(topic, key, message).Result;这一行就会报错,报错如下
![图片说明](https://img-ask.csdn.net/upload/202101/21/1611215252_272464.jpg)

有没有大佬会解决的,或者有别的基于C#实现kafka认证kerberos的方案也可以指导一下(调用java实现的就算了),多谢!!!

  • 点赞
  • 回答
  • 收藏
  • 复制链接分享

0条回答