2 dhtrabbit dhtrabbit 于 2016.02.22 16:15 提问

netty4 服务端丢包的问题

目标:实现服务端100000车辆数据接收 解析
问题:当模拟终端数量达到3000以上时,服务端只能接收到2000多个数据包

备注:2000一下数据包解码 解析没有问题

模拟客户端与服务端netty配置如下

package org.dht.vehicle.data.com;

import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.dht.vehicle.com.deCoder.BJVehicleDeviceDataDecoder;
import org.dht.vehicle.com.message.MessageManager;
import org.dht.vehicle.com.message.MessageSendVehicleRegister;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class BJTCPComService {
    static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
    private static ChannelFuture f = null;;
    private static EventLoopGroup group = new NioEventLoopGroup();
    private static Bootstrap b = new Bootstrap();
    private static Map<String, DeviceConInfo> map = new ConcurrentHashMap<String, DeviceConInfo>();

    public static void start() {
        // TODO Auto-generated method stub

         b.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_RCVBUF, 1024*1024)
                .option(ChannelOption.SO_SNDBUF, 10*1024*1024)      
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) 
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch)
                            throws Exception {
                        ChannelPipeline p = ch.pipeline();
                        p.addLast(new BJVehicleDeviceDataDecoder());
                        p.addLast(new DeviceClientHandler());
                    }
                });

    }

    public static Map<String, DeviceConInfo> getMap() {
        return map;
    }

    public static Channel getChannel() {
        if (null != f)
            return f.channel();
        return null;
    }

    public static void connect(String ip, String port)
            throws NumberFormatException, InterruptedException {

        ChannelFuture f = b.connect(ip, Integer.parseInt(port)).sync();
        DeviceConInfo d = new DeviceConInfo();
        d.socketChannel = (SocketChannel) f.channel();
        map.put(String.valueOf(1), d);
    }

    public static void connect(int num, int oneNums, String ip, String port,
            int beginID) {
        for (int i = 0; i < num; i++) {

            try {
                ChannelFuture f = b.connect(ip, Integer.parseInt(port)).sync();

                System.out.println("====" + MessageManager.getMessageManger()
                        + "====" + f.channel());
                DeviceConInfo d = new DeviceConInfo();
                String strID = String.format("%07d", i + 1 + beginID);
                String identiCode = "abcdefghij" + strID;
                d.socketChannel = (SocketChannel) f.channel();
                d.identiCode = identiCode;
                d.onState = BJProtocolConst.CONNECTED;
                map.put(identiCode, d);
                MessageSendVehicleRegister messagePack = new MessageSendVehicleRegister(
                        null, 0, d);

                MessageManager.getMessageManger().addSocketMessage(messagePack);

                Thread.sleep(5);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

    public static void connectNums() {
        System.out.println("======client nums:" + map.size() + "=====");
    }

    public static int getOnLineDevices() {
        int nums = 0;
        for (Map.Entry entry : map.entrySet()) {
            DeviceConInfo devConInfo = (DeviceConInfo) entry.getValue();

            if (BJProtocolConst.LOGINED == devConInfo.onState) {
                nums++;
            }

        }
        return nums;

    }

    public static void diConnect() {

        for (Map.Entry entry : map.entrySet()) {
            DeviceConInfo devConInfo = (DeviceConInfo) entry.getValue();

            if (null != devConInfo.socketChannel)
                devConInfo.socketChannel.close();
            try {
                devConInfo.socketChannel.closeFuture().sync();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            map.remove(entry.getKey());
        }

    }

    public static void stop() {
        diConnect();
        group.shutdownGracefully();

    }

    public static DeviceConInfo update(String identiCode,
            DeviceConInfo deviceConInfo) {

        return map.put(identiCode, deviceConInfo);

    }

    public static DeviceConInfo get(String identiCode) {

        return map.get(identiCode);

    }

    public static void remove(SocketChannel socketChannel) {

        for (Map.Entry entry : map.entrySet()) {
            DeviceConInfo devConInfo = (DeviceConInfo) entry.getValue();
            if (devConInfo.socketChannel == socketChannel) {
                map.remove(entry.getKey());
            }
        }
    }
}


package org.dht.vehicle.com.socketfactory;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;




public class BasicSocketServer implements SocketServer {

    protected ChannelHandler serverChannel; 
    protected Channel acceptorChannel;
    protected ServerBootstrap b ;   
    protected EventLoopGroup bossGroup ;
    protected EventLoopGroup workerGroup ;
    protected List<Integer> port;
    protected List<ChannelFuture> channelFuture;    

    public BasicSocketServer(){
        this.channelFuture = new ArrayList<ChannelFuture>();
    }
    public void setServerChannel(ChannelHandler serverChannel){
        this.serverChannel = serverChannel;
    }
    public ChannelHandler getServerChannel(){
        return this.serverChannel ;
    }
    public void setPort(List<Integer> port){
        this.port = port;
    }

    public void Start() throws Exception {
        // TODO Auto-generated method stub
        try{

            createServerBootstrap();

        }finally{
            Stop();
        }
    }

    public void Stop() throws Exception {
        // TODO Auto-generated method stub
        closeFuture();
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }

    public void Restart() throws Exception {
        // TODO Auto-generated method stub
        Stop();
        Start();
    }

    public void createServerBootstrap() throws Exception{

        // TODO Auto-generated method stub
    try{
         b           = new ServerBootstrap();

         bossGroup   = new NioEventLoopGroup(1);
         workerGroup = new NioEventLoopGroup(); 

        b.group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG, 1024)
         .option(ChannelOption.SO_RCVBUF, 10*1024*1024)
         .option(ChannelOption.SO_SNDBUF, 1024*1024)
         .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)   
         .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
         .childOption(ChannelOption.SO_KEEPALIVE, true)
         .handler(new LoggingHandler(LogLevel.INFO))
         .childHandler(serverChannel);
        bindPort();      
        // Wait until the server socket is closed.
        closeFuture();

     } finally {
         // Shut down all event loops to terminate all threads.
         bossGroup.shutdownGracefully();
         workerGroup.shutdownGracefully();
     }
    }


    public void bindPort() throws InterruptedException {
        // TODO Auto-generated method stub      


        Iterator<Integer> iter = port.iterator();           
        int nPort;            
            while(iter.hasNext())  
            {  
                nPort = (Integer)iter.next().intValue();  
                if(nPort>0){
                ChannelFuture f = b.bind(nPort).sync();                 
                channelFuture.add(f);
                }
                //port.remove(iter.next());
            }               
    }

    /**
     * �ر����е�ChannelFuture
     */
    public void closeFuture() throws InterruptedException {
        // TODO Auto-generated method stub

          Iterator<ChannelFuture> iter = channelFuture.iterator();        
          ChannelFuture f = null;

            while(iter.hasNext())  
            {  

                f=(ChannelFuture)iter.next();
                if(null != f){
                    f.channel().closeFuture().sync();   
                 }
                //port.remove(iter.next());
            }                   
    }

}

1个回答

psc928624742
psc928624742   2016.06.27 20:58

你这不是全部代码吧,你让别人肉眼帮你看问题?

Csdn user default icon
上传中...
上传图片
插入图片
准确详细的回答,更有利于被提问者采纳,从而获得C币。复制、灌水、广告等回答会被删除,是时候展现真正的技术了!