I am using go as a cosumer of rabbitmq , I run 5 worker and send the data received from rabbitmq to them trough a channel, when the producer send data to rabbit the first 1000 (cap of channel) messages handled correctly but when the next batch of messages comes from rabbitmq only 5 of them (number of workers) sent to workers, when the other 1000 messages come the number of messages sent to workers reaches 1000 again.it seems some messages(in my case 995 are missing) is there anything I am misunderstanding about using worker in go? or could you please help me to solve the problem
package main
import (
"encoding/json"
"github.com/streadway/amqp"
"log"
)
type job struct {
Receiver string `json:"receiver"`
Subject string `json:"subject"`
Content string `json:"content"`
Priority int `json:"priority"`
ScheduledAt string `json:"scheduledAt"`
status string
}
func newJob(data []byte) job {
job := job{}
json.Unmarshal(data, &job)
job.status = "pending"
return job
}
var count = 1
const url = "amqp://rabbitmquser:some_password@rabbitmq:5672/"
const queueName = "email_queue_1"
func worker(id int, jobs <-chan job, results chan<- string, errors chan<- string) {
for j := range jobs {
println("worker", id, "processed job")
sendMessage(&j)
saveJob(&j)
results <- "done"
}
}
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
con, err := amqp.Dial(url)
failOnError(err, "failed to connect to rabbitMq broker")
defer con.Close()
ch, err := con.Channel()
failOnError(err, "failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
queueName,
true,
false,
false,
false,
nil,
)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
jobs, _, _ := createWorkers(5)
go func() {
for d := range msgs {
//log.Printf("here")
job := newJob(d.Body)
//fmt.Println(job.status)
//fmt.Println(err)
jobs <- job
if len(jobs) == 1000 {
jobs, _, _ = createWorkers(5)
}
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
func createWorkers(i int) (chan job, chan string, chan string) {
jobs := make(chan job, 1000)
results := make(chan string, 1000)
errors := make(chan string, 1000)
for w := 1; w <= i; w++ {
go worker(w, jobs, results, errors)
}
return jobs, results, errors
}
func sendMessage(job *job) {
//here we should send message to user
println(count)
count++
//send(*job)
}
func saveJob(job *job) {
//here we should save data with statuses in database
}