dongliufa6380 2019-04-12 07:06
浏览 243
已采纳

如何在Golang中释放WebSocket和Redis网关服务器资源?

I have a gateway server, which can push message to client side by using websocket, A new client connected to my server, I will generate a cid for it. And then I also subscribe a channel, which using cid. If any message publish to this channel, My server will push it to client side. For now, all unit are working fine, but when I try to test with benchmark test by thor, it will crash, I fine the DeliverMessage has some issue, it would never exit, since it has a die-loop. but since redis need to subscribe something, I don't know how to avoid loop.

func (h *Hub) DeliverMessage(pool *redis.Pool) {
    conn := pool.Get()
    defer conn.Close()
    var gPubSubConn *redis.PubSubConn
    gPubSubConn = &redis.PubSubConn{Conn: conn}
    defer gPubSubConn.Close()

    for {
        switch v := gPubSubConn.Receive().(type) {
        case redis.Message:
            // fmt.Printf("Channel=%q |  Data=%s
", v.Channel, string(v.Data))
            h.Push(string(v.Data))
        case redis.Subscription:
            fmt.Printf("Subscription message: %s : %s %d
", v.Channel, v.Kind, v.Count)
        case error:
            fmt.Println("Error pub/sub, delivery has stopped", v)
            panic("Error pub/sub")
        }
    }
}

In the main function, I have call the above function as:

go h.DeliverMessage(pool)

But when I test it with huge connection, it get me some error like:

ERR max number of clients reached

So, I change the redis pool size by change MaxIdle:

func newPool(addr string) *redis.Pool {
    return &redis.Pool{
        MaxIdle:     5000,
        IdleTimeout: 240 * time.Second,
        Dial:        func() (redis.Conn, error) { return redis.Dial("tcp", addr) },
    }
}

But it still doesn't work, so I wonder to know, if there any good way to kill a goroutine after my websocket disconnected to my server on the below selection:

case client := <-h.Unregister:
    if _, ok := h.Clients[client]; ok {
        delete(h.Clients, client)
        delete(h.Connections, client.CID)
        close(client.Send)
        if err := gPubSubConn.Unsubscribe(client.CID); err != nil {
            panic(err)
        }
        // TODO kill subscribe goroutine if don't client-side disconnected ...

    }

But How do I identify this goroutine? How can I do it like unix way. kill -9 <PID>?

  • 写回答

2条回答 默认 最新

  • duanmao1319 2019-04-16 10:09
    关注

    Actually, I got wrong design architecture, I am going to explain what I want to do.

    1. A client can connect to my websocket server;

    2. The server have several handler of http, and the admin can post data via the handler, the structure of the data can be like:

      {
         "cid": "something",
         "body": {
          }
      }
      

    Since, I have several Nodes are running to service our client, and the Nginx can dispatch each request from admin to totally different Node, but only one Node has hold on the connection about cid with "something", so I will need to publish this data to Redis, if any Node has got the data, it's going to send this message to the client side.

    3.Looking for the NodeID, which i am going to Publish to by given an cid.

    // redis code & golang
    NodeID, err := conn.Do("HGET", "NODE_MAP", cid)
    

    4.For now, I can publish any message from the admin, and publish to the NodeID, which we have got at step 3.

    // redis code & golang
    NodeID, err := conn.Do("PUBLISH", NodeID, data)
    
    1. Time to show the core code, which related to this question. I am going to subscribe a channel, which name is NodeID. like the following.

      go func(){
        for {
          switch v := gPubSubConn.Receive().(type) {
          case redis.Message:
              fmt.Println("Got a message", v.Data)
              h.Broadcast <- v.Data
              pipeline <- v.Data
          case error:
              panic(v)
          }
        }
      }()
      

    6.To manage your websocket, you do also need a goroutine to do that. like the following way:

       go func () {
         for {
                select {
                case client := <-h.Register:
                    h.Clients[client] = true
                    cid := client.CID
                    h.Connections[cid] = client
    
                    body := "something"
                    client.Send <- msg // greeting
    
                case client := <-h.Unregister:
                    if _, ok := h.Clients[client]; ok {
                        delete(h.Clients, client)
                        delete(h.Connections, client.CID)
                        close(client.Send)
                    }
                case message := <-h.Broadcast:
                    fmt.Println("message is", message)
                }
            }
          }()
    
    1. The last thing is manage a redis pool, you don't really need a connection pool right now. since we only have two goroutine, one main process.

      func newPool(addr string) *redis.Pool {
          return &redis.Pool{
              MaxIdle:     100,
              IdleTimeout: 240 * time.Second,
              Dial:        func() (redis.Conn, error) { return redis.Dial("tcp", addr) },
          }
      }
      
      var (
          pool        *redis.Pool
          redisServer = flag.String("redisServer", ":6379", "")
      )
      pool = newPool(*redisServer)
      conn := pool.Get()
      defer conn.Close()
      
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥15 metadata提取的PDF元数据,如何转换为一个Excel
  • ¥15 关于arduino编程toCharArray()函数的使用
  • ¥100 vc++混合CEF采用CLR方式编译报错
  • ¥15 coze 的插件输入飞书多维表格 app_token 后一直显示错误,如何解决?
  • ¥15 vite+vue3+plyr播放本地public文件夹下视频无法加载
  • ¥15 c#逐行读取txt文本,但是每一行里面数据之间空格数量不同
  • ¥50 如何openEuler 22.03上安装配置drbd
  • ¥20 ING91680C BLE5.3 芯片怎么实现串口收发数据
  • ¥15 无线连接树莓派,无法执行update,如何解决?(相关搜索:软件下载)
  • ¥15 Windows11, backspace, enter, space键失灵