I have a new TCP server written in Go that has 100+ clients attached to it. Each client streams in data that needs to be looked at centrally as they are looking at radio packets over the air waves from various locations which then get analysed. The code works but I am seeing a lot of contention and increased CPU around the locking and was after some thoughts on how to avoid the locking (if possible) or optimise around it.
As the TCP server spins up a GoRoutine for each packet received the addMessage
function needs a level of synchronisation. These packets also get analysed in another function later on that does a RLock()
on the map.
It is the cullMessages()
function that gets called once per second that really gets caught up in itself and can really slow down, sometimes taking 2-3 seconds to run which compounds the issue as the next 2-3 operations are queued waiting to unlock and run straight away!
Any ideas/thoughts would be appreciated!
var dataMessagesMutex sync.RWMutex
var dataMessages map[string][]*trackingPacket_v1
// Function is called from each TCP client who need to share this data
func addMessage(trackingPacket *trackingPacket_v1) {
dataMessagesMutex.Lock()
dataMessages[trackingPacket.packetID] = append(dataMessages[trackingPacket.packetID], trackingPacket)
dataMessagesMutex.Unlock()
}
// Function called on a loop, need to delete based on age here
func cullMessages() {
cullTS := time.Now().Add(-time.Second * MODES_MAX_MESSAGE_AGE)
dataMessagesMutex.Lock()
defer dataMessagesMutex.Unlock()
for avr, data := range dataMessages {
sort.Sort(PacketSorter(data))
highestIndex := 0
for i, messages := range data {
if cullTS.Sub(messages.ProcessedTime) > 0 {
// Need to delete the message here
messages = nil
highestIndex = i
}
}
// Copy the new slice into the data variable
data = data[highestIndex+1:]
if len(data) == 0 {
// Empty Messages, delete
delete(dataMessages, avr)
}
}
}
UPDATE: Added analysis function
func processCandidates() {
mlatMessagesMutex.RLock()
defer dataMessagesMutex.RUnlock()
for _, data := range dataMessages {
numberOfMessages := len(data)
for a := 0; a < numberOfMessages; a++ {
packetA := data[a]
applicablePackets := []*trackingPacket_v1{packetA}
for b := 0; b < numberOfMessages; b++ {
// Don't compare identical packets
if b == a {
continue
}
packetB := data[b]
// Only consider this packet if it's within an acceptable
// timestamp threshold
tsDelta := math.Abs(packetA.NormalisedTS - packetB.NormalisedTS)
if tsDelta < MAX_MESSAGE_TS_DIFF {
// Finally, we need to make sure that only one message per
// station is included in our batch
stationAlreadyRepresented := false
for i := 0; i < len(applicablePackets); i++ {
if applicablePackets[i].Sharecode == packetB.Sharecode {
stationAlreadyRepresented = true
}
}
if stationAlreadyRepresented == false {
applicablePackets = append(applicablePackets, packetB)
}
}
}
// Remove any stations which are deemed too close to one another
if len(applicablePackets) >= MIN_STATIONS_NEEDED {
applicablePackets = cullPackets(applicablePackets)
}
// Provided we still have enough packets....
if len(applicablePackets) >= MIN_STATIONS_NEEDED {
// Generate a hash for this batch...
hash := generateHashForPackets(applicablePackets)
batchIsUnique := true
for _, packet := range applicablePackets {
if packet.containsHash(hash) {
batchIsUnique = false
break
}
}
if batchIsUnique == true {
for _, packet := range applicablePackets {
packet.addHash(hash)
}
go sendOfDataForWork(applicablePackets)
}
}
}
}
}