dtt83024 2015-04-07 12:37
浏览 63
已采纳

Go中的Webcrawler

I'm trying to build a web crawler in Go where I would like to specify the max number of concurrent workers. They will all be working as long as there are link to explore in the queue. When the queue has less elements than workers, workers should shout down, but resume in case more links are found.

The code I have tried is

const max_workers = 6
// simulating links with int
func crawl(wg *sync.WaitGroup, queue chan int) {
    for element := range queue {   
        wg.Done() // why is defer here causing a deadlock?
        fmt.Println("adding 2 new elements ")
        if element%2 == 0 {
            wg.Add(2)
            queue <- (element*100 + 11)
            queue <- (element*100 + 33)
        }

    }
}

func main() {
    var wg sync.WaitGroup
    queue := make(chan int, 10)
    queue <- 0
    queue <- 1
    queue <- 2
    queue <- 3
    var min int
    if (len(queue) < max_workers) {
        min = len(queue)
    } else {
        min = max_workers
    }
    for i := 0; i < min; i++ {
        wg.Add(1)
        go crawl(&wg, queue)
    }
    wg.Wait()
    close(queue)
}

Link to playground

This seems to work, but there is a catch: I have to fill the queue with more than one element when I start. I would like it to start from a (single) seed page (in my example queue <- 0) and then grow / shrink the working pool dynamically.

My questions are:

  • how can I obtain behavior?

  • why is defer wg.Done() causing deadlock? It's normal to wg.Done() the function when it is actually completed? I think that without the defer the goroutine is not waiting for the other part to finish (which could take longer in the real work example of parsing HTML).

  • 写回答

1条回答 默认 最新

  • douhao5280 2015-04-07 18:07
    关注

    If you use your favorite web search for "Go web crawler" (or "Golang web crawler") you'll find many examples including: Go Tour Exercise: Web Crawler. There are also some talks on concurrency in Go that cover this kind of thing.1

    The "standard" way to do this in Go does not need to involve wait groups at all. To answer one of your questions, things queued with defer only get run when the function returns. You have a long running function so do not use defer in such a loop.

    The "standard" way is to start up however many workers you want in their own goroutines. They all read "jobs" from the same channel, blocking if/when there is nothing to do. When fully done that channel is closed and they all exit.

    In the case of something like a crawler the workers will discover more "jobs" to do and want to enqueue them. You don't want them writing back to the same channel since it will have some limited amount of buffering (or none!) and you'll eventually block all the workers trying to enqueue more jobs!

    A simple solution to this is to use a separate channel (e.g. each worker has in <-chan Job, out chan<- Job) and a single queue/filter goroutine that reads these requests, appends them onto a slice that it either lets grow arbitrarily large or does some global limiting on, and also feeds the other channel from the head of the slice (i.e. a simple for-select loop reading from one channel and writing to the other). This code is also usually responsible for keeping track of what has been already done (e.g. a map of URLs visited) and drops incoming requests for duplicates.

    The queue goroutine might look something like this (argument names excessively verbose here):

    type Job string
    
    func queue(toWorkers chan<- Job, fromWorkers <-chan Job) {
        var list []Job
        done := make(map[Job]bool)
        for {
            var send chan<- Job
            var item Job
            if len(list) > 0 {
                send = toWorkers
                item = list[0]
            }
            select {
            case send <- item:
                // We sent an item, remove it
                list = list[1:]
            case thing := <-fromWorkers:
                // Got a new thing
                if !done[thing] {
                    list = append(list, thing)
                    done[thing] = true
                }
            }
        }
    }
    

    A few things are glossed over in this simple example. Such as termination. And if "Jobs" is some larger structure where you'd want to use chan *Job and []*Job instead. In that case you'd also need to change the map type to some some key you extract from the job (e.g. Job.URL perhaps) and you'd want to do list[0] = nil before list = list[1:] to get rid of the reference to *Job pointer and let the garbage collector at it earlier.

    Edit: Some notes on terminating cleanly.

    There are several ways to terminate code like the above cleanly. A wait group could be used, but the placement of the Add/Done calls needs to be done carefully and you'd probably need another goroutine to do the Wait (and then close one of the channels to start the shutdown). The workers shouldn't close their output channel since there are multiple workers and you can't close a channel more than once; the queue goroutine can't tell when to close it's channel to the workers without knowing when the workers are done.

    In the past when I've used code very similar to the above I used a local "outstanding" counter within the "queue" goroutine (which avoids any need for a mutex or any synchronization overhead that a wait group has). The count of outstanding jobs is increased when a job is sent to a worker. It's decreased again when the worker says it's finished with it. My code happened to have another channel for this (my "queue" was also collecting results in addition to further nodes to enqueue). It's probably cleaner on it's own channel, but instead a special value on the existing channel (e.g. a nil Job pointer) could be used. At any rate, with such a counter, the existing length check on the local list just needs to see there is nothing outstanding when the list is empty and it's time to terminate; just shutdown the channel to the workers and return. e.g.: 

        if len(list) > 0 {
            send = toWorkers
            item = list[0]
        } else if outstandingJobs == 0 {
            close(toWorkers)
            return
        }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 MATLAB怎么通过柱坐标变换画开口是圆形的旋转抛物面?
  • ¥15 寻一个支付宝扫码远程授权登录的软件助手app
  • ¥15 解riccati方程组
  • ¥15 display:none;样式在嵌套结构中的已设置了display样式的元素上不起作用?
  • ¥30 用arduino开发esp32控制ps2手柄一直报错
  • ¥15 使用rabbitMQ 消息队列作为url源进行多线程爬取时,总有几个url没有处理的问题。
  • ¥15 Ubuntu在安装序列比对软件STAR时出现报错如何解决
  • ¥50 树莓派安卓APK系统签名
  • ¥65 汇编语言除法溢出问题
  • ¥15 Visual Studio问题