Thanks for the patch, I've got a commit that changes how channels interact with the IO object which allows this behavior, so I'll be closing this with the spirit of your change commited.
Use Queue.get instead of time.sleep to delay AMQPFrame._wait_for_frame iterations.
The example at https://gist.github.com/kristaps/10599120 can process ~10 msgs/s, due to the mandatory 0.1s sleep in _wait_on_frame, if there is no frame available on first iteration (happens on every message for me). I propose using the timeout param of the Queue.get call in AMQPQueue._read_from_queue, which will sleep only when there is no data and return immediately when data is available.
This change makes the example script's throughput rise to ~700 msgs/s on my machine.
- 点赞 评论 复制链接分享
How are you dealing with the fully async nature of state change if you move to fully blocking?点赞 评论 复制链接分享
- weixin_39640195 5月前
Not sure I understood the question, could you elaborate a bit, please?点赞 评论 复制链接分享
I'm refactoring the code a bit. RabbitMQ can send RPC messages to the client that should interrupt any existing behavior, such as Channel.Close, Basic.Return. Basic.Nack, etc. Moving to a fully blocking Queue.get() with the existing IO structure would break client behavior. I think I have a work around through.点赞 评论 复制链接分享