UDP传输文件,通过网线直连,低速传输正常,高速传输丢包的问题

用UDP传输视频文件,VC发送数据到安卓端,安卓端监听端口接收数据,采用网线直连,排除网络原因造成的丢包。当VC发送数据延时50ms(大概几百K每秒的速度发包),JAVA端能正确接收。可是不延时50ms发送时(大概2M/S的速度发包)JAVA端接收就出现丢包。JAVA开了两个线程,一个线程接收并放到缓冲队列,另一个线程从缓冲队列取出数据写入U盘。缓冲队列大小也比发送的文件要大,也不可能是缓冲队列不够大的问题。希望得到大神指点,代码如下:
void CSendudpDlg::OnBUTTONSendfile()
{
SOCKET sockClient=socket(AF_INET,SOCK_DGRAM,0);
SOCKADDR_IN addSrv;

CString str;
CTypeChange myChang;
unsigned char xbuf[4096];
int len;

m_portdest=8899;
m_serverIP="192.168.1.4";
HANDLE hfile;
hfile=CreateFile("D:\\amy.mp4",
                GENERIC_READ,
                FILE_SHARE_READ|FILE_SHARE_WRITE, 
                NULL,
                OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, 0);

addSrv.sin_addr.S_un.S_addr = inet_addr(m_serverIP);
addSrv.sin_family   = AF_INET;
addSrv.sin_port     = htons(m_portdest);
DWORD dwstart,dwend,dwrate;
DWORD file_len=19025412;
int npack=file_len/4063;
DWORD ret,lret;
int num=0;

while(npack--)
{   
    memset(&xbuf,0,sizeof(xbuf));
    ret=ReadFile(hfile,xbuf,4063,&lret,NULL);
    if( lret == 0)
    {   
        SetFilePointer(hfile,0,0,FILE_BEGIN);
        continue;
    }
    xbuf[1]=4077/256; //包头信息
    xbuf[2]=4077%256;
    xbuf[3]=(num/256)/256;
    xbuf[4]=(num/256)%256;
    xbuf[7]=num%256;
    ::Sleep(50);
    sendto(sockClient,(char*)xbuf,4096,0,(SOCKADDR*)&addSrv,sizeof(SOCKADDR));
    num++;

}
cprintf("end!!!!!!!\n");
closesocket(sockClient);    

}

安卓端:
package com.example.util;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.SocketException;
import java.util.Arrays;

import android.os.Environment;
import android.util.Log;

import com.example.net.Invoker;

/**

  • 封装了数据生产工厂,该工厂中提供了生产和消费方法
  • @author Jry
    *
    */
    public class Factory {

    private int read_loc;
    private int write_loc;
    private int buffer_len;
    private int content_len;
    private int buffers_count;
    private byte[] buffers;
    private int[] flags;

    private File file = null;
    private String filePath = null;
    private RandomAccessFile raf = null;
    private static final String CRLF = "\r\n";

    private DatagramSocket datagramSocket = null;
    private DatagramPacket dataPacket = null;

    private Object read_lock = new Object();
    private Object write_lock = new Object();

    private static Factory instance = null;

    private int last_pos;
    private int[][] loc_map = null;

    private Invoker invoker = null;

    public static Factory newInstance() {
    if (null == instance) {
    synchronized (Factory.class) {
    if (null == instance) {
    instance = new Factory();
    }
    }
    }
    return instance;
    }

    private Factory() {
    initialize();
    invoker = Invoker.getInstance();
    }

    private void initialize() {
    last_pos = 0;
    read_loc = 0;
    write_loc = 0;
    buffer_len = 4096;
    content_len = 4063;
    buffers_count = 1024 * 10;
    flags = new int[buffers_count];
    loc_map = new int[4063][256];
    buffers = new byte[buffer_len * buffers_count];

    Arrays.fill(flags, 0);
    for (int[] temp : loc_map) {
        Arrays.fill(temp, 0);
    }
    // 设置输出流
    filePath = Environment.getExternalStorageDirectory().getPath() + "/video/test.mp4";
    file = new File(filePath);
    if (file.exists()) {
        Log.i("123", "file has been existed !!!" + CRLF);
        if (file.delete()) {
            Log.i("123", "file has benn deleted!!!" + CRLF);
        }
    }
    try {
        if (file.createNewFile()) {
            Log.i("123", "成功创建新文件!!!" + CRLF);
        }
    } catch (IOException e2) {
        e2.printStackTrace();
    }
    
    // 预分配文件所占的磁盘空间,磁盘中会创建一个指定大小的文件
    try {
        raf = new RandomAccessFile(filePath, "rw");
        raf.setLength(19025412); // 预分配 19025412 的文件空间
        raf.close();
    } catch (FileNotFoundException e1) {
        e1.printStackTrace();
    } catch (IOException e) {
        e.printStackTrace();
    }
    
    // 设置socket的监听端口
    try {
        datagramSocket = new DatagramSocket(8899);
        // datagramSocket.setSoTimeout(10000);
        Log.i("123", "datagramsocket start successed!!!");
    } catch (SocketException e) {
        Log.i("123", "datagramsocket start failed!!!");
        e.printStackTrace();
    }
    

    }

    //计数当前总共收到了多少个数据包
    private int j = 0;

    public void create() {
    while (flags[write_loc] == 1) {
    synchronized (read_lock) {
    try {
    read_lock.wait();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    byte[] buffer = new byte[buffer_len];
    byte[] receiveByte = new byte[buffer_len];
    dataPacket = new DatagramPacket(receiveByte, buffer_len);
    try {
    datagramSocket.receive(dataPacket);
    Log.i("123", "收到了数据包 !!!" + " dataPacket.getLength() ==" + dataPacket.getLength());
    int len = dataPacket.getLength();
    if (len == buffer_len) {
    buffer = dataPacket.getData();
    System.arraycopy(buffer, 0, buffers, write_loc * buffer_len, buffer_len);
    Log.i("123", "写入缓存数据的长度------" + len + "------" + write_loc + CRLF);
    Log.i("123", "当前受到包的数据包的数量是------" + (++j) + CRLF);
    flags[write_loc] = 1;
    write_loc = (write_loc + 1) % buffers_count;
    synchronized (read_lock) {
    read_lock.notifyAll();
    }
    }
    } catch (IOException e) {
    e.printStackTrace();
    }

    }

    public void consume() {
    while (flags[read_loc] == 1) {
    try {
    raf = new RandomAccessFile(filePath, "rw");
    int start_loc = buffer_len * read_loc;
    /*
    int offset = i*buffer_len;
    raf.seek(offset);
    raf.write(buffers, start_loc, buffer_len);
    Log.i("123", "写入文件数据的长度*****************" + buffer_len +"read_loc == " + read_loc+CRLF);
    */
    int section = (buffers[start_loc + 7] & 0x0FF);
    int module = (buffers[start_loc + 3] & 0x0FF) * 256 + (buffers[start_loc + 4] & 0x0FF);
    int len = (buffers[start_loc + 1] & 0x0FF) * 256 + (buffers[start_loc + 2] & 0x0FF);
    int cur_pos = module * 256 + section;

            Log.i("123", "module == " + module + ",       section == " + section + ",    len == " + len + CRLF);
    
            if (loc_map[module][section] == 0) {
                // 补包的代码
                // if (!((last_pos + 1) == cur_pos)) {
                // int gap = cur_pos - last_pos - 1;
                // for (int i = 0; i < gap; i++) {
                // Command command = new
                // ConcreteCommand(UDPSender.getInstance());
                // command.setOrderNum(++last_pos);
                // invoker.addCommand(command);
                // }
                // invoker.executeCommand();
                // }
                loc_map[module][section] = 1;   // 表示该包号的包已经接收
                raf.seek(cur_pos * content_len);
                raf.write(buffers, start_loc + 13, len - 14);
                last_pos = cur_pos;
                Log.i("123", "写入文件数据的长度*****************" + (len - 14) + "*******" + read_loc + CRLF);
            }
            flags[read_loc] = 0;
            read_loc = (read_loc + 1) % buffers_count;
            synchronized (write_lock) {
                write_lock.notifyAll();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                raf.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    synchronized (read_lock) {
        try {
            read_lock.wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    

    }
    }

查看全部
tank1781053
tank1781053
2015/08/22 04:15
  • udp
  • 线程
  • android
  • 点赞
  • 收藏
  • 回答
    私信

3个回复