donglu8344812 2018-12-07 23:27
浏览 206

正确的Go / RabbitMQ方法可以从队列中“弹出”一条消息?

The first question I have is a design question really. This is my first time writing a service that uses a Queue and I am also new to Go. I am trying to determine whether I should write my worker in such a way that it simply pops one message off the queue, processes it, and the dies off. With things like Kubernetes this seems fairly trivial.

Or should I have a long-lived worker constantly waiting for new messages but that is relaunched if it dies (by a bug or accident)?

The reason I ask this question is that in order to implement the former it feels a little more "hacked up" because I have to write the following using the common go AMQP library from streadway/amqp (read the comments):

// Pop will extract a message from the AMQP queue
func (v *Queue) Pop() (data []byte, err error) {
    msgs, err := v.Channel.Consume(
        v.QueueName, // queue
        v.ConsmerID, // consumer
        true,        // auto-ack
        false,       // exclusive
        false,       // no-local
        false,       // no-wait
        nil,         // args
    )
    if err != nil {
        return nil, err
    }

    // We have to use for .. range because Consume returns
    // "<-chan Delivery" but if we only want ONE message popped off
    // we return on the first one
    for data := range msgs {
        return data.Body, nil
    }

    // We should never get this far...
    return nil, errors.New("Something went wrong")
}

Furthermore what is <-chan Delivery in this case? It seems like some sort of "stream" or object that you can plug into. Is there a way to not have to write a for-loop for these data types?

EDIT: I have also discovered that it appears that this code will dequeue the ENTIRE queue even though it only does a for-loop iteration once (as show in the code above). I am not sure why this happens either?

Relevant links to code:

  • 写回答

1条回答 默认 最新

  • dongzun9958 2018-12-08 20:10
    关注

    To simply take a single object from a <-chan Delivery, don't use a range loop, but the channel operator <-:

    data := <- msgs
    return data.Body, nil
    

    As to why your entire queue is emptied as soon as you fetch one message: This is most likely due to the Consumer prefetch. When consuming messages, the client will actually not pop them from the broker one-by-one, but in batches of configurable size (if I recall correctly, around the order of 32 or 64 messages by default). As soon as the broker has published this batch of messages to your consumer, they'll be in your msgs channel; and if you don't read from that channel any more after getting the first message, the rest of them will be gone (at least, with auto-ack enabled -- otherwise, they'll be requeued after the channel is closed).

    To only fetch one message at a time, use the channel's QoS function (with the first parameter being the prefetch count):

    err := v.Channel.Qos(1, 0, false)
    
    评论

报告相同问题?

悬赏问题

  • ¥15 无线电能传输系统MATLAB仿真问题
  • ¥50 如何用脚本实现输入法的热键设置
  • ¥20 我想使用一些网络协议或者部分协议也行,主要想实现类似于traceroute的一定步长内的路由拓扑功能
  • ¥30 深度学习,前后端连接
  • ¥15 孟德尔随机化结果不一致
  • ¥15 apm2.8飞控罗盘bad health,加速度计校准失败
  • ¥15 求解O-S方程的特征值问题给出边界层布拉休斯平行流的中性曲线
  • ¥15 谁有desed数据集呀
  • ¥20 手写数字识别运行c仿真时,程序报错错误代码sim211-100
  • ¥15 关于#hadoop#的问题