普通网友 2018-08-03 11:02
浏览 65
已采纳

对UDP消息启动多个计时器并取消

I'm listening for messages on UDP. We've got devices that announce themselves this way. They also say when they will send the next announcement. If this does not happen we assume a device is gone.

I'd like to make a list of devices that are currently in the network. I'd like to add new devices and remove those that I haven't heard from.

Here is what I've got so far.

1) I've got an in memory db which holds all the devices.

func NewDB() *DB {
    return &DB{
        table: make(map[string]Announcement),
    }
}

type DB struct {
    mutex sync.Mutex
    table map[string]Announcement
}

func (db *DB) Set(ip string, ann Announcement) {
    db.mutex.Lock()
    defer db.mutex.Unlock()
    db.table[ip] = ann
}

func (db *DB) Delete(ip string) {
    db.mutex.Lock()
    defer db.mutex.Unlock()
    delete(db.table, ip)
}

func (db *DB) Snapshot() map[string]Announcement {
    db.mutex.Lock()
    defer db.mutex.Unlock()
    return db.table
}

2) I've got web server that serves this db to my JavaScript frontend

http.HandleFunc("/json", func(w http.ResponseWriter, r *http.Request) {
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(db.Snapshot())
})

// start server
go func() {
    log.Fatal(http.ListenAndServe(":8085", nil))
}()

3) And finally I'm listening for UDP messages. Whenever a new device is added to the db I also create a new timer with the provided timeout (here I just set it to 10 seconds). When a new messages arrives I check for an existing timer, stop it when it exists and start it again to clear the device if it doesn't send messages anymore.

However I doesn't really work. The AfterFunc is called way to often. Although the device is still in the network it is removed from my db. Any ideas?

// some global variable
var (
    timers = map[string]*time.Timer{}
)

for {
    // create new buffer
    b := make([]byte, 1500)

    // read message from udp into buffer
    n, src, err := conn.ReadFromUDP(b)
    if err != nil {
        panic(err)
    }

    // convert raw json bytes to struct
    var ann Announcement
    if err := json.Unmarshal(b[:n], &ann); err != nil {
        panic(err)
    }

    // add announcement to db
    ip := src.IP.String()
    db.Set(ip, ann)

    // check for existing timer
    timer, ok := timers[ip]
    if ok {
        log.Println("stopping timer", ip)
        // stop existing timer
        timer.Stop()
    }

    // start new timer for device
    timer = time.AfterFunc(time.Second*10, func() {
        log.Println("time after func", ip)
        delete(timers, ip)
        db.Delete(ip)
    })

    // store timer in timers db
    timers[ip] = timer

    time.Sleep(250 * time.Millisecond)
}
  • 写回答

1条回答 默认 最新

  • douchongzhang9267 2018-08-03 11:38
    关注

    I think the problem you are having may be connected with the value of ip variable you are capturing in AfterFunc func.

    timer = time.AfterFunc(time.Second*10, func() {
            log.Println("time after func", ip)
            delete(timers, ip)
            db.Delete(ip)
        })
    

    From this code, delete on ip will be invoked with the value of ip variable at the moment this timer expires. So if in the meanwhile you received a packet from another device with different IP, its this one that will be deleted.

    Example of what's happening:

    • second 1: Device with IP 1.2.3.4 sends UDP packet. ip = 1.2.3.4, AfterFunc is invoked, 10 second timer is started
    • second 3: Device with IP 4.5.6.7 sends UDP packet. Now ip = 4.5.6.7, AfterFunc is invoked, 10 second timer is started
    • second 10: function that deletes current value of ip variable is invoked, device 4.5.6.7 is deleted
    • second 13: second timer times out, and we try to delete 4.5.6.7 again

    As a result, device with IP 1.2.3.4 never gets deleted.

    You can fix that by creating a function that takes an argument and returns func() with current value of argument.

    timer = time.AfterFunc(time.Second*10, func(ip string) func() {
        return func() {
            log.Println("time after func", ip)
            delete(timers, ip)
            db.Delete(ip)
        }   
    }(ip))
    

    Simpler working example:

    package main
    
    import (
        "fmt"
        "time"
    )
    
    func main() {
        fmt.Println("Capturing value of i at the moment of execution of func()")
        for i := 0; i < 5; i++ {
    
            afterFuncTimer := time.AfterFunc(time.Second*2, func() {
                fmt.Printf("AfterFunc() with %v
    ", i)
            })
    
            defer afterFuncTimer.Stop()
        }
    
        time.Sleep(5 * time.Second)
    
        fmt.Println("Capturing value of i from the loop")
        for i := 0; i < 5; i++ {
    
            afterFuncTimer := time.AfterFunc(time.Second*2, func(i int) func() {
                return func() {
                    fmt.Printf("AfterFunc() with %v
    ", i)
                }
            }(i))
    
            defer afterFuncTimer.Stop()
        }
    
        time.Sleep(5 * time.Second)
    
    }
    

    Run it on Go Playground: https://play.golang.org/p/bGWzTaWe3ZU

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 MCNP里如何定义多个源?
  • ¥20 双层网络上信息-疾病传播
  • ¥50 paddlepaddle pinn
  • ¥20 idea运行测试代码报错问题
  • ¥15 网络监控:网络故障告警通知
  • ¥15 django项目运行报编码错误
  • ¥15 请问这个是什么意思?
  • ¥15 STM32驱动继电器
  • ¥15 Windows server update services
  • ¥15 关于#c语言#的问题:我现在在做一个墨水屏设计,2.9英寸的小屏怎么换4.2英寸大屏