在网上看了一些demo,都是一个Producer和一个Consumer,Producer发消息,
Consumer接收消息。但在实际业务场景中,比如一个人的操作产生了消息,要指定发给另一个人,这个怎么准确的发过去啊。是把tag指定为人的ID吗?
RocketMQ中消息谁发的,具体发给谁怎么确定呢?
- 写回答
- 好问题 0 提建议
- 追加酬金
- 关注问题
- 邀请回答
-
1条回答 默认 最新
- weixin_43758917 2018-11-21 07:14关注
一、背景
最近在学习kafka相关的知识,正好遇到一个疑问,在写demo的过程中发现,投递的数据都是字符串类型,那么就想想在实际应用中应该会有大量的需求投递自定义数据类型,那么如何才能投递自定义数据类型呢?这里面就涉及到了kafka提供的接口序列化和反序列化的功能。
二、kafka消息序列化和反序列化
先看个demo,写个Producer客户端,根据官方文档,需要先做一些配置,放到Properties中,这里包括啥bootstrap.servers,等参数,就不详解了,主要是用到两个参数:
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
添加了这么两个参数,其实这个是kafka封装好的,把字符串进行序列化,这就是为什么在ProducerRecord中可以输入字符串进行传输。同时,我们在Consumer端也需要添加这样两个参数,这两个参数是进行反序列化的作用的,就是接收到kafka传递给你的数据之后,进行反序列化操作。
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
添加以上两个参数,就是为啥我们是用Producer和Consumer进行生产和接收数据了,并且是以字符串的形式,那么这两个类到底实现了什么功能呢,下面看一下...public class StringSerializer implements Serializer {
private String encoding = "UTF8";@Override public void configure(Map<String, ?> configs, boolean isKey) { String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding"; Object encodingValue = configs.get(propertyName); if (encodingValue == null) encodingValue = configs.get("serializer.encoding"); if (encodingValue != null && encodingValue instanceof String) encoding = (String) encodingValue; } @Override public byte[] serialize(String topic, String data) { try { if (data == null) return null; else return data.getBytes(encoding); } catch (UnsupportedEncodingException e) { throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding); } } @Override public void close() { // nothing to do }
}
以上这个类很简单,就是实现Serializer接口,但是注意这个接口是org.apache.kafka.common.serialization包下面的,不是java下面的,这个接口提供了几个方法,其中,我们比较关注的是serialize方法,可以看到这个方法有两个参数,topic,就是我们在客户端传递过来的参数,而data就是对应的数据.ProducerRecord record = new ProducerRecord(TOPIC, values);
看到这个方法其实很简单,就是把字符串转换成byte[]数组,其实到最后,kafka接收和发送的都是以字节数据从形式。不信的话,我们看下反序列化的方法StringDeserializerpublic class StringDeserializer implements Deserializer {
private String encoding = "UTF8";@Override public void configure(Map<String, ?> configs, boolean isKey) { String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding"; Object encodingValue = configs.get(propertyName); if (encodingValue == null) encodingValue = configs.get("deserializer.encoding"); if (encodingValue != null && encodingValue instanceof String) encoding = (String) encodingValue; } @Override public String deserialize(String topic, byte[] data) { try { if (data == null) return null; else return new String(data, encoding); } catch (UnsupportedEncodingException e) { throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding); } } @Override public void close() { // nothing to do }
}
看其中的deserialize方法,把对应的字节数组转换成字符串返回,这就是为啥我们能够传递字符串,而直接传递自定义类型就报错的原因了。那么问题来了,我们怎么传递自定义数据类型呢?
看到上面的这个流程,其实就很明显了,我们写一个自定义的类,比如说我们要传递一个Person对象,那么我们就定义个Person对象的序列化和反序列化的类,并且实现Serializer接口,下面继续看,首先定义个Person类
public class Person implements Serializable{
private String id ; private String name ; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return "Person [id=" + id + ", name=" + name + "]"; }
}
接下来,我们自定义一个序列化的类:PersonUtilSerializer
public class PersonUtilSerializer implements Serializer{@Override public void configure(Map<String, ?> configs, boolean isKey) { } @Override public byte[] serialize(String topic, Person data) { return JSON.toJSONBytes(data); } @Override public void close() { // TODO Auto-generated method stub }
}
这个类的方法很简单,就是把Person对象转换成字节数据,然后在定义一个反序列化对象PersonUtilDeserializer,具体的代码如下:
public class PersonUtilDeserializer implements Deserializer {@Override public void configure(Map<String, ?> configs, boolean isKey) { // TODO Auto-generated method stub } @Override public Person deserialize(String topic, byte[] data) { return JSON.parseObject(data, Person.class); } @Override public void close() { // TODO Auto-generated method stub }
代码都特别简单,把对象转成byte,把byte转成对象。具体是使用过程就变成了,先看下Proceduer中
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "com.java.kafka.PersonUtilSerializer"); Producer<String, Object> producer = new KafkaProducer<String, Object>(props); int i=0; while(true){ i++; Person p = new Person(); p.setId(i+""); p.setName("zhangsan-"+i); System.out.println(p.toString()); ProducerRecord<String, Object> record = new ProducerRecord<String,Object>(TOPIC, p); producer.send(record); } producer.close(); }
是就很简单,把value.serializer换成我们刚写的这个类,然后传递Person对象进去,那么在Consumer中,同样的道理
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "com.java.kafka.PersonUtilDeserializer");KafkaConsumer<String, Object> consumer = new KafkaConsumer<>(props); consumer.assign(Arrays.asList(TOPIC_PARTITION)); consumer.seekToBeginning(Arrays.asList(TOPIC_PARTITION)); while(true){ ConsumerRecords<String, Object> records = consumer.poll(100); System.err.println("print the size of records ,size="+records.count()); for(ConsumerRecord<String, Object> record:records){ Person p = (Person) record.value(); System.out.printf("offset = %d, value = %s", record.offset(), p.toString()); System.out.println(); }
看到上面的把对应的value.deserializer换成对应的反序列化的类即可。此时在执行Consumer程序就能得到你要的输出了
offset = 2537, value = Person [id={"id":"1"}, name={"name":"zhangsan-3"}]
offset = 2538, value = Person [id={"id":"3"}, name={"name":"zhangsan-4"}]
看到对应的结果了,因此要要采用自定义类型,其实只需要实现Serializer和Deserializer连个借口即可,在serialize中进行序列化好反序列化操作。那么最后还有一个问题,本人在之前的测试过程中,采用了多个客户端Producer进行生产数据,同时生产的数据类型也是不同的,有int,string和自定义类型,结果在使用Consumer进行接受数据的时候,始终无法得到收到数据,其实这个地方并非无法接收数据,而是在反序列化的过程失败了,通过测试可以验证的出来,本人经过下面的测试发现,
@Override
public Person deserialize(String topic, byte[] data) {
// TODO Auto-generated method stub
System.err.println(topic+"-----"+new String(data));
String str = new String(data);
Person p = new Person();
p.setName(str);
return p;
// return JSON.parseObject(data, Person.class);
}
对反序列化的方法进行测试打印,因为不同的类型过来,全部要转车Person类型自然是失败的,但是如果你手动全部转成字符串类型,即可打印出来,结果如下:offset = 2534, value = Person [id=null, name={"id":"10","name":"zhangsan-10"}]
offset = 2535, value = Person [id=null, name={"id":"1","name":"zhangsan-1"}]
offset = 2536, value = Person [id=null, name={"id":"2","name":"zhangsan-2"}]
offset = 2537, value = Person [id=null, name={"id":"3","name":"zhangsan-3"}]
offset = 2538, value = Person [id=null, name={"id":"4","name":"zhangsan-4"}]
这里没有设置id值,所有的值全部被赋值到names上面,因此可以说明在反序列化的时候失败了。因此,如果有不同生成者使用不同的类型进行传递数据,此时,可以采用自定义类型,在反序列化的时候进行判断,转车对应的 类,保证能够接收数据,或者Proceduer采用打标的方式,标记自己的类型,方便Consumer进行解析。
作者:行走的code
来源:CSDN
原文:https://blog.csdn.net/fyhailin/article/details/80501390
版权声明:本文为博主原创文章,转载请附上博文链接!本回答被题主选为最佳回答 , 对您是否有帮助呢?解决 无用评论 打赏 举报
悬赏问题
- ¥15 关于#hadoop#的问题
- ¥15 (标签-Python|关键词-socket)
- ¥15 keil里为什么main.c定义的函数在it.c调用不了
- ¥50 切换TabTip键盘的输入法
- ¥15 可否在不同线程中调用封装数据库操作的类
- ¥15 微带串馈天线阵列每个阵元宽度计算
- ¥15 keil的map文件中Image component sizes各项意思
- ¥20 求个正点原子stm32f407开发版的贪吃蛇游戏
- ¥15 划分vlan后,链路不通了?
- ¥20 求各位懂行的人,注册表能不能看到usb使用得具体信息,干了什么,传输了什么数据