dongzhang6544 2018-06-21 17:26
浏览 8
已采纳

互斥锁写通道值

I have a channel of thousands of IDs that need to be processed in parallel inside goroutines. How could I implement a lock so that goroutines cannot process the same id at the same time, should it be repeated in the channel?

package main

import (
    "fmt"
    "sync"
    "strconv"
    "time"
)

var wg sync.WaitGroup

func main() {
    var data []string
    for d := 0; d < 30; d++ {
        data = append(data, "id1")
        data = append(data, "id2")
        data = append(data, "id3")
    }

    chanData := createChan(data)    


    for i := 0; i < 10; i++ {
        wg.Add(1)
        process(chanData, i)
    }

    wg.Wait()
}

func createChan(data []string) <-chan string {
    var out = make(chan string)
    go func() {
        for _, val := range data {
            out <- val
        }
    close(out)
    }()
    return out
}

func process(ids <-chan string, i int) {
    go func() {
        defer wg.Done()
        for id := range ids {
            fmt.Println(id + " (goroutine " + strconv.Itoa(i) + ")")
            time.Sleep(1 * time.Second)
        }
    }()
}

--edit: All values need to be processed in any order, but "id1, "id2" & "id3" need to block so they cannot be processed by more than one goroutine at the same time.

  • 写回答

3条回答 默认 最新

  • douyuan5600 2018-06-25 11:12
    关注

    I've found a solution. Someone has written a package (github.com/EagleChen/mapmutex) to do exactly what I needed:

    package main
    
    import (
        "fmt"
        "github.com/EagleChen/mapmutex"
        "strconv"
        "sync"
        "time"
    )
    
    var wg sync.WaitGroup
    var mutex *mapmutex.Mutex
    
    func main() {
    
        mutex = mapmutex.NewMapMutex()
    
        var data []string
        for d := 0; d < 30; d++ {
            data = append(data, "id1")
            data = append(data, "id2")
            data = append(data, "id3")
        }
    
        chanData := createChan(data)
    
        for i := 0; i < 10; i++ {
            wg.Add(1)
            process(chanData, i)
        }
    
        wg.Wait()
    }
    
    func createChan(data []string) <-chan string {
        var out = make(chan string)
        go func() {
            for _, val := range data {
                out <- val
            }
            close(out)
        }()
        return out
    }
    
    func process(ids <-chan string, i int) {
        go func() {
            defer wg.Done()
            for id := range ids {
                if mutex.TryLock(id) {
                    fmt.Println(id + " (goroutine " + strconv.Itoa(i) + ")")
                    time.Sleep(1 * time.Second)
                    mutex.Unlock(id)
                }
            }
        }()
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(2条)

报告相同问题?