What I did:
I'm using the golang
Redis library from github.com/go-redis/redis
. My client listens on a PubSub channel called 'control'. Whenever a message arrives, I handle it and continue receiving the next message.
I listen endlessly and the messages can come often, or sometimes not for days.
What I expect:
I expect the redis channel to stay open endlessly and receive messages as they are sent.
What I experience:
Usually it runs well for days, but every once in a while client.Receive()
returns EOF
error. After this error, the client no longer receives messages on that channel.
Internally, the redis client prints to stdout the following message:
redis: 2019/08/29 14:18:57 pubsub.go:151: redis: discarding bad PubSub connection: EOF
Disclaimer: I am not certain that this error is what causes me to stop receiving messages, it just seems related.
Additional questions:
I'd like to understand why this happens, if this is normal and if reconnecting to the channel via client.Subscribe()
whenever I encounter the behaviour is a good remedy, or should I address the root issue, whatever it may be.
The code:
Here is the entire code that handles my client (connect to redis, subscribe to channel, endlessly receive messages):
func InitAndListenAsync(log *log.Logger, sseHandler func(string, string) error) error {
rootLogger = log.With(zap.String("component", "redis-client"))
host := env.RedisHost
port := env.RedisPort
pass := env.RedisPass
addr := fmt.Sprintf("%s:%s", host, port)
tlsCfg := &tls.Config{}
client = redis.NewClient(&redis.Options{
Addr: addr,
Password: pass,
TLSConfig: tlsCfg,
})
if _, err := client.Ping().Result(); err != nil {
return err
}
go func() {
controlSub := client.Subscribe("control")
defer controlSub.Close()
for {
in, err := controlSub.Receive() // *** SOMETIMES RETURNS EOF ERROR ***
if err != nil {
rootLogger.Error("failed to get feedback", zap.Error(err))
break
}
switch in.(type) {
case *redis.Message:
cm := comm.ControlMessageEvent{}
payload := []byte(in.(*redis.Message).Payload)
if err := json.Unmarshal(payload, &cm); err != nil {
rootLogger.Error("failed to parse control message", zap.Error(err))
} else if err := handleIncomingEvent(&cm); err != nil {
rootLogger.Error("failed to handle control message", zap.Error(err))
}
default:
rootLogger.Warn("Received unknown input over REDIS PubSub control channel", zap.Any("received", in))
}
}
}()
return nil
}