dongxuxian6930 2018-05-13 06:29
浏览 65
已采纳

使用通道同步多个goroutine

I need to start a number of workers with single task queue and single result queue. Each worker should be started in different goroutine. And I need to wait till all workers will be finished and task queue will be empty before exiting from program. I have prepare small example for goroutine synchronization. The main idea was that we count tasks in queue and waiting for all workers to finish jobs. But current implementation sometime miss values. Why this happends and how to solve the problem? The sample code:

import (
    "fmt"
    "os"
    "os/signal"
    "strconv"
)

const num_workers = 5

type workerChannel chan uint64

// Make channel for tasks
var workCh workerChannel
// Make channel for task counter
var cntChannel chan int

// Task counter
var tskCnt int64

// Worker function
func InitWorker(input workerChannel, result chan string, num int) {
    for {
        select {
        case inp := <-input:
            getTask()
            result <- ("Worker " + strconv.Itoa(num) + ":" + strconv.FormatUint(inp, 10))
        }
    }
}

// Function to manage task counter
// should be in uniq goroutine
func taskCounter(inp chan int) {
    for {
        val := <-inp
        tskCnt += int64(val)
    }
}

// Put pask to the queue
func putTask(val uint64) {
    func() {
        fmt.Println("Put ", val)
        cntChannel <- int(1)
        workCh <- val
    }()
}

// Get task from queue
func getTask() {
    func() {
        cntChannel <- int(-1)
    }()
}

func main() {
// Init service channels
    abort := make(chan os.Signal)
    done := make(chan bool)

// init queue for results
    result := make(chan string)

// init task queue
    workCh = make(workerChannel)

// start some workers
    for i := uint(0); i < num_workers; i++ {
        go InitWorker(workCh, result, int(i))
    }

// init counter for synchro
    cntChannel = make(chan int)
    go taskCounter(cntChannel)

// goroutine that put some tasks into queue
    go func() {
        for i := uint(0); i < 21; i++ {
            putTask(uint64(i))
        }

        // wait for processing all tasks and close application
        for len(cntChannel) != 0 {}
        for tskCnt != 0 {}
        for len(workCh) != 0 {}
        for len(result) != 0 {}

        // send signal for close
        done <- true
    }()

    signal.Notify(abort, os.Interrupt)
    for {
        select {
        case <-abort:
            fmt.Println("Aborted.")
            os.Exit(0)

        // print results
        case res := <-result:
            fmt.Println(res)

        case <-done:
            fmt.Println("Done")
            os.Exit(0)
        }
    }
}
  • 写回答

2条回答 默认 最新

  • dtpxi88884 2018-05-13 07:05
    关注

    Use sync.WaitGroup to wait for goroutines to complete. Close channels to cause loops reading on channels to exit.

    package main
    
    import (
        "fmt"
        "sync"
    )
    
    type workerChannel chan uint64
    
    const num_workers = 5
    
    func main() {
    
        results := make(chan string)
        workCh := make(workerChannel)
    
        // Start workers
        var wg sync.WaitGroup
        wg.Add(num_workers)
        for i := 0; i < num_workers; i++ {
            go func(num int) {
                defer wg.Done()
                // Loop processing work until workCh is closed
                for w := range workCh {
                    results <- fmt.Sprintf("worker %d, task %d", num, w)
                }
    
            }(i)
        }
    
        // Close result channel when workers are done
        go func() {
            wg.Wait()
            close(results)
        }()
    
        // Send work to be done
        go func() {
            for i := 0; i < 21; i++ {
                workCh <- uint64(i)
            }
            // Closing the channel causes workers to break out of loop
            close(workCh)
        }()
    
        // Process results. Loop exits when result channel is closed.
        for r := range results {
            fmt.Println(r)
        }
    }
    

    https://play.golang.org/p/ZifpzsP6fNv

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥15 #MATLAB仿真#车辆换道路径规划
  • ¥15 java 操作 elasticsearch 8.1 实现 索引的重建
  • ¥15 数据可视化Python
  • ¥15 要给毕业设计添加扫码登录的功能!!有偿
  • ¥15 kafka 分区副本增加会导致消息丢失或者不可用吗?
  • ¥15 微信公众号自制会员卡没有收款渠道啊
  • ¥15 stable diffusion
  • ¥100 Jenkins自动化部署—悬赏100元
  • ¥15 关于#python#的问题:求帮写python代码
  • ¥20 MATLAB画图图形出现上下震荡的线条