doudengjin8251 2016-05-22 06:19
浏览 308

Golang在Goroutine之间共享大量数据

I have a need to read structure fields set from another goroutine, afaik doing so directly even when knowing for sure there will be no concurrent access(write finished before read occurred, signaled via chan struct{}) may result in stale data

Will sending a pointer to the structure(created in the 1st goroutine, modified in the 2nd, read by the 3rd) resolve the possible staleness issue, considering I can guarantee no concurrent access?

I would like to avoid copying as structure is big and contains huge Bytes.Buffer filled in the 2nd goroutine, I need to read from the 3rd

There is an option for locking, but seems like an overkill considering I know that there will be no concurrent access

  • 写回答

2条回答 默认 最新

  • dongshuo1257 2016-05-22 10:41
    关注

    There are many answers to this, and it depends to your data structure and program logic.

    see: How to lock/synchronize access to a variable in Go during concurrent goroutines?
    and: How to use RWMutex in Golang?

    1- using Stateful Goroutines and channels
    2- using sync.Mutex
    3- using sync/atomic
    4- using WaitGroup
    5- using program logic(Semaphore)
    ...


    1: Stateful Goroutines and channels:
    I simulated very similar sample(imagine you want to read from one SSD and write to another SSD with different speed):
    In this sample code one goroutine (named write) does some job prepares data and fills the big struct, and another goroutine (named read) reads data from big struct then do some job, And the manger goroutine, guarantee no concurrent access to same data. And communication between three goroutines done with channels. And in your case you can use pointers for channel data, or global struct like this sample.
    output will be like this:
    mean= 36.6920166015625 stdev= 6.068973186592054

    I hope this helps you to get the idea.
    Working sample code:

    package main
    
    import (
        "fmt"
        "math"
        "math/rand"
        "runtime"
        "sync"
        "time"
    )
    
    type BigStruct struct {
        big     []uint16
        rpos    int
        wpos    int
        full    bool
        empty   bool
        stopped bool
    }
    
    func main() {
        wg.Add(1)
        go write()
        go read()
        go manage()
        runtime.Gosched()
        stopCh <- <-time.After(5 * time.Second)
        wg.Wait()
        mean := Mean(hist)
        stdev := stdDev(hist, mean)
        fmt.Println("mean=", mean, "stdev=", stdev)
    }
    
    const N = 1024 * 1024 * 1024
    
    var wg sync.WaitGroup
    var stopCh chan time.Time = make(chan time.Time)
    
    var hist []int = make([]int, 65536)
    
    var s *BigStruct = &BigStruct{empty: true,
        big: make([]uint16, N), //2GB
    }
    
    var rc chan uint16 = make(chan uint16)
    var wc chan uint16 = make(chan uint16)
    
    func next(pos int) int {
        pos++
        if pos >= N {
            pos = 0
        }
        return pos
    }
    
    func manage() {
        dataReady := false
        var data uint16
        for {
            if !dataReady && !s.empty {
                dataReady = true
                data = s.big[s.rpos]
                s.rpos++
                if s.rpos >= N {
                    s.rpos = 0
                }
                s.empty = s.rpos == s.wpos
                s.full = next(s.wpos) == s.rpos
            }
            if dataReady {
                select {
                case rc <- data:
                    dataReady = false
                default:
                    runtime.Gosched()
                }
            }
            if !s.full {
                select {
                case d := <-wc:
                    s.big[s.wpos] = d
                    s.wpos++
                    if s.wpos >= N {
                        s.wpos = 0
                    }
                    s.empty = s.rpos == s.wpos
                    s.full = next(s.wpos) == s.rpos
                default:
                    runtime.Gosched()
                }
            }
            if s.stopped {
                if s.empty {
                    wg.Done()
                    return
                }
            }
    
        }
    }
    
    func read() {
        for {
            d := <-rc
            hist[d]++
        }
    }
    
    func write() {
        for {
            wc <- uint16(rand.Intn(65536))
            select {
            case <-stopCh:
                s.stopped = true
                return
            default:
                runtime.Gosched()
            }
        }
    }
    
    func stdDev(data []int, mean float64) float64 {
        sum := 0.0
        for _, d := range data {
            sum += math.Pow(float64(d)-mean, 2)
        }
        variance := sum / float64(len(data)-1)
        return math.Sqrt(variance)
    }
    func Mean(data []int) float64 {
        sum := 0.0
        for _, d := range data {
            sum += float64(d)
        }
        return sum / float64(len(data))
    }
    

    5: another way(faster) for some use cases:
    here another way to use shared data structure for read job/write job/ processing job which it was separated in first post, now here doing same 3 jobs without channels and without mutex.

    working sample:

    package main
    
    import (
        "fmt"
        "math"
        "math/rand"
        "time"
    )
    
    type BigStruct struct {
        big     []uint16
        rpos    int
        wpos    int
        full    bool
        empty   bool
        stopped bool
    }
    
    func manage() {
        for {
            if !s.empty {
                hist[s.big[s.rpos]]++ //sample read job with any time len
                nextPtr(&s.rpos)
            }
            if !s.full && !s.stopped {
                s.big[s.wpos] = uint16(rand.Intn(65536)) //sample wrire job with any time len
                nextPtr(&s.wpos)
            }
            if s.stopped {
                if s.empty {
                    return
                }
            } else {
                s.stopped = time.Since(t0) >= 5*time.Second
            }
        }
    }
    
    func main() {
        t0 = time.Now()
        manage()
        mean := Mean(hist)
        stdev := StdDev(hist, mean)
        fmt.Println("mean=", mean, "stdev=", stdev)
        d0 := time.Since(t0)
        fmt.Println(d0) //5.8523347s
    }
    
    var t0 time.Time
    
    const N = 100 * 1024 * 1024
    
    var hist []int = make([]int, 65536)
    
    var s *BigStruct = &BigStruct{empty: true,
        big: make([]uint16, N), //2GB
    }
    
    func next(pos int) int {
        pos++
        if pos >= N {
            pos = 0
        }
        return pos
    }
    func nextPtr(pos *int) {
        *pos++
        if *pos >= N {
            *pos = 0
        }
    
        s.empty = s.rpos == s.wpos
        s.full = next(s.wpos) == s.rpos
    }
    
    func StdDev(data []int, mean float64) float64 {
        sum := 0.0
        for _, d := range data {
            sum += math.Pow(float64(d)-mean, 2)
        }
        variance := sum / float64(len(data)-1)
        return math.Sqrt(variance)
    }
    func Mean(data []int) float64 {
        sum := 0.0
        for _, d := range data {
            sum += float64(d)
        }
        return sum / float64(len(data))
    }
    
    评论

报告相同问题?

悬赏问题

  • ¥15 关于#hadoop#的问题
  • ¥15 (标签-Python|关键词-socket)
  • ¥15 keil里为什么main.c定义的函数在it.c调用不了
  • ¥50 切换TabTip键盘的输入法
  • ¥15 可否在不同线程中调用封装数据库操作的类
  • ¥15 微带串馈天线阵列每个阵元宽度计算
  • ¥15 keil的map文件中Image component sizes各项意思
  • ¥20 求个正点原子stm32f407开发版的贪吃蛇游戏
  • ¥15 划分vlan后,链路不通了?
  • ¥20 求各位懂行的人,注册表能不能看到usb使用得具体信息,干了什么,传输了什么数据