duan02143
2016-09-22 14:43 阅读 31
已采纳

Goroutines被for循环阻塞了吗?

I have the following code which implements a worker queue:

package main

import (
    "fmt"
    "net/http"
    "io"
    "time"
)

var (
    linkQueue chan Link
    scraperQueue chan chan Link
)

func CycleDirectory(page int) {
    linkQueue <- Link{Name: "asd"}
}

type Link struct {
    Name string
}

func (s Scraper) Start() {
    fmt.Println("Started")
    go func() {
        for {
            s.ScraperQueue <- s.Link
            select {
            case link := <-s.Link:
                fmt.Println(fmt.Sprintf("%v", s.Id) + ": Received " + link.Name)
            case <-s.QuitChan:
                fmt.Println("Closed")
                return
            }
        }
    }()
}

func (s Scraper) Stop() {
    go func() {
        s.QuitChan <- true
    }()
}

type Scraper struct {
    Id int
    Link chan Link
    ScraperQueue chan chan Link
    QuitChan chan bool
}

func InitScraper(id int, scraperQueue chan chan Link) Scraper {
    return Scraper {
        Id: id,
        Link: make(chan Link),
        ScraperQueue: scraperQueue,
        QuitChan: make(chan bool),
    }
}

func HelloServer(w http.ResponseWriter, req *http.Request) {
    io.WriteString(w, "hello, world!
")
}

func main() {
    linkQueue = make(chan Link, 2000)

    numScrapers := 2

    scraperQueue = make(chan chan Link, numScrapers)

    for i := 0; i < numScrapers; i++ {
        s := InitScraper(i+1, scraperQueue)
        s.Start()
    }

    go func() {
        for {
            select {
            case link := <-linkQueue:
                go func() {
                    scraper := <-scraperQueue
                    scraper <- link
                }()
            }
        }
    }()

    CycleDirectory(1)

    // time.Sleep(1 * time.Millisecond)

    for {
        // select {
        // }
    }

    // http.HandleFunc("/hello", HelloServer)

    // http.ListenAndServe(":12345", nil)
}

Running this code using a for loop containing an if statement (or nothing inside), the scraper does not print a received message. Blocking using the ListenAndServe function from net/http, it prints the received message. Blocking using sleep for 1 ms, I receive the message. And putting a select statement in the for loop, I also receive the message.

Why is the for loop without a select statement not allowing for execution of the message sending in the worker queues, and how would I go about handling this. I need an if statement in the for loop to check if all the work has been done so I can exit the loop and end the program.

Update

Amd's suggestion is a solution to this problem. Here is my updated code using sync.WaitGroup package main

import (
    "fmt"
    "sync"
)

var (
    linkQueue chan Link
    scraperQueue chan chan Link
    wg sync.WaitGroup
)

func CycleDirectory(page int) {
    wg.Add(1)
    linkQueue <- Link{Name: "asd"}
}

type Link struct {
    Name string
}

func (s Scraper) Start() {
    fmt.Println("Started")
    go func() {
        for {
            s.ScraperQueue <- s.Link
            select {
            case link := <-s.Link:
                Scrape(s.Id, link.Name)
                s.Stop()
            case <-s.QuitChan:
                fmt.Println("Closed")
                wg.Done()
                return
            }
        }
    }()
}

func (s Scraper) Stop() {
    go func() {
        s.QuitChan <- true
    }()
}

type Scraper struct {
    Id int
    Link chan Link
    ScraperQueue chan chan Link
    QuitChan chan bool
}

func Scrape(id int, name string) {
    fmt.Println(fmt.Sprintf("%v", id) + ": Received " + name)
}

func InitScraper(id int, scraperQueue chan chan Link) Scraper {
    return Scraper {
        Id: id,
        Link: make(chan Link),
        ScraperQueue: scraperQueue,
        QuitChan: make(chan bool),
    }
}

func main() {
    linkQueue = make(chan Link, 2000)

    numScrapers := 2

    scraperQueue = make(chan chan Link, numScrapers)

    for i := 0; i < numScrapers; i++ {
        s := InitScraper(i+1, scraperQueue)
        s.Start()
    }

    go func() {
        for {
            select {
            case link := <-linkQueue:
                go func() {
                    scraper := <-scraperQueue
                    scraper <- link
                }()
            }
        }
    }()

    CycleDirectory(1)

    wg.Wait()

    fmt.Println("Done")
}
  • 点赞
  • 写回答
  • 关注问题
  • 收藏
  • 复制链接分享

1条回答 默认 最新

  • 已采纳
    doujian3401 doujian3401 2016-09-22 15:14

    You may use sync.WaitGroup to stop the program from exiting until all the work is done.
    Try it on The Go Playground:

    package main
    
    import (
        "fmt"
        "sync"
        "time"
    )
    
    var (
        wg sync.WaitGroup
    )
    
    func main() {
        wg.Add(1)
        go func() {
            defer wg.Done()
            time.Sleep(2 * time.Second)
        }()
    
        fmt.Println("Wait...")
        wg.Wait()
        fmt.Println("Done.")
    }
    
    点赞 评论 复制链接分享

相关推荐