Ma_Smile 2021-10-31 18:38 采纳率: 0%
浏览 19
已结题

NIO客户端接收不到服务端消息

服务端代码:

package com.14.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;

public class MultiplexerTimerServer implements Runnable {

    private Selector selector;

    private ServerSocketChannel serverSocketChannel;

    private volatile boolean stop;

    /**
     * 创建多路复用器Selector、ServerSocketChannel,对Channel和TCP参数进行配置。
     * 将ServerSocketChannel注册到Selector,监听SelectionKey.OP_ACCEPT操作位
     * @param port
     */
    public MultiplexerTimerServer(int port) {
        try {
            selector = Selector.open();
            serverSocketChannel = ServerSocketChannel.open();
            // 将ServerSocketChannel设置为异步非阻塞模式
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("The time server is start in port : " + port);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void stop(){
        this.stop = true;
    }

    /**
     * 循环遍历selector,休眠时间为1s,无论是否有读写事件发生,selector每隔1s都被唤醒一次,seletor也提供了一个无参的select方法,当有
     * 处于就绪状态的Channel时,selector将返回就绪状态的Channel的SelectorKey集合,通过对就绪状态的Channel集合进行迭代,可以进行网络
     * 的异步读写操作。
     */
    public void run() {
        while (!stop) {
            try {
                selector.select(1000);
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                SelectionKey key = null;
                while (iterator.hasNext()) {
                    key = iterator.next();
                    iterator.remove();
                    try {
                        handleInput(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();;
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        if (selector != null) {
            try {
                selector.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 处理客户端请求
     * @param key
     * @throws Exception
     */
    private void handleInput(SelectionKey key) throws Exception {
        if (key.isValid()) {
            /**
             * 处理新接入的请求信息:根据SelectionKey的操作位进行判断即可获知网络事件的类型,通过ServerSocketChannel的accept接收
             * 客户端的连接请求并创建SocketChannel实例,完成上述操作后,相当于完成了TCP的三次握手,TCP物理链路正式建立。
             */
            if (key.isAcceptable()) {
                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                SocketChannel sc = ssc.accept();
                sc.configureBlocking(false);
                sc.register(selector, SelectionKey.OP_READ);
            }
            /**
             * 读取客户端的请求信息
             * 首先创建一个ByteBuffer,然后调用SocketChannel的read方法读取请求码流。因为是异步的,需要使用返回值
             * 进行判断:1.返回值大于0,读到了字节,对字节进行编解码;2.返回值等于0,没有读取到字节,属于正常场景,忽略;
             * 3.返回值为-1,链路已经关闭,需要关闭SocketChannel,释放资源。
             * 当读取到码流以后,进行解码,首先对readBuffer进行flip操作,将缓冲区当前的limit设置为position,position
             * 设置为0,用于后续对缓冲区的读取操作,然后根据缓冲区可读的字节个数创建字节数组,调用ByteByffer的get操作将
             * 缓冲区可读的字节数组复制到新创建的字节数组中。
             */
            if (key.isReadable()) {
                SocketChannel sc = (SocketChannel) key.channel();
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                if (readBytes > 0) {
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes, "UTF-8");
                    System.out.println("The time server receive order : " + body);
                    String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ?
                            new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
                    doWrite(sc, currentTime);
                } else if (readBytes < 0) {
                    key.cancel();
                    sc.close();
                } else {
                    ;
                }
            }
        }
    }

    private void doWrite(SocketChannel channel, String response) throws IOException {
        if (response != null && response.trim().length() > 0) {
            byte[] bytes = response.getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            writeBuffer.put(bytes);
            writeBuffer.flip();
            channel.write(writeBuffer);
            System.out.println("server write message is : " + response);
        }
    }
}

客户端代码:

package com.14.nio;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class TimeClientHandle implements Runnable {

    private String host;
    private int port;
    private Selector selector;
    private SocketChannel socketChannel;
    private volatile boolean stop;

    public TimeClientHandle(String host, int port) {
        this.host = host == null ? "127.0.0.1" : host;
        this.port = port;
        try {
            selector = Selector.open();
            SocketAddress address;
            socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void run() {
        try {
            doConnect();
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
        while (!stop) {
            try {
                selector.select(1000);
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectionKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    key = it.next();
                    it.remove();
                    try {
                        handleInput(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
//                            if (key.isValid() && key.isReadable()) {
//                                key.cancel();
//                                if (key.channel() != null) {
//                                    key.channel().close();
//                                }
//                            }
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                System.exit(1);
            }
        }
        if (selector != null) {
            try {
                selector.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private void doConnect() throws Exception {
        // 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答
        if (socketChannel.connect(new InetSocketAddress(host, port))) {
            socketChannel.register(selector, SelectionKey.OP_READ);
            doWrite(socketChannel);
        } else {
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
        }
    }

    private void doWrite(SocketChannel socketChannel) throws Exception {
        byte[] req = "QUERY TIME ORDER".getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
        writeBuffer.put(req);
        writeBuffer.flip();
        socketChannel.write(writeBuffer);
        if (!writeBuffer.hasRemaining()) {
            System.out.println("Send order 2 server succeed.");
        }
    }

    private void handleInput(SelectionKey key) throws Exception {
        if (key.isValid()) {
            SocketChannel sc = (SocketChannel) key.channel();
//            sc.configureBlocking(false);
            if (key.isConnectable()) {
                if (sc.finishConnect()) {
                    sc.register(selector, SelectionKey.OP_READ);
                    doWrite(sc);
                } else {
                    System.exit(1);
                }
                if (key.isReadable()) {
                    System.out.println("==========key is readable");
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                    int readBytes = sc.read(readBuffer);
                    if (readBytes > 0) {
                        readBuffer.flip();
                        byte[] bytes = new byte[readBuffer.remaining()];
                        readBuffer.get(bytes);
                        String body = new String(bytes, "UTF-8");
                        System.out.println("Now is : " + body);
                        this.stop = true;
                    } else if (readBytes < 0) {
                        key.cancel();
                        sc.close();
                    } else {
                        ;
                    }
                }
            }
        }
    }

运行结果:
1.服务端:

The time server is start in port : 8080
The time server receive order : QUERY TIME ORDER
server write message is : Sun Oct 31 18:28:59 CST 2021

2.客户端:

Send order 2 server succeed.

debug看了一下客户端的key.isReadable始终返回false,求各位帮忙看一下,代码来自《Netty权威指南》

  • 写回答

0条回答 默认 最新

    报告相同问题?

    问题事件

    • 系统已结题 11月8日
    • 创建了问题 10月31日

    悬赏问题

    • ¥15 c语言怎么用printf(“\b \b”)与getch()实现黑框里写入与删除?
    • ¥20 怎么用dlib库的算法识别小麦病虫害
    • ¥15 华为ensp模拟器中S5700交换机在配置过程中老是反复重启
    • ¥15 java写代码遇到问题,求帮助
    • ¥15 uniapp uview http 如何实现统一的请求异常信息提示?
    • ¥15 有了解d3和topogram.js库的吗?有偿请教
    • ¥100 任意维数的K均值聚类
    • ¥15 stamps做sbas-insar,时序沉降图怎么画
    • ¥15 买了个传感器,根据商家发的代码和步骤使用但是代码报错了不会改,有没有人可以看看
    • ¥15 关于#Java#的问题,如何解决?