报错信息描述:celery任务循环两次推送数据,第一次不报错,第二次提示 Event loop is closed
可开远程桌面
[2022-12-20 13:08:36,490: ERROR/MainProcess] Task exception was never retrieved
future: <Task finished name='Task-5' coro=<Connection.disconnect() done, defined at /home/shaoyang/.local/lib/python3.8/site-packages/redis/asyncio/connection.py:687> exception=RuntimeError('Event loop is closed')>
Traceback (most recent call last):
File "/home/shaoyang/.local/lib/python3.8/site-packages/redis/asyncio/connection.py", line 696, in disconnect
self._writer.close() # type: ignore[union-attr]
File "/usr/lib/python3.8/asyncio/streams.py", line 353, in close
return self._transport.close()
File "/usr/lib/python3.8/asyncio/selector_events.py", line 692, in close
self._loop.call_soon(self._call_connection_lost, None)
File "/usr/lib/python3.8/asyncio/base_events.py", line 719, in call_soon
self._check_closed()
File "/usr/lib/python3.8/asyncio/base_events.py", line 508, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
- 环境版本:
Python-3.8.10
channels-3.0.5
channels-redis-4.0.0
celery-5.2.7
Django-3.2.7
django-cors-headers-3.8.0
django-cors-middleware-1.5.0
2-要实现的功能:
-创建websocket连接,利用celery组件从外部推送数据到channel组
3-具体模块代码:
websocket 代码:
# chat/consumers.py
import json
from mycelery.sms.tasks import send_sms
from mycelery.sms.tasks import realTime1
from channels.generic.websocket import AsyncWebsocketConsumer
from asgiref.sync import async_to_sync
class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.room_name = self.scope["url_route"]["kwargs"]["group"]
self.room_group_name = "chat_%s" % self.room_name
# Join room group
await self.channel_layer.group_add(self.room_group_name, self.channel_name)
self.result = realTime1.delay(self.room_group_name)
await self.accept()
# Receive message from room group
async def chat_message(self, event):
message = event["message"]
print('i am chat_message:',message)
# Send message to WebSocket
await self.send(text_data=json.dumps({"message": message}))
celery 模块代码
@app.task
def realTime1(group_name):
channel_layer = get_channel_layer()
for i in range(2):
async_to_sync(channel_layer.group_send)(group_name, {"type": "chat.message", "message": 12345678, })
print('@@@')