北小风 2023-02-01 20:41 采纳率: 0%
浏览 20

自实现RPC项目问题

今天下午在测试RPC模块的时候,发现一个BUG,已经折磨一下午了。

问题描述

BUG主要是在自定义解码器的反序列化方法上。客户端发送请求消息时,经过编码器消息编码传送到服务端,服务端在自定义解码器正常解码获取到请求消息,接着在调用服务获取到结果,服务端把结果通过管道发送给客户端。消息首先会经过自定义编码器,客户端接收后在自定义解码器调用反序列化方法时抛出异常。令人奇怪的是,第一次服务端在解码器中能正常反序列化消息,但是客户端在接收结果时,调用反序列化方法却报错。佬们有啥思路吗?

尝试方案

1、更改序列化算法
2、更改netty版本

最后都没有解决。

编码器代码:

package com.dai.rpc.netty.codec;

import com.dai.rpc.compress.Compress;
import com.dai.rpc.constant.CompressTypeEnum;
import com.dai.rpc.constant.MyRpcConstants;
import com.dai.rpc.constant.SerializationTypeEnum;
import com.dai.rpc.exceptions.MyRpcException;
import com.dai.rpc.message.RpcMessage;
import com.dai.rpc.serialize.Serializer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import java.util.ServiceLoader;
import java.util.concurrent.atomic.AtomicInteger;

public class MyRpcEncoder extends MessageToByteEncoder<RpcMessage> {


    private static final AtomicInteger ATOMICINTEGER = new AtomicInteger(0);

    /**
     *  每一次发送数据都会经过编码器
     *  编码器职责:按照协议自定义数据
     * @param channelHandlerContext
     * @param rpcMessage
     * @param byteBuf
     * @throws Exception
     */
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, RpcMessage rpcMessage, ByteBuf byteBuf) throws Exception {

        /**
         * 1. 4B  magic code(魔法数)
         * 2. 1B version(版本)
         * 3. 4B full length(整个报文消息长度)
         * 4. 1B messageType(消息类型)
         * 5. 1B codec(序列化类型)
         * 6. 1B compress(压缩类型)
         * 7. 4B  requestId(请求的Id)
         * 8. body(object类型数据)
         */

        byte codecType = rpcMessage.getCodec();
        byte compressType = rpcMessage.getCompress();
        byte messageType = rpcMessage.getMessageType();

        byteBuf.writeBytes(MyRpcConstants.RPC_MAGIC_TABLE);
        byteBuf.writeByte(MyRpcConstants.RPC_VERSION);
        // 报文消息长度预留
        // 无参writerIndex作用:返回当前写下标。有参writerIndex作用:更改写下标
        byteBuf.writerIndex(byteBuf.writerIndex() + 4);
        byteBuf.writeByte(messageType);
        byteBuf.writeByte(codecType);
        byteBuf.writeByte(compressType);
        // 使用原子类保证请求id唯一
        int requestId = MyRpcEncoder.ATOMICINTEGER.getAndIncrement();
        byteBuf.writeInt(requestId);
        // 写入数据
        int fullLength = MyRpcConstants.TOTAL_LENGTH;
        byte[] bodyBuff = null;
        // 先要序列化
        Serializer serializer = loadSerializer(codecType);
        bodyBuff = serializer.serialize(rpcMessage.getData());
        // 压缩
        Compress compress  = loadCompress(compressType);
        bodyBuff = compress.compress(bodyBuff);
        fullLength += bodyBuff.length;

        byteBuf.writeBytes(bodyBuff);
        int writeIndex = byteBuf.writerIndex();
        // 更改写下标将fullLength写入之前预留的位置
        byteBuf.writerIndex(writeIndex - fullLength + MyRpcConstants.RPC_MAGIC_TABLE.length + 1);
        byteBuf.writeInt(fullLength);
        // 更改写下标
        byteBuf.writerIndex(writeIndex);

    }

    private Serializer loadSerializer(byte codecType) {
        // 根据spi技术找到需要加载的实现类
        String serializerName = SerializationTypeEnum.getName(codecType);
        ServiceLoader<Serializer> load = ServiceLoader.load(Serializer.class);
        for (Serializer serializer : load) {
            if(serializer.name().equals(serializerName)) return serializer;
        }
        throw new MyRpcException("未找到对应的序列化类型");
    }

    private Compress loadCompress(byte compressType) {
        // 根据spi技术找到需要加载的实现类
        String compressName = CompressTypeEnum.getName(compressType);
        ServiceLoader<Compress> load = ServiceLoader.load(Compress.class);
        for (Compress compress : load) {
            if(compress.name().equals(compressName)) return compress;
        }
        throw new  MyRpcException("未找到对应的压缩类型");
    }


}


解码器代码:

package com.dai.rpc.netty.codec;

import com.dai.rpc.compress.Compress;
import com.dai.rpc.constant.CompressTypeEnum;
import com.dai.rpc.constant.MessageTypeEnum;
import com.dai.rpc.constant.MyRpcConstants;
import com.dai.rpc.constant.SerializationTypeEnum;
import com.dai.rpc.exceptions.MyRpcException;
import com.dai.rpc.message.RpcMessage;
import com.dai.rpc.message.RpcRequest;
import com.dai.rpc.message.RpcResponse;
import com.dai.rpc.serialize.Serializer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

import java.util.ServiceLoader;

/**
 * RPC框架解码器
 * LengthFieldBasedFrameDecoder内部解决TCP粘包、拆包问题
 */
public class MyRpcDecoder extends LengthFieldBasedFrameDecoder {

    /**
     * 1. 4B  magic code(魔法数)
     * 2. 1B version(版本)
     * 3. 4B full length(整个报文消息长度)
     * 4. 1B messageType(消息类型)
     * 5. 1B codec(序列化类型)
     * 6. 1B compress(压缩类型)
     * 7. 4B  requestId(请求的Id)
     * 8. body(object类型数据)
     */

    public MyRpcDecoder(){
        super(1024 * 1024 * 8, 5, 4, -9, 0);
    }

    /**
     *
     * @param maxFrameLength  最大帧长度。它决定可以接收的数据的最大长度。如果超过,数据将被丢弃,根据实际环境定义
     * @param lengthFieldOffset  数据长度字段开始的偏移量, magic code+version=长度为5
     * @param lengthFieldLength  消息长度的大小  full length(消息长度) 长度为4
     * @param lengthAdjustment 补偿值 lengthAdjustment+数据长度取值=长度字段之后剩下包的字节数(x + 16=7 so x = -9)
     * @param initialBytesToStrip 忽略的字节长度,如果要接收所有的header+body 则为0,如果只接收body 则为header的长度 ,我们这为0
     */
    public MyRpcDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
        super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
    }

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {

        Object decode = super.decode(ctx, in);
        // 解码
        if(decode instanceof ByteBuf){
            ByteBuf frame = (ByteBuf)  decode;
            if(frame.readableBytes() < MyRpcConstants.TOTAL_LENGTH){
                throw new MyRpcException("消息长度不符,格式有误");
            }
            return decodeFrame(frame);
        }
        return decode;
    }

    private Object decodeFrame(ByteBuf frame) {

        // 1.检查魔法数
        checkMagicCode(frame);
        // 2.检查版本号是否在定义范围内
        checkVersion(frame);
        // 3.读取消息长度
        int fullLength = frame.readInt();
        // 4.读取消息类型
        byte messageType = frame.readByte();
        // 5.读取序列化类型
        byte codecType = frame.readByte();
        // 6.读取压缩类型
        byte compressType = frame.readByte();
        // 7.读取请求id
        int requestId = frame.readInt();
        // 8.读取具体数据
        int bodyLength = fullLength - MyRpcConstants.TOTAL_LENGTH;
        RpcMessage message = RpcMessage.builder().codec(codecType)
                .messageType(messageType)
                .compress(compressType)
                .requestId(requestId)
                .build();
        if(bodyLength > 0){
            byte[] bodyData = new byte[bodyLength];
            frame.readBytes(bodyData);
            // 发送方发过来时,先是序列化,再压缩
            // 接收方接收时,应该先解压缩再反序列化
            // 解压缩
            Compress compress  = loadCompress(compressType);
            bodyData = compress.decompress(bodyData);
            // 反序列化
            Serializer serializer = loadSerializer(codecType);
            // 根据消息类型把数据反序列化并设置到message中
            if(MessageTypeEnum.REQUEST.getCode() ==  messageType){
                RpcRequest rpcRequest = (RpcRequest) serializer.deserialize(bodyData, RpcRequest.class);
                message.setData(rpcRequest);
            }
            if(MessageTypeEnum.RESPONSE.getCode() ==  messageType){
                RpcResponse rpcResponse = (RpcResponse) serializer.deserialize(bodyData, RpcResponse.class);
                message.setData(rpcResponse);
            }
        }
        return message;
    }

    private Serializer loadSerializer(byte codecType) {
        // 根据spi技术找到需要加载的实现类
        String serializerName = SerializationTypeEnum.getName(codecType);
        ServiceLoader<Serializer> load = ServiceLoader.load(Serializer.class);
        for (Serializer serializer : load) {
            if(serializer.name().equals(serializerName)) return serializer;
        }
        throw new  MyRpcException("未找到对应的序列化类型");
    }

    private Compress loadCompress(byte compressType) {
        // 根据spi技术找到需要加载的实现类
        String compressName = CompressTypeEnum.getName(compressType);
        ServiceLoader<Compress> load = ServiceLoader.load(Compress.class);
        for (Compress compress : load) {
            if(compress.name().equals(compressName)) return compress;
        }
        throw new  MyRpcException("未找到对应的压缩类型");
    }

    private void checkVersion(ByteBuf frame) {
        byte version = frame.readByte();
        if(version != MyRpcConstants.RPC_VERSION){
            throw new MyRpcException("版本号不符,格式有误");
        }
    }

    private void checkMagicCode(ByteBuf frame) {
        // 检查魔法值,不对就抛出异常
        byte[] magics = new byte[MyRpcConstants.RPC_MAGIC_TABLE.length];
        frame.readBytes(magics);
        for(int i = 0;i < magics.length;i ++){
            if(magics[i] != MyRpcConstants.RPC_MAGIC_TABLE[i]){
                throw new MyRpcException("魔法值不符,格式有误");
            }
        }
    }

}


服务端处理代码:

package com.dai.rpc.netty.handler;

import com.dai.rpc.constant.MessageTypeEnum;
import com.dai.rpc.exceptions.MyRpcException;
import com.dai.rpc.factory.SingletonFactory;
import com.dai.rpc.message.RpcMessage;
import com.dai.rpc.message.RpcRequest;
import com.dai.rpc.message.RpcResponse;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MyNettyServerHandler extends ChannelInboundHandlerAdapter {

    private MyRequestHandler myRequestHandler;

    public MyNettyServerHandler() {
        this.myRequestHandler = SingletonFactory.getInstance(MyRequestHandler.class);
    }

    /**
     * 编解码之后的消息经由处理器处理
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        // 服务端接收消息
        // 接收客户端发来的数据,数据肯定包括要调用的服务提供者的接口和方法
        // 解析消息,去找到对应的服务提供者,然后调用得到结果,发消息给客户端即可
        try{
            if(msg instanceof RpcMessage){
                // 如果接收到的消息是RpcMessage,那么进行处理
                RpcMessage message = (RpcMessage) msg;
                byte messageType = message.getMessageType();
                if(MessageTypeEnum.REQUEST.getCode() ==  messageType){
                    // 如果是请求消息,那么请求调用服务获取结果并返回
                    RpcRequest data = (RpcRequest)message.getData();
                    Object result = myRequestHandler.handler(data);
                    // 装入结果
                    if(ctx.channel().isActive() && ctx.channel().isWritable()){
                        RpcResponse<Object> success = RpcResponse.success(result, data.getRequestId());
                        message.setData(success);
                    }else{
                        RpcResponse<Object> response = RpcResponse.fail("net fail");
                        message.setData(response);
                    }
                }
                //log.info("服务端收到数据,并处理完成:" + message);
                // 把处理结果发送给客户端
                ctx.writeAndFlush(message).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            }
        }catch (Exception e){
            e.printStackTrace();
            throw new MyRpcException("数据格式异常...");
        }finally {
            // netty自带的包进行释放
            ReferenceCountUtil.release(msg);
        }

    }
}


报错信息

com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 97, Size: 2
Serialization trace:
interfaceName (com.dai.rpc.message.RpcRequest)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:709)
    at com.dai.rpc.serialize.KryoSerializer.deserialize(KryoSerializer.java:70)
    at com.dai.rpc.netty.codec.MyRpcDecoder.decodeFrame(MyRpcDecoder.java:102)
    at com.dai.rpc.netty.codec.MyRpcDecoder.decode(MyRpcDecoder.java:62)
    at io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:332)
    at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IndexOutOfBoundsException: Index: 97, Size: 2
    at java.util.ArrayList.rangeCheck(ArrayList.java:659)
    at java.util.ArrayList.get(ArrayList.java:435)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:60)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:857)
    at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:780)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:132)
    ... 25 more

Process finished with exit code -1


  • 写回答

1条回答 默认 最新

  • CSDN-Ada助手 CSDN-AI 官方账号 2023-02-02 11:52
    关注
    不知道你这个问题是否已经解决, 如果还没有解决的话:

    如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 写成博客, 将相关链接放在评论区, 以帮助更多的人 ^-^
    评论

报告相同问题?

问题事件

  • 创建了问题 2月1日

悬赏问题

  • ¥15 is not in the mmseg::model registry。报错,模型注册表找不到自定义模块。
  • ¥15 安装quartus II18.1时弹出此error,怎么解决?
  • ¥15 keil官网下载psn序列号在哪
  • ¥15 想用adb命令做一个通话软件,播放录音
  • ¥30 Pytorch深度学习服务器跑不通问题解决?
  • ¥15 部分客户订单定位有误的问题
  • ¥15 如何在maya程序中利用python编写领子和褶裥的模型的方法
  • ¥15 Bug traq 数据包 大概什么价
  • ¥15 在anaconda上pytorch和paddle paddle下载报错
  • ¥25 自动填写QQ腾讯文档收集表