2 dhtrabbit dhtrabbit 于 2016.02.22 16:15 提问

netty4 服务端丢包的问题

目标:实现服务端100000车辆数据接收 解析
问题:当模拟终端数量达到3000以上时,服务端只能接收到2000多个数据包

备注:2000一下数据包解码 解析没有问题

模拟客户端与服务端netty配置如下

package org.dht.vehicle.data.com;

import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.dht.vehicle.com.deCoder.BJVehicleDeviceDataDecoder;
import org.dht.vehicle.com.message.MessageManager;
import org.dht.vehicle.com.message.MessageSendVehicleRegister;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class BJTCPComService {
    static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
    private static ChannelFuture f = null;;
    private static EventLoopGroup group = new NioEventLoopGroup();
    private static Bootstrap b = new Bootstrap();
    private static Map<String, DeviceConInfo> map = new ConcurrentHashMap<String, DeviceConInfo>();

    public static void start() {
        // TODO Auto-generated method stub

         b.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_RCVBUF, 1024*1024)
                .option(ChannelOption.SO_SNDBUF, 10*1024*1024)      
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) 
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch)
                            throws Exception {
                        ChannelPipeline p = ch.pipeline();
                        p.addLast(new BJVehicleDeviceDataDecoder());
                        p.addLast(new DeviceClientHandler());
                    }
                });

    }

    public static Map<String, DeviceConInfo> getMap() {
        return map;
    }

    public static Channel getChannel() {
        if (null != f)
            return f.channel();
        return null;
    }

    public static void connect(String ip, String port)
            throws NumberFormatException, InterruptedException {

        ChannelFuture f = b.connect(ip, Integer.parseInt(port)).sync();
        DeviceConInfo d = new DeviceConInfo();
        d.socketChannel = (SocketChannel) f.channel();
        map.put(String.valueOf(1), d);
    }

    public static void connect(int num, int oneNums, String ip, String port,
            int beginID) {
        for (int i = 0; i < num; i++) {

            try {
                ChannelFuture f = b.connect(ip, Integer.parseInt(port)).sync();

                System.out.println("====" + MessageManager.getMessageManger()
                        + "====" + f.channel());
                DeviceConInfo d = new DeviceConInfo();
                String strID = String.format("%07d", i + 1 + beginID);
                String identiCode = "abcdefghij" + strID;
                d.socketChannel = (SocketChannel) f.channel();
                d.identiCode = identiCode;
                d.onState = BJProtocolConst.CONNECTED;
                map.put(identiCode, d);
                MessageSendVehicleRegister messagePack = new MessageSendVehicleRegister(
                        null, 0, d);

                MessageManager.getMessageManger().addSocketMessage(messagePack);

                Thread.sleep(5);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

    public static void connectNums() {
        System.out.println("======client nums:" + map.size() + "=====");
    }

    public static int getOnLineDevices() {
        int nums = 0;
        for (Map.Entry entry : map.entrySet()) {
            DeviceConInfo devConInfo = (DeviceConInfo) entry.getValue();

            if (BJProtocolConst.LOGINED == devConInfo.onState) {
                nums++;
            }

        }
        return nums;

    }

    public static void diConnect() {

        for (Map.Entry entry : map.entrySet()) {
            DeviceConInfo devConInfo = (DeviceConInfo) entry.getValue();

            if (null != devConInfo.socketChannel)
                devConInfo.socketChannel.close();
            try {
                devConInfo.socketChannel.closeFuture().sync();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            map.remove(entry.getKey());
        }

    }

    public static void stop() {
        diConnect();
        group.shutdownGracefully();

    }

    public static DeviceConInfo update(String identiCode,
            DeviceConInfo deviceConInfo) {

        return map.put(identiCode, deviceConInfo);

    }

    public static DeviceConInfo get(String identiCode) {

        return map.get(identiCode);

    }

    public static void remove(SocketChannel socketChannel) {

        for (Map.Entry entry : map.entrySet()) {
            DeviceConInfo devConInfo = (DeviceConInfo) entry.getValue();
            if (devConInfo.socketChannel == socketChannel) {
                map.remove(entry.getKey());
            }
        }
    }
}


package org.dht.vehicle.com.socketfactory;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;




public class BasicSocketServer implements SocketServer {

    protected ChannelHandler serverChannel; 
    protected Channel acceptorChannel;
    protected ServerBootstrap b ;   
    protected EventLoopGroup bossGroup ;
    protected EventLoopGroup workerGroup ;
    protected List<Integer> port;
    protected List<ChannelFuture> channelFuture;    

    public BasicSocketServer(){
        this.channelFuture = new ArrayList<ChannelFuture>();
    }
    public void setServerChannel(ChannelHandler serverChannel){
        this.serverChannel = serverChannel;
    }
    public ChannelHandler getServerChannel(){
        return this.serverChannel ;
    }
    public void setPort(List<Integer> port){
        this.port = port;
    }

    public void Start() throws Exception {
        // TODO Auto-generated method stub
        try{

            createServerBootstrap();

        }finally{
            Stop();
        }
    }

    public void Stop() throws Exception {
        // TODO Auto-generated method stub
        closeFuture();
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }

    public void Restart() throws Exception {
        // TODO Auto-generated method stub
        Stop();
        Start();
    }

    public void createServerBootstrap() throws Exception{

        // TODO Auto-generated method stub
    try{
         b           = new ServerBootstrap();

         bossGroup   = new NioEventLoopGroup(1);
         workerGroup = new NioEventLoopGroup(); 

        b.group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG, 1024)
         .option(ChannelOption.SO_RCVBUF, 10*1024*1024)
         .option(ChannelOption.SO_SNDBUF, 1024*1024)
         .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)   
         .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
         .childOption(ChannelOption.SO_KEEPALIVE, true)
         .handler(new LoggingHandler(LogLevel.INFO))
         .childHandler(serverChannel);
        bindPort();      
        // Wait until the server socket is closed.
        closeFuture();

     } finally {
         // Shut down all event loops to terminate all threads.
         bossGroup.shutdownGracefully();
         workerGroup.shutdownGracefully();
     }
    }


    public void bindPort() throws InterruptedException {
        // TODO Auto-generated method stub      


        Iterator<Integer> iter = port.iterator();           
        int nPort;            
            while(iter.hasNext())  
            {  
                nPort = (Integer)iter.next().intValue();  
                if(nPort>0){
                ChannelFuture f = b.bind(nPort).sync();                 
                channelFuture.add(f);
                }
                //port.remove(iter.next());
            }               
    }

    /**
     * �ر����е�ChannelFuture
     */
    public void closeFuture() throws InterruptedException {
        // TODO Auto-generated method stub

          Iterator<ChannelFuture> iter = channelFuture.iterator();        
          ChannelFuture f = null;

            while(iter.hasNext())  
            {  

                f=(ChannelFuture)iter.next();
                if(null != f){
                    f.channel().closeFuture().sync();   
                 }
                //port.remove(iter.next());
            }                   
    }

}

1个回答

psc928624742
psc928624742   2016.06.27 20:58

你这不是全部代码吧,你让别人肉眼帮你看问题?

Csdn user default icon
上传中...
上传图片
插入图片
准确详细的回答,更有利于被提问者采纳,从而获得C币。复制、灌水、广告等回答会被删除,是时候展现真正的技术了!
其他相关推荐
Netty4客户端使用遇到的坑笔记
1.注意ChannelPipeline中handler的调用顺序。完整的发送与接收过程,是先发送再接收,所以是先执行out再执行in。具体参考https://blog.csdn.net/u014614038/article/details/80487942 2.发送数据转码,通过MessageToByteEncoder,比如我使用的是SMessage格式数据: public class S...
UDP主要丢包原因及具体问题分析
一、主要丢包原因   1、接收端处理时间过长导致丢包:调用recv方法接收端收到数据后,处理数据花了一些时间,处理完后再次调用recv方法,在这二次调用间隔里,发过来的包可能丢失。对于这种情况可以修改接收端,将包接收后存入一个缓冲区,然后迅速返回继续recv.   2、发送的包巨大丢包:虽然send方法会帮你做大包切割成小包发送的事情,但包太大也不行。例如超过50K的一个udp包,不切
Netty 拆包 丢包 过程分析
基础正常流程:TCP缓存->Netty本地缓存->拆包器拆包->Handler处理封装好的数据包测试代码:netty/demo/tcppackage参考博文:http://www.jianshu.com/p/a0a51fd79f62项目地址如果不设置解码器测试流程: 客户端连续发送n次18字节的数据 @Override public void channelActive
【Netty4.x】Netty TCP粘包/拆包问题的解决办法(二)
上一篇:【Netty4.X】Unity客户端与Netty服务器的网络通信(一)一、什么是TCP粘包/拆包  如图所示,假如客户端分别发送两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下4中情况:第一种情况:Server端分别读取到D1和D2,没有产生粘包和拆包的情况。第二种情况:Server端一次接收到两个数据包,D1和D2粘合在一起,被称为TCP粘包。第三种情
netty4与spring集成
netty4与spring集成;客户端服务端代码实现。netty4与spring集成。
高并发下Netty4底层bug导致直接内存溢出分析
事故记录: 10点游戏开服,迅速冲破2300+单区同时在线 18点15分,运营反应玩家进不了,准备吃饭的人被抓回来排查故障 发现,由于直接内存被占满,一直在Full GC ,并且回收不掉,所以完全不处理玩家请求,通知运维重启服务器,临时解决。 2.考虑了下是不是把RPC连接数量改成了8条,超时改长了了导致,试着把数量减少,超时改成2个小时,发现直接内存随着时间推移还在增加。
Netty 4源码解析:服务端启动
Netty 4源码解析:服务端启动1.基础知识1.1 Netty 4示例因为Netty 5还处于测试版,所以选择了目前比较稳定的Netty 4作为学习对象。而且5.0的变化也不像4.0这么大,好多网上的例子都已经过时了。 <dependency> <groupId>io.netty</groupId> <artifactId>netty-
客户端连续发包,服务器丢包?no,可能是缓冲区满了。
<br />socket发送数据时候先把数据发送到socket缓冲区中,之后接受函数再从缓冲区中取数据,如果发送端特别快的时候,缓冲区很快就被填满(socket默认的是1024×8=8192字节),这时候我们应该根据情况设置缓冲区的大小,可以通过setsockopt函数实现 int setsockopt( SOCKET s, int level, int optname, const char* optval, int optlen ); eg: int optVal = 1024*1024 ; int o
Netty4 学习笔记之二:客户端与服务端心跳 demo
前言 在上一篇Netty demo 中,了解了Netty中的客户端和服务端之间的通信。这篇则介绍Netty中的心跳。 之前在Mina 中心跳的使用是通过继承 KeepAliveMessageFactory 心跳工厂类而实现的。而在Netty中,提供IdleStateHandler 类,可以实现对三种心跳的检测,分别是readerIdleTime、writerIdleTime和allIdleTi...
当mqtt客户端向服务端发送信息出现掉包的问题
当mqtt客户端向服务端发送信息出现掉包的问题