dongyuxiao6295 2016-10-03 06:48
浏览 178
已采纳

RabbitMQ发布/订阅实现不起作用

I've converted the RabbitMQ pub/sub tutorial into the below dummy test. Somehow it is not working as expected.

amqpURL is a valid AMQP service (i.e. RabbitMQ) URL. I've tested it with the queue example and it works. Somehow it fails in "exchange"

I'd expect TestDummy to log " [x] Hello World". Somehow it is not happening. Only the sending half is working as expected.

What did I got wrong?

import (
    "fmt"
    "log"
    "testing"

    "github.com/streadway/amqp"
)

func TestDummy(t *testing.T) {
    done := exchangeReceive()
    exchangeSend("Hello World")
    <-done
}

func exchangeSend(msg string) {
    failOnError := func(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
        }
    }

    log.Printf("exchangeSend: connect %s", amqpURL)
    conn, err := amqp.Dial(amqpURL)
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    err = ch.ExchangeDeclare(
        "logs",   // name
        "fanout", // type
        true,     // durable
        false,    // auto-deleted
        false,    // internal
        false,    // no-wait
        nil,      // arguments
    )
    failOnError(err, "Failed to declare an exchange")

    body := []byte(msg)
    err = ch.Publish(
        "logs", // exchange
        "",     // routing key
        false,  // mandatory
        false,  // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(body),
        })
    failOnError(err, "Failed to publish a message")

    log.Printf(" [x] Sent %s", body)
}

func exchangeReceive() <-chan bool {

    done := make(chan bool)

    failOnError := func(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
        }
    }

    log.Printf("exchangeReceive: connect %s", amqpURL)
    conn, err := amqp.Dial(amqpURL)
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    err = ch.ExchangeDeclare(
        "logs",   // name
        "fanout", // type
        true,     // durable
        false,    // auto-deleted
        false,    // internal
        false,    // no-wait
        nil,      // arguments
    )
    failOnError(err, "Failed to declare an exchange")

    q, err := ch.QueueDeclare(
        "",    // name
        false, // durable
        false, // delete when usused
        true,  // exclusive
        false, // no-wait
        nil,   // arguments
    )
    failOnError(err, "Failed to declare a queue")

    err = ch.QueueBind(
        q.Name, // queue name
        "",     // routing key
        "logs", // exchange
        false,
        nil)
    failOnError(err, "Failed to bind a queue")

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    go func() {
        for d := range msgs {
            log.Printf(" [x] %s", d.Body)
            done <- true
        }
    }()

    log.Printf(" [*] Waiting for logs. To exit press CTRL+C")

    return done
}
  • 写回答

1条回答 默认 最新

  • douhushen3241 2016-10-04 03:21
    关注

    Some silly mistake here. When exchangeRecieve ends, the defer statments are triggered and hence closed the connections. That's why my rewrite fails.

    I've changed my code this way and it worked:

    import (
        "fmt"
        "os"
        "testing"
        "time"
    
        "github.com/streadway/amqp"
    )
    
    func TestDummy(t *testing.T) {
        amqpURL := os.Getenv("CLOUDAMQP_URL")
        t.Logf("  [*] amqpURL: %s", amqpURL)
    
        results1 := exchangeReceive(t, "consumer 1", amqpURL)
        results2 := exchangeReceive(t, "consumer 2", amqpURL)
        time.Sleep(50 * time.Millisecond)
    
        exchangeSend(t, amqpURL, "Hello World")
        if want, have := "Hello World", <-results1; want != have {
            t.Errorf("expected %#v, got %#v", want, have)
        }
        if want, have := "Hello World", <-results2; want != have {
            t.Errorf("expected %#v, got %#v", want, have)
        }
    }
    
    func exchangeReceive(t *testing.T, name, amqpURL string) <-chan string {
    
        out := make(chan string)
    
        failOnError := func(err error, msg string) {
            if err != nil {
                t.Fatalf("%s: %s", msg, err)
                panic(fmt.Sprintf("%s: %s", msg, err))
            }
        }
    
        conn, err := amqp.Dial(amqpURL)
        failOnError(err, "Failed to connect to RabbitMQ")
    
        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
    
        err = ch.ExchangeDeclare(
            "logs",   // name
            "fanout", // type
            true,     // durable
            false,    // auto-deleted
            false,    // internal
            false,    // no-wait
            nil,      // arguments
        )
        failOnError(err, "Failed to declare an exchange")
    
        q, err := ch.QueueDeclare(
            "",    // name
            false, // durable
            false, // delete when usused
            true,  // exclusive
            false, // no-wait
            nil,   // arguments
        )
        failOnError(err, "Failed to declare a queue")
    
        err = ch.QueueBind(
            q.Name, // queue name
            "",     // routing key
            "logs", // exchange
            false,
            nil)
        failOnError(err, "Failed to bind a queue")
    
        msgs, err := ch.Consume(
            q.Name, // queue
            "",     // consumer
            true,   // auto-ack
            false,  // exclusive
            false,  // no-local
            false,  // no-wait
            nil,    // args
        )
        failOnError(err, "Failed to register a consumer")
    
        go func() {
            for d := range msgs {
                t.Logf("  [x] %s received: %s", name, d.Body)
                out <- string(d.Body)
            }
        }()
    
        t.Logf("  [*] %s ready to receive", name)
        return out
    }
    
    func exchangeSend(t *testing.T, amqpURL, msg string) {
        failOnError := func(err error, msg string) {
            if err != nil {
                t.Fatalf("%s: %s", msg, err)
                panic(fmt.Sprintf("%s: %s", msg, err))
            }
        }
    
        conn, err := amqp.Dial(amqpURL)
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()
    
        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()
    
        err = ch.ExchangeDeclare(
            "logs",   // name
            "fanout", // type
            true,     // durable
            false,    // auto-deleted
            false,    // internal
            false,    // no-wait
            nil,      // arguments
        )
        failOnError(err, "Failed to declare an exchange")
    
        body := []byte(msg)
        err = ch.Publish(
            "logs", // exchange
            "",     // routing key
            false,  // mandatory
            false,  // immediate
            amqp.Publishing{
                ContentType: "text/plain",
                Body:        []byte(body),
            })
        failOnError(err, "Failed to publish a message")
    
        t.Logf(" [x] Sent %s", body)
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 执行 virtuoso 命令后,界面没有,cadence 启动不起来
  • ¥50 comfyui下连接animatediff节点生成视频质量非常差的原因
  • ¥20 有关区间dp的问题求解
  • ¥15 多电路系统共用电源的串扰问题
  • ¥15 slam rangenet++配置
  • ¥15 有没有研究水声通信方面的帮我改俩matlab代码
  • ¥15 ubuntu子系统密码忘记
  • ¥15 信号傅里叶变换在matlab上遇到的小问题请求帮助
  • ¥15 保护模式-系统加载-段寄存器
  • ¥15 电脑桌面设定一个区域禁止鼠标操作