和Java狼狈为奸 2018-11-21 06:58 采纳率: 50%
浏览 1554
已采纳

RocketMQ中消息谁发的,具体发给谁怎么确定呢?

在网上看了一些demo,都是一个Producer和一个Consumer,Producer发消息,
Consumer接收消息。但在实际业务场景中,比如一个人的操作产生了消息,要指定发给另一个人,这个怎么准确的发过去啊。是把tag指定为人的ID吗?

  • 写回答

1条回答 默认 最新

  • weixin_43758917 2018-11-21 07:14
    关注

    一、背景

        最近在学习kafka相关的知识,正好遇到一个疑问,在写demo的过程中发现,投递的数据都是字符串类型,那么就想想在实际应用中应该会有大量的需求投递自定义数据类型,那么如何才能投递自定义数据类型呢?这里面就涉及到了kafka提供的接口序列化和反序列化的功能。

    二、kafka消息序列化和反序列化

    先看个demo,写个Producer客户端,根据官方文档,需要先做一些配置,放到Properties中,这里包括啥bootstrap.servers,等参数,就不详解了,主要是用到两个参数:

    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    添加了这么两个参数,其实这个是kafka封装好的,把字符串进行序列化,这就是为什么在ProducerRecord中可以输入字符串进行传输。同时,我们在Consumer端也需要添加这样两个参数,这两个参数是进行反序列化的作用的,就是接收到kafka传递给你的数据之后,进行反序列化操作。
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    添加以上两个参数,就是为啥我们是用Producer和Consumer进行生产和接收数据了,并且是以字符串的形式,那么这两个类到底实现了什么功能呢,下面看一下...

    public class StringSerializer implements Serializer {
    private String encoding = "UTF8";

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null)
            encodingValue = configs.get("serializer.encoding");
        if (encodingValue != null && encodingValue instanceof String)
            encoding = (String) encodingValue;
    }
    
    @Override
    public byte[] serialize(String topic, String data) {
        try {
            if (data == null)
                return null;
            else
                return data.getBytes(encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
        }
    }
    
    @Override
    public void close() {
        // nothing to do
    }
    

    }
    以上这个类很简单,就是实现Serializer接口,但是注意这个接口是org.apache.kafka.common.serialization包下面的,不是java下面的,这个接口提供了几个方法,其中,我们比较关注的是serialize方法,可以看到这个方法有两个参数,topic,就是我们在客户端传递过来的参数,而data就是对应的数据.

    ProducerRecord record = new ProducerRecord(TOPIC, values);
    看到这个方法其实很简单,就是把字符串转换成byte[]数组,其实到最后,kafka接收和发送的都是以字节数据从形式。不信的话,我们看下反序列化的方法StringDeserializer

    public class StringDeserializer implements Deserializer {
    private String encoding = "UTF8";

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null)
            encodingValue = configs.get("deserializer.encoding");
        if (encodingValue != null && encodingValue instanceof String)
            encoding = (String) encodingValue;
    }
    
    @Override
    public String deserialize(String topic, byte[] data) {
        try {
            if (data == null)
                return null;
            else
                return new String(data, encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
        }
    }
    
    @Override
    public void close() {
        // nothing to do
    }
    

    }
    看其中的deserialize方法,把对应的字节数组转换成字符串返回,这就是为啥我们能够传递字符串,而直接传递自定义类型就报错的原因了。

    那么问题来了,我们怎么传递自定义数据类型呢?

    看到上面的这个流程,其实就很明显了,我们写一个自定义的类,比如说我们要传递一个Person对象,那么我们就定义个Person对象的序列化和反序列化的类,并且实现Serializer接口,下面继续看,首先定义个Person类

    public class Person implements Serializable{

    private String id ;
    private String name ;
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    @Override
    public String toString() {
        return "Person [id=" + id + ", name=" + name + "]";
    }
    

    }
    接下来,我们自定义一个序列化的类:PersonUtilSerializer
    public class PersonUtilSerializer implements Serializer{

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    
    }
    
    @Override
    public byte[] serialize(String topic, Person data) {
    
        return JSON.toJSONBytes(data);
    }
    
    @Override
    public void close() {
        // TODO Auto-generated method stub
    
    }
    

    }
    这个类的方法很简单,就是把Person对象转换成字节数据,然后在定义一个反序列化对象PersonUtilDeserializer,具体的代码如下:
    public class PersonUtilDeserializer implements Deserializer {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // TODO Auto-generated method stub
    
    }
    
    @Override
    public Person deserialize(String topic, byte[] data) {
    
            return JSON.parseObject(data, Person.class);
    }
    
    @Override
    public void close() {
        // TODO Auto-generated method stub
    
    }
    

    代码都特别简单,把对象转成byte,把byte转成对象。具体是使用过程就变成了,先看下Proceduer中

        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "com.java.kafka.PersonUtilSerializer");
        Producer<String, Object> producer = new KafkaProducer<String, Object>(props);
        int i=0;
        while(true){
            i++;
            Person p = new Person();
            p.setId(i+"");
            p.setName("zhangsan-"+i);
            System.out.println(p.toString());
            ProducerRecord<String, Object> record = new ProducerRecord<String,Object>(TOPIC, p);
            producer.send(record);
    
        }
           producer.close();
    }
    

    是就很简单,把value.serializer换成我们刚写的这个类,然后传递Person对象进去,那么在Consumer中,同样的道理
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "com.java.kafka.PersonUtilDeserializer");

        KafkaConsumer<String, Object> consumer = new KafkaConsumer<>(props);
        consumer.assign(Arrays.asList(TOPIC_PARTITION));
        consumer.seekToBeginning(Arrays.asList(TOPIC_PARTITION));
        while(true){
            ConsumerRecords<String, Object> records = consumer.poll(100);
            System.err.println("print the size of records ,size="+records.count());
            for(ConsumerRecord<String, Object> record:records){
    
                        Person p = (Person) record.value();
                System.out.printf("offset = %d, value = %s", record.offset(), p.toString());
                     System.out.println();
           }
    

    看到上面的把对应的value.deserializer换成对应的反序列化的类即可。此时在执行Consumer程序就能得到你要的输出了
    offset = 2537, value = Person [id={"id":"1"}, name={"name":"zhangsan-3"}]
    offset = 2538, value = Person [id={"id":"3"}, name={"name":"zhangsan-4"}]
    看到对应的结果了,因此要要采用自定义类型,其实只需要实现Serializer和Deserializer连个借口即可,在serialize中进行序列化好反序列化操作。

    那么最后还有一个问题,本人在之前的测试过程中,采用了多个客户端Producer进行生产数据,同时生产的数据类型也是不同的,有int,string和自定义类型,结果在使用Consumer进行接受数据的时候,始终无法得到收到数据,其实这个地方并非无法接收数据,而是在反序列化的过程失败了,通过测试可以验证的出来,本人经过下面的测试发现,
    @Override
    public Person deserialize(String topic, byte[] data) {
    // TODO Auto-generated method stub
    System.err.println(topic+"-----"+new String(data));
    String str = new String(data);
    Person p = new Person();
    p.setName(str);
    return p;
    // return JSON.parseObject(data, Person.class);
    }
    对反序列化的方法进行测试打印,因为不同的类型过来,全部要转车Person类型自然是失败的,但是如果你手动全部转成字符串类型,即可打印出来,结果如下:

    offset = 2534, value = Person [id=null, name={"id":"10","name":"zhangsan-10"}]

    offset = 2535, value = Person [id=null, name={"id":"1","name":"zhangsan-1"}]

    offset = 2536, value = Person [id=null, name={"id":"2","name":"zhangsan-2"}]

    offset = 2537, value = Person [id=null, name={"id":"3","name":"zhangsan-3"}]

    offset = 2538, value = Person [id=null, name={"id":"4","name":"zhangsan-4"}]
    这里没有设置id值,所有的值全部被赋值到names上面,因此可以说明在反序列化的时候失败了。

    因此,如果有不同生成者使用不同的类型进行传递数据,此时,可以采用自定义类型,在反序列化的时候进行判断,转车对应的 类,保证能够接收数据,或者Proceduer采用打标的方式,标记自己的类型,方便Consumer进行解析。

    作者:行走的code
    来源:CSDN
    原文:https://blog.csdn.net/fyhailin/article/details/80501390
    版权声明:本文为博主原创文章,转载请附上博文链接!

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 关于#hadoop#的问题
  • ¥15 (标签-Python|关键词-socket)
  • ¥15 keil里为什么main.c定义的函数在it.c调用不了
  • ¥50 切换TabTip键盘的输入法
  • ¥15 可否在不同线程中调用封装数据库操作的类
  • ¥15 微带串馈天线阵列每个阵元宽度计算
  • ¥15 keil的map文件中Image component sizes各项意思
  • ¥20 求个正点原子stm32f407开发版的贪吃蛇游戏
  • ¥15 划分vlan后,链路不通了?
  • ¥20 求各位懂行的人,注册表能不能看到usb使用得具体信息,干了什么,传输了什么数据