duanqinjiao5244 2019-07-27 05:34
浏览 78

数据丢失通过通道发送给工作者

I am using go as a cosumer of rabbitmq , I run 5 worker and send the data received from rabbitmq to them trough a channel, when the producer send data to rabbit the first 1000 (cap of channel) messages handled correctly but when the next batch of messages comes from rabbitmq only 5 of them (number of workers) sent to workers, when the other 1000 messages come the number of messages sent to workers reaches 1000 again.it seems some messages(in my case 995 are missing) is there anything I am misunderstanding about using worker in go? or could you please help me to solve the problem

package main

import (
    "encoding/json"
    "github.com/streadway/amqp"
    "log"
)

type job struct {
    Receiver    string `json:"receiver"`
    Subject     string `json:"subject"`
    Content     string `json:"content"`
    Priority    int    `json:"priority"`
    ScheduledAt string `json:"scheduledAt"`
    status      string
}

func newJob(data []byte) job {
    job := job{}
    json.Unmarshal(data, &job)
    job.status = "pending"
    return job
}

var count = 1

const url = "amqp://rabbitmquser:some_password@rabbitmq:5672/"
const queueName = "email_queue_1"

func worker(id int, jobs <-chan job, results chan<- string, errors chan<- string) {
    for j := range jobs {
        println("worker", id, "processed job")
        sendMessage(&j)
        saveJob(&j)
        results <- "done"
    }
}

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {

    con, err := amqp.Dial(url)
    failOnError(err, "failed to  connect to rabbitMq broker")
    defer con.Close()

    ch, err := con.Channel()
    failOnError(err, "failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        queueName,
        true,
        false,
        false,
        false,
        nil,
    )

    failOnError(err, "Failed to declare a queue")
    msgs, err := ch.Consume(
        q.Name,
        "",
        true,
        false,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to register a consumer")
    forever := make(chan bool)


    jobs, _, _ := createWorkers(5)

    go func() {
        for d := range msgs {
            //log.Printf("here")
            job := newJob(d.Body)
            //fmt.Println(job.status)
            //fmt.Println(err)
            jobs <- job
            if len(jobs) == 1000  {
                jobs, _, _ = createWorkers(5)
            }
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}

func createWorkers(i int) (chan job, chan string, chan string) {
    jobs := make(chan job, 1000)
    results := make(chan string, 1000)
    errors := make(chan string, 1000)
    for w := 1; w <= i; w++ {
        go worker(w, jobs, results, errors)
    }
    return jobs, results, errors
}

func sendMessage(job *job) {
    //here we should send message to user
    println(count)
    count++
    //send(*job)

}

func saveJob(job *job) {
    //here we should save data with statuses in database
}
  • 写回答

0条回答 默认 最新

    报告相同问题?

    悬赏问题

    • ¥15 如何在scanpy上做差异基因和通路富集?
    • ¥20 关于#硬件工程#的问题,请各位专家解答!
    • ¥15 关于#matlab#的问题:期望的系统闭环传递函数为G(s)=wn^2/s^2+2¢wn+wn^2阻尼系数¢=0.707,使系统具有较小的超调量
    • ¥15 FLUENT如何实现在堆积颗粒的上表面加载高斯热源
    • ¥30 截图中的mathematics程序转换成matlab
    • ¥15 动力学代码报错,维度不匹配
    • ¥15 Power query添加列问题
    • ¥50 Kubernetes&Fission&Eleasticsearch
    • ¥15 報錯:Person is not mapped,如何解決?
    • ¥15 c++头文件不能识别CDialog