写代码的韦哥
2021-10-09 19:26
采纳率: 100%
浏览 548

C# 多线程怎么分配任务?

网页爬虫,用户输入一批网址加入未爬取队列,多线程进行爬取。提取网页里的链接又加入到未爬取队列。循环往复。那我的运行逻辑是使用并发集合ConcurrentQueue,开启线程循环取任务,访问网址提取源码里的A标签链接再加入到未爬取队列。 以下是我的代码框架,我想知道有没有更好的方法。

class Spider
    {
        //未爬取队列
        private static ConcurrentQueue<string> _queue = new ConcurrentQueue<string>() ;

        /// <summary>
        /// 设置初始化未爬取的链接
        /// </summary>
        public void SetLink(string[] m_link)
        {
            foreach (var item in m_link)
            {
                _queue.Enqueue(item);
            }
        }

        /// <summary>
        /// 启动爬行
        /// </summary>
        public void Start()
        {
            //启动50个线程循环取任务
            for (int i = 0; i < 50; i++)
            {
                Task.Factory.StartNew(() =>
                {
                    while (true)
                    {
                        string link;

                        if (_queue.TryDequeue(out link))
                        {
                            GetLink(link);
                        }

                        Thread.Sleep(1);
                    }

                },TaskCreationOptions.LongRunning);
            }
        }

        //访问链接进行提取
        private void GetLink(string link)
        {
            string html = httpRequest(link);//获取源码后进行提取
            

            //...然后把提取到链接又加回到 _queue里
        }
    }

  • 好问题 提建议
  • 收藏

5条回答 默认 最新

  • yayaxxww 2021-10-12 11:29
    已采纳

    这里可以通过多线程并发来处理队列,且可以控制处理线程的数量,以下是代码实例(缺少重复Uri判断,请自行通过缓存已处理的Uri判断处理)

    
        /// <summary>
        /// 请求队列
        /// </summary>
        public class RequestQueue
        {
            #region Constructors
            /// <summary>
            /// 构造函数
            /// </summary>
            /// <param name="minThreadCount">最小线程数量</param>
            /// <param name="maxThreadCount">最大线程数量</param>
            /// <param name="maxRequestCount">最大请求数量</param>
            public RequestQueue(int minThreadCount, int maxThreadCount, int maxRequestCount)
            {
                m_queue = new Queue<Uri>();
                m_minThreadCount = minThreadCount;
                m_maxThreadCount = maxThreadCount;
                m_maxRequestCount = maxRequestCount;
                m_totalThreadCount = 0;
                m_activeThreadCount = 0;
                m_stopped = false;
            }
            #endregion
    
            #region IDisposable Members
            /// <summary>
            /// 释放资源
            /// </summary>
            public void Dispose()
            {
                Dispose(true);
            }
    
            /// <summary>
            /// 释放资源
            /// </summary>
            protected virtual void Dispose(bool disposing)
            {
                if (disposing)
                {
                    lock (m_lock)
                    {
                        m_stopped = true;
    
                        Monitor.PulseAll(m_lock);
    
                        m_queue.Clear();
                    }
                }
            }
            #endregion
    
            #region Public Members
            /// <summary>
            /// 将处理请求加入计划队列
            /// </summary>
            /// <param name="request">请求处理的Uri</param>
            public void ScheduleIncomingRequest(Uri request)
            {
                bool tooManyRequests = false;
    
                // 加入队列
                lock (m_lock)
                {
                    // 检查请求数量
                    if (m_stopped || m_queue.Count >= m_maxRequestCount)
                    {
                        tooManyRequests = true;
                    }
                    else
                    {
                        m_queue.Enqueue(request);
    
                        // 如果有空闲线程,则唤醒空闲线程去处理
                        if (m_activeThreadCount < m_totalThreadCount)
                        {
                            Monitor.Pulse(m_lock);
                        }
                        // 如果没有空闲线程,且线程总数没有超过最大线程数量,则开启一个新线程去处理
                        else if (m_totalThreadCount < m_maxThreadCount)
                        {
                            Thread thread = new Thread(OnProcessRequestQueue);
                            thread.IsBackground = true;
                            thread.Start(null);
                            m_totalThreadCount++;
                            m_activeThreadCount++;  // 增加一个活动线程
    
                            LogHelper.Info("开启新线程: " + Thread.CurrentThread.ManagedThreadId + ",当前线程总数: " + m_totalThreadCount + ",活动线程数量" + m_activeThreadCount);
                        }
                    }
                }
    
                if (tooManyRequests)
                {
                    throw new Exception("请求数量过多,请稍后再提交请求");
                }
            }
            #endregion
    
            #region Private Methods
            /// <summary>
            /// 处理请求队列
            /// </summary>
            private void OnProcessRequestQueue()
            {
                lock (m_lock)
                {
                    while (true)
                    {
                        // 检查队列是否为空
                        while (m_queue.Count == 0)
                        {
                            m_activeThreadCount--;
    
                            // 等待请求, 如果超时没有获取到新的请求,且线程数量大于最小线程数量,就释放该线程
                            if (m_stopped || (!Monitor.Wait(m_lock, 30000) && m_totalThreadCount > m_minThreadCount))
                            {
                                m_totalThreadCount--;
    
                                LogHelper.Info("终止线程: " + Thread.CurrentThread.ManagedThreadId + ",当前线程总数: " + m_totalThreadCount + ",活动线程数量" + m_activeThreadCount);
    
                                return; // 跳出,终止线程
                            }
    
                            m_activeThreadCount++;
                        }
    
                        Uri request = m_queue.Dequeue();
    
                        Monitor.Exit(m_lock);
    
                        try
                        {
                            // 这里调用爬虫方法,对Uri进行处理
                            // GetLink(Uri)
                        }
                        catch (Exception ex)
                        {
                            LogHelper.Error(ex, "处理Uri发生未知错误");
                        }
                        finally
                        {
                            Monitor.Enter(m_lock);
                        }
                    }
                }
            }
            #endregion
    
            #region Private Fields
            private object m_lock = new object();
            private Queue<Uri> m_queue;
            private int m_totalThreadCount;
            private int m_activeThreadCount;
            private int m_maxThreadCount;
            private int m_minThreadCount;
            private int m_maxRequestCount;
            private bool m_stopped;
            #endregion
        }
    
    已采纳该答案
    评论
    解决 无用
    打赏 举报
  • 肖Cat 2021-10-09 21:13

    建议使用BurpSuit进行爬网,不仅能爬取所有的链接,还能检测漏洞

    评论
    解决 无用
    打赏 举报
  • dark9spring 2021-10-10 12:45

    递归开启子线程,爬到没有链接继续时就停止。
    用task1.continuewith(task2)比较方便,爬出来方便链接分组

    评论
    解决 无用
    打赏 举报
  • wanghui0380 2021-10-10 18:00

    东西是这么个东西,你写的没有啥问题。不过在数据结构选择上。 BlockingCollection 代码更直观点

    当然这玩意目前是个粗胚,还有很多东西需要处理
    1.无论是这个队列,还是 BlockingCollection,你那50个task里需要处理try,不然出现异常后task自己就结束了
    2 你目前缺少一些已采集过的判定,这个理论上用redis更好,量不大的情况用内存级的布隆过滤器也行,量大的情况请考虑bloom过滤器+redis去过滤掉已经采集过的地址,避免重复加入相同地址的任务
    3.小心处理内存,并行开发的情况。应该使用限流手段。比如你这个50并行控制就不错

    评论
    解决 无用
    打赏 举报
  • bidisty 2021-10-13 15:35

    你可以使用生产消费模型来处理,一个生产网址,一个爬取网址。使用task来处理,不用考虑并发处理。

    评论
    解决 无用
    打赏 举报

相关推荐 更多相似问题