dongyang2229 2018-01-14 05:58
浏览 156
已采纳

当websocket连接关闭时,关闭redis订阅并结束go例程

I'm pushing events from a redis subscription to a client who is connected via websocket. I'm having trouble unsubscribing and exiting the redis go routine when the client disconnects the websocket.

Inspired by this post, here's what I have thus far. I'm able to receive subscription events and send messages to the client via websocket, but when the client closes the websocket and the defer close(done) code fires, my case b, ok := <-done: doesn't fire. It seems to be overloaded by the default case???

package api

import (
    ...

    "github.com/garyburd/redigo/redis"
    "github.com/gorilla/websocket"
)

func wsHandler(w http.ResponseWriter, r *http.Request) {
    var upgrader = websocket.Upgrader{
        ReadBufferSize:  1024,
        WriteBufferSize: 1024,
    }

    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        HandleError(w, err)
        return
    }
    defer conn.Close()

    done := make(chan bool)
    defer close(done)

    for {
        var req WSRequest
        err := conn.ReadJSON(&req)
        if err != nil {
            HandleWSError(conn, err)
            return
        }
        defer conn.Close()

        go func(done chan bool, req *WSRequest, conn *websocket.Conn) {
            rc := redisPool.Get()
            defer rc.Close()

            psc := redis.PubSubConn{Conn: rc}
            if err := psc.PSubscribe(req.chanName); err != nil {
                HandleWSError(conn, err)
                return
            }
            defer psc.PUnsubscribe()

            for {
                select {
                case b, ok := <-done:
                    if !ok || b == true {
                        return
                    }
                default:
                    switch v := psc.Receive().(type) {
                    case redis.PMessage:
                        err := handler(conn, req, v)
                        if err != nil {
                            HandleWSError(conn, err)
                        }

                    case redis.Subscription:
                         log.Printf("%s: %s %d
", v.Channel, v.Kind, v.Count)

                    case error:
                         log.Printf("error in redis subscription; err:
%v
", v)
                         HandleWSError(conn, v)

                    default:
                        // do nothing...
                        log.Printf("unknown redis subscription event type; %s
", reflect.TypeOf(v))
                    }
                }
            }
        }(done, &req, conn)
    }
}
  • 写回答

1条回答 默认 最新

  • doumo1807831 2018-01-14 17:56
    关注

    Make these changes to break out of the read loop when done serving the websocket connection:

    • Maintain a slice of the Redis connections created for this websocket connection.

    • Unsubscribe all connections when done.

    • Modify the read loop to return when the subscription count is zero.

    Here's the code:

    func wsHandler(w http.ResponseWriter, r *http.Request) {
        var upgrader = websocket.Upgrader{
            ReadBufferSize:  1024,
            WriteBufferSize: 1024,
        }
    
        conn, err := upgrader.Upgrade(w, r, nil)
        if err != nil {
            HandleError(w, err)
            return
        }
        defer conn.Close()
    
        // Keep slice of all connections. Unsubscribe all connections on exit.
        var pscs []redis.PubSubConn
        defer func() {
            for _, psc := range rcs {
               psc.Unsubscribe() // unsubscribe with no args unsubs all channels
            }
        }()
    
        for {
            var req WSRequest
            err := conn.ReadJSON(&req)
            if err != nil {
                HandleWSError(conn, err)
                return
            }
    
            rc := redisPool.Get()
            psc := redis.PubSubConn{Conn: rc}
            pscs = append(pscs, psc)
    
            if err := psc.PSubscribe(req.chanName); err != nil {
                HandleWSError(conn, err)
                return
            }
    
            go func(req *WSRequest, conn *websocket.Conn) {
                defer rc.Close()
                for {
                    switch v := psc.Receive().(type) {
                    case redis.PMessage:
                        err := handler(conn, req, v)
                        if err != nil {
                            HandleWSError(conn, err)
                        }
    
                    case redis.Subscription:
                         log.Printf("%s: %s %d
    ", v.Channel, v.Kind, v.Count)
                         if v.Count == 0 {
                             return
                         }
    
                    case error:
                         log.Printf("error in redis subscription; err:
    %v
    ", v)
                         HandleWSError(conn, v)
    
                    default:
                        // do nothing...
                        log.Printf("unknown redis subscription event type; %s
    ", reflect.TypeOf(v))
                    }
                }
            }(&req, conn)
        }
    }
    

    The code in the question and this answer dial multiple Redis connections for each websocket client. A more typical and scalable approach is to share a single Redis pubsub connection across multiple clients. The typical approach may be appropriate for your application given the high-level description, but I am still unsure of what you are trying to do given the code in the question.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥20 我想使用一些网络协议或者部分协议也行,主要想实现类似于traceroute的一定步长内的路由拓扑功能
  • ¥30 深度学习,前后端连接
  • ¥15 孟德尔随机化结果不一致
  • ¥15 apm2.8飞控罗盘bad health,加速度计校准失败
  • ¥15 求解O-S方程的特征值问题给出边界层布拉休斯平行流的中性曲线
  • ¥15 谁有desed数据集呀
  • ¥20 手写数字识别运行c仿真时,程序报错错误代码sim211-100
  • ¥15 关于#hadoop#的问题
  • ¥15 (标签-Python|关键词-socket)
  • ¥15 keil里为什么main.c定义的函数在it.c调用不了