weixin_39592789
weixin_39592789
2020-12-09 14:19

reached the maximum number of channels raised with closed channels

Hello,

I've run into an issue with creating new channels and receiving the reached the maximum number of channels 65535 when attempting to create a new channel.

After some digging, I noticed Connection._get_next_available_channel_id() accounts for all channels, both open and closed. I believe filtering the count for just opened should resolve this issue.

I tested with a quick fix

python
    def _get_next_available_channel_id(self):
        channel_id = len(self._channels) + 1
        active_channels = [
            ch for ch in list(self._channels.values()) if ch and ch.is_open
        ]
        if len(active_channels) >= self.max_allowed_channels:
            raise AMQPConnectionError(
                'reached the maximum number of channels %d' %
                self.max_allowed_channels)
        return channel_id

However it may be better to just keep an active count

该提问来源于开源项目:eandersson/amqpstorm

  • 点赞
  • 写回答
  • 关注问题
  • 收藏
  • 复制链接分享
  • 邀请回答

12条回答

  • weixin_39695701 weixin_39695701 5月前

    Thanks for reporting this .

    I'll look into this tonight.

    点赞 评论 复制链接分享
  • weixin_39695701 weixin_39695701 5月前

    The error message is a little misleading, as the real limitation here is that RabbitMQ cannot accept a channel id value higher than 65535.

    I'll look into implementing a basic way of re-using closed channels.

    点赞 评论 复制链接分享
  • weixin_39695701 weixin_39695701 5月前

    If you have time, please try the above ^ patch.

    点赞 评论 复制链接分享
  • weixin_39592789 weixin_39592789 5月前

    Thanks for taking the time on this. My current environment is a web endpoint that posts updates to a queue. I'm still a bit new with rabbitmq, but my understanding is each thread should have its own channel. So in my setup, I share the connection and create a new channel when adding to the queue. This is where the ton of channels are coming from.

    I did some testing, and noticed I would get amqpstorm.exception.AMQPConnectionError: socket/connection closed after a few seconds.

    In my test script, I am launching 100 threads per second, each thread creates a channel waits 1 second, and then closes. My thought is, a channel may be getting recreated too quickly.

    A possible cool down period could be added for each channel id as it gets closed or setting up an increment variable that loops and skips duplicate ids could be a more efficient route, however some edge cases could result in a similar issue.

    点赞 评论 复制链接分享
  • weixin_39592789 weixin_39592789 5月前

    I took a look at the updated _get_next_available_channel_id and I believe the problem is with returning the count of channels, as it is possible a channel id was created before, and a different id was removed, resulting in the same count.

    I've modified it to just loop through and find the first open channel otherwise it raises max channels. mikemrm/amqpstorm

    I've tested this a few times with the same script, and hasn't thrown an error.

    点赞 评论 复制链接分享
  • weixin_39695701 weixin_39695701 5月前

    If you are publishing messages I would just share the channel across the threads. The reason why you would want to use individual channels is primarily for consuming. Having that said it should still work.

    Feel free to open a pull request against my branch with mikemrm/amqpstorm.

    点赞 评论 复制链接分享
  • weixin_39695701 weixin_39695701 5月前

    What do you think about this? I was trying to come up with something that would be fast, but still offer the features you need.

    https://github.com/eandersson/amqpstorm/commit/4bbe32ab268d75609a719e49840d90c40e91ef1e

    
    def _get_next_available_channel_id(self):
        if not self._channels:
            return 1
    
        last_channel_id = int(next(reversed(self._channels)))
        next_channel_id = last_channel_id + 1
        if next_channel_id < self.max_allowed_channels:
            return next_channel_id
    
        for index in compatibility.RANGE(1, self.max_allowed_channels):
            if index in self._channels:
                continue
            return index
    
        raise AMQPConnectionError(
            'reached the maximum number of channels %d' %
            self.max_allowed_channels)
    
    点赞 评论 复制链接分享
  • weixin_39592789 weixin_39592789 5月前

    Thanks for that information, I'll switch my stuff over to using a single channel, and see how it goes.

    That was much faster, but that range is still pretty slow, so I took another stab at it, with one of the suggestions I had before, and it seems to be the quickest.

    mikemrm/amqpstorm

    python
        def _get_next_available_channel_id(self):
            if self._last_channel_id == self.max_allowed_channels:
                self._last_channel_id = 0
    
            for index in range(self._last_channel_id + 1, self.max_allowed_channels):
                if index in self._channels:
                    continue
                self._last_channel_id = index
                return index
    
            if self._last_channel_id != 0:
                self._last_channel_id = 0
                return self._get_next_available_channel_id()
    
            raise AMQPConnectionError(
                'reached the maximum number of channels %d' % self.max_allowed_channels)
    

    I ran some tests, and this one takes 0.24 seconds to create 100k fake channels.

    
    [single range] Runtime: 328.03 sec
    [reversed nxt] Runtime: 106.00 sec
    [ channel id ] Runtime: 0.24 sec
    

    Let me know what you think. #56

    点赞 评论 复制链接分享
  • weixin_39695701 weixin_39695701 5月前

    Looks good. I made some minor modifications. Not sure if my assumptions are correct. Tried to write some tests to make sure. I will also need to do another pass on the channel removal part.

    点赞 评论 复制链接分享
  • weixin_39695701 weixin_39695701 5月前

    I still want to fix this, but haven't had the time to finalize a solution. Is this still critical for you ?

    点赞 评论 复制链接分享
  • weixin_39592789 weixin_39592789 5月前

    My local modifications are handling it fine. So I'm good

    点赞 评论 复制链接分享
  • weixin_39695701 weixin_39695701 5月前

    The code is merged, and I am ready to release 2.6.0, but if you (or anyone else) has some time to run some basic tests, that would be great!

    点赞 评论 复制链接分享

相关推荐