服务端代码:
package com.14.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;
public class MultiplexerTimerServer implements Runnable {
private Selector selector;
private ServerSocketChannel serverSocketChannel;
private volatile boolean stop;
/**
* 创建多路复用器Selector、ServerSocketChannel,对Channel和TCP参数进行配置。
* 将ServerSocketChannel注册到Selector,监听SelectionKey.OP_ACCEPT操作位
* @param port
*/
public MultiplexerTimerServer(int port) {
try {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
// 将ServerSocketChannel设置为异步非阻塞模式
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("The time server is start in port : " + port);
} catch (Exception e) {
e.printStackTrace();
}
}
public void stop(){
this.stop = true;
}
/**
* 循环遍历selector,休眠时间为1s,无论是否有读写事件发生,selector每隔1s都被唤醒一次,seletor也提供了一个无参的select方法,当有
* 处于就绪状态的Channel时,selector将返回就绪状态的Channel的SelectorKey集合,通过对就绪状态的Channel集合进行迭代,可以进行网络
* 的异步读写操作。
*/
public void run() {
while (!stop) {
try {
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
SelectionKey key = null;
while (iterator.hasNext()) {
key = iterator.next();
iterator.remove();
try {
handleInput(key);
} catch (Exception e) {
if (key != null) {
key.cancel();;
if (key.channel() != null) {
key.channel().close();
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
if (selector != null) {
try {
selector.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 处理客户端请求
* @param key
* @throws Exception
*/
private void handleInput(SelectionKey key) throws Exception {
if (key.isValid()) {
/**
* 处理新接入的请求信息:根据SelectionKey的操作位进行判断即可获知网络事件的类型,通过ServerSocketChannel的accept接收
* 客户端的连接请求并创建SocketChannel实例,完成上述操作后,相当于完成了TCP的三次握手,TCP物理链路正式建立。
*/
if (key.isAcceptable()) {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
}
/**
* 读取客户端的请求信息
* 首先创建一个ByteBuffer,然后调用SocketChannel的read方法读取请求码流。因为是异步的,需要使用返回值
* 进行判断:1.返回值大于0,读到了字节,对字节进行编解码;2.返回值等于0,没有读取到字节,属于正常场景,忽略;
* 3.返回值为-1,链路已经关闭,需要关闭SocketChannel,释放资源。
* 当读取到码流以后,进行解码,首先对readBuffer进行flip操作,将缓冲区当前的limit设置为position,position
* 设置为0,用于后续对缓冲区的读取操作,然后根据缓冲区可读的字节个数创建字节数组,调用ByteByffer的get操作将
* 缓冲区可读的字节数组复制到新创建的字节数组中。
*/
if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "UTF-8");
System.out.println("The time server receive order : " + body);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ?
new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
doWrite(sc, currentTime);
} else if (readBytes < 0) {
key.cancel();
sc.close();
} else {
;
}
}
}
}
private void doWrite(SocketChannel channel, String response) throws IOException {
if (response != null && response.trim().length() > 0) {
byte[] bytes = response.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer);
System.out.println("server write message is : " + response);
}
}
}
客户端代码:
package com.14.nio;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class TimeClientHandle implements Runnable {
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean stop;
public TimeClientHandle(String host, int port) {
this.host = host == null ? "127.0.0.1" : host;
this.port = port;
try {
selector = Selector.open();
SocketAddress address;
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
} catch (Exception e) {
e.printStackTrace();
}
}
public void run() {
try {
doConnect();
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
while (!stop) {
try {
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
try {
handleInput(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
// if (key.isValid() && key.isReadable()) {
// key.cancel();
// if (key.channel() != null) {
// key.channel().close();
// }
// }
}
}
}
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
if (selector != null) {
try {
selector.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void doConnect() throws Exception {
// 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答
if (socketChannel.connect(new InetSocketAddress(host, port))) {
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
} else {
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
}
private void doWrite(SocketChannel socketChannel) throws Exception {
byte[] req = "QUERY TIME ORDER".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
writeBuffer.put(req);
writeBuffer.flip();
socketChannel.write(writeBuffer);
if (!writeBuffer.hasRemaining()) {
System.out.println("Send order 2 server succeed.");
}
}
private void handleInput(SelectionKey key) throws Exception {
if (key.isValid()) {
SocketChannel sc = (SocketChannel) key.channel();
// sc.configureBlocking(false);
if (key.isConnectable()) {
if (sc.finishConnect()) {
sc.register(selector, SelectionKey.OP_READ);
doWrite(sc);
} else {
System.exit(1);
}
if (key.isReadable()) {
System.out.println("==========key is readable");
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "UTF-8");
System.out.println("Now is : " + body);
this.stop = true;
} else if (readBytes < 0) {
key.cancel();
sc.close();
} else {
;
}
}
}
}
}
运行结果:
1.服务端:
The time server is start in port : 8080
The time server receive order : QUERY TIME ORDER
server write message is : Sun Oct 31 18:28:59 CST 2021
2.客户端:
Send order 2 server succeed.
debug看了一下客户端的key.isReadable始终返回false,求各位帮忙看一下,代码来自《Netty权威指南》