这里可以通过多线程并发来处理队列,且可以控制处理线程的数量,以下是代码实例(缺少重复Uri判断,请自行通过缓存已处理的Uri判断处理)
/// <summary>
/// 请求队列
/// </summary>
public class RequestQueue
{
#region Constructors
/// <summary>
/// 构造函数
/// </summary>
/// <param name="minThreadCount">最小线程数量</param>
/// <param name="maxThreadCount">最大线程数量</param>
/// <param name="maxRequestCount">最大请求数量</param>
public RequestQueue(int minThreadCount, int maxThreadCount, int maxRequestCount)
{
m_queue = new Queue<Uri>();
m_minThreadCount = minThreadCount;
m_maxThreadCount = maxThreadCount;
m_maxRequestCount = maxRequestCount;
m_totalThreadCount = 0;
m_activeThreadCount = 0;
m_stopped = false;
}
#endregion
#region IDisposable Members
/// <summary>
/// 释放资源
/// </summary>
public void Dispose()
{
Dispose(true);
}
/// <summary>
/// 释放资源
/// </summary>
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
lock (m_lock)
{
m_stopped = true;
Monitor.PulseAll(m_lock);
m_queue.Clear();
}
}
}
#endregion
#region Public Members
/// <summary>
/// 将处理请求加入计划队列
/// </summary>
/// <param name="request">请求处理的Uri</param>
public void ScheduleIncomingRequest(Uri request)
{
bool tooManyRequests = false;
// 加入队列
lock (m_lock)
{
// 检查请求数量
if (m_stopped || m_queue.Count >= m_maxRequestCount)
{
tooManyRequests = true;
}
else
{
m_queue.Enqueue(request);
// 如果有空闲线程,则唤醒空闲线程去处理
if (m_activeThreadCount < m_totalThreadCount)
{
Monitor.Pulse(m_lock);
}
// 如果没有空闲线程,且线程总数没有超过最大线程数量,则开启一个新线程去处理
else if (m_totalThreadCount < m_maxThreadCount)
{
Thread thread = new Thread(OnProcessRequestQueue);
thread.IsBackground = true;
thread.Start(null);
m_totalThreadCount++;
m_activeThreadCount++; // 增加一个活动线程
LogHelper.Info("开启新线程: " + Thread.CurrentThread.ManagedThreadId + ",当前线程总数: " + m_totalThreadCount + ",活动线程数量" + m_activeThreadCount);
}
}
}
if (tooManyRequests)
{
throw new Exception("请求数量过多,请稍后再提交请求");
}
}
#endregion
#region Private Methods
/// <summary>
/// 处理请求队列
/// </summary>
private void OnProcessRequestQueue()
{
lock (m_lock)
{
while (true)
{
// 检查队列是否为空
while (m_queue.Count == 0)
{
m_activeThreadCount--;
// 等待请求, 如果超时没有获取到新的请求,且线程数量大于最小线程数量,就释放该线程
if (m_stopped || (!Monitor.Wait(m_lock, 30000) && m_totalThreadCount > m_minThreadCount))
{
m_totalThreadCount--;
LogHelper.Info("终止线程: " + Thread.CurrentThread.ManagedThreadId + ",当前线程总数: " + m_totalThreadCount + ",活动线程数量" + m_activeThreadCount);
return; // 跳出,终止线程
}
m_activeThreadCount++;
}
Uri request = m_queue.Dequeue();
Monitor.Exit(m_lock);
try
{
// 这里调用爬虫方法,对Uri进行处理
// GetLink(Uri)
}
catch (Exception ex)
{
LogHelper.Error(ex, "处理Uri发生未知错误");
}
finally
{
Monitor.Enter(m_lock);
}
}
}
}
#endregion
#region Private Fields
private object m_lock = new object();
private Queue<Uri> m_queue;
private int m_totalThreadCount;
private int m_activeThreadCount;
private int m_maxThreadCount;
private int m_minThreadCount;
private int m_maxRequestCount;
private bool m_stopped;
#endregion
}