doulechou0700 2016-01-13 00:47
浏览 42
已采纳

死锁所有goroutine睡着了

This is a follow up to my earlier post:

    http://stackoverflow.com/questions/34736825/goroutine-exit-status-2-what-does-it-mean-why-is-it-happening?noredirect=1#comment57238789_34736825

I'm still having trouble figuring out where the channels should be closed, after reading multiple topics and articles both on and off SO.

This program will open a list of files, create an output file for each input file (with the same name),visit all the urls in each input file and get all href links from these - which are saved to the corresponding output file. However, I'm getting the following error:

    http://play.golang.org/p/8X-1rM3aXC

The linkgetter, and getHref functions are mainly for processing. Head and tail are run as separate goroutines, and worker does the processing.

    package main

    import (
    "bufio"
    "bytes"
    "fmt"
    "golang.org/x/net/html"
    "io"
    "io/ioutil"
    "log"
    "net/http"
    "os"
    "path/filepath"
    "regexp"
    "sync"
    )

    type Work struct {
    Link     string
    Filename string
    }

    type Output struct {
    Href     string
    Filename string
    }

    func getHref(t html.Token) (href string, ok bool) {
    // Iterate over all of the Token's attributes until we find an    "href"
    for _, a := range t.Attr {
            if a.Key == "href" {
                    href = a.Val
                    ok = true
            }
    }
    return
    }

    func linkGetter(out chan<- Output, r io.Reader, filename string) {
    z := html.NewTokenizer(r)
    for {
            tt := z.Next()
            switch {
            case tt == html.ErrorToken:
                    return
            case tt == html.StartTagToken:
                    t := z.Token()
                    isAnchor := t.Data == "a"
                    if !isAnchor {
                            continue
                    }

                    // Extract the href value, if there is one
                    url, ok := getHref(t)
                    if !ok {
                            continue
                    }

                    out <- Output{url, filename}
            }
    }
    }

    func worker(out chan<- Output, in <-chan Work, wg *sync.WaitGroup)    {
    defer wg.Done()
    for work := range in {
            resp, err := http.Get(work.Link)
            if err != nil {
                    continue
            }
            body, err := ioutil.ReadAll(resp.Body)
            if err != nil {
                    continue
            }
            if err = resp.Body.Close(); err != nil {
                    fmt.Println(err)
            }
            linkGetter(out, bytes.NewReader(body), work.Filename)
    }
    }

    func head(c chan<- Work) {
    r, _ := regexp.Compile("(.*)(?:.json)")
    files, _ := filepath.Glob("*.json")

    for _, elem := range files {
            res := r.FindStringSubmatch(elem)
            for k, v := range res {

                    if k == 0 {
                            outpath, _ :=  filepath.Abs(fmt.Sprintf("go_tester/%s", v))

                            abspath, _ := filepath.Abs(fmt.Sprintf("url_links/%s", v))
                            f, _ := os.Open(abspath)
                            scanner := bufio.NewScanner(f)

                            for scanner.Scan() {
                                    c <- Work{outpath, scanner.Text()}
                            }

                    }
            }

    }


    }

    func tail(c <-chan Output) {
    currentfile := ""
    var f *os.File
    var err error
    for out := range c {
            if out.Filename != currentfile {
                    if err = f.Close(); err != nil { // might cause an error on first run
                            fmt.Println(err)
                    }
                    f, err = os.OpenFile(out.Filename, os.O_APPEND|os.O_WRONLY, 0600)
                    if err != nil {
                            log.Fatal(err)
                    }
                    currentfile = out.Filename
            }
            if _, err = f.WriteString(out.Href + "
"); err != nil {
                    fmt.Println(err)
            }
    }

    }

    const (
    nworkers = 80
    )

    func main() {
    //fmt.Println("hi")
    in := make(chan Work)
    out := make(chan Output)

    go head(in)
    go tail(out)

    var wg sync.WaitGroup
    for i := 0; i < 85; i++ {
            wg.Add(1)
            go worker(out, in, &wg)
    }
    close(in)   
    close(out)    
    wg.Wait()


    }

What is wrong with the way the channels are closed?

  • 写回答

1条回答 默认 最新

  • dsgdhf5674 2016-01-13 03:03
    关注

    You're not really paying attention to your pipeline design here. You have to ask yourself "When is section X done? What should happen when it is done? What happens after it is done?" for every section of the pipeline.

    You start up head, tail, and worker to range over channels. The only way these functions are going to return successfully is if these channels are closed.

    Draw it out of you need to.

    1. head(in) feeds in to in
    2. worker(out, in, &wg) ranges over in, feeds into out, and tells you it is done with wg once in is closed
    3. tail(out) ranges over out

    So what do you need to do to:

    1. Make sure all input is processed?
    2. Make sure all goroutines return?

    Like so:

    1. You need to close in from head once it is done processing all of the files.
    2. This will cause worker to actually return once all items it can get from in are processed, causing wg.Wait() to return
    3. it is now safe to close out since nothing is feeding in to it and this will cause tail to eventually return.

    But you'll probably need another sync.WaitGroup associated with tail for this particular design because the whole program will exit right when wg.Wait() returns, thus possibly not finishing all of the work that tail is doing. See here. Specifically:

    Program execution begins by initializing the main package and then invoking the function main. When that function invocation returns, the program exits. It does not wait for other (non-main) goroutines to complete.

    You'll probably also want to use buffered channels referenced here to aid in not having switch execution between goroutines so much. With your current design you're wasting a lot of time with context switching.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥20 测距传感器数据手册i2c
  • ¥15 RPA正常跑,cmd输入cookies跑不出来
  • ¥15 求帮我调试一下freefem代码
  • ¥15 matlab代码解决,怎么运行
  • ¥15 R语言Rstudio突然无法启动
  • ¥15 关于#matlab#的问题:提取2个图像的变量作为另外一个图像像元的移动量,计算新的位置创建新的图像并提取第二个图像的变量到新的图像
  • ¥15 改算法,照着压缩包里边,参考其他代码封装的格式 写到main函数里
  • ¥15 用windows做服务的同志有吗
  • ¥60 求一个简单的网页(标签-安全|关键词-上传)
  • ¥35 lstm时间序列共享单车预测,loss值优化,参数优化算法