I have a gateway server, which can push message to client side by using websocket, A new client connected to my server, I will generate a cid
for it. And then I also subscribe a channel, which using cid
. If any message publish to this channel, My server will push it to client side. For now, all unit are working fine, but when I try to test with benchmark test by thor, it will crash, I fine the DeliverMessage
has some issue, it would never exit, since it has a die-loop. but since redis need to subscribe something, I don't know how to avoid loop.
func (h *Hub) DeliverMessage(pool *redis.Pool) {
conn := pool.Get()
defer conn.Close()
var gPubSubConn *redis.PubSubConn
gPubSubConn = &redis.PubSubConn{Conn: conn}
defer gPubSubConn.Close()
for {
switch v := gPubSubConn.Receive().(type) {
case redis.Message:
// fmt.Printf("Channel=%q | Data=%s
", v.Channel, string(v.Data))
h.Push(string(v.Data))
case redis.Subscription:
fmt.Printf("Subscription message: %s : %s %d
", v.Channel, v.Kind, v.Count)
case error:
fmt.Println("Error pub/sub, delivery has stopped", v)
panic("Error pub/sub")
}
}
}
In the main function, I have call the above function as:
go h.DeliverMessage(pool)
But when I test it with huge connection, it get me some error like:
ERR max number of clients reached
So, I change the redis pool size by change MaxIdle
:
func newPool(addr string) *redis.Pool {
return &redis.Pool{
MaxIdle: 5000,
IdleTimeout: 240 * time.Second,
Dial: func() (redis.Conn, error) { return redis.Dial("tcp", addr) },
}
}
But it still doesn't work, so I wonder to know, if there any good way to kill a goroutine after my websocket disconnected to my server on the below selection:
case client := <-h.Unregister:
if _, ok := h.Clients[client]; ok {
delete(h.Clients, client)
delete(h.Connections, client.CID)
close(client.Send)
if err := gPubSubConn.Unsubscribe(client.CID); err != nil {
panic(err)
}
// TODO kill subscribe goroutine if don't client-side disconnected ...
}
But How do I identify this goroutine? How can I do it like unix
way. kill -9 <PID>
?