donglu8344812 2018-12-07 23:27
浏览 206

正确的Go / RabbitMQ方法可以从队列中“弹出”一条消息?

The first question I have is a design question really. This is my first time writing a service that uses a Queue and I am also new to Go. I am trying to determine whether I should write my worker in such a way that it simply pops one message off the queue, processes it, and the dies off. With things like Kubernetes this seems fairly trivial.

Or should I have a long-lived worker constantly waiting for new messages but that is relaunched if it dies (by a bug or accident)?

The reason I ask this question is that in order to implement the former it feels a little more "hacked up" because I have to write the following using the common go AMQP library from streadway/amqp (read the comments):

// Pop will extract a message from the AMQP queue
func (v *Queue) Pop() (data []byte, err error) {
    msgs, err := v.Channel.Consume(
        v.QueueName, // queue
        v.ConsmerID, // consumer
        true,        // auto-ack
        false,       // exclusive
        false,       // no-local
        false,       // no-wait
        nil,         // args
    )
    if err != nil {
        return nil, err
    }

    // We have to use for .. range because Consume returns
    // "<-chan Delivery" but if we only want ONE message popped off
    // we return on the first one
    for data := range msgs {
        return data.Body, nil
    }

    // We should never get this far...
    return nil, errors.New("Something went wrong")
}

Furthermore what is <-chan Delivery in this case? It seems like some sort of "stream" or object that you can plug into. Is there a way to not have to write a for-loop for these data types?

EDIT: I have also discovered that it appears that this code will dequeue the ENTIRE queue even though it only does a for-loop iteration once (as show in the code above). I am not sure why this happens either?

Relevant links to code:

  • 写回答

1条回答 默认 最新

  • dongzun9958 2018-12-08 20:10
    关注

    To simply take a single object from a <-chan Delivery, don't use a range loop, but the channel operator <-:

    data := <- msgs
    return data.Body, nil
    

    As to why your entire queue is emptied as soon as you fetch one message: This is most likely due to the Consumer prefetch. When consuming messages, the client will actually not pop them from the broker one-by-one, but in batches of configurable size (if I recall correctly, around the order of 32 or 64 messages by default). As soon as the broker has published this batch of messages to your consumer, they'll be in your msgs channel; and if you don't read from that channel any more after getting the first message, the rest of them will be gone (at least, with auto-ack enabled -- otherwise, they'll be requeued after the channel is closed).

    To only fetch one message at a time, use the channel's QoS function (with the first parameter being the prefetch count):

    err := v.Channel.Qos(1, 0, false)
    
    评论

报告相同问题?

悬赏问题

  • ¥20 sub地址DHCP问题
  • ¥15 delta降尺度计算的一些细节,有偿
  • ¥15 Arduino红外遥控代码有问题
  • ¥15 数值计算离散正交多项式
  • ¥30 数值计算均差系数编程
  • ¥15 redis-full-check比较 两个集群的数据出错
  • ¥15 Matlab编程问题
  • ¥15 训练的多模态特征融合模型准确度很低怎么办
  • ¥15 kylin启动报错log4j类冲突
  • ¥15 超声波模块测距控制点灯,灯的闪烁很不稳定,经过调试发现测的距离偏大