Bandysol 2023-07-16 16:50 采纳率: 83.3%
浏览 92
已结题

C# 多线程下载的问题

在测试下面的代码时,线程0下载的文件内容正确,但后面的线程不能下载到正确的数据(我查看了下载文件的二进制数据)(也有可能是拼接的问题)。
注:Logger是一个日志类,Array是一个仿数组类(提供了数组下标访问)

public class ThreadDownload {
    private Logger log = new("Download.log");

    private string URL;     //文件URL
    private string Path;    //下载后文件保存位置
    private long   Length;  //文件大小(B)
    private int    Threads; //下载线程数

    private long    DownloadEnd = 0;     //下载完成的字节
    private bool    Stop        = false; //出错停止
    private bool    NoAccept    = false; //目标服务器是否支持多线程下载
    private Array<byte[]> DownloadBuffer;
    private bool[]  Changed;

    public ThreadDownload(string dURL,string dPath = null,int dThreads = 64) {
        URL = dURL;
        Path = dPath;
        Threads = dThreads;
        Do();
    }

    private void Do() {
        try {
            ServicePointManager.DefaultConnectionLimit = Threads + 1;
            ServicePointManager.SecurityProtocol = SecurityProtocolType.Ssl3 | SecurityProtocolType.SystemDefault | SecurityProtocolType.Tls | SecurityProtocolType.Tls11 | SecurityProtocolType.Tls12 | SecurityProtocolType.Tls13;
            Length         = WebRequest.Create(URL).GetResponse().ContentLength;
            long BlockLong = (Length - Length % Threads) / Threads;
            DownloadBuffer = new(Threads+1);
            Changed        = new bool[Threads+1];
            Console.Clear();
            Path = Path==null ? new Uri(URL).Segments[new Uri(URL).Segments.Length-1] : Path;
            if (File.Exists(Path)) {
                File.Delete(Path);
            }
            {
                var readreturntmp1 = ((HttpWebRequest)WebRequest.Create(URL));
                readreturntmp1.AddRange(0,1);
                var readreturn     = (HttpWebResponse)readreturntmp1.GetResponse();
                for (int i = 0;i < readreturn.Headers.AllKeys.Length;++i) {
                    if (readreturn.Headers.AllKeys[i].Equals("Accept-Ranges") && readreturn.Headers[i].Equals("bytes")) {
                        break;
                    } else if (readreturn.Headers.AllKeys[i].Equals("Accept-Ranges") && readreturn.Headers[i].Equals("none")) {
                        NoAccept = true;
                        break;
                    }
                    if (i == readreturn.Headers.AllKeys.Length-1) {
                        NoAccept = true;
                        break;
                    }
                }
            }
            if (!NoAccept) {
                Thread[] downThs = new Thread[Threads+1];
                Thread   pushThs = new Thread(() => {
                    try {
                        int Changeds = 0;
                        var write = new FileStream(Path,FileMode.Append,FileAccess.Write);
                        for (;Changeds < Threads+1;) {
                            if (Stop) {
                                write.Close();
                                File.Delete(Path);
                                log = log << "将在单线程重试";
                                new Download(URL,Path,ref log);
                                return;
                            } else if (Changed[Changeds]) {
                                write.Write(DownloadBuffer[Changeds],0,DownloadBuffer[Changeds].Length);
                                write.Write(new byte[1]{0},0,1);
                                write.Flush();
                                DownloadBuffer[Changeds] = new byte[0];
                                log = log << "写入" + Changeds + "号线程的数据";
                                ++Changeds;
                            } else {
                                Thread.Sleep(200);
                            }
                        }
                        write.Close();
                    } catch(Exception ex) {
                        Stop = true;
                        log = log << new Exception("在合并线程:",ex);
                    }
                });
                Thread   tjisThs = new Thread(() => {
                    try {
                        for (;;) {
                            long old = DownloadEnd;
                            Thread.Sleep(200);
                            Console.Clear();
                            Console.WriteLine("已完成下载{0}/{2},当前下载速度{1}\\s。",ByteToBytes.ByteToString(DownloadEnd),ByteToBytes.ByteToString(DownloadEnd - old),ByteToBytes.ByteToString(Length));
                        }
                    } catch (Exception ex) {
                        Stop = true;
                        log = log << new Exception("在统计线程:",ex);
                    }
                });
                pushThs.Start();
                for (int i = 0;i < downThs.Length;++i) {
                    downThs[i] = new Thread(info => {
                        const int ListenBlackLong = 1024*64;

                        try {
                            int DownloadBlock = (int)(((DownloadInfo)info).End - ((DownloadInfo)info).Start);
                            int ID            = ((DownloadInfo)info).ID;
                            DownloadBuffer[ID] = new byte[DownloadBlock];
                            int key = 0;
                            Logger log = new("DownloadLog\\" + ID + ".log");
                            log = log << "下载区间:" + ((DownloadInfo)info).Start + "~" + ((DownloadInfo)info).End + ",共" + DownloadBlock;
                            for (;key < DownloadBlock;) {
                                long nowend;
                                if (DownloadBlock - key < ListenBlackLong) {
                                    nowend = DownloadBlock;
                                } else {
                                    nowend = ListenBlackLong + key;
                                }
                                int nowrange = (int)(nowend - key);
                                log = log << "读取区间:" + key + "~" + nowend + ",共" + nowrange;
                                var readstreamt = ((HttpWebRequest)WebRequest.Create(URL));
                                readstreamt.AddRange(key,nowend);
                                var readstream = readstreamt.GetResponse().GetResponseStream();
                                for (int readkey = 0;;) {
                                    if (readkey == nowrange) {
                                        break;
                                    }
                                    lock (DownloadBuffer) {
                                        readkey += readstream.Read(DownloadBuffer[ID],key+readkey,nowrange-readkey);
                                    }
                                }
                                readstream.Close();
                                DownloadEnd += nowrange;
                                key += nowrange;
                            }
                            log.Close();
                            if (!Stop) {
                                Changed[ID] = true;
                            } else {
                                File.Delete(Path + "-" + ID.ToString());
                            }
                        } catch (Exception ex) {
                            Stop = true;
                            log = log << new Exception("在" + ((DownloadInfo)info).ID.ToString() + "号线程:",ex);
                        }
                    });
                    var info = new DownloadInfo();
                    if (i!=downThs.Length-1) {
                        info.Start = BlockLong * i;
                        info.End   = BlockLong * (i + 1);
                    } else {
                        info.Start = Length - Length % Threads;
                        info.End   = Length;
                    }
                    info.ID = i;
                    downThs[i].Start(info);
                }
                tjisThs.Start();
                pushThs.Join();
                log.Close();
                tjisThs.Abort();
            } else {
                try {
                    log = log << "无法多线程下载,开始单线程下载。";
                    new Download(URL,Path,ref log);
                } catch (Exception ex) {
                    log = log << new Exception("在单线程下载:",ex);
                }
            }
        } catch (Exception ex) {
            Stop = true;
            log = log << new Exception("在主控制线程:",ex);
            log = log << "将在单线程下载中重试";
            new Download(URL,Path,ref log);
        }
    }

    private struct DownloadInfo {
        public long Start = 0;
        public long End   = 0;
        public int  ID    = 0;

        public DownloadInfo() {

        }
    }
}
  • 写回答

2条回答 默认 最新

  • wanghui0380 2023-07-17 16:58
    关注
    /nuget Microsoft.Extensions.Http(他本身就包含Microsoft.Extensions.Logging日志)
        //nuget Microsoft.Extensions.DependencyInjection
        internal class Program
        {
            private static IHttpClientFactory httpClientFactory;
    
            static async Task Main(string[] args)
            {
    
                var serviceProvider = new ServiceCollection()
                    .AddHttpClient()
                    .BuildServiceProvider();
    
                httpClientFactory = serviceProvider.GetService<System.Net.Http.IHttpClientFactory>();
    
                await downloadfile(new Uri("http://127.0.0.1:5500/a.exe"));
    
                Console.ReadKey();
            }
    
            public static async Task downloadfile(Uri uri, int blocksize = 4 * 1024 * 1024)
            {
    
    
                var test = await CheckUriCanRange(uri);
                if (test.canRange)
                {
                    var windowcount = (test.totalsize + blocksize - 1) / blocksize;
                    System.Net.Http.Headers.RangeHeaderValue[] ranges = new System.Net.Http.Headers.RangeHeaderValue[windowcount];
                    for (int i = 0; i < windowcount - 1; i++)
                    {
                        ranges[i] = new System.Net.Http.Headers.RangeHeaderValue(i * blocksize, i * blocksize + blocksize - 1);
                    }
    
                    ranges[windowcount - 1] = new RangeHeaderValue((windowcount - 1) * blocksize, null);
    
                    var task = ranges.Select((r, index) =>
                    {
    
                        var filename = $"d_{index}.temp";
                        return downloadfile0(uri, filename, r);
                    });
    
                    await Task.WhenAll(task);
    
                    //合并部分我就随便写了,反正demo只是表明个意思
    
                    FileInfo save = new FileInfo("download.t");
                    using (FileStream fs = save.OpenWrite())
                    {
                        for (int i = 0; i < windowcount; i++)
                        {
                            using (var fs2=new FileInfo($"d_{i}.temp").OpenRead())
                            {
                                await fs2.CopyToAsync(fs);
                            }
                        }
                    }
                }
                else
                {
                    //不支持断点续传的我就不写了
                }
            }
    
            private static SemaphoreSlim slim = new SemaphoreSlim(5);
            public static async Task downloadfile0(Uri uri, string filename,
                System.Net.Http.Headers.RangeHeaderValue range = null)
            {
                await slim.WaitAsync();
                try
                {
                    using (var httpClient = httpClientFactory.CreateClient())
                    {
                        if (range != null)
                        {
                            httpClient.DefaultRequestHeaders.Range = range;
                        }
    
                        using (HttpResponseMessage response =
                               await httpClient.GetAsync(uri, HttpCompletionOption.ResponseHeadersRead))
                        {
                            response.EnsureSuccessStatusCode();
                            using (FileStream fs = new FileInfo(filename).OpenWrite())
                            {
                                await response.Content.CopyToAsync(fs);
                            }
                        }
                    }
                }
                finally
                {
                    slim.Release();
                }
    
            }
            public static async Task<(bool canRange, long totalsize)> CheckUriCanRange(Uri uri)
            {
                long totalsize = 0;
                //demo,异常处理不写了,自己完成
                using (var httpClient = httpClientFactory.CreateClient())
                {
                    httpClient.DefaultRequestHeaders.Range = new System.Net.Http.Headers.RangeHeaderValue(2, null);
    
                    //发起head请求,获取文件总大小
                    using (HttpResponseMessage headResponse =
                           await httpClient.SendAsync(new HttpRequestMessage(HttpMethod.Head, uri)))
                    {
                        // 获取文件总大小
                        totalsize = headResponse.Content.Headers.ContentLength ?? 0;
                        //发起一个get请求测试是否支持断点续传
                        using (HttpResponseMessage response =
                               await httpClient.GetAsync(uri, HttpCompletionOption.ResponseHeadersRead))
                        {
                            if (response.StatusCode == System.Net.HttpStatusCode.PartialContent)
                            {
                                return (true, totalsize);
                            }
                            else
                            {
                                return (false, totalsize);
                            }
                        }
                    }
                }
            }
    
        }
    }
    
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

问题事件

  • 系统已结题 7月29日
  • 已采纳回答 7月21日
  • 创建了问题 7月16日

悬赏问题

  • ¥500 把面具戴到人脸上,请大家贡献智慧
  • ¥15 任意一个散点图自己下载其js脚本文件并做成独立的案例页面,不要作在线的,要离线状态。
  • ¥15 各位 帮我看看如何写代码,打出来的图形要和如下图呈现的一样,急
  • ¥30 c#打开word开启修订并实时显示批注
  • ¥15 如何解决ldsc的这条报错/index error
  • ¥15 VS2022+WDK驱动开发环境
  • ¥30 关于#java#的问题,请各位专家解答!
  • ¥30 vue+element根据数据循环生成多个table,如何实现最后一列 平均分合并
  • ¥20 pcf8563时钟芯片不启振
  • ¥20 pip2.40更新pip2.43时报错