I'm trying to set up consumers for topics that are published to our exchange, like an API gateway that publishes messages to one of several subscribers.
I can't find an example of multiple subscribers that aren't just put in the main function of a single file.
- Can I create a goroutine for each consumer without breaking the connection that every other consumer has?
- Is a channel inside a goroutine appropriate within another goroutine?
Here is the main file:
package main
func main() {
...
go myMessagePackage.Subscribe("example_topic1")
go myMessagePackage.Subscribe("example_topic2")
go myMessagePackage.Subscribe("example_topic3")
...
}
And this is what each consumer looks like:
package myMessagePackage
import...
func Subscribe(topic string, queueName string) {
conn, err := amqp.Dial(getConnectionString())
failOnError(err, "API Consumer failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "API Consumer failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
env.RabbitMq_Exchange_Name,
env.RabbitMq_Exchange_Type,
true,
false,
false,
false,
nil,
)
failOnError(err, "API Consumer failed to declare an exchange")
q, err := ch.QueueDeclare(
queueName,
true,
false,
false,
false,
nil,
)
failOnError(err, "API Consumer failed to declare a queue")
err = ch.QueueBind(
q.Name,
topic,
env.RabbitMq_Exchange_Name,
false,
nil)
failOnError(err, "API Consumer failed to bind a queue")
msgs, err := ch.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
failOnError(err, "API Consumer failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("API Consumer received a message from backend: %s", d.Body)
routeResponse(d.Body)
}
}()
log.Printf("API Consumer on topic %s started. Waiting for messages.", topic)
<-forever
}