1 . 只有一个客户端和服务端 连着 , 发送消息的时候用channel 的 writeAndFlush , 用线程结果Channel 一直被线程占用 , 只能发消息 , 其他的事情做不了
- 请问有什么解决的办法吗 , 或者netty 是有那个类可以专门处理客户端给服务端发送数据的呢
- 还有就是有没有关于客户端给服务端发送数据执行流程的博文呢 4 . 谢谢帮助我的大神
// workerGroup处理已经被接收的连接
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
AtsClient client = this;
//workerGroup.scheduleAtFixedRate(command, initialDelay, period, unit);
try {
// Bootstrap是一个启动 NIO 服务的辅助启动类
Bootstrap b = new Bootstrap();
b.group(workerGroup);
// 使用NioSocketChannel类的新的channel接收进来的连接
b.channel(NioSocketChannel.class);
// option() 是提供给NioSocketChannel用来接收进来的连接
b.option(ChannelOption.SO_KEEPALIVE, true);
// 处理最近接收的channel
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// 设定IdleStateHandler心跳检测每五秒进行一次写检测,如果五秒内write()方法未被调用则触发一次userEventTrigger()方法,实现客户端每五秒秒向服务端发送一次消息
ch.pipeline().addLast(
new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS));
// 解码器
ch.pipeline().addLast("AtsDecoder", new AtsDecoder());
// 处理器
ch.pipeline().addLast("AtsHanler", new AtsHandler(client));
// 编码器
ch.pipeline().addLast("AtsEncoder", new AtsEncoder());
}
});
// Start the client. 绑定端口然后启动服务
ChannelFuture f = b.connect(host, port).sync();
channel = f.channel();
System.out.println(channel);
channel.eventLoop().scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
sendPlanTrain();
}
}, 5, 3, TimeUnit.SECONDS);
这个是客户端连接服务端的代码 , 这里调用了channel的eventLoop().scheduleAtFixedRate , 启动了线程 , 里面是往服务端发送的数据方法 . channel 执行了sendPlanTrain(); 就一直被线程征用 , 其他处理器无法使用
class Sender implements Runnable {
Channel channel = null;
public Sender(Channel channel) {
this.channel = channel;
}
@Override
public void run() {
Map<String,String> ScadaDataMap = null;
Jedis jedis = null;
try {
ScadaDataMap = new HashMap<String,String>();
Thread.sleep(3 * 1000); //设置暂停的时间 3 秒
JedisUtilProxy.initJedisPoolUtil("127.0.0.1", 6379);
jedis = JedisUtilProxy.getJedis();
for (int i = 1 ; i <= 30 ; i++) {
String statu = jedis.hget("SCADA_DATA",String.valueOf(i));
ScadaDataMap.put(String.valueOf(i), statu);
}
sendPlanTrain();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
logger.debug("===AtsHandler.userEventTriggered.客户端循环心跳监测发送: "
+ new Date());
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.WRITER_IDLE) {
logger.debug("===AtsHandler.userEventTriggered.客户端没有信息发送给服务端 客户端每5秒发送 MESSAGE_POLLING 信息给服务端");
ByteBuf byteBuf = null;
byte[] frameHead = new byte[4];
frameHead[0] = (byte) 0xff;
frameHead[1] = 0;
frameHead[2] = 5;
frameHead[3] = 0;
byte[] message = new byte[4];
message[0] = 0;
message[1] = (byte) 2;
message[2] = (byte) (AtsMessageIDCodeEnum.MESSAGE_POLLING
.getCode() >> 8);
message[3] = (byte) (AtsMessageIDCodeEnum.MESSAGE_POLLING
.getCode() & 0xFF);
byte[] frame = ArrayUtils.addAll(frameHead, message);
byteBuf = ctx.channel().alloc().buffer(8 * 4);
byteBuf.writeBytes(frame);
ctx.writeAndFlush(byteBuf);
}
}