今天下午在测试RPC模块的时候,发现一个BUG,已经折磨一下午了。
问题描述
BUG主要是在自定义解码器的反序列化方法上。客户端发送请求消息时,经过编码器消息编码传送到服务端,服务端在自定义解码器正常解码获取到请求消息,接着在调用服务获取到结果,服务端把结果通过管道发送给客户端。消息首先会经过自定义编码器,客户端接收后在自定义解码器调用反序列化方法时抛出异常。令人奇怪的是,第一次服务端在解码器中能正常反序列化消息,但是客户端在接收结果时,调用反序列化方法却报错。佬们有啥思路吗?
尝试方案
1、更改序列化算法
2、更改netty版本
最后都没有解决。
编码器代码:
package com.dai.rpc.netty.codec;
import com.dai.rpc.compress.Compress;
import com.dai.rpc.constant.CompressTypeEnum;
import com.dai.rpc.constant.MyRpcConstants;
import com.dai.rpc.constant.SerializationTypeEnum;
import com.dai.rpc.exceptions.MyRpcException;
import com.dai.rpc.message.RpcMessage;
import com.dai.rpc.serialize.Serializer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import java.util.ServiceLoader;
import java.util.concurrent.atomic.AtomicInteger;
public class MyRpcEncoder extends MessageToByteEncoder<RpcMessage> {
private static final AtomicInteger ATOMICINTEGER = new AtomicInteger(0);
/**
* 每一次发送数据都会经过编码器
* 编码器职责:按照协议自定义数据
* @param channelHandlerContext
* @param rpcMessage
* @param byteBuf
* @throws Exception
*/
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, RpcMessage rpcMessage, ByteBuf byteBuf) throws Exception {
/**
* 1. 4B magic code(魔法数)
* 2. 1B version(版本)
* 3. 4B full length(整个报文消息长度)
* 4. 1B messageType(消息类型)
* 5. 1B codec(序列化类型)
* 6. 1B compress(压缩类型)
* 7. 4B requestId(请求的Id)
* 8. body(object类型数据)
*/
byte codecType = rpcMessage.getCodec();
byte compressType = rpcMessage.getCompress();
byte messageType = rpcMessage.getMessageType();
byteBuf.writeBytes(MyRpcConstants.RPC_MAGIC_TABLE);
byteBuf.writeByte(MyRpcConstants.RPC_VERSION);
// 报文消息长度预留
// 无参writerIndex作用:返回当前写下标。有参writerIndex作用:更改写下标
byteBuf.writerIndex(byteBuf.writerIndex() + 4);
byteBuf.writeByte(messageType);
byteBuf.writeByte(codecType);
byteBuf.writeByte(compressType);
// 使用原子类保证请求id唯一
int requestId = MyRpcEncoder.ATOMICINTEGER.getAndIncrement();
byteBuf.writeInt(requestId);
// 写入数据
int fullLength = MyRpcConstants.TOTAL_LENGTH;
byte[] bodyBuff = null;
// 先要序列化
Serializer serializer = loadSerializer(codecType);
bodyBuff = serializer.serialize(rpcMessage.getData());
// 压缩
Compress compress = loadCompress(compressType);
bodyBuff = compress.compress(bodyBuff);
fullLength += bodyBuff.length;
byteBuf.writeBytes(bodyBuff);
int writeIndex = byteBuf.writerIndex();
// 更改写下标将fullLength写入之前预留的位置
byteBuf.writerIndex(writeIndex - fullLength + MyRpcConstants.RPC_MAGIC_TABLE.length + 1);
byteBuf.writeInt(fullLength);
// 更改写下标
byteBuf.writerIndex(writeIndex);
}
private Serializer loadSerializer(byte codecType) {
// 根据spi技术找到需要加载的实现类
String serializerName = SerializationTypeEnum.getName(codecType);
ServiceLoader<Serializer> load = ServiceLoader.load(Serializer.class);
for (Serializer serializer : load) {
if(serializer.name().equals(serializerName)) return serializer;
}
throw new MyRpcException("未找到对应的序列化类型");
}
private Compress loadCompress(byte compressType) {
// 根据spi技术找到需要加载的实现类
String compressName = CompressTypeEnum.getName(compressType);
ServiceLoader<Compress> load = ServiceLoader.load(Compress.class);
for (Compress compress : load) {
if(compress.name().equals(compressName)) return compress;
}
throw new MyRpcException("未找到对应的压缩类型");
}
}
解码器代码:
package com.dai.rpc.netty.codec;
import com.dai.rpc.compress.Compress;
import com.dai.rpc.constant.CompressTypeEnum;
import com.dai.rpc.constant.MessageTypeEnum;
import com.dai.rpc.constant.MyRpcConstants;
import com.dai.rpc.constant.SerializationTypeEnum;
import com.dai.rpc.exceptions.MyRpcException;
import com.dai.rpc.message.RpcMessage;
import com.dai.rpc.message.RpcRequest;
import com.dai.rpc.message.RpcResponse;
import com.dai.rpc.serialize.Serializer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import java.util.ServiceLoader;
/**
* RPC框架解码器
* LengthFieldBasedFrameDecoder内部解决TCP粘包、拆包问题
*/
public class MyRpcDecoder extends LengthFieldBasedFrameDecoder {
/**
* 1. 4B magic code(魔法数)
* 2. 1B version(版本)
* 3. 4B full length(整个报文消息长度)
* 4. 1B messageType(消息类型)
* 5. 1B codec(序列化类型)
* 6. 1B compress(压缩类型)
* 7. 4B requestId(请求的Id)
* 8. body(object类型数据)
*/
public MyRpcDecoder(){
super(1024 * 1024 * 8, 5, 4, -9, 0);
}
/**
*
* @param maxFrameLength 最大帧长度。它决定可以接收的数据的最大长度。如果超过,数据将被丢弃,根据实际环境定义
* @param lengthFieldOffset 数据长度字段开始的偏移量, magic code+version=长度为5
* @param lengthFieldLength 消息长度的大小 full length(消息长度) 长度为4
* @param lengthAdjustment 补偿值 lengthAdjustment+数据长度取值=长度字段之后剩下包的字节数(x + 16=7 so x = -9)
* @param initialBytesToStrip 忽略的字节长度,如果要接收所有的header+body 则为0,如果只接收body 则为header的长度 ,我们这为0
*/
public MyRpcDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
Object decode = super.decode(ctx, in);
// 解码
if(decode instanceof ByteBuf){
ByteBuf frame = (ByteBuf) decode;
if(frame.readableBytes() < MyRpcConstants.TOTAL_LENGTH){
throw new MyRpcException("消息长度不符,格式有误");
}
return decodeFrame(frame);
}
return decode;
}
private Object decodeFrame(ByteBuf frame) {
// 1.检查魔法数
checkMagicCode(frame);
// 2.检查版本号是否在定义范围内
checkVersion(frame);
// 3.读取消息长度
int fullLength = frame.readInt();
// 4.读取消息类型
byte messageType = frame.readByte();
// 5.读取序列化类型
byte codecType = frame.readByte();
// 6.读取压缩类型
byte compressType = frame.readByte();
// 7.读取请求id
int requestId = frame.readInt();
// 8.读取具体数据
int bodyLength = fullLength - MyRpcConstants.TOTAL_LENGTH;
RpcMessage message = RpcMessage.builder().codec(codecType)
.messageType(messageType)
.compress(compressType)
.requestId(requestId)
.build();
if(bodyLength > 0){
byte[] bodyData = new byte[bodyLength];
frame.readBytes(bodyData);
// 发送方发过来时,先是序列化,再压缩
// 接收方接收时,应该先解压缩再反序列化
// 解压缩
Compress compress = loadCompress(compressType);
bodyData = compress.decompress(bodyData);
// 反序列化
Serializer serializer = loadSerializer(codecType);
// 根据消息类型把数据反序列化并设置到message中
if(MessageTypeEnum.REQUEST.getCode() == messageType){
RpcRequest rpcRequest = (RpcRequest) serializer.deserialize(bodyData, RpcRequest.class);
message.setData(rpcRequest);
}
if(MessageTypeEnum.RESPONSE.getCode() == messageType){
RpcResponse rpcResponse = (RpcResponse) serializer.deserialize(bodyData, RpcResponse.class);
message.setData(rpcResponse);
}
}
return message;
}
private Serializer loadSerializer(byte codecType) {
// 根据spi技术找到需要加载的实现类
String serializerName = SerializationTypeEnum.getName(codecType);
ServiceLoader<Serializer> load = ServiceLoader.load(Serializer.class);
for (Serializer serializer : load) {
if(serializer.name().equals(serializerName)) return serializer;
}
throw new MyRpcException("未找到对应的序列化类型");
}
private Compress loadCompress(byte compressType) {
// 根据spi技术找到需要加载的实现类
String compressName = CompressTypeEnum.getName(compressType);
ServiceLoader<Compress> load = ServiceLoader.load(Compress.class);
for (Compress compress : load) {
if(compress.name().equals(compressName)) return compress;
}
throw new MyRpcException("未找到对应的压缩类型");
}
private void checkVersion(ByteBuf frame) {
byte version = frame.readByte();
if(version != MyRpcConstants.RPC_VERSION){
throw new MyRpcException("版本号不符,格式有误");
}
}
private void checkMagicCode(ByteBuf frame) {
// 检查魔法值,不对就抛出异常
byte[] magics = new byte[MyRpcConstants.RPC_MAGIC_TABLE.length];
frame.readBytes(magics);
for(int i = 0;i < magics.length;i ++){
if(magics[i] != MyRpcConstants.RPC_MAGIC_TABLE[i]){
throw new MyRpcException("魔法值不符,格式有误");
}
}
}
}
服务端处理代码:
package com.dai.rpc.netty.handler;
import com.dai.rpc.constant.MessageTypeEnum;
import com.dai.rpc.exceptions.MyRpcException;
import com.dai.rpc.factory.SingletonFactory;
import com.dai.rpc.message.RpcMessage;
import com.dai.rpc.message.RpcRequest;
import com.dai.rpc.message.RpcResponse;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class MyNettyServerHandler extends ChannelInboundHandlerAdapter {
private MyRequestHandler myRequestHandler;
public MyNettyServerHandler() {
this.myRequestHandler = SingletonFactory.getInstance(MyRequestHandler.class);
}
/**
* 编解码之后的消息经由处理器处理
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 服务端接收消息
// 接收客户端发来的数据,数据肯定包括要调用的服务提供者的接口和方法
// 解析消息,去找到对应的服务提供者,然后调用得到结果,发消息给客户端即可
try{
if(msg instanceof RpcMessage){
// 如果接收到的消息是RpcMessage,那么进行处理
RpcMessage message = (RpcMessage) msg;
byte messageType = message.getMessageType();
if(MessageTypeEnum.REQUEST.getCode() == messageType){
// 如果是请求消息,那么请求调用服务获取结果并返回
RpcRequest data = (RpcRequest)message.getData();
Object result = myRequestHandler.handler(data);
// 装入结果
if(ctx.channel().isActive() && ctx.channel().isWritable()){
RpcResponse<Object> success = RpcResponse.success(result, data.getRequestId());
message.setData(success);
}else{
RpcResponse<Object> response = RpcResponse.fail("net fail");
message.setData(response);
}
}
//log.info("服务端收到数据,并处理完成:" + message);
// 把处理结果发送给客户端
ctx.writeAndFlush(message).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
}catch (Exception e){
e.printStackTrace();
throw new MyRpcException("数据格式异常...");
}finally {
// netty自带的包进行释放
ReferenceCountUtil.release(msg);
}
}
}
报错信息
com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 97, Size: 2
Serialization trace:
interfaceName (com.dai.rpc.message.RpcRequest)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:709)
at com.dai.rpc.serialize.KryoSerializer.deserialize(KryoSerializer.java:70)
at com.dai.rpc.netty.codec.MyRpcDecoder.decodeFrame(MyRpcDecoder.java:102)
at com.dai.rpc.netty.codec.MyRpcDecoder.decode(MyRpcDecoder.java:62)
at io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:332)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IndexOutOfBoundsException: Index: 97, Size: 2
at java.util.ArrayList.rangeCheck(ArrayList.java:659)
at java.util.ArrayList.get(ArrayList.java:435)
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:60)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:857)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:780)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:132)
... 25 more
Process finished with exit code -1