dongsou4301 2017-12-15 05:40
浏览 30
已采纳

如何从正在从该通道接收数据的goroutine向该通道添加对象?

Basically, I am trying to write a concurrent sitemap crawler using goroutines. One sitemap can contain links to multiple sitemaps which can contain links to other sitemaps etc.

Right now, this is my design:

worker:
     - receives url from channel
     - processesUrl(url)
processUrl:
     for each link in lookup(url):
         - if link is sitemap:
                channel <- url
           else:
               print(url)
main:
    - create 10 workers
    - chanel <- root url

the problem is that the worker won't accept a new url from the channel until processUrl() is finished and processUrl won't finish until a worker accepts a new url from the channel if it is adding a url to the channel. What concurrent design can I use to add the url to a task queue without a channel and without busy-waiting or without waiting for channel <- url?

Here is the actual code if it helps:

func (c *SitemapCrawler) worker() {
    for {
        select {
        case url := <-urlChan:
            fmt.Println(url)
            c.crawlSitemap(url)
        }
    }
}
func crawlUrl(url string) {
    defer crawlWg.Done()
    crawler := NewCrawler(url)
    for i := 0; i < MaxCrawlRate*20; i++ {
        go crawler.worker()
    }
    crawler.getSitemaps()
    pretty.Println(crawler.sitemaps)
    crawler.crawlSitemaps()
}
func (c SitemapCrawler) crawlSitemap(url string) {
    c.limiter.Take()
    resp, err := MakeRequest(url)
    if err != nil || resp.StatusCode != 200 {
        crawlWg.Done()
        return
    }
    var resp_txt []byte
    if strings.Contains(resp.Header.Get("Content-Type"), "html") {
        crawlWg.Done()
        return
    } else if strings.Contains(url, ".gz") || resp.Header.Get("Content-Encoding") == "gzip" {
        reader, err := gzip.NewReader(resp.Body)
        if err != nil {
            crawlWg.Done()
            panic(err)
        } else {
            resp_txt, err = ioutil.ReadAll(reader)
            if err != nil {
                crawlWg.Done()
                panic(err)
            }
        }
        reader.Close()
    } else {
        resp_txt, err = ioutil.ReadAll(resp.Body)
        if err != nil {
            //panic(err)
            crawlWg.Done()
            return
        }
    }
    io.Copy(ioutil.Discard, resp.Body)
    resp.Body.Close()

    d, err := libxml2.ParseString(string(resp_txt))
    if err != nil {
        crawlWg.Done()
        return
    }
    results, err := d.Find("//*[contains(local-name(), 'loc')]")
    if err != nil {
        crawlWg.Done()
        return
    }
    locs := results.NodeList()
    printLock.Lock()
    for i := 0; i < len(locs); i++ {
        newUrl := locs[i].TextContent()
        if strings.Contains(newUrl, ".xml") {
            crawlWg.Add(1)
            //go c.crawlSitemap(newUrl)
            urlChan <- newUrl
        } else {
            fmt.Println(newUrl)
        }
    }
    printLock.Unlock()

    crawlWg.Done()
}
  • 写回答

1条回答 默认 最新

  • duandu2159 2017-12-15 15:27
    关注

    Write operations to channels are blocking when the channel is not buffered.

    To create a buffered channel:

    urlChan := make(chan string, len(allUrls))
    

    When this channel is full however, write operations will block again.

    Alternatively you could use a switch. When the write 'fails' it will immediately fall through to default

    select {
    case urlChan <- url:
        fmt.Println("received message")
    default:
        fmt.Println("no activity")
    }
    

    To have a timeout on writing to the channel do the following

    select {
    case urlChan <- url:
        fmt.Println("received message")
    case <-time.After(5 * time.Second):
        fmt.Println("timed out")
    }
    

    Or finally put the write event in a separate go channel

    func write() {
        urlChan <- url
    }
    
    go write()
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥60 版本过低apk如何修改可以兼容新的安卓系统
  • ¥25 由IPR导致的DRIVER_POWER_STATE_FAILURE蓝屏
  • ¥50 有数据,怎么建立模型求影响全要素生产率的因素
  • ¥50 有数据,怎么用matlab求全要素生产率
  • ¥15 TI的insta-spin例程
  • ¥15 完成下列问题完成下列问题
  • ¥15 C#算法问题, 不知道怎么处理这个数据的转换
  • ¥15 YoloV5 第三方库的版本对照问题
  • ¥15 请完成下列相关问题!
  • ¥15 drone 推送镜像时候 purge: true 推送完毕后没有删除对应的镜像,手动拷贝到服务器执行结果正确在样才能让指令自动执行成功删除对应镜像,如何解决?