Here is a simple concurrent map that I wrote for learning purpose
package concurrent_hashmap
import (
"hash/fnv"
"sync"
)
type ConcurrentMap struct {
buckets []ThreadSafeMap
bucketCount uint32
}
type ThreadSafeMap struct {
mapLock sync.RWMutex
hashMap map[string]interface{}
}
func NewConcurrentMap(bucketSize uint32) *ConcurrentMap {
var threadSafeMapInstance ThreadSafeMap
var bucketOfThreadSafeMap []ThreadSafeMap
for i := 0; i <= int(bucketSize); i++ {
threadSafeMapInstance = ThreadSafeMap{sync.RWMutex{}, make(map[string]interface{})}
bucketOfThreadSafeMap = append(bucketOfThreadSafeMap, threadSafeMapInstance)
}
return &ConcurrentMap{bucketOfThreadSafeMap, bucketSize}
}
func (cMap *ConcurrentMap) Put(key string, val interface{}) {
bucketIndex := hash(key) % cMap.bucketCount
bucket := cMap.buckets[bucketIndex]
bucket.mapLock.Lock()
bucket.hashMap[key] = val
bucket.mapLock.Unlock()
}
// Helper
func hash(s string) uint32 {
h := fnv.New32a()
h.Write([]byte(s))
return h.Sum32()
}
I am trying to write a simple benchmark and I find that synchronize access will work correctly but concurrent access will get
fatal error: concurrent map writes
Here is my benchmark run with go test -bench=. -race
package concurrent_hashmap
import (
"testing"
"runtime"
"math/rand"
"strconv"
"sync"
)
// Concurrent does not work
func BenchmarkMyFunc(b *testing.B) {
var wg sync.WaitGroup
runtime.GOMAXPROCS(runtime.NumCPU())
my_map := NewConcurrentMap(uint32(4))
for n := 0; n < b.N; n++ {
go insert(my_map, wg)
}
wg.Wait()
}
func insert(my_map *ConcurrentMap, wg sync.WaitGroup) {
wg.Add(1)
var rand_int int
for element_num := 0; element_num < 1000; element_num++ {
rand_int = rand.Intn(100)
my_map.Put(strconv.Itoa(rand_int), rand_int)
}
defer wg.Done()
}
// This works
func BenchmarkMyFuncSynchronize(b *testing.B) {
my_map := NewConcurrentMap(uint32(4))
for n := 0; n < b.N; n++ {
my_map.Put(strconv.Itoa(123), 123)
}
}
The WARNING: DATA RACE
is saying that bucket.hashMap[key] = val
is causing the problem, but I am confused on why that is possible, since I lock that logic whenever write is happening.
I think I am missing something basic, can someone point out my mistake?
Thanks
Edit1:
Not sure if this helps but here is what my mutex looks like if I don't lock anything
{{0 0} 0 0 0 0}
Here is what it looks like if I lock the write
{{1 0} 0 0 -1073741824 0}
Not sure why my readerCount is a low negative number
Edit:2
I think I find where the issue is at, but not sure why I have to code that way
The issue is
type ThreadSafeMap struct {
mapLock sync.RWMutex // This is causing problem
hashMap map[string]interface{}
}
it should be
type ThreadSafeMap struct {
mapLock *sync.RWMutex
hashMap map[string]interface{}
}
Another weird thing is that in Put
if I put print statement inside lock
bucket.mapLock.Lock()
fmt.Println("start")
fmt.Println(bucket)
fmt.Println(bucketIndex)
fmt.Println(bucket.mapLock)
fmt.Println(&bucket.mapLock)
bucket.hashMap[key] = val
defer bucket.mapLock.Unlock()
The following prints is possible
start
start
{0x4212861c0 map[123:123]}
{0x4212241c0 map[123:123]}
Its weird because each start
printout should be follow with 4 lines of bucket info since you cannot have start
back to back because that would indicate that multiple thread is access the line inside lock
Also for some reason each bucket.mapLock
have different address even if I make the bucketIndex static, that indicate that I am not even accessing the same lock.
But despite the above weirdness changing mutex to pointer solves my problem
I would love to find out why I need pointers for mutex and why the prints seem to indicate multiple thread is accessing the lock and why each lock has different address.