写代码的伟哥 2021-10-09 19:26 采纳率: 50%
浏览 586
已结题

C# 多线程怎么分配任务?

网页爬虫,用户输入一批网址加入未爬取队列,多线程进行爬取。提取网页里的链接又加入到未爬取队列。循环往复。那我的运行逻辑是使用并发集合ConcurrentQueue,开启线程循环取任务,访问网址提取源码里的A标签链接再加入到未爬取队列。 以下是我的代码框架,我想知道有没有更好的方法。

class Spider
    {
        //未爬取队列
        private static ConcurrentQueue<string> _queue = new ConcurrentQueue<string>() ;

        /// <summary>
        /// 设置初始化未爬取的链接
        /// </summary>
        public void SetLink(string[] m_link)
        {
            foreach (var item in m_link)
            {
                _queue.Enqueue(item);
            }
        }

        /// <summary>
        /// 启动爬行
        /// </summary>
        public void Start()
        {
            //启动50个线程循环取任务
            for (int i = 0; i < 50; i++)
            {
                Task.Factory.StartNew(() =>
                {
                    while (true)
                    {
                        string link;

                        if (_queue.TryDequeue(out link))
                        {
                            GetLink(link);
                        }

                        Thread.Sleep(1);
                    }

                },TaskCreationOptions.LongRunning);
            }
        }

        //访问链接进行提取
        private void GetLink(string link)
        {
            string html = httpRequest(link);//获取源码后进行提取
            

            //...然后把提取到链接又加回到 _queue里
        }
    }

  • 写回答

5条回答 默认 最新

  • yayaxxww 2021-10-12 11:29
    关注

    这里可以通过多线程并发来处理队列,且可以控制处理线程的数量,以下是代码实例(缺少重复Uri判断,请自行通过缓存已处理的Uri判断处理)

    
        /// <summary>
        /// 请求队列
        /// </summary>
        public class RequestQueue
        {
            #region Constructors
            /// <summary>
            /// 构造函数
            /// </summary>
            /// <param name="minThreadCount">最小线程数量</param>
            /// <param name="maxThreadCount">最大线程数量</param>
            /// <param name="maxRequestCount">最大请求数量</param>
            public RequestQueue(int minThreadCount, int maxThreadCount, int maxRequestCount)
            {
                m_queue = new Queue<Uri>();
                m_minThreadCount = minThreadCount;
                m_maxThreadCount = maxThreadCount;
                m_maxRequestCount = maxRequestCount;
                m_totalThreadCount = 0;
                m_activeThreadCount = 0;
                m_stopped = false;
            }
            #endregion
    
            #region IDisposable Members
            /// <summary>
            /// 释放资源
            /// </summary>
            public void Dispose()
            {
                Dispose(true);
            }
    
            /// <summary>
            /// 释放资源
            /// </summary>
            protected virtual void Dispose(bool disposing)
            {
                if (disposing)
                {
                    lock (m_lock)
                    {
                        m_stopped = true;
    
                        Monitor.PulseAll(m_lock);
    
                        m_queue.Clear();
                    }
                }
            }
            #endregion
    
            #region Public Members
            /// <summary>
            /// 将处理请求加入计划队列
            /// </summary>
            /// <param name="request">请求处理的Uri</param>
            public void ScheduleIncomingRequest(Uri request)
            {
                bool tooManyRequests = false;
    
                // 加入队列
                lock (m_lock)
                {
                    // 检查请求数量
                    if (m_stopped || m_queue.Count >= m_maxRequestCount)
                    {
                        tooManyRequests = true;
                    }
                    else
                    {
                        m_queue.Enqueue(request);
    
                        // 如果有空闲线程,则唤醒空闲线程去处理
                        if (m_activeThreadCount < m_totalThreadCount)
                        {
                            Monitor.Pulse(m_lock);
                        }
                        // 如果没有空闲线程,且线程总数没有超过最大线程数量,则开启一个新线程去处理
                        else if (m_totalThreadCount < m_maxThreadCount)
                        {
                            Thread thread = new Thread(OnProcessRequestQueue);
                            thread.IsBackground = true;
                            thread.Start(null);
                            m_totalThreadCount++;
                            m_activeThreadCount++;  // 增加一个活动线程
    
                            LogHelper.Info("开启新线程: " + Thread.CurrentThread.ManagedThreadId + ",当前线程总数: " + m_totalThreadCount + ",活动线程数量" + m_activeThreadCount);
                        }
                    }
                }
    
                if (tooManyRequests)
                {
                    throw new Exception("请求数量过多,请稍后再提交请求");
                }
            }
            #endregion
    
            #region Private Methods
            /// <summary>
            /// 处理请求队列
            /// </summary>
            private void OnProcessRequestQueue()
            {
                lock (m_lock)
                {
                    while (true)
                    {
                        // 检查队列是否为空
                        while (m_queue.Count == 0)
                        {
                            m_activeThreadCount--;
    
                            // 等待请求, 如果超时没有获取到新的请求,且线程数量大于最小线程数量,就释放该线程
                            if (m_stopped || (!Monitor.Wait(m_lock, 30000) && m_totalThreadCount > m_minThreadCount))
                            {
                                m_totalThreadCount--;
    
                                LogHelper.Info("终止线程: " + Thread.CurrentThread.ManagedThreadId + ",当前线程总数: " + m_totalThreadCount + ",活动线程数量" + m_activeThreadCount);
    
                                return; // 跳出,终止线程
                            }
    
                            m_activeThreadCount++;
                        }
    
                        Uri request = m_queue.Dequeue();
    
                        Monitor.Exit(m_lock);
    
                        try
                        {
                            // 这里调用爬虫方法,对Uri进行处理
                            // GetLink(Uri)
                        }
                        catch (Exception ex)
                        {
                            LogHelper.Error(ex, "处理Uri发生未知错误");
                        }
                        finally
                        {
                            Monitor.Enter(m_lock);
                        }
                    }
                }
            }
            #endregion
    
            #region Private Fields
            private object m_lock = new object();
            private Queue<Uri> m_queue;
            private int m_totalThreadCount;
            private int m_activeThreadCount;
            private int m_maxThreadCount;
            private int m_minThreadCount;
            private int m_maxRequestCount;
            private bool m_stopped;
            #endregion
        }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(4条)

报告相同问题?

问题事件

  • 系统已结题 10月21日
  • 已采纳回答 10月13日
  • 创建了问题 10月9日

悬赏问题

  • ¥15 素材场景中光线烘焙后灯光失效
  • ¥15 请教一下各位,为什么我这个没有实现模拟点击
  • ¥15 执行 virtuoso 命令后,界面没有,cadence 启动不起来
  • ¥50 comfyui下连接animatediff节点生成视频质量非常差的原因
  • ¥20 有关区间dp的问题求解
  • ¥15 多电路系统共用电源的串扰问题
  • ¥15 slam rangenet++配置
  • ¥15 有没有研究水声通信方面的帮我改俩matlab代码
  • ¥15 ubuntu子系统密码忘记
  • ¥15 保护模式-系统加载-段寄存器