难ban 2024-04-22 12:02 采纳率: 50%
浏览 7
已结题

django Channels长连接 数据量大出现ChannelFull

最近刚试长连接,我使用django的channels(3.0.4)版本,数据量大的情况下会出现ChannelFull的情况,
之前找资料说是需要从InMemoryChannelLayer改成RedisChannelLayer,但我本身用的就是redischannellayer,并且调整了MAX_CONNECTIONS为None,设置CAPACITY为2048,还是会出现这样的问题。

我想问下该如何避免和解决这个问题?

或者我还应该调整服务器的哪些参数么?(服务器使用supervisor+nginx+daphne配置)

附代码如下:


@csrf_exempt
@async_to_sync
async def async_receive_live_like_msg(request):
    """
        主方法
    """
    try:
        # 校验数据正确性
        params, data = common_valid_and_get_data(request)

        # 获取当前房间通道
        room = data.get('room')
        channel_name = await get_channel_name_by_code(room)
        if not channel_name:
            logger.info('推送点赞信息给房间{}时检测到当前房间未连接'.format(room))
            return JsonResponse({'result': 1, 'msg': 'room{} is not connected!'.format(room)})


        # 处理消息
        msgs, count_sum = await async_trans_save_live_msg(params, data)

        logger.info('给房间{}推送信息{}'.format(room, like_msgs))
        await channel_layer.send(channel_name, {
            "type": "group_message",
            "message": json.dumps({'msgs': msgs}),
        })
        return JsonResponse({'result': 1, 'msg': 'success'})
    except Exception as e:
        logger.error(e)
        return JsonResponse({'result': 2, 'errorMsg': str(e)})

class AsyncConsumer(AsyncWebsocketConsumer):
        async def connect(self):
            await self.accept()
        
        async def disconnect(self, close_code=None):
            ...
        
        async def group_message(self, event):
        """
            消息推送
        """
        logger.info('room: {}推送消息到本房间: {}'.format(self.room_code, event.get('message')))
        await self.send(text_data=json.dumps({
            'result': 1,
            **json.loads(event.get('message'))
        }))
  • 写回答

4条回答 默认 最新

  • 专家-赤兔[在线] 全栈领域优质创作者 2024-04-22 12:15
    关注

    引自免费微信小程序:皆我百晓生

    针对您遇到的问题,首先确认一下是否在Daphne配置文件中启用了--max-requests选项,这个选项会限制每个worker处理请求的最大数量。如果设置了这个值,建议将其设置为一个较大的值或者移除这个限制,因为这可能会导致 ChannelFull 错误。

    另外,检查您的Nginx配置,确保它正确地将WebSocket流量转发给了Daphne。正确的配置示例如下:

    location /ws/ {
        proxy_pass http://localhost:8000;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
    }
    

    接下来,检查您的Supervisor配置,确保有足够的 worker 进程来处理 WebSocket 请求。可以通过修改 numprocs 参数来增加进程数。例如,在supervisord.conf 中:

    [program:daphne]
    command=/path/to/daphne yourproject.asgi:channel_layer -b 0.0.0.0:8000
    directory=/path/to/your/project
    autostart=true
    autorestart=true
    numprocs=4
    stdout_logfile=/var/log/daphne.log
    stderr_logfile=/var/log/daphne_error.log
    

    这里将 numprocs 设置为4,可以根据服务器性能适当调整。

    关于代码部分,您可以考虑优化发送消息的方式。目前,每次接收到点赞消息时,都发送了一条单独的消息到频道。这可能导致大量的小消息在网络中传输,从而造成拥堵。可以尝试批量处理消息,比如每秒发送一次汇总的消息,而不是立即发送每一条点赞消息。

    这是一个可能的优化方案:

    import asyncio
    from collections import deque
    
    # 在 AsyncConsumer 类中添加以下变量
    self.message_queue = deque(maxlen=20)  # 保存最多20条消息
    self.flush_interval = 1.0  # 每秒发送一次消息
    self.last_flush_time = None
    
    class AsyncConsumer(AsyncWebsocketConsumer):
    
        async def connect(self):
            await self.accept()
    
        async def disconnect(self, close_code=None):
            ...
    
        async def group_message(self, event):
            """
                消息推送
            """
            logger.info('room: {} 推送消息到本房间: {}'.format(self.room_code, event.get('message')))
            self.message_queue.append(json.loads(event.get('message')))
            if self.last_flush_time is None or (time.time() - self.last_flush_time) >= self.flush_interval:
                await self.flush_messages()
    
        async def flush_messages(self):
            if self.message_queue:
                message = {"messages": list(self.message_queue)}
                await self.send(text_data=json.dumps(message))
                self.message_queue.clear()
                self.last_flush_time = time.time()
    

    这个改动会将消息暂存到队列中,并在达到一定数量或时间间隔后一次性发送出去。这样可以减少网络开销并提高效率。

    最后,为了进一步调试,可以启用 Redis 的慢查询日志,以确定是否有其他瓶颈或操作耗时过长。在 Redis 配置文件(redis.conf)中添加以下行:

    slowlog-log-slower-than 10000
    slowlog-max-len 10000
    

    然后重启 Redis 服务。这将记录执行时间超过10毫秒的操作,可以帮助定位可能的性能瓶颈。

    经过这些调整后,如果仍然存在问题,那么可能是服务器硬件资源不足或网络带宽限制,这时可能需要升级服务器配置或寻找更高效的网络解决方案。

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论 编辑记录
查看更多回答(3条)

报告相同问题?

问题事件

  • 系统已结题 5月2日
  • 已采纳回答 4月24日
  • 创建了问题 4月22日

悬赏问题

  • ¥66 换电脑后应用程序报错
  • ¥50 array数据同步问题
  • ¥15 pic16F877a单片机的外部触发中断程序仿真失效
  • ¥15 Matlab插值拟合差分微分规划图论
  • ¥15 keil5 target not created
  • ¥15 C/C++数据与算法请教
  • ¥15 怎么找志同道合的伙伴
  • ¥20 如何让程序ab.eXe自已删除干净硬盘里的本文件自己的ab.eXe文件
  • ¥50 爬虫预算充足,跪巨佬
  • ¥15 滑块验证码拖动问题悬赏