douxiawei9318 2015-10-25 07:17
浏览 23
已采纳

Go Golang选择语句无法接收发送的值

I'm new to Go and trying to implement a simple load balancer as seen in this slides: http://concur.rspace.googlecode.com/hg/talk/concur.html#slide-42

The complete code:

package main

import (
    "fmt"
    "time"
    "container/heap"
)

type Request struct {
    fn func(*Worker) int
    c  chan int
}

func requester(work chan <-Request) {
    c := make(chan int)
    work <- Request{workFn, c}
    result := <-c
    furtherProcess(result)
}

func workFn(w *Worker) int {
    time.Sleep(1000 * time.Millisecond)
    return w.index
}

func furtherProcess(result int) {
    fmt.Println(result)
}

type Worker struct {
    request chan Request
    pending int
    index   int
}

func (w *Worker) work(done chan *Worker) {
    for req := range w.request {
        req.c <- req.fn(w)
        fmt.Println("sending to done:", done)
        done <- w
        fmt.Println("sended to done")
    }
}

type Pool []*Worker

type Balancer struct {
    pool Pool
    done chan *Worker
}

func (b *Balancer) balance(work chan Request) {
    for {
        fmt.Println("selecting, done:", b.done)
        select {
        case req := <-work:
            b.dispatch(req)
        case w := <-b.done:
            fmt.Println("completed")
            b.completed(w)
        }
    }
}

func (p Pool) Len() int {
    return len(p)
}

func (p Pool) Less(i, j int) bool {
    return p[i].pending < p[j].pending
}

func (p Pool) Swap(i, j int) {
    p[i], p[j] = p[j], p[i]
}

func (p *Pool) Push(x interface{}) {
    *p = append(*p, x.(*Worker))
}

func (p *Pool) Pop() interface{} {
    old := *p
    n := len(old)
    x := old[n - 1]
    *p = old[0 : n - 1]
    return x
}

func (b *Balancer) dispatch(req Request) {
    w := heap.Pop(&b.pool).(*Worker)
    w.request <- req
    w.pending++
    heap.Push(&b.pool, w)
    fmt.Println("dispatched to worker", w.index)
}

func (b *Balancer) completed(w *Worker) {
    w.pending--
    heap.Remove(&b.pool, w.index)
    heap.Push(&b.pool, w)
}

func Run() {
    NumWorkers := 4
    req := make(chan Request)
    done := make(chan *Worker)
    b := Balancer{make([]*Worker, NumWorkers), done}
    for i := 0; i < NumWorkers; i++ {
        w := Worker{make(chan Request), 0, i}
        b.pool[i] = &w
        go w.work(done)
    }
    go b.balance(req)
    for i := 0; i < NumWorkers * 4; i++ {
        go requester(req)
    }
    time.Sleep(200000 * time.Millisecond)
}

func main() {
    Run()
}

When I ran it, I got following outputs:

selecting, done: 0xc0820082a0
dispatched to worker 0
selecting, done: 0xc0820082a0
dispatched to worker 3
selecting, done: 0xc0820082a0
dispatched to worker 2
selecting, done: 0xc0820082a0
dispatched to worker 1
selecting, done: 0xc0820082a0
sending to done: 0xc0820082a0
sending to done: 0xc0820082a0
3
sending to done: 0xc0820082a0
2
1
0
sending to done: 0xc0820082a0

As you can see, it was selecting on and sending to the same pipe (done: 0xc0820082a0), but the select didn't receive the sended value and was blocking forever. How could this happen? What's the problem with the above code? Thanks!

  • 写回答

1条回答 默认 最新

  • dongxia5394 2015-10-25 09:02
    关注

    Using kill -ABRT <PID> you can see that all your Workers are blocked on done <- w while your Balancer is blocked on w.request <- req, creating a deadlock (workers can't go further until the balancer receives their "done" signals, and the balancer can't go further until the selected worker takes the request).

    If you replace done <- w by go func() { done <- w }(), you can see that your program will process the 16 requests without hanging.

    Side note: instead of time.Sleep(200000 * time.Millisecond), look into sync.WaitGroup

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥20 测距传感器数据手册i2c
  • ¥15 RPA正常跑,cmd输入cookies跑不出来
  • ¥15 求帮我调试一下freefem代码
  • ¥15 matlab代码解决,怎么运行
  • ¥15 R语言Rstudio突然无法启动
  • ¥15 关于#matlab#的问题:提取2个图像的变量作为另外一个图像像元的移动量,计算新的位置创建新的图像并提取第二个图像的变量到新的图像
  • ¥15 改算法,照着压缩包里边,参考其他代码封装的格式 写到main函数里
  • ¥15 用windows做服务的同志有吗
  • ¥60 求一个简单的网页(标签-安全|关键词-上传)
  • ¥35 lstm时间序列共享单车预测,loss值优化,参数优化算法