想学习下netty ,就在上班空闲时捣鼓捣鼓了一下,想直接往项目方向去搞, 实现聊天的功能.
主要是搞服务端,客户端以后有空去学学android,这里只是启动了一个netty客户端.
这里主要是webSocket和protobuf,问题就是我服务端往客户端是能发消息的,且客户端能正常收到,但是客户端往服务端发消息会报错
java.lang.ClassCastException: class io.netty.buffer.PooledUnsafeDirectByteBuf cannot be cast to class io.netty.handler.codec.http.FullHttpRequest (io.netty.buffer.PooledUnsafeDirectByteBuf and io.netty.handler.codec.http.FullHttpRequest are in unnamed module of loader 'app')
,看上去消息好像被发成了FullHttpRequest ....
package com.wuhu.fantasyim.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Slf4j
@Component
public class SocketServer {
private static final int port = 9876;
private static final EventLoopGroup boss = new NioEventLoopGroup();
private static final EventLoopGroup work = new NioEventLoopGroup();
private static final ServerBootstrap serverBootstrap = new ServerBootstrap();
@PostConstruct
void init(){
run();
}
public void run() {
try {
serverBootstrap.group(boss, work);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new NettyServerFilter());
// 服务器绑定端口监听
ChannelFuture f = serverBootstrap.bind(port).sync();
new SocketClient().run();
log.info("netty Socket 服务启动成功,端口:{}",port);
// 监听服务器关闭监听
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
work.shutdownGracefully();
boss.shutdownGracefully();
}
}
}
package com.wuhu.fantasyim.server;
import com.wuhu.fantasyim.protobuf.SocketProto;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
public class NettyServerFilter extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(1024*64));
pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536 * 10));
//入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
pipeline.addLast(new IdleStateHandler(1, 1, 2, TimeUnit.SECONDS));
// pipeline.addLast(new StringDecoder());
// pipeline.addLast(new StringEncoder());
pipeline.addLast(new ProtobufVarint32FrameDecoder());
pipeline.addLast(new ProtobufDecoder(SocketProto.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new SocketServerHandler());
}
}
package com.wuhu.fantasyim.server;
import com.alibaba.fastjson2.JSONObject;
import com.google.protobuf.Any;
import com.wuhu.fantasyim.protobuf.MessageWrapper;
import com.wuhu.fantasyim.protobuf.SocketProto;
import com.wuhu.fantasyim.protobuf.Status;
import com.wuhu.fantasyim.utils.DataFormatUtils;
import com.wuhu.fantasyim.utils.JwtUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.EmptyByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ByteProcessor;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
import java.nio.charset.Charset;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Component
public class SocketServerHandler extends ChannelInboundHandlerAdapter {
private static final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private static final ConcurrentHashMap<Integer, List<String>> userChannels = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, Integer> channelUserIds = new ConcurrentHashMap<>();
@Value("${jwt.tokenName}")
private String tokenName;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(msg instanceof FullHttpRequest){
FullHttpRequest aggregator = (FullHttpRequest) msg;
List<Map.Entry<String,String>> headerParams = aggregator.headers().entries();
String token = "";
for(Map.Entry<String,String> headerParamItem : headerParams){
if(Objects.equals(headerParamItem.getKey(), tokenName)){
token = headerParamItem.getValue();
break;
}
}
Integer userId = 0;
try {
userId = Integer.valueOf(JwtUtils.getClaimByName(token,"userId"));
}catch (Exception e){
ctx.writeAndFlush("fuck");
e.printStackTrace();
}
}else if (msg instanceof SocketProto) {
SocketProto socketProto = (SocketProto)msg;
MessageWrapper wrapper = socketProto.getPayload().unpack(MessageWrapper.class);
log.info("消息样子"+socketProto);
switch (socketProto.getType()){
case KEEPALIVE:
break;
case ACK:
break;
case MESSAGE:
break;
case SYSTEM:
break;
case EVENT:
break;
default:
log.error("未知消息类型:{}",socketProto.getType());
}
}else{
log.info("无效的消息体:"+msg.toString());
ctx.writeAndFlush("歪比巴卜"+"\n");
}
Channel channel = channels.find(ctx.channel().id());
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
if (idleStateEvent.state() == IdleState.READER_IDLE) {//读空闲
//检测到读空闲不做任何的操作
log.info("读空闲事件触发...");
SocketProto.Builder messageBuilder = SocketProto.newBuilder()
.setStatus(Status.STATUS_SUCCESS)
.setCode(200)
.setVersion(1)
.setSequence(1L)
.setType(SocketProto.Type.MESSAGE)
// .setPayload(Any.pack(evt))
.setMsg("surprise mother fucker")
.setTimestamp(Instant.now().toEpochMilli())
.setAppType(SocketProto.AppType.ANDROID);
SocketProto message = messageBuilder.build();
ctx.writeAndFlush(message);
ctx.writeAndFlush( evt);
} else if (idleStateEvent.state() == IdleState.WRITER_IDLE) {//写空闲
//检测到写空闲不做任何的操作
log.info("写空闲事件触发...");
ctx.writeAndFlush("写空闲事件触发 ");
} else if (idleStateEvent.state() == IdleState.ALL_IDLE) {//读写空闲
log.info("读写空闲事件触发...");
ctx.writeAndFlush("读写空闲事件触发 ");
// ctx.channel().close();
}
}else{
super.userEventTriggered(ctx, evt);
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
// ctx.channel().
log.info("客户端:{},已连接",ctx.channel().remoteAddress());
channels.add(ctx.channel());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
String channelId = ctx.channel().id().asLongText();
// Integer userId = userChannels.get(channelId);
log.info("用户:{},主动断开连接",0);
userChannels.remove(channelId);
channels.remove(ctx.channel());
ctx.channel().close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// String channelId = ctx.channel().id().asLongText();
//// Integer userId = userChannels.get(channelId);
// log.info("出现异常,用户:{},断开连接",0);
// userChannels.remove(channelId);
// channels.remove(ctx.channel());
// ctx.channel().close();
log.error("服务端出现异常");
cause.printStackTrace();
}
}
以下是客户端代码
package com.wuhu.fantasyim.server;
import com.wuhu.fantasyim.protobuf.SocketProto;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
public class SocketClient {
private static final EventLoopGroup work = new NioEventLoopGroup();
public void run() {
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(work).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// pipeline.addLast(new ChunkedWriteHandler());
// pipeline.addLast(new HttpObjectAggregator(1024*64));
//入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
// pipeline.addLast(new IdleStateHandler(5, 5, 10, TimeUnit.SECONDS));
// pipeline.addLast(new StringDecoder());
// pipeline.addLast(new StringEncoder());
pipeline.addLast(new ProtobufVarint32FrameDecoder());
pipeline.addLast(new ProtobufDecoder(SocketProto.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new SocketClientHandler());
}
});
//发起异步连接操作,同步阻等待结果
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9876).sync();
log.info("连接成功");
//等待客户端链路关闭
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
work.shutdownGracefully();
}
}
}
package com.wuhu.fantasyim.server;
import com.wuhu.fantasyim.protobuf.SocketProto;
import com.wuhu.fantasyim.protobuf.Status;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.Instant;
@Slf4j
@Component
public class SocketClientHandler extends ChannelInboundHandlerAdapter {
private WebSocketServerHandshaker handShaker;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
SocketProto message = (SocketProto) msg;
log.info("收到消息了捏:"+message);
SocketProto.Builder messageBuilder = SocketProto.newBuilder()
.setStatus(Status.STATUS_SUCCESS)
.setCode(200)
.setVersion(1)
.setSequence(1L)
.setType(SocketProto.Type.MESSAGE)
// .setPayload(Any.pack(evt))
.setMsg("酸萝卜别吃")
.setTimestamp(Instant.now().toEpochMilli())
.setAppType(SocketProto.AppType.ANDROID);
SocketProto backMessage = messageBuilder.build();
ctx.writeAndFlush(backMessage);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// String channelId = ctx.channel().id().asLongText();
//// Integer userId = userChannels.get(channelId);
// log.info("出现异常,用户:{},断开连接",0);
// userChannels.remove(channelId);
// channels.remove(ctx.channel());
// ctx.channel().close();
log.info("客户端出现异常");
cause.printStackTrace();
}
}
这是运行结果
不知道是不是webSocket的什么机制,不是特别了解那一块的内容,握手应该连接的时候netty就完成了,反正就是客户端发的消息被服务端接收成FullHttpRequest,而且是在WebSocketServerProtocolHandshakeHandler这个类里面发生的异常