酥悲清风 2024-02-15 15:32 采纳率: 0%
浏览 5

django+celery+websocket无法正常运行

能帮忙找个bug吗,我做毕设用djagno+celery+websocket做通知功能,但是这个在task中调用consumers.py中的自定义任务怎么调用都没有反应,参数全是正确的,在consumer中直接使用self.channel_layer调用完全没有问题。

img

img

  • 写回答

3条回答 默认 最新

  • 专家-赤兔[在线] 全栈领域优质创作者 2024-02-15 15:33
    关注

    引自免费微信小程序:皆我百晓生

    由于无法查看实际代码和图片,我将提供一个概念性的解决方案,并给出一个简化的示例来说明如何在Django + Celery + WebSockets的环境中正确地从Celery任务中发送消息到WebSocket消费者。

    问题分析:

    1. 异步通信:Celery是一个分布式任务队列,它与WebSockets是两种不同的异步通信机制。Celery用于处理后台任务,而WebSockets用于前端实时通信。
    2. 消息传递:在你的场景中,Celery任务完成后需要通知前端用户,这时就需要通过某种方式让Celery任务能够与WebSocket消费者进行通信。

    解决方案概要:

    • 在Celery任务执行完成时,将其结果写入到一个中间件(例如数据库、Redis或RabbitMQ等),并且设置一个触发器或者监听器。
    • WebSocket消费者监听这个中间件的变化,当检测到新的通知时,通过Channel Layers向客户端发送消息。

    模拟代码示例

    首先,确保你已经配置了ASGI应用(如Daphne)以及Channels库,并且创建了一个通道层实例。

    # settings.py
    ...
    ASGI_APPLICATION = 'your_project.routing.application'
    
    CHANNEL_LAYERS = {
        'default': {
            'BACKEND': 'channels_redis.core.RedisChannelLayer',
            'CONFIG': {
                "hosts": [('localhost', 6379)],
            },
        },
    }
    

    然后,在Celery任务中更新状态并触发通知:

    # tasks.py
    from celery import shared_task
    from asgiref.sync import async_to_sync
    
    @shared_task
    def background_task(user_id, notification_data):
        # 执行你的任务逻辑
        ...
        
        # 任务完成后,将通知信息写入到一个地方(这里以Redis为例)
        redis_connection.set(f'notification:{user_id}', json.dumps(notification_data))
    
        # 使用Channel Layers向指定通道发送消息
        channel_name = f'user_{user_id}_notifications'
        async_to_sync(channel_layer.send)(channel_name, {"type": "send.notification", "data": notification_data})
    

    接着,在consumers.py中接收来自Channel Layer的消息并广播给客户端:

    # consumers.py
    from channels.generic.websocket import AsyncWebsocketConsumer
    from asgiref.sync import async_to_sync
    import json
    
    class NotificationConsumer(AsyncWebsocketConsumer):
        async def connect(self):
            self.user_id = self.scope['url_route']['kwargs']['user_id']
            await self.channel_layer.group_add(
                f'user_{self.user_id}_notifications',
                self.channel_name
            )
            await self.accept()
    
        async def disconnect(self, close_code):
            await self.channel_layer.group_discard(
                f'user_{self.user_id}_notifications',
                self.channel_name
            )
    
        async def send_notification(self, event):
            notification_data = event['data']
            await self.send(text_data=json.dumps({"type": "notification", "data": notification_data}))
    
        async def receive(self, text_data):
            # 可能不需要,这取决于你的具体需求
            ...
    
        # 注册事件处理器
        async def dispatch(self, message):
            await super().dispatch(message)
            message_type = message["type"]
            if message_type == "send.notification":
                await self.send_notification(message)
    

    最后,在routing.py中配置WebSocket路由:

    # routing.py
    from django.urls import re_path
    from .consumers import NotificationConsumer
    
    websocket_urlpatterns = [
        re_path(r'^ws/notifications/(?P<user_id>\w+)/$', NotificationConsumer.as_asgi()),
    ]
    

    请根据你的实际项目结构调整上述代码,关键在于建立Celery任务和WebSocket消费者之间的通信桥梁。这样,当Celery任务完成后,就可以通过Channel Layers发送消息至对应的WebSocket消费者,进而推送给前端用户。

    评论

报告相同问题?

问题事件

  • 修改了问题 2月15日
  • 修改了问题 2月15日
  • 创建了问题 2月15日

悬赏问题

  • ¥15 汇编语言没有主程序吗?
  • ¥15 这个函数为什么会爆内存
  • ¥15 无法装系统,grub成了顽固拦路虎
  • ¥15 springboot aop 应用启动异常
  • ¥15 matlab有关债券凸性久期的代码
  • ¥15 lvgl v8.2定时器提前到来
  • ¥15 qtcp 发送数据时偶尔会遇到发送数据失败?用的MSVC编译器(标签-qt|关键词-tcp)
  • ¥15 cam_lidar_calibration报错
  • ¥15 拓扑学,凸集,紧集。。
  • ¥15 如何扩大AIS数据容量