dop2144
2016-03-02 03:12 阅读 35
已采纳

golang:所有goroutine完成后如何关闭频道?

I would like to write a simple web scraper in Go by:

  • get all href with a pattern from an URL
  • extract some specific fields
  • and write to a CSV file

Here's my code:

package main

import (
    "encoding/csv"
    "flag"
    "fmt"
    "github.com/PuerkitoBio/goquery"
    "log"
    "net/http"
    "net/url"
    "os"
    "strings"
    "sync"
)

type Enterprise struct {
    name     string
    tax_code string
    group    string
    capital  string
}

var u, f string
var name, tax_code, group, capital string

func init() {
    flag.StringVar(&u, "u", "", "Which URL to download from")
    flag.StringVar(&f, "f", "", "Path to the csv file to write the output to")
}

func check(e error) {
    if e != nil {
        panic(e)
    }
}

func findHrefs(u string) map[string]string {
    resp, err := http.Get(u)
    check(err)

    doc, err := goquery.NewDocumentFromResponse(resp)
    check(err)

    e_hrefs := make(map[string]string)
    doc.Find("td div a").Each(func(_ int, s *goquery.Selection) {
        e_href, _ := s.Attr("href")
        if strings.HasPrefix(e_href, "/Thong-tin-doanh-nghiep") && s.Text() != "" {
            e_hrefs[e_href] = s.Text()
        }
    })
    return e_hrefs
}

func fetch(url string, name string, file *os.File, wg *sync.WaitGroup, c chan Enterprise) {
    defer wg.Done()

    log.Println("Fetching URL", url)
    resp, err := http.Get(url)
    check(err)

    doc, err := goquery.NewDocumentFromResponse(resp)
    check(err)
    e := new(Enterprise)
    doc.Find("td").Each(func(_ int, s *goquery.Selection) {
        if s.Text() == "Mã số thuế:" {
            e.tax_code = s.Next().Text()
        }
        if s.Text() == "Tên ngành cấp 2:" {
            e.group = s.Next().Text()
        }
        if s.Text() == "Sở hữu vốn:" {
            e.capital = s.Next().Text()
        }
    })
    w := csv.NewWriter(file)
    w.Write([]string{name, "'" + e.tax_code, e.group, e.capital})
    w.Flush()
    c <- *e
}

func getDoc(u, f string) {
    parsedUrl, err := url.Parse(u)
    check(err)

    file, err := os.Create(f)
    check(err)
    defer file.Close()

    var wg sync.WaitGroup
    c := make(chan Enterprise)

    e_hrefs := findHrefs(u)
    for e_href, name := range e_hrefs {
        wg.Add(1)
        go fetch(parsedUrl.Scheme+"://"+parsedUrl.Host+e_href, name, file, &wg, c)
    }
    wg.Wait()
}

func main() {
    flag.Parse()
    if u == "" || f == "" {
        fmt.Println("-u=<URL to download from> -f=<Path to the CSV file>")
        os.Exit(1)
    }
    getDoc(u, f)
}

The problem is channel was not closed after all goroutines are finished and I have to press control+C to get my shell prompt back:

2016/03/02 09:34:05 Fetching URL ...
2016/03/02 09:34:05 Fetching URL ...
2016/03/02 09:34:05 Fetching URL ...
^Csignal: interrupt

By reading this, I change the last line in getDoc func to something like:

go func() {
    wg.Wait()
    close(c)
}()

Now I can get my shell prompt back when running but the channel was closed before all goroutines are finished and nothing write to CSV file.

Where did I go wrong?

  • 点赞
  • 写回答
  • 关注问题
  • 收藏
  • 复制链接分享

2条回答 默认 最新

  • 已采纳
    douneiben2240 douneiben2240 2016-03-02 04:04

    To me it doesn't look like you're reading from your channel, and because it is a synchronous channel (you never declared a length on it) it will block if it receives a value. So you need to be reading from your c by value <- c or your fetch function will just hang at c <- *e

    This is causing your sync.WaitGroup to never wg.Done() which never decrements the counter, which never causes the wg.Wait() to stop blocking, which causes your close(c) to never get called

    点赞 评论 复制链接分享
  • drasebt1835 drasebt1835 2016-03-02 06:10

    My original code is something like this:

    e_hrefs := findHrefs(u)
    w := csv.NewWriter(file)
    for e_href, name := range e_hrefs {
        wg.Add(1)
        go fetch(parsedUrl.Scheme+"://"+parsedUrl.Host+e_href, name, &wg, c)
        e := <-c
        w.Write([]string{name, "'" + e.tax_code, e.group, e.capital})
        w.Flush()
    }
    wg.Wait()
    

    and you can see, it's not concurrency.

    I've just fixed by using the range clause to iterate over channel:

    e_hrefs := findHrefs(u)
    for e_href, name := range e_hrefs {
        wg.Add(1)
        go fetch(parsedUrl.Scheme+"://"+parsedUrl.Host+e_href, name, &wg, c)
    }
    go func() {
        wg.Wait()
        close(c)
    }()
    
    w := csv.NewWriter(file)
    for e := range c {
        w.Write([]string{e.name, "'" + e.tax_code, e.group, e.capital})
        w.Flush()
    }
    
    点赞 评论 复制链接分享

相关推荐