duanqie5741
duanqie5741
2012-11-04 09:49

去旅游爬虫运动麻烦

I'm going through the go tour and I feel like I have a pretty good understanding of the language except for concurrency.

On slide 72 there is an exercise that asks the reader to parallelize a web crawler (and to make it not cover repeats but I haven't gotten there yet.)

Here is what I have so far:

func Crawl(url string, depth int, fetcher Fetcher, ch chan string) {
    if depth <= 0 {
        return
    }

    body, urls, err := fetcher.Fetch(url)
    if err != nil {
        ch <- fmt.Sprintln(err)
        return
    }

    ch <- fmt.Sprintf("found: %s %q
", url, body)
    for _, u := range urls {
        go Crawl(u, depth-1, fetcher, ch)
    }
}

func main() {
    ch := make(chan string, 100)
    go Crawl("http://golang.org/", 4, fetcher, ch)

    for i := range ch {
        fmt.Println(i)
    }
}

The issue I have is where to put the close(ch) call. If I put a defer close(ch) somewhere in the Crawl method, then I end up writing to a closed channel in one of the spawned goroutines, since the method will finish execution before the spawned goroutines do.

If I omit the call to close(ch), as is shown in my example code, the program deadlocks after all the goroutines finish executing but the main thread is still waiting on the channel in the for loop since the channel was never closed.

  • 点赞
  • 写回答
  • 关注问题
  • 收藏
  • 复制链接分享
  • 邀请回答

11条回答

  • doujiku1028 doujiku1028 9年前

    A look at the Parallelization section of Effective Go leads to ideas for the solution. Essentually you have to close the channel on each return route of the function. Actually this is a nice use case of the defer statement:

    func Crawl(url string, depth int, fetcher Fetcher, ret chan string) {
        defer close(ret)
        if depth <= 0 {
            return
        }
    
        body, urls, err := fetcher.Fetch(url)
        if err != nil {
            ret <- err.Error()
            return
        }
    
        ret <- fmt.Sprintf("found: %s %q", url, body)
    
        result := make([]chan string, len(urls))
        for i, u := range urls {
            result[i] = make(chan string)
            go Crawl(u, depth-1, fetcher, result[i])
        }
    
        for i := range result {
            for s := range result[i] {
                ret <- s
            }
        }
    
        return
    }
    
    func main() {
        result := make(chan string)
        go Crawl("http://golang.org/", 4, fetcher, result)
    
        for s := range result {
            fmt.Println(s)
        }
    }
    

    The essential difference to your code is that every instance of Crawl gets its own return channel and the caller function collects the results in its return channel.

    点赞 评论 复制链接分享
  • doumei3828 doumei3828 2年前

    Below is my solution. Except the global map, I only had to change the contents of Crawl. Like other solutions, I used sync.Map and sync.WaitGroup. I've blocked out the important parts.

    var m sync.Map
    
    // Crawl uses fetcher to recursively crawl
    // pages starting with url, to a maximum of depth.
    func Crawl(url string, depth int, fetcher Fetcher) {
        // This implementation doesn't do either:
        if depth <= 0 {
            return
        }
        // Don't fetch the same URL twice.
        /////////////////////////////////////
        _, ok := m.LoadOrStore(url, url)   //
        if ok {                            //
            return                         //
        }                                  //
        /////////////////////////////////////
        body, urls, err := fetcher.Fetch(url)
        if err != nil {
            fmt.Println(err)
            return
        }
        fmt.Printf("found: %s %q
    ", url, body)
        // Fetch URLs in parallel.
        /////////////////////////////////////
        var wg sync.WaitGroup              //
        defer wg.Wait()                    //
        for _, u := range urls {           //
            wg.Add(1)                      //
            go func(u string) {            //
                defer wg.Done()            //
                Crawl(u, depth-1, fetcher) //
            }(u)                           //
        }                                  //
        /////////////////////////////////////
        return
    }
    
    点赞 评论 复制链接分享
  • dongxianghui3709 dongxianghui3709 2年前

    Similar idea to the accepted answer, but with no duplicate URLs fetched, and printing directly to console. defer() is not used either. We use channels to signal when goroutines complete. The SafeMap idea is lifted off the SafeCounter given previously in the tour.

    For the child goroutines, we create an array of channels, and wait until every child returns, by waiting on the channel.

    package main
    
    import (
        "fmt"
        "sync"
    )
    
    // SafeMap is safe to use concurrently.
    type SafeMap struct {
        v   map[string] bool
        mux sync.Mutex
    }
    
    // SetVal sets the value for the given key.
    func (m *SafeMap) SetVal(key string, val bool) {
        m.mux.Lock()
        // Lock so only one goroutine at a time can access the map c.v.
        m.v[key] = val
        m.mux.Unlock()
    }
    
    // Value returns the current value of the counter for the given key.
    func (m *SafeMap) GetVal(key string) bool {
        m.mux.Lock()
        // Lock so only one goroutine at a time can access the map c.v.
        defer m.mux.Unlock()
        return m.v[key]
    }
    
    type Fetcher interface {
        // Fetch returns the body of URL and
        // a slice of URLs found on that page.
        Fetch(url string) (body string, urls []string, err error)
    }
    
    // Crawl uses fetcher to recursively crawl
    // pages starting with url, to a maximum of depth.
    func Crawl(url string, depth int, fetcher Fetcher, status chan bool, urlMap SafeMap) {
    
        // Check if we fetched this url previously.
        if ok := urlMap.GetVal(url); ok {
            //fmt.Println("Already fetched url!")
            status <- true
            return 
        }
    
        // Marking this url as fetched already.
        urlMap.SetVal(url, true)
    
        if depth <= 0 {
            status <- false
            return
        }
    
        body, urls, err := fetcher.Fetch(url)
        if err != nil {
            fmt.Println(err)
            status <- false
            return
        }
    
        fmt.Printf("found: %s %q
    ", url, body)
    
        statuses := make ([]chan bool, len(urls)) 
        for index, u := range urls {
            statuses[index] = make (chan bool)
            go Crawl(u, depth-1, fetcher, statuses[index], urlMap)
        }
    
        // Wait for child goroutines.
        for _, childstatus := range(statuses) {
            <- childstatus
        }
    
        // And now this goroutine can finish.
        status <- true
    
        return
    }
    
    func main() {
        urlMap := SafeMap{v: make(map[string] bool)}
        status := make(chan bool)
        go Crawl("https://golang.org/", 4, fetcher, status, urlMap)
        <- status
    }
    
    
    点赞 评论 复制链接分享
  • doucong1268 doucong1268 2年前

    Below is a simple solution for parallelization using only sync waitGroup.

    var fetchedUrlMap = make(map[string]bool)
    var mutex sync.Mutex
    
    func Crawl(url string, depth int, fetcher Fetcher) {
        //fmt.Println("In Crawl2 with url" , url)
        if _, ok := fetchedUrlMap[url]; ok {
            return
        }
    
        if depth <= 0 {
            return
        }
        body, urls, err := fetcher.Fetch(url)
        mutex.Lock()
        fetchedUrlMap[url] = true
        mutex.Unlock()
        if err != nil {
            fmt.Println(err)
            return
        }
        fmt.Printf("found: %s %q
    ", url, body)
    
        var wg sync.WaitGroup
        for _, u := range urls {
            //  fmt.Println("Solving for ", u)
            wg.Add(1)
            go func(uv string) {
                Crawl(uv, depth-1, fetcher)
                wg.Done()
            }(u)
        }
        wg.Wait()
    }

    </div>
    
    点赞 评论 复制链接分享
  • douzhong3038 douzhong3038 4年前

    Here's my solution, using sync.WaitGroup and a SafeCache of fetched urls:

    package main
    
    import (
        "fmt"
        "sync"
    )
    
    type Fetcher interface {
        // Fetch returns the body of URL and
        // a slice of URLs found on that page.
        Fetch(url string) (body string, urls []string, err error)
    }
    
    // Safe to use concurrently
    type SafeCache struct {
        fetched map[string]string
        mux     sync.Mutex
    }
    
    func (c *SafeCache) Add(url, body string) {
        c.mux.Lock()
        defer c.mux.Unlock()
    
        if _, ok := c.fetched[url]; !ok {
            c.fetched[url] = body
        }
    }
    
    func (c *SafeCache) Contains(url string) bool {
        c.mux.Lock()
        defer c.mux.Unlock()
    
        _, ok := c.fetched[url]
        return ok
    }
    
    // Crawl uses fetcher to recursively crawl
    // pages starting with url, to a maximum of depth.
    func Crawl(url string, depth int, fetcher Fetcher, cache SafeCache,
        wg *sync.WaitGroup) {
    
        defer wg.Done()
        if depth <= 0 {
            return
        }
        body, urls, err := fetcher.Fetch(url)
        if err != nil {
            fmt.Println(err)
            return
        }
        fmt.Printf("found: %s %q
    ", url, body)
        cache.Add(url, body)
        for _, u := range urls {
            if !cache.Contains(u) {
                wg.Add(1)
                go Crawl(u, depth-1, fetcher, cache, wg)
            }
        }
        return
    }
    
    func main() {
        cache := SafeCache{fetched: make(map[string]string)}
        var wg sync.WaitGroup
    
        wg.Add(1)
        Crawl("http://golang.org/", 4, fetcher, cache, &wg)
        wg.Wait()
    }
    
    点赞 评论 复制链接分享
  • dongmu6578 dongmu6578 5年前

    I use slice to avoid crawl the url twice,the recursive version without the concurrency is ok, but not sure about this concurrency version.

    func Crawl(url string, depth int, fetcher Fetcher) {
        var str_arrs []string
        var mux sync.Mutex
    
        var crawl func(string, int)
        crawl = func(url string, depth int) {
            if depth <= 0 {
                return
            }
    
            mux.Lock()
            for _, v := range str_arrs {
                if url == v {
                    mux.Unlock()
                    return
                }
            }
            str_arrs = append(str_arrs, url)
            mux.Unlock()
    
            body, urls, err := fetcher.Fetch(url)
            if err != nil {
                fmt.Println(err)
                return
            }
            fmt.Printf("found: %s %q
    ", url, body)
            for _, u := range urls {
                go crawl(u, depth-1) // could delete “go” then it is recursive
            }
        }
    
        crawl(url, depth)
        return
    }
    
    func main() {
        Crawl("http://golang.org/", 4, fetcher)
    }
    
    点赞 评论 复制链接分享
  • dtest84004 dtest84004 5年前

    I have implemented it with a simple channel, where all the goroutines send their messages. To ensure that it is closed when there is no more goroutines I use a safe counter, that close the channel when the counter is 0.

    type Msg struct {
        url string
        body string
    }
    
    type SafeCounter struct {
        v int
        mux sync.Mutex
    }
    
    func (c *SafeCounter) inc() {
        c.mux.Lock()
        defer c.mux.Unlock()
        c.v++   
    }
    
    func (c *SafeCounter) dec(ch chan Msg) {
        c.mux.Lock()
        defer c.mux.Unlock()
        c.v--
        if c.v == 0 {
            close(ch)
        }
    }
    
    var goes SafeCounter = SafeCounter{v: 0}
    
    // Crawl uses fetcher to recursively crawl
    // pages starting with url, to a maximum of depth.
    func Crawl(url string, depth int, fetcher Fetcher, ch chan Msg) {
        defer goes.dec(ch)
        // TODO: Fetch URLs in parallel.
        // TODO: Don't fetch the same URL twice.
        // This implementation doesn't do either:
        if depth <= 0 {
            return
        }
        if !cache.existsAndRegister(url) {
            body, urls, err :=  fetcher.Fetch(url)
            if err != nil {
                fmt.Println(err)
                return
            }
            ch <- Msg{url, body}
            for _, u := range urls {
                goes.inc()
                go Crawl(u, depth-1, fetcher, ch)
            }
        }
        return
    }
    
    func main() {
        ch := make(chan Msg, 100)
        goes.inc()
        go Crawl("http://golang.org/", 4, fetcher, ch)
        for m := range ch {
            fmt.Printf("found: %s %q
    ", m.url, m.body)
        }
    }
    

    Note that the safe counter must be incremented outside of the goroutine.

    点赞 评论 复制链接分享
  • dph19153 dph19153 2年前

    I think using a map (the same way we could use a set in other languages) and a mutex is the easiest approach:

    func Crawl(url string, depth int, fetcher Fetcher) {
        mux.Lock()
        defer mux.Unlock()
        if depth <= 0 || IsVisited(url) {
            return
        }
        visit[url] = true
        body, urls, err := fetcher.Fetch(url)
        if err != nil {
            fmt.Println(err)
            return
        }
        fmt.Printf("found: %s %q
    ", url, body)
        for _, u := range urls {
            //
            go Crawl(u, depth-1, fetcher)
        }
        return
    }
    
    func IsVisited(s string) bool {
        _, ok := visit[s]
        return ok
    }
    
    var mux sync.Mutex
    
    var visit = make(map[string]bool)
    
    func main() {
        Crawl("https://golang.org/", 4, fetcher)
        time.Sleep(time.Second)
    }
    
    点赞 评论 复制链接分享
  • douganbi7686 douganbi7686 7年前

    Here's my solution. I have a "master" routine that listens to a channel of urls and starts new crawling routine (which puts crawled urls into the channel) if it finds new urls to crawl.

    Instead of explicitly closing the channel, I have a counter for unfinished crawling goroutines, and when the counter is 0, the program exits because it has nothing to wait for.

    func doCrawl(url string, fetcher Fetcher, results chan []string) {
        body, urls, err := fetcher.Fetch(url)
        results <- urls
    
        if err != nil {
            fmt.Println(err)
        } else {
            fmt.Printf("found: %s %q
    ", url, body)
        }
    }
    
    
    
    func Crawl(url string, depth int, fetcher Fetcher) {
        results := make(chan []string)
        crawled := make(map[string]bool)
        go doCrawl(url, fetcher, results)
        // counter for unfinished crawling goroutines
        toWait := 1
    
        for urls := range results {
            toWait--
    
            for _, u := range urls {
                if !crawled[u] {
                    crawled[u] = true
                    go doCrawl(u, fetcher, results)
                    toWait++
                }
            }
    
            if toWait == 0 {
                break
            }
        }
    }
    
    点赞 评论 复制链接分享
  • doucang2831 doucang2831 5年前

    O(1) time lookup of url on map instead of O(n) lookup on slice of all urls visited should help minimize time spent inside of the critical section, which is a trivial amount of time for this example but would become relevant with scale.

    WaitGroup used to prevent top level Crawl() function from returning until all child go routines are complete.

    func Crawl(url string, depth int, fetcher Fetcher) {
        var str_map = make(map[string]bool)
        var mux sync.Mutex
        var wg sync.WaitGroup
    
        var crawler func(string,int)
        crawler = func(url string, depth int) {
            defer wg.Done()
    
            if depth <= 0 {
                return
            }   
    
            mux.Lock()
            if _, ok := str_map[url]; ok {
                mux.Unlock()
                return;
            }else{
                str_map[url] = true
                mux.Unlock()
            }
    
            body, urls, err := fetcher.Fetch(url)
            if err != nil {
                fmt.Println(err)
                return
            }
            fmt.Printf("found: %s %q %q
    ", url, body, urls)
    
            for _, u := range urls {
                wg.Add(1)
                go crawler(u, depth-1)          
            }       
        }
        wg.Add(1)
        crawler(url,depth)
        wg.Wait()   
    }
    
    func main() {
        Crawl("http://golang.org/", 4, fetcher)
    }
    
    点赞 评论 复制链接分享
  • dshw124502 dshw124502 5年前

    I went with a completely different direction with this one. I might have been mislead by the tip about using a map.

    // SafeUrlMap is safe to use concurrently.
    type SafeUrlMap struct {
        v   map[string]string
        mux sync.Mutex
    }
    
    func (c *SafeUrlMap) Set(key string, body string) {
        c.mux.Lock()
        // Lock so only one goroutine at a time can access the map c.v.
        c.v[key] = body
        c.mux.Unlock()
    }
    
    // Value returns mapped value for the given key.
    func (c *SafeUrlMap) Value(key string) (string, bool) {
        c.mux.Lock()
        // Lock so only one goroutine at a time can access the map c.v.
        defer c.mux.Unlock()
        val, ok := c.v[key]
        return val, ok
    }
    
    // Crawl uses fetcher to recursively crawl
    // pages starting with url, to a maximum of depth.
    func Crawl(url string, depth int, fetcher Fetcher, urlMap SafeUrlMap) {
        defer wg.Done()
        urlMap.Set(url, body)
    
        if depth <= 0 {
            return
        }
    
        body, urls, err := fetcher.Fetch(url)
        if err != nil {
            fmt.Println(err)
            return
        }
    
        for _, u := range urls {
            if _, ok := urlMap.Value(u); !ok {
                wg.Add(1)
                go Crawl(u, depth-1, fetcher, urlMap)
            }
        }
    
        return
    }
    
    var wg sync.WaitGroup
    
    func main() {
        urlMap := SafeUrlMap{v: make(map[string]string)}
    
        wg.Add(1)
        go Crawl("http://golang.org/", 4, fetcher, urlMap)
        wg.Wait()
    
        for url := range urlMap.v {
            body, _ := urlMap.Value(url)
            fmt.Printf("found: %s %q
    ", url, body)
        }
    }
    
    点赞 评论 复制链接分享

为你推荐