doulechou0700
2016-01-13 00:47
浏览 42
已采纳

死锁所有goroutine睡着了

This is a follow up to my earlier post:

    http://stackoverflow.com/questions/34736825/goroutine-exit-status-2-what-does-it-mean-why-is-it-happening?noredirect=1#comment57238789_34736825

I'm still having trouble figuring out where the channels should be closed, after reading multiple topics and articles both on and off SO.

This program will open a list of files, create an output file for each input file (with the same name),visit all the urls in each input file and get all href links from these - which are saved to the corresponding output file. However, I'm getting the following error:

    http://play.golang.org/p/8X-1rM3aXC

The linkgetter, and getHref functions are mainly for processing. Head and tail are run as separate goroutines, and worker does the processing.

    package main

    import (
    "bufio"
    "bytes"
    "fmt"
    "golang.org/x/net/html"
    "io"
    "io/ioutil"
    "log"
    "net/http"
    "os"
    "path/filepath"
    "regexp"
    "sync"
    )

    type Work struct {
    Link     string
    Filename string
    }

    type Output struct {
    Href     string
    Filename string
    }

    func getHref(t html.Token) (href string, ok bool) {
    // Iterate over all of the Token's attributes until we find an    "href"
    for _, a := range t.Attr {
            if a.Key == "href" {
                    href = a.Val
                    ok = true
            }
    }
    return
    }

    func linkGetter(out chan<- Output, r io.Reader, filename string) {
    z := html.NewTokenizer(r)
    for {
            tt := z.Next()
            switch {
            case tt == html.ErrorToken:
                    return
            case tt == html.StartTagToken:
                    t := z.Token()
                    isAnchor := t.Data == "a"
                    if !isAnchor {
                            continue
                    }

                    // Extract the href value, if there is one
                    url, ok := getHref(t)
                    if !ok {
                            continue
                    }

                    out <- Output{url, filename}
            }
    }
    }

    func worker(out chan<- Output, in <-chan Work, wg *sync.WaitGroup)    {
    defer wg.Done()
    for work := range in {
            resp, err := http.Get(work.Link)
            if err != nil {
                    continue
            }
            body, err := ioutil.ReadAll(resp.Body)
            if err != nil {
                    continue
            }
            if err = resp.Body.Close(); err != nil {
                    fmt.Println(err)
            }
            linkGetter(out, bytes.NewReader(body), work.Filename)
    }
    }

    func head(c chan<- Work) {
    r, _ := regexp.Compile("(.*)(?:.json)")
    files, _ := filepath.Glob("*.json")

    for _, elem := range files {
            res := r.FindStringSubmatch(elem)
            for k, v := range res {

                    if k == 0 {
                            outpath, _ :=  filepath.Abs(fmt.Sprintf("go_tester/%s", v))

                            abspath, _ := filepath.Abs(fmt.Sprintf("url_links/%s", v))
                            f, _ := os.Open(abspath)
                            scanner := bufio.NewScanner(f)

                            for scanner.Scan() {
                                    c <- Work{outpath, scanner.Text()}
                            }

                    }
            }

    }


    }

    func tail(c <-chan Output) {
    currentfile := ""
    var f *os.File
    var err error
    for out := range c {
            if out.Filename != currentfile {
                    if err = f.Close(); err != nil { // might cause an error on first run
                            fmt.Println(err)
                    }
                    f, err = os.OpenFile(out.Filename, os.O_APPEND|os.O_WRONLY, 0600)
                    if err != nil {
                            log.Fatal(err)
                    }
                    currentfile = out.Filename
            }
            if _, err = f.WriteString(out.Href + "
"); err != nil {
                    fmt.Println(err)
            }
    }

    }

    const (
    nworkers = 80
    )

    func main() {
    //fmt.Println("hi")
    in := make(chan Work)
    out := make(chan Output)

    go head(in)
    go tail(out)

    var wg sync.WaitGroup
    for i := 0; i < 85; i++ {
            wg.Add(1)
            go worker(out, in, &wg)
    }
    close(in)   
    close(out)    
    wg.Wait()


    }

What is wrong with the way the channels are closed?

  • 写回答
  • 好问题 提建议
  • 追加酬金
  • 关注问题
  • 邀请回答

1条回答 默认 最新

相关推荐 更多相似问题