drba1172 2016-01-01 23:48
浏览 53

如何停止rethinkdbs接下来(.Changes())

I am currently using RethinkDB in my app because of their real time event driven data. I currently have this watch function that checks for users that are online:

func (c *connection) watchUsers() {
    db := common.DB()
    query := gorethink.Table("Users").Filter(map[string]interface{}{
        "online": 1,
    }).Changes(gorethink.ChangesOpts{
        IncludeInitial: true,
    })
    res, err := query.Run(db)
    if err != nil {
        log.Println(err)
    }
    defer res.Close()
    var users interface{}
    for res.Next(&users) {
        if c.disconnecting {
            break
        }
        usersNewMap := users.(map[string]interface{})["new_val"]
        user := usersNewMap.(map[string]interface{})
        log.Println(user["username"])
        c.ws.WriteJSON(wsMsg{
            "user add",
            map[string]interface{}{
                "username": user["username"].(string),
                "uuid":     user["id"].(string),
            },
        })
    }
    log.Println("Ended on disconnect")
}

The only problem is I need the watchUsers function to return when the websocket disconnects. Right now I have

defer func() {
   c.disconnecting = true
}()

in the websocket its self which then when the watch tries to get more data on another announcement, it then will break the loop and end the goroutine. The only problem is what if there is no data to be broadcasted for a while and now this goroutine just hangs out for a while taking up unnecessary space. So my question is, how can I break this for loop with some sort of event driven method, it would be easier if I could just have res.Next be a channel because then I could use a select but this is not the case.

  • 写回答

1条回答 默认 最新

  • doutang6819 2016-01-02 02:45
    关注

    So I came up with a solution, which seems to work fine using a suggestion from @CodingPickle, in a nutshell, run the for loop in its own goroutine and then do res.Close() when its done watching.

    func (c *connection) watchUsers() {
        db := common.DB()
        query := gorethink.Table("Users").Filter(map[string]interface{}{
            "online": 1,
        }).Changes(gorethink.ChangesOpts{
            IncludeInitial: true,
        })
        res, err := query.Run(db)
        if err != nil {
            log.Println(err)
        }
    
        go func(res *gorethink.Cursor, c *connection) {
            defer res.Close()
            var users interface{}
            for res.Next(&users) {
                usersNewMap := users.(map[string]interface{})["new_val"]
                user := usersNewMap.(map[string]interface{})
                c.send <- wsMsg{
                    "user add",
                    map[string]interface{}{
                        "username": user["username"].(string),
                        "uuid":     user["id"].(string),
                    },
                }
            }
            log.Println("Ended gracefully")
        }(res, c)
    
        select {
        case <-c.disconnecting:
            res.Close()
            break
        case <-c.stopWatchingUsers:
            res.Close()
            break
        }
    }
    
    评论

报告相同问题?

悬赏问题

  • ¥15 python变量和列表之间的相互影响
  • ¥15 写一个方法checkPerson,入参实体类Person,出参布尔值
  • ¥15 我想咨询一下路面纹理三维点云数据处理的一些问题,上传的坐标文件里是怎么对无序点进行编号的,以及xy坐标在处理的时候是进行整体模型分片处理的吗
  • ¥15 CSAPPattacklab
  • ¥15 一直显示正在等待HID—ISP
  • ¥15 Python turtle 画图
  • ¥15 关于大棚监测的pcb板设计
  • ¥15 stm32开发clion时遇到的编译问题
  • ¥15 lna设计 源简并电感型共源放大器
  • ¥15 如何用Labview在myRIO上做LCD显示?(语言-开发语言)