I'm using PubSub on two different streams where we receive messages from one stream, run some logic, and if it fits certain criteria, we publish it to the second stream. The second stream is also being received from in a goroutine.
Now, I have two main functions HandleMessage
and HandleRetry
where the former is from the first stream, and the second is for the second stream.
The relevant code for HandleMessage
is as follows:
if c.handler.ShouldProcess(tx) {
err := c.handler.Process(tx)
if err != nil {
c.log.
WithError(err).
WithField("tx_hash", tx.TxHash.String()).
Error("failed to process")
retryMsg := RetryMessage{
Transaction: tx,
RemainingProcessingAttempts: c.config.MaxProcessingAttempts,
LastAttempt: time.Now(),
}
data, err := pubsub.EncodeMessage(retryMsg)
if err != nil {
c.log.WithError(err).Error("failed to convert retry msg to byte slice")
}
id, err := c.retryQueue.Publish(context.Background(), &pubsub.Message{Data: data})
if err != nil {
c.log.WithError(err).
WithField("id", id).
Error("failed to publish message to retry queue")
}
}
}
and in HandleRetry
, the function opens with
retryTx := new(RetryMessage)
err := pubsub.DecodeMessage(msg.Data, retryTx)
if err != nil {
c.log.WithError(err).
Error("failed to decode message: not a retry tx")
msg.Ack()
return
}
For the RetryQueue
which is handled by HandleRetry
-- there is no other input except the messages being published from HandleMessage
But, I keep getting a gob decoding error saying
level=error msg="failed to decode message: not a retry tx" env=LOCAL error="gob: type mismatch: no fields matched compiling decoder for RetryMessage"
RetryMessage
looks like this
type RetryMessage struct {
Transaction *firehose.Transaction
RemainingProcessingAttempts int
LastAttempt time.Time
}
The encoding and decoding functions are as follows
// EncodeMessage convert an arbitrary interface into a byte slice.
func EncodeMessage(data interface{}) ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err := enc.Encode(data)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// DecodeMessage decodes message data into the provided interface.
func DecodeMessage(data []byte, dest interface{}) error {
buf := bytes.NewBuffer(data)
dec := gob.NewDecoder(buf)
return dec.Decode(dest)
}