Flykos 2018-04-09 07:32 采纳率: 100%
浏览 2187
已采纳

netty消息转发中client端循环推送的问题

接触netty不久,开发时遇到一个问题,求帮忙

流程:page——> java ——> 第三方server ——>java——>page

单写client向第三方循环发送没有问题,该处打印ctx.channel [id: 0x5916337a, L:/xxx.xxx.x.xxx:55041 - R:/xxx.xxx.x.xxx:4002]

 public void run(String host, int port) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class);
            // 有数据立即发送
            bootstrap.option(ChannelOption.TCP_NODELAY, true);
            // 保持连接
            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    // 自定义解码工具(含拆包粘包处理)
                    ch.pipeline().addLast(new SelfDefineEncodeHandler());
                    // 自定义方法处理
                    ch.pipeline().addLast(new HttpClientHandler());
                }
            });
            Channel channel = bootstrap.connect(host, port).sync().channel();
            System.out.println(channel.toString());
            while (true) {
                ByteBuf bytebuf = Unpooled.buffer(1);
                bytebuf.writeByte(0xff);
                channel.writeAndFlush(bytebuf);
                Thread.sleep(10000);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }

    }

我先写一个server端接收page传过来的请求

 public void run(int port) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast("http-codec", new HttpServerCodec());
                            pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
                            pipeline.addLast("http-chunked", new ChunkedWriteHandler());
                            pipeline.addLast("handler", new DataServerHandler());
                        }
                    });
            Channel ch = b.bind(port).sync().channel();
            ch.closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

然后再 DataServerHandler中的channelRead0中嵌入了netty的client端

 // 把外层channel的eventLoop挂接在内层上
        Bootstrap bootstrap = new Bootstrap().group(ctx.channel().eventLoop()).channel(NioSocketChannel.class);
        // 有数据立即发送
        bootstrap.option(ChannelOption.TCP_NODELAY, true);
        // 保持连接
        bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                // 自定义解码工具(含拆包粘包处理)
                ch.pipeline().addLast(new SelfDefineEncodeHandler());
                // 自定义方法处理
                ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                    // 内层建立的连接,从这里接收内层的应答,在这里是服务端的应答
                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                        //...
                    }


                    @Override
                    public void channelActive(ChannelHandlerContext ctx) throws Exception {

                        super.channelActive(ctx);
                    }
                });
            }
        });
        connectFuture = bootstrap.connect("xxx.xxx.x.xxx", 7789);
        System.out.println(connectFuture.channel().toString());
        while(true){
            ByteBuf bytebuf = Unpooled.buffer(1);
            bytebuf.writeByte(0xff);
            connectFuture.channel().writeAndFlush(bytebuf);
            System.out.println("发送请求");
            Thread.sleep(10000);
        }

因为要向connectFuture连接的路径发送,所以调用connectFuture.channel(),但是该处拿到的connectFuture.channel()为[id: 0x71e6e0dc],因此在测试工具中并没有收到ff的消息

将循环写到channelActive以后,测试工具可以不停的接收传过来的ff,但是java无法接收返回的消息

 @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // outerCnl = ctx.channel();
        System.out.println(ctx.channel().toString());
        while (true) {
            ByteBuf bytebuf = Unpooled.buffer(1);
            bytebuf.writeByte(0xff);
            ctx.channel().writeAndFlush(bytebuf);
            Thread.sleep(10000);
        }
        // super.channelActive(ctx);
    }

如果未使用循环,返回的信息可以正确的接收解析并传送给page

刚接触netty,有些东西不太懂,有人有过类似的错误么,望解答

  • 写回答

1条回答 默认 最新

  • SimonDW 2018-04-10 07:05
    关注

    所有的 netty 状态处理不能过长时间啊。

    Thread.sleep(10000); 这个会把链接处理干掉的 ,
    因为,没有把处理权返回给 netty的工作线程。

    应该新建一个线程 做循环,把channelContext 传进去,用来发消息。

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 MPLS/VPN实验中MPLS的配置问题
  • ¥15 materialstudio氢键计算问题
  • ¥15 已知隐函数其中一个变量的,求另外一个变量
  • ¥15 echarts图表制作
  • ¥15 halcon根据玻璃面板纹路取区域
  • ¥15 HFSS设计小型化180度耦合器
  • ¥15 使用CInternetSession,CHttpFile读取网页文件时有些电脑上会卡住怎么办?
  • ¥15 水下机器人的半物理仿真研究
  • ¥15 微服务假死,一段时间后自动恢复,如何排查处理
  • ¥50 webrtc-streamer TCP rtsp