duanhuangyun3887 2017-01-19 18:00
浏览 46
已采纳

大量的临时对象-避免争用

I have a new TCP server written in Go that has 100+ clients attached to it. Each client streams in data that needs to be looked at centrally as they are looking at radio packets over the air waves from various locations which then get analysed. The code works but I am seeing a lot of contention and increased CPU around the locking and was after some thoughts on how to avoid the locking (if possible) or optimise around it.

As the TCP server spins up a GoRoutine for each packet received the addMessage function needs a level of synchronisation. These packets also get analysed in another function later on that does a RLock() on the map.

It is the cullMessages() function that gets called once per second that really gets caught up in itself and can really slow down, sometimes taking 2-3 seconds to run which compounds the issue as the next 2-3 operations are queued waiting to unlock and run straight away!

Any ideas/thoughts would be appreciated!

var dataMessagesMutex sync.RWMutex
var dataMessages map[string][]*trackingPacket_v1

// Function is called from each TCP client who need to share this data
func addMessage(trackingPacket *trackingPacket_v1) {
    dataMessagesMutex.Lock()
    dataMessages[trackingPacket.packetID] = append(dataMessages[trackingPacket.packetID], trackingPacket)
    dataMessagesMutex.Unlock()
}

// Function called on a loop, need to delete based on age here
func cullMessages() {
    cullTS := time.Now().Add(-time.Second * MODES_MAX_MESSAGE_AGE)

    dataMessagesMutex.Lock()
    defer dataMessagesMutex.Unlock()

    for avr, data := range dataMessages {
        sort.Sort(PacketSorter(data))
        highestIndex := 0

        for i, messages := range data {
            if cullTS.Sub(messages.ProcessedTime) > 0 {
                // Need to delete the message here
                messages = nil
                highestIndex = i
            }
        }
        // Copy the new slice into the data variable
        data = data[highestIndex+1:]

        if len(data) == 0 {
            // Empty Messages, delete
            delete(dataMessages, avr)
        }
    }
}

UPDATE: Added analysis function

func processCandidates() {
    mlatMessagesMutex.RLock()
    defer dataMessagesMutex.RUnlock()

    for _, data := range dataMessages {
        numberOfMessages := len(data)
        for a := 0; a < numberOfMessages; a++ {
            packetA := data[a]
            applicablePackets := []*trackingPacket_v1{packetA}
            for b := 0; b < numberOfMessages; b++ {
                // Don't compare identical packets
                if b == a {
                    continue
                }

                packetB := data[b]

                // Only consider this packet if it's within an acceptable
                // timestamp threshold
                tsDelta := math.Abs(packetA.NormalisedTS - packetB.NormalisedTS)

                if tsDelta < MAX_MESSAGE_TS_DIFF {
                    // Finally, we need to make sure that only one message per
                    // station is included in our batch
                    stationAlreadyRepresented := false
                    for i := 0; i < len(applicablePackets); i++ {
                        if applicablePackets[i].Sharecode == packetB.Sharecode {
                            stationAlreadyRepresented = true
                        }
                    }

                    if stationAlreadyRepresented == false {

                        applicablePackets = append(applicablePackets, packetB)
                    }
                }
            }

            // Remove any stations which are deemed too close to one another
            if len(applicablePackets) >= MIN_STATIONS_NEEDED {
                applicablePackets = cullPackets(applicablePackets)
            }

            // Provided we still have enough packets....
            if len(applicablePackets) >= MIN_STATIONS_NEEDED {
                // Generate a hash for this batch...
                hash := generateHashForPackets(applicablePackets)
                batchIsUnique := true

                for _, packet := range applicablePackets {
                    if packet.containsHash(hash) {
                        batchIsUnique = false
                        break
                    }
                }

                if batchIsUnique == true {
                    for _, packet := range applicablePackets {
                        packet.addHash(hash)
                    }

                    go sendOfDataForWork(applicablePackets)
                }
            }

        }
    }
}
  • 写回答

1条回答 默认 最新

  • dswg47377 2017-01-19 18:34
    关注

    Instead of having one big map, have a goroutine for each packetID. A dispatcher goroutine could have a map[string]chan *trackingPacket_v1, and send the incoming packets on the appropriate channel. Then the goroutine for that packetID would collect the packets into a local slice, and cull them and analyze them at intervals.

    Somehow you would need to terminate the goroutines that haven't received a packet in MODES_MAX_MESSAGE_AGE. Probably the dispatcher goroutine would keep track of when each packetID was most recently seen, and periodically go through and check for ones that were too old. Then it would close those channels and remove them from its map. When the analysis goroutine discovered that its channel had been closed, it would exit.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 seatunnel-web使用SQL组件时候后台报错,无法找到表格
  • ¥15 fpga自动售货机数码管(相关搜索:数字时钟)
  • ¥15 用前端向数据库插入数据,通过debug发现数据能走到后端,但是放行之后就会提示错误
  • ¥30 3天&7天&&15天&销量如何统计同一行
  • ¥30 帮我写一段可以读取LD2450数据并计算距离的Arduino代码
  • ¥15 飞机曲面部件如机翼,壁板等具体的孔位模型
  • ¥15 vs2019中数据导出问题
  • ¥20 云服务Linux系统TCP-MSS值修改?
  • ¥20 关于#单片机#的问题:项目:使用模拟iic与ov2640通讯环境:F407问题:读取的ID号总是0xff,自己调了调发现在读从机数据时,SDA线上并未有信号变化(语言-c语言)
  • ¥20 怎么在stm32门禁成品上增加查询记录功能