代码运行到ByteBuf in = (ByteBuf) msg;
byte[] req = new byte[in.readableBytes()];
in.readBytes(req);
就会报错,哪位大神能解决?
public class ZhiNengKaiGuanServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
super.channelActive(ctx);
//与服务端建立连接后
System.out.println("链接成功");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
// TODO Auto-generated method stub
super.channelRead(ctx, msg);
System.out.println("server channelRead..");
System.out.println("recive data:"+msg);
ByteBuf in = (ByteBuf) msg;
//in.retain();
byte[] req = new byte[in.readableBytes()];
in.readBytes(req);
//String resultStr = new String(req);
//System.out.println("Client said:" + resultStr);
// 释放资源,这行很关键
//in.release();
//String str=new String(req,"UTF-8");
System.out.print("Receive data: { ");
for (int i = 0; i <req.length ; i++) {
String dhs = "00" + Integer.toHexString(req[i] & 0xFF);
System.out.print(dhs.substring(dhs.length() - 2, dhs.length())
.toUpperCase() +
" ");
}
System.out.println(" } ");
String ser="hellow ,I'm Server";
ctx.write(ser);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
super.channelReadComplete(ctx);
System.out.println("server channelReadComplete..");
ctx.flush();//刷新后才将数据发出到SocketChannel
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
// TODO Auto-generated method stub
super.exceptionCaught(ctx, cause);
System.out.println("server exceptionCaught..");
cause.printStackTrace();
ctx.close();
}
public class ZhiNengKaiGuanServer {
private int port;
public ZhiNengKaiGuanServer(int port) {
this.port = port;
}
public void run() throws Exception {
// 服务器线程组 用于网络事件的处理 一个用于服务器接收客户端的连接
// 另一个线程组用于处理SocketChannel的网络读写
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// NIO服务器端的辅助启动类 降低服务器开发难度
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3) 类似NIO中serverSocketChannel
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("encoder", new ObjectEncoder());
//ch.pipeline().addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
//ch.pipeline().addLast("decoder", new StringEncoder());
//ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
//ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(4));
// 自己的逻辑Handler
ch.pipeline().addLast(new ZhiNengKaiGuanServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 1024) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)// 最后绑定I/O事件的处理类
// 服务器绑定端口监听
ChannelFuture f = b.bind(port).sync(); // (7)
// 监听服务器关闭监听
f.channel().closeFuture().sync();
} finally {
// 优雅退出 释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
System.out.println("服务器优雅的释放了线程资源...");
}
}
public static void main(String[] args) throws Exception {
int port = 9000;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
}
}
new ZhiNengKaiGuanServer(9527).run();
}
}
public class ZhiNengKaiGuanCommuccationConnector {
// 配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
private ZhiNengKaiGuan zhiNengKaiGuan;
public ZhiNengKaiGuanCommuccationConnector(ZhiNengKaiGuan zhiNengKaiGuan){
this.zhiNengKaiGuan=zhiNengKaiGuan;
}
public ZhiNengKaiGuan getZhiNengKaiGuan() {
return zhiNengKaiGuan;
}
public void setZhiNengKaiGuan(ZhiNengKaiGuan zhiNengKaiGuan) {
this.zhiNengKaiGuan = zhiNengKaiGuan;
}
public void connect(final byte[] req) throws Exception {
int port=zhiNengKaiGuan.getPort();
String IP=zhiNengKaiGuan.getIp();
System.out.println("this zhinengchazuo ip :" + this.zhiNengKaiGuan.getIp() +
" port:" + this.zhiNengKaiGuan.getPort());
System.out.print("Send Command: { ");
for (int i = 0; i < req.length; i++) {
String dhs = "00" +
Integer.toHexString(req[i] & 0xFF);
System.out.print(dhs.substring(dhs.length() - 2,
dhs.length()).toUpperCase() +
" ");
}
System.out.println(" } ");
//配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
//客户端辅助启动类 对客户端配置
Bootstrap b = new Bootstrap(); // (1)
b.group(group); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("encoder", new ObjectEncoder());
ch.pipeline().addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
ch.pipeline().addLast(new ZhiNengKaiGuanCommuncationHander(req));
}
});
//异步链接服务器 同步等待链接成功
ChannelFuture f = b.connect(IP, port).sync();
//等待链接关闭
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
System.out.println("客户端优雅的释放了线程资源...");
}
}
}
public class ZhiNengKaiGuanCommuncationHander extends ChannelInboundHandlerAdapter {
private static final Logger logger=Logger.getLogger(ZhiNengKaiGuanCommuncationHander.class.getName());
private ByteBuf sendMessage;
public ZhiNengKaiGuanCommuncationHander(byte[] reqs){
sendMessage=Unpooled.buffer(reqs.length);
sendMessage.writeBytes(reqs);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(sendMessage);
ctx.writeAndFlush(sendMessage);
System.out.println("客户端active");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("客户端收到服务器响应数据");
/*ByteBuf m = (ByteBuf) msg; // (1)
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
byte[] req=new byte[m.readableBytes()];
CommonUtil.remsg=req;
m.readBytes(req);
String body=new String(req,"UTF-8");
System.out.println("Now is:"+body);
ctx.close();
} finally {
m.release();
}*/
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
System.out.println("客户端收到服务器响应数据处理完成");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
logger.warning("Unexpected exception from downstream:"+cause.getMessage());
ctx.close();
System.out.println("客户端异常退出");
}
}
public class Test2 {
public static void main(String[] args) {
ZhiNengKaiGuan s=new ZhiNengKaiGuan("127.0.1.1", 9527);
KaiGuanCmd j=new KaiGuanCmd(s);
j.query();
}
}