烛未 2019-01-23 08:08 采纳率: 100%
浏览 3154
已采纳

请大神解答一下关于netty客户端定时发送消息给服务端

1 . 只有一个客户端和服务端 连着 , 发送消息的时候用channel 的 writeAndFlush , 用线程结果Channel 一直被线程占用 , 只能发消息 , 其他的事情做不了

  1. 请问有什么解决的办法吗 , 或者netty 是有那个类可以专门处理客户端给服务端发送数据的呢
  2. 还有就是有没有关于客户端给服务端发送数据执行流程的博文呢 4 . 谢谢帮助我的大神
// workerGroup处理已经被接收的连接
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        AtsClient client = this;
        //workerGroup.scheduleAtFixedRate(command, initialDelay, period, unit);

        try {
            // Bootstrap是一个启动 NIO 服务的辅助启动类
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            // 使用NioSocketChannel类的新的channel接收进来的连接
            b.channel(NioSocketChannel.class);
            // option() 是提供给NioSocketChannel用来接收进来的连接
            b.option(ChannelOption.SO_KEEPALIVE, true);

            // 处理最近接收的channel
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    // 设定IdleStateHandler心跳检测每五秒进行一次写检测,如果五秒内write()方法未被调用则触发一次userEventTrigger()方法,实现客户端每五秒秒向服务端发送一次消息
                    ch.pipeline().addLast(
                            new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS));
                    // 解码器
                    ch.pipeline().addLast("AtsDecoder", new AtsDecoder());
                    // 处理器
                    ch.pipeline().addLast("AtsHanler", new AtsHandler(client));
                    // 编码器
                    ch.pipeline().addLast("AtsEncoder", new AtsEncoder());


                }
            });
            // Start the client. 绑定端口然后启动服务
             ChannelFuture f = b.connect(host, port).sync();

            channel = f.channel();
            System.out.println(channel);

            channel.eventLoop().scheduleAtFixedRate(new Runnable() {

                @Override
                public void run() {

                    sendPlanTrain();


                }
            }, 5, 3, TimeUnit.SECONDS);

 这个是客户端连接服务端的代码 , 这里调用了channel的eventLoop().scheduleAtFixedRate , 启动了线程 , 里面是往服务端发送的数据方法 .  channel 执行了sendPlanTrain(); 就一直被线程征用 , 其他处理器无法使用 
class Sender implements Runnable {

        Channel channel = null;

        public Sender(Channel channel) {
            this.channel = channel;
        }

        @Override
        public void run() {

                Map<String,String> ScadaDataMap = null;

                Jedis jedis = null;
                try {
                    ScadaDataMap = new HashMap<String,String>();
                    Thread.sleep(3 * 1000); //设置暂停的时间 3 秒
                    JedisUtilProxy.initJedisPoolUtil("127.0.0.1", 6379);
                    jedis = JedisUtilProxy.getJedis();
                    for (int i = 1 ; i <= 30 ; i++) {
                        String statu = jedis.hget("SCADA_DATA",String.valueOf(i));

                        ScadaDataMap.put(String.valueOf(i), statu);
                    }
                     sendPlanTrain();
                } catch (Exception e) {
                    e.printStackTrace();
                }
        }
@Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
            throws Exception {
        logger.debug("===AtsHandler.userEventTriggered.客户端循环心跳监测发送: "
                + new Date());
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.WRITER_IDLE) {
                logger.debug("===AtsHandler.userEventTriggered.客户端没有信息发送给服务端  客户端每5秒发送 MESSAGE_POLLING 信息给服务端");
                ByteBuf byteBuf = null;
                byte[] frameHead = new byte[4];
                frameHead[0] = (byte) 0xff;
                frameHead[1] = 0;
                frameHead[2] = 5;
                frameHead[3] = 0;
                byte[] message = new byte[4];
                message[0] = 0;
                message[1] = (byte) 2;
                message[2] = (byte) (AtsMessageIDCodeEnum.MESSAGE_POLLING
                        .getCode() >> 8);
                message[3] = (byte) (AtsMessageIDCodeEnum.MESSAGE_POLLING
                        .getCode() & 0xFF);
                byte[] frame = ArrayUtils.addAll(frameHead, message);
                byteBuf = ctx.channel().alloc().buffer(8 * 4);
                byteBuf.writeBytes(frame);
                ctx.writeAndFlush(byteBuf);
            }
        }

展开全部

  • 写回答

2条回答 默认 最新

  • 烛未 2019-01-24 07:52
    关注

    谢谢@舍文 的关注 , 问题已经迎刃而解了 . 说来惭愧 , 用了很多方法 , 定时器 线程 试过处理器 , 绝望之下 , 都想用心跳发送我的报文了 . 具体解决方法是 , 用了thread 线程 , 实现runneble接口 , 在run方法中添加定时器 , 心跳的代码改了一下 , 就没有问题了 , 之前一直没有用socketTool 尝试给我的client发送报文 , 局限了我的思维 . 我检讨 ......

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)
编辑
预览

报告相同问题?

悬赏问题

  • ¥15 全志t113i启动qt应用程序提示internal error
  • ¥15 ensp可以看看嘛.
  • ¥80 51单片机C语言代码解决单片机为AT89C52是清翔单片机
  • ¥60 优博讯DT50高通安卓11系统刷完机自动进去fastboot模式
  • ¥15 minist数字识别
  • ¥15 在安装gym库的pygame时遇到问题,不知道如何解决
  • ¥20 uniapp中的webview 使用的是本地的vue页面,在模拟器上显示无法打开
  • ¥15 网上下载的3DMAX模型,不显示贴图怎么办
  • ¥15 关于#stm32#的问题:寻找一块开发版,作为智能化割草机的控制模块和树莓派主板相连,要求:最低可控制 3 个电机(两个驱动电机,1 个割草电机),其次可以与树莓派主板相连电机照片如下:
  • ¥15 潜在扩散模型的Unet特征提取
手机看
程序员都在用的中文IT技术交流社区

程序员都在用的中文IT技术交流社区

专业的中文 IT 技术社区,与千万技术人共成长

专业的中文 IT 技术社区,与千万技术人共成长

关注【CSDN】视频号,行业资讯、技术分享精彩不断,直播好礼送不停!

关注【CSDN】视频号,行业资讯、技术分享精彩不断,直播好礼送不停!

客服 返回
顶部