I'm trying to find a good method to consume asynchronously from an input queue, process the content using several workers and then publish to an output queue. So far I've tried a number of examples, most recently using the code from here and here as inspiration.
My current code doesn't appear to be doing what it should be however, increasing the number of workers doesn't increase performance (msg/s consumed or published) and the number of goroutines remains fairly static whilst running.
main:
func main() {
maxWorkers := 10
// channel for jobs
in := make(chan []byte)
out := make(chan []byte)
// start workers
wg := &sync.WaitGroup{}
wg.Add(maxWorkers)
for i := 1; i <= maxWorkers; i++ {
log.Println(i)
defer wg.Done()
go processor(in, out)
}
// add jobs
go collector(in)
go sender(out)
// wait for workers to complete
wg.Wait()
}
The collector is basically the example from the RabbitMQ site with a goroutine that collects messages from the queue and places them on the 'in' channel:
forever := make(chan bool)
go func() {
for d := range msgs {
in <- d.Body
d.Ack(false)
}
}()
log.Printf("[*] Waiting for messages. To exit press CTRL+C")
<-forever
The processor receives an 'in' and 'out' channel, unmarshals JSON, performs a series of regexes and then places the output into the 'out' channel:
func processor(in chan []byte, out chan []byte) {
var (
// list of regexes declared here
)
for {
body := <-in
jsonIn := &Data{}
err := json.Unmarshal(body, jsonIn)
if err != nil {
log.Fatalln("Failed to decode:", err)
}
content := jsonIn.Content
//process regexes using:
//jsonIn.a = r1.FindAllString(content, -1)
jsonOut, _ := json.Marshal(jsonIn)
out <- jsonOut
}
}
And finally the sender is simply the code from the RabbitMQ site, setting up a connection, reading from the 'out' channel and then publishing to a RMQ queue:
for {
jsonOut := <-out
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/json",
Body: []byte(jsonOut),
})
failOnError(err, "Failed to publish a message")
}
This is a pattern that I'll be using quite a lot, so I'm spending a lot of time trying to find something that works correctly (and well) - any advice or help would be appreciated (and in case it isn't obvious, I'm new to Go).