Shao_Yang_com 2022-12-20 21:32 采纳率: 0%
浏览 92
已结题

Celery python 异步编Event loop is closed

报错信息描述:celery任务循环两次推送数据,第一次不报错,第二次提示 Event loop is closed
可开远程桌面

[2022-12-20 13:08:36,490: ERROR/MainProcess] Task exception was never retrieved
future: <Task finished name='Task-5' coro=<Connection.disconnect() done, defined at /home/shaoyang/.local/lib/python3.8/site-packages/redis/asyncio/connection.py:687> exception=RuntimeError('Event loop is closed')>
Traceback (most recent call last):
  File "/home/shaoyang/.local/lib/python3.8/site-packages/redis/asyncio/connection.py", line 696, in disconnect
    self._writer.close()  # type: ignore[union-attr]
  File "/usr/lib/python3.8/asyncio/streams.py", line 353, in close
    return self._transport.close()
  File "/usr/lib/python3.8/asyncio/selector_events.py", line 692, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "/usr/lib/python3.8/asyncio/base_events.py", line 719, in call_soon
    self._check_closed()
  File "/usr/lib/python3.8/asyncio/base_events.py", line 508, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
  1. 环境版本:
   Python-3.8.10
   channels-3.0.5               
   channels-redis-4.0.0
   celery-5.2.7
   Django-3.2.7               
   django-cors-headers-3.8.0               
   django-cors-middleware-1.5.0

2-要实现的功能:
-创建websocket连接,利用celery组件从外部推送数据到channel组
3-具体模块代码:

websocket 代码:

# chat/consumers.py
import json
from mycelery.sms.tasks import send_sms
from mycelery.sms.tasks import realTime1
from channels.generic.websocket import AsyncWebsocketConsumer
from asgiref.sync import async_to_sync

class ChatConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.room_name = self.scope["url_route"]["kwargs"]["group"]
        self.room_group_name = "chat_%s" % self.room_name
        # Join room group
        await self.channel_layer.group_add(self.room_group_name, self.channel_name)
        self.result = realTime1.delay(self.room_group_name)
        await self.accept()

    # Receive message from room group
    async def chat_message(self, event):
        message = event["message"]
        print('i am chat_message:',message)
        # Send message to WebSocket
        await self.send(text_data=json.dumps({"message": message}))

celery 模块代码

@app.task
def realTime1(group_name):
    channel_layer = get_channel_layer()
    for i in range(2):
        async_to_sync(channel_layer.group_send)(group_name, {"type": "chat.message", "message": 12345678, })
    print('@@@')
  • 写回答

3条回答 默认 最新

  • |__WhoAmI__| 2022-12-20 21:40
    关注

    1、在 realTime1 任务中使用了 async_to_sync 函数。这个函数是用来在 Django 同步代码中使用 asyncio 代码的,但是任务是在 Celery 中运行的,所以不应该使用这个函数。

    2、在调用 self._writer.close() 时遇到了一个异常。这个异常表明事件循环已经关闭了。这通常是因为程序已经退出了事件循环,或者事件循环已经被意外关闭了。

    评论
    1人已打赏

报告相同问题?

问题事件

  • 已结题 (查看结题原因) 12月20日
  • 创建了问题 12月20日

悬赏问题

  • ¥15 运动想象脑电信号数据集.vhdr
  • ¥15 三因素重复测量数据R语句编写,不存在交互作用
  • ¥15 微信会员卡等级和折扣规则
  • ¥15 微信公众平台自制会员卡可以通过收款码收款码收款进行自动积分吗
  • ¥15 随身WiFi网络灯亮但是没有网络,如何解决?
  • ¥15 gdf格式的脑电数据如何处理matlab
  • ¥20 重新写的代码替换了之后运行hbuliderx就这样了
  • ¥100 监控抖音用户作品更新可以微信公众号提醒
  • ¥15 UE5 如何可以不渲染HDRIBackdrop背景
  • ¥70 2048小游戏毕设项目