douti6740 2019-02-28 13:36
浏览 22

如何在多个Goroutine循环内测量/测量延迟

For reproducing an error (404) Reason: "NOT_FOUND - no queue when subscribing to rabbitmq queues, I am using the following code to concurrently declare and consume queues:

package main

import (
    "fmt"
    "log"
    "os"
    "sync"
    "time"

    uuid "github.com/satori/go.uuid"
    "github.com/streadway/amqp"
)

func exit1(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    rUSER := "bunny"
    rPASS := "test"
    rHOST := "my-rabbit"
    rPORT := "5672"
    rVHOST := "hole"

    // read from ENV
    if e := os.Getenv("RABBITMQ_USER"); e != "" {
        rUSER = e
    }
    if e := os.Getenv("RABBITMQ_PASS"); e != "" {
        rPASS = e
    }
    if e := os.Getenv("RABBITMQ_HOST"); e != "" {
        rHOST = e
    }
    if e := os.Getenv("RABBITMQ_PORT"); e != "" {
        rPORT = e
    }
    if e := os.Getenv("RABBITMQ_VHOST"); e != "" {
        rVHOST = e
    }

    conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%s/%s",
        rUSER, rPASS, rHOST, rPORT, rVHOST))
    exit1(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    exit1(err, "Failed to open a channel")
    defer ch.Close()

    // buggy part
    args := map[string]interface{}{
        "x-message-ttl": int32(3000),
        "x-expires":     int32(8000), // <-- culprit
    }

    concurrent := 500

    wg := sync.WaitGroup{}
    semaphore := make(chan struct{}, concurrent)

    for i := 0; i < 1000; i++ {
        semaphore <- struct{}{}
        wg.Add(1)
        go func() {
            queueName := fmt.Sprintf("carrot-%s-%s", time.Now().Format("2006-01-02"), uuid.Must(uuid.NewV4()))
            fmt.Printf("Creating queue: %s
", queueName)
            defer func() {
                <-semaphore
                wg.Done()
            }()
            q, err := ch.QueueDeclare(
                queueName,
                false, // durable
                false, // delete when usused
                false, // exclusive
                false, // no-wait
                args,  // arguments
            )
            exit1(err, "Failed to declare a queue")

            // how to measure here time elapsed between ch.Consume is called

            _, err = ch.Consume(
                q.Name, // queue
                "",     // consumer
                true,   // auto-ack
                false,  // exclusive
                false,  // no-local
                false,  // no-wait
                nil,    // args
            )
            exit1(err, "Failed to register a consumer")
        }()
    }
    wg.Wait()
}

Giving more context, the code will work fine and subscribe to all the 1000 queues when having few concurrent clients < 100, but when adding more concurrent clients a kind of "race condition" occurs and clients start to receive an error 404, this happens because of declaring a TTL for a queue, in this case, 8 seconds:

 "x-expires":     int32(8000),

A better solution would be to use exclusive queues otherwise, the queue is being deleted before the client can consume it, but within this "buggy" code, I would like to measure the delay is taking between the ch.QueueDeclare and the ch.Consume.

A client basically is doing:

  q, err := ch.QueueDeclare(
            queueName,
            false, // durable
            false, // delete when usused
            false, // exclusive
            false, // no-wait
            args,  // arguments
        )

And then:

 _, err = ch.Consume(
          q.Name, // queue
          "",     // consumer
          true,   // auto-ack
          false,  // exclusive
          false,  // no-local
          false,  // no-wait
          nil,    // args
        )

When having multiple concurrent clients, after doing the QueueDeclare there seems to be a delay that reaches up to 8s seconds before the Consume is called and because of this the error 404 but what and how could I instrument, adapt to code to measure this delays?

  • 写回答

0条回答 默认 最新

    报告相同问题?

    悬赏问题

    • ¥15 改算法,照着压缩包里边,参考其他代码封装的格式 写到main函数里
    • ¥15 用windows做服务的同志有吗
    • ¥60 求一个简单的网页(标签-安全|关键词-上传)
    • ¥35 lstm时间序列共享单车预测,loss值优化,参数优化算法
    • ¥15 Python中的request,如何使用ssr节点,通过代理requests网页。本人在泰国,需要用大陆ip才能玩网页游戏,合法合规。
    • ¥100 为什么这个恒流源电路不能恒流?
    • ¥15 有偿求跨组件数据流路径图
    • ¥15 写一个方法checkPerson,入参实体类Person,出参布尔值
    • ¥15 我想咨询一下路面纹理三维点云数据处理的一些问题,上传的坐标文件里是怎么对无序点进行编号的,以及xy坐标在处理的时候是进行整体模型分片处理的吗
    • ¥15 一直显示正在等待HID—ISP