bokiBiabu 2022-08-13 16:24 采纳率: 40%
浏览 342
已结题

netty webSocket 客户端发往服务端的消息接收异常

想学习下netty ,就在上班空闲时捣鼓捣鼓了一下,想直接往项目方向去搞, 实现聊天的功能.
主要是搞服务端,客户端以后有空去学学android,这里只是启动了一个netty客户端.
这里主要是webSocket和protobuf,问题就是我服务端往客户端是能发消息的,且客户端能正常收到,但是客户端往服务端发消息会报错
java.lang.ClassCastException: class io.netty.buffer.PooledUnsafeDirectByteBuf cannot be cast to class io.netty.handler.codec.http.FullHttpRequest (io.netty.buffer.PooledUnsafeDirectByteBuf and io.netty.handler.codec.http.FullHttpRequest are in unnamed module of loader 'app')
,看上去消息好像被发成了FullHttpRequest ....


package com.wuhu.fantasyim.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;

@Slf4j
@Component
public class SocketServer {

    private static final int port = 9876;
    private static final EventLoopGroup boss = new NioEventLoopGroup();
    private static final EventLoopGroup work = new NioEventLoopGroup();
    private static final ServerBootstrap serverBootstrap = new ServerBootstrap();

    @PostConstruct
    void init(){
        run();
    }

    public void run() {
        try {
            serverBootstrap.group(boss, work);
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.childHandler(new NettyServerFilter());
            // 服务器绑定端口监听
            ChannelFuture f = serverBootstrap.bind(port).sync();
            new SocketClient().run();
            log.info("netty Socket 服务启动成功,端口:{}",port);
            // 监听服务器关闭监听
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            work.shutdownGracefully();
            boss.shutdownGracefully();
        }

    }
}

package com.wuhu.fantasyim.server;

import com.wuhu.fantasyim.protobuf.SocketProto;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

public class NettyServerFilter extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new ChunkedWriteHandler());
        pipeline.addLast(new HttpObjectAggregator(1024*64));
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536 * 10));
        //入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
        pipeline.addLast(new IdleStateHandler(1, 1, 2, TimeUnit.SECONDS));
//        pipeline.addLast(new StringDecoder());
//        pipeline.addLast(new StringEncoder());
        pipeline.addLast(new ProtobufVarint32FrameDecoder());
        pipeline.addLast(new ProtobufDecoder(SocketProto.getDefaultInstance()));
        pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
        pipeline.addLast(new ProtobufEncoder());
        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new SocketServerHandler());
    }

}

package com.wuhu.fantasyim.server;

import com.alibaba.fastjson2.JSONObject;
import com.google.protobuf.Any;
import com.wuhu.fantasyim.protobuf.MessageWrapper;
import com.wuhu.fantasyim.protobuf.SocketProto;
import com.wuhu.fantasyim.protobuf.Status;
import com.wuhu.fantasyim.utils.DataFormatUtils;
import com.wuhu.fantasyim.utils.JwtUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.EmptyByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ByteProcessor;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
import java.nio.charset.Charset;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
@Component
public class SocketServerHandler extends ChannelInboundHandlerAdapter {

    private static final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    private static final ConcurrentHashMap<Integer, List<String>> userChannels = new ConcurrentHashMap<>();

    private static final ConcurrentHashMap<String, Integer> channelUserIds = new ConcurrentHashMap<>();

    @Value("${jwt.tokenName}")
    private String tokenName;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if(msg instanceof FullHttpRequest){
            FullHttpRequest aggregator = (FullHttpRequest) msg;
            List<Map.Entry<String,String>> headerParams = aggregator.headers().entries();
            String token = "";
            for(Map.Entry<String,String> headerParamItem : headerParams){
                if(Objects.equals(headerParamItem.getKey(), tokenName)){
                    token = headerParamItem.getValue();
                    break;
                }
            }
            Integer userId = 0;
            try {
                userId = Integer.valueOf(JwtUtils.getClaimByName(token,"userId"));
            }catch (Exception e){
                ctx.writeAndFlush("fuck");
                e.printStackTrace();
            }
        }else if (msg instanceof SocketProto) {
            SocketProto socketProto = (SocketProto)msg;
            MessageWrapper wrapper = socketProto.getPayload().unpack(MessageWrapper.class);
            log.info("消息样子"+socketProto);
            switch (socketProto.getType()){
                case KEEPALIVE:
                    break;
                case ACK:
                    break;
                case MESSAGE:
                    break;
                case SYSTEM:
                    break;
                case EVENT:
                    break;
                default:
                    log.error("未知消息类型:{}",socketProto.getType());
            }
        }else{
            log.info("无效的消息体:"+msg.toString());
            ctx.writeAndFlush("歪比巴卜"+"\n");
        }
        Channel channel = channels.find(ctx.channel().id());
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
            if (idleStateEvent.state() == IdleState.READER_IDLE) {//读空闲
                //检测到读空闲不做任何的操作

                log.info("读空闲事件触发...");
                SocketProto.Builder messageBuilder = SocketProto.newBuilder()
                        .setStatus(Status.STATUS_SUCCESS)
                        .setCode(200)
                        .setVersion(1)
                        .setSequence(1L)
                        .setType(SocketProto.Type.MESSAGE)
//                        .setPayload(Any.pack(evt))
                        .setMsg("surprise mother fucker")
                        .setTimestamp(Instant.now().toEpochMilli())
                        .setAppType(SocketProto.AppType.ANDROID);
                SocketProto message = messageBuilder.build();
                ctx.writeAndFlush(message);
                ctx.writeAndFlush( evt);
            } else if (idleStateEvent.state() == IdleState.WRITER_IDLE) {//写空闲
                //检测到写空闲不做任何的操作
                log.info("写空闲事件触发...");
                ctx.writeAndFlush("写空闲事件触发 ");
            } else if (idleStateEvent.state() == IdleState.ALL_IDLE) {//读写空闲
                log.info("读写空闲事件触发...");
                ctx.writeAndFlush("读写空闲事件触发 ");
//                ctx.channel().close();
            }
        }else{
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
//        ctx.channel().
        log.info("客户端:{},已连接",ctx.channel().remoteAddress());
        channels.add(ctx.channel());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        String channelId = ctx.channel().id().asLongText();
//        Integer userId = userChannels.get(channelId);
        log.info("用户:{},主动断开连接",0);
        userChannels.remove(channelId);
        channels.remove(ctx.channel());
        ctx.channel().close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
//        String channelId = ctx.channel().id().asLongText();
////        Integer userId = userChannels.get(channelId);
//        log.info("出现异常,用户:{},断开连接",0);
//        userChannels.remove(channelId);
//        channels.remove(ctx.channel());
//        ctx.channel().close();
        log.error("服务端出现异常");
        cause.printStackTrace();
    }
}

以下是客户端代码


package com.wuhu.fantasyim.server;

import com.wuhu.fantasyim.protobuf.SocketProto;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.concurrent.TimeUnit;

@Slf4j
@Component
public class SocketClient {

    private static final EventLoopGroup work = new NioEventLoopGroup();

    public void run() {
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(work).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
//                            pipeline.addLast(new ChunkedWriteHandler());
//                            pipeline.addLast(new HttpObjectAggregator(1024*64));
                            //入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
//                            pipeline.addLast(new IdleStateHandler(5, 5, 10, TimeUnit.SECONDS));
//                            pipeline.addLast(new StringDecoder());
//                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new ProtobufVarint32FrameDecoder());
                            pipeline.addLast(new ProtobufDecoder(SocketProto.getDefaultInstance()));
                            pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
                            pipeline.addLast(new ProtobufEncoder());
                            pipeline.addLast(new HttpServerCodec());
                            pipeline.addLast(new SocketClientHandler());
                        }
                    });
            //发起异步连接操作,同步阻等待结果
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9876).sync();
            log.info("连接成功");
            //等待客户端链路关闭
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            work.shutdownGracefully();
        }
    }
}


package com.wuhu.fantasyim.server;

import com.wuhu.fantasyim.protobuf.SocketProto;
import com.wuhu.fantasyim.protobuf.Status;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.time.Instant;

@Slf4j
@Component
public class SocketClientHandler extends ChannelInboundHandlerAdapter {

    private WebSocketServerHandshaker handShaker;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        SocketProto message = (SocketProto) msg;
        log.info("收到消息了捏:"+message);
        SocketProto.Builder messageBuilder = SocketProto.newBuilder()
                .setStatus(Status.STATUS_SUCCESS)
                .setCode(200)
                .setVersion(1)
                .setSequence(1L)
                .setType(SocketProto.Type.MESSAGE)
//                        .setPayload(Any.pack(evt))
                .setMsg("酸萝卜别吃")
                .setTimestamp(Instant.now().toEpochMilli())
                .setAppType(SocketProto.AppType.ANDROID);
        SocketProto backMessage = messageBuilder.build();
        ctx.writeAndFlush(backMessage);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
//        String channelId = ctx.channel().id().asLongText();
////        Integer userId = userChannels.get(channelId);
//        log.info("出现异常,用户:{},断开连接",0);
//        userChannels.remove(channelId);
//        channels.remove(ctx.channel());
//        ctx.channel().close();
        log.info("客户端出现异常");
        cause.printStackTrace();
    }
}

这是运行结果

img

不知道是不是webSocket的什么机制,不是特别了解那一块的内容,握手应该连接的时候netty就完成了,反正就是客户端发的消息被服务端接收成FullHttpRequest,而且是在WebSocketServerProtocolHandshakeHandler这个类里面发生的异常

  • 写回答

1条回答 默认 最新

  • bokiBiabu 2022-08-13 16:34
    关注

    哦 不对 客户端往服务端发消息 发成了class io.netty.buffer.PooledUnsafeDirectByteBuf 类型 然后被WebSocketServerProtocolHandshakeHandler类的channelRead方法的转类型成FullHttpRequest出错,

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

    报告相同问题?

    问题事件

    • 系统已结题 8月27日
    • 已采纳回答 8月19日
    • 创建了问题 8月13日

    悬赏问题

    • ¥15 微信实时共享位置修改
    • ¥100 TG的session协议号转成直登号号后客户端登录几分钟后自动退出设备
    • ¥30 共模反馈回路的小信号增益
    • ¥15 arduino ssd1306函数与tone函数放歌代码不兼容问题
    • ¥70 0.96版本hbase的row_key里含有双引号,无法deleteall
    • ¥20 Ida Pro增加插件出现问题
    • ¥15 诊断性META分析合并效能的检验
    • ¥15 请问abb根据色块判断奇偶数并根据批次号放入仓储
    • ¥66 开发PC客户端一定也要开发上位机吗?
    • ¥20 Java eclipse连接数据库