duangan6731 2017-08-20 17:03
浏览 55

甚至使用互斥体进行自定义并发映射的Golang数据竞赛

Here is a simple concurrent map that I wrote for learning purpose

    package concurrent_hashmap

    import (
        "hash/fnv"
        "sync"
    )

    type ConcurrentMap struct {
        buckets []ThreadSafeMap
        bucketCount uint32
    }

    type ThreadSafeMap struct {
        mapLock sync.RWMutex
        hashMap map[string]interface{}
    }

    func NewConcurrentMap(bucketSize uint32) *ConcurrentMap {
        var threadSafeMapInstance ThreadSafeMap
        var bucketOfThreadSafeMap []ThreadSafeMap

        for i := 0; i <= int(bucketSize); i++ {
            threadSafeMapInstance = ThreadSafeMap{sync.RWMutex{}, make(map[string]interface{})}
            bucketOfThreadSafeMap = append(bucketOfThreadSafeMap, threadSafeMapInstance)
        }

        return &ConcurrentMap{bucketOfThreadSafeMap, bucketSize}
    }

    func (cMap *ConcurrentMap) Put(key string, val interface{}) {
        bucketIndex := hash(key) % cMap.bucketCount
        bucket := cMap.buckets[bucketIndex]
        bucket.mapLock.Lock()
        bucket.hashMap[key] = val
        bucket.mapLock.Unlock()
    }

    // Helper
    func hash(s string) uint32 {
        h := fnv.New32a()
        h.Write([]byte(s))
        return h.Sum32()
    }

I am trying to write a simple benchmark and I find that synchronize access will work correctly but concurrent access will get

fatal error: concurrent map writes

Here is my benchmark run with go test -bench=. -race

package concurrent_hashmap

import (
    "testing"
    "runtime"
    "math/rand"
    "strconv"
    "sync"
)
// Concurrent does not work
func BenchmarkMyFunc(b *testing.B) {
    var wg sync.WaitGroup

    runtime.GOMAXPROCS(runtime.NumCPU())

    my_map := NewConcurrentMap(uint32(4))
    for n := 0; n < b.N; n++ {
        go insert(my_map, wg)
    }
    wg.Wait()
}

func insert(my_map *ConcurrentMap, wg sync.WaitGroup) {
    wg.Add(1)
    var rand_int int
    for element_num := 0; element_num < 1000; element_num++ {
        rand_int = rand.Intn(100)
        my_map.Put(strconv.Itoa(rand_int), rand_int)
    }
    defer wg.Done()
}

// This works
func BenchmarkMyFuncSynchronize(b *testing.B) {
    my_map := NewConcurrentMap(uint32(4))
    for n := 0; n < b.N; n++ {
        my_map.Put(strconv.Itoa(123), 123)
    }
}

The WARNING: DATA RACE is saying that bucket.hashMap[key] = val is causing the problem, but I am confused on why that is possible, since I lock that logic whenever write is happening.

I think I am missing something basic, can someone point out my mistake?

Thanks

Edit1:

Not sure if this helps but here is what my mutex looks like if I don't lock anything

{{0 0} 0 0 0 0}

Here is what it looks like if I lock the write

{{1 0} 0 0 -1073741824 0}

Not sure why my readerCount is a low negative number

Edit:2

I think I find where the issue is at, but not sure why I have to code that way

The issue is

type ThreadSafeMap struct {
    mapLock sync.RWMutex // This is causing problem
    hashMap map[string]interface{}
}

it should be

type ThreadSafeMap struct {
    mapLock *sync.RWMutex
    hashMap map[string]interface{}
}

Another weird thing is that in Put if I put print statement inside lock

bucket.mapLock.Lock()
fmt.Println("start")
fmt.Println(bucket)
fmt.Println(bucketIndex)
fmt.Println(bucket.mapLock)
fmt.Println(&bucket.mapLock)
bucket.hashMap[key] = val
defer bucket.mapLock.Unlock()

The following prints is possible

start
start
{0x4212861c0 map[123:123]}
{0x4212241c0 map[123:123]}

Its weird because each start printout should be follow with 4 lines of bucket info since you cannot have start back to back because that would indicate that multiple thread is access the line inside lock

Also for some reason each bucket.mapLock have different address even if I make the bucketIndex static, that indicate that I am not even accessing the same lock.

But despite the above weirdness changing mutex to pointer solves my problem

I would love to find out why I need pointers for mutex and why the prints seem to indicate multiple thread is accessing the lock and why each lock has different address.

  • 写回答

1条回答 默认 最新

  • douzhang8840 2017-08-24 19:16
    关注

    The problem is with the statement

    bucket := cMap.buckets[bucketIndex]
    

    bucket now contains copy of the ThreadSafeMap at that index. As sync.RWMutex is stored as value, a copy of it is made while assigning. But map maps hold references to an underlying data structure, so the copy of the pointer or the same map is passed. The code locks a copy of the lock while writing to a single map, which cause the problem.

    Thats why you don't face any problem when you change sync.RWMutex to *sync.RWMutex. It's better to store reference to structure in map as shown.

    package concurrent_hashmap
    
    import (
        "hash/fnv"
        "sync"
    )
    
    type ConcurrentMap struct {
        buckets     []*ThreadSafeMap
        bucketCount uint32
    }
    
    type ThreadSafeMap struct {
        mapLock sync.RWMutex
        hashMap map[string]interface{}
    }
    
    func NewConcurrentMap(bucketSize uint32) *ConcurrentMap {
        var threadSafeMapInstance *ThreadSafeMap
        var bucketOfThreadSafeMap []*ThreadSafeMap
    
        for i := 0; i <= int(bucketSize); i++ {
            threadSafeMapInstance = &ThreadSafeMap{sync.RWMutex{}, make(map[string]interface{})}
            bucketOfThreadSafeMap = append(bucketOfThreadSafeMap, threadSafeMapInstance)
        }
    
        return &ConcurrentMap{bucketOfThreadSafeMap, bucketSize}
    }
    
    func (cMap *ConcurrentMap) Put(key string, val interface{}) {
        bucketIndex := hash(key) % cMap.bucketCount
        bucket := cMap.buckets[bucketIndex]
        bucket.mapLock.Lock()
        bucket.hashMap[key] = val
        bucket.mapLock.Unlock()
    }
    
    // Helper
    func hash(s string) uint32 {
        h := fnv.New32a()
        h.Write([]byte(s))
        return h.Sum32()
    }
    

    It's possible to validate the scenario by modifying the function Put as follows

    func (cMap *ConcurrentMap) Put(key string, val interface{}) {
        //fmt.Println("index", key)
        bucketIndex := 1
        bucket := cMap.buckets[bucketIndex]
        fmt.Printf("%p %p
    ", &(bucket.mapLock), bucket.hashMap)
    }
    
    评论

报告相同问题?

悬赏问题

  • ¥15 想问一下树莓派接上显示屏后出现如图所示画面,是什么问题导致的
  • ¥100 嵌入式系统基于PIC16F882和热敏电阻的数字温度计
  • ¥15 cmd cl 0x000007b
  • ¥20 BAPI_PR_CHANGE how to add account assignment information for service line
  • ¥500 火焰左右视图、视差(基于双目相机)
  • ¥100 set_link_state
  • ¥15 虚幻5 UE美术毛发渲染
  • ¥15 CVRP 图论 物流运输优化
  • ¥15 Tableau online 嵌入ppt失败
  • ¥100 支付宝网页转账系统不识别账号