weixin_45840746 2023-03-08 10:58 采纳率: 36.4%
浏览 41
已结题

python:读串口拿数据websocket发送

之前我在前端读串口拿数据,最近有个需求,需要将读串口的功能放到python里本地测试,然后发给前端,我查到python里用到的是websocket,好像python里读串口要开新线程有阻塞的啥问题,但因为我是web端开发,不太懂python,磕磕绊绊给写出来的,功能是实现了,但是总感觉代码不对劲, 担心长时间开着会不会卡顿哈,我是用vscode写的,我ctrl+c关不掉,必须清掉,清掉后再开启报串口被占用,再重新开启才能跑程序,请教一下python方面的佬帮忙优化一下

img

  • 写回答

2条回答 默认 最新

  • MarkHan_ 2023-03-08 13:41
    关注

    该回答引用GPTᴼᴾᴱᴺᴬᴵ
    你好,根据你提供的代码,我发现有一些问题需要修复。首先,你在主函数main中开启了一个新线程t1来执行readLoop函数,但是这个线程是没有被关闭的,也就是说当你按下Ctrl + C结束程序时,这个线程还在运行,所以需要在程序结束时手动关闭这个线程。其次,asyncio.sleep(random.random() * 1)这句代码中的sleep函数应该是异步的,也就是应该使用await asyncio.sleep(random.random() * 1)来等待一段时间。最后,由于读取串口数据的函数readLine是阻塞的,所以应该使用异步的方式读取串口数据。

    下面是我根据你的代码做的修改,建议你可以参考一下:

    import asyncio
    import random
    import serial
    import websockets
    from threading import Thread
    
    global newValue, oldValue
    newValue = 'a'
    oldValue = ''
    
    # 读串口
    async def readLoop():
        global newValue
        print('开始读串口')
        port = "COM3" # 修改为自己的串口
        bps = 9600
        bytesize = 8
        ser = serial.Serial(port, int(bps), int(bytesize), timeout=-1, parity=serial.PARITY_NONE, stopbits=1)
        while True:
            buffer = await asyncio.to_thread(ser.readline) # 异步读取串口数据
            buffer = buffer.decode().strip()
            print(buffer)
            if buffer == '':
                continue
            if buffer == 'a':
                newValue = 'a'
            elif buffer == 'b':
                newValue = 'b'
    
    # WebSocket 服务器
    async def time(websocket, path):
        global newValue, oldValue
        while True:
            if newValue != oldValue:
                await websocket.send(newValue)
                oldValue = newValue
            else:
                await asyncio.sleep(random.random() * 1)
    
    # 主函数
    def main():
        t1 = Thread(target=lambda: asyncio.run(readLoop())) # 在新线程中异步执行readLoop函数
        t1.start()
        start_server = websockets.serve(time, "127.0.0.1", 5678)
        print('Websocket 服务器已启动')
        try:
            asyncio.get_event_loop().run_until_complete(start_server)
            asyncio.get_event_loop().run_forever()
        except KeyboardInterrupt:
            print('程序已结束')
            t1.join() # 关闭线程
    
    if __name__ == '__main__':
        main()
    
    
    

    你可以将上面的代码保存为一个.py文件,然后在命令行中运行该文件。当你想要结束程序时,按下Ctrl + C即可。

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

问题事件

  • 系统已结题 3月17日
  • 已采纳回答 3月9日
  • 修改了问题 3月8日
  • 创建了问题 3月8日

悬赏问题

  • ¥50 导入文件到网吧的电脑并且在重启之后不会被恢复
  • ¥15 (希望可以解决问题)ma和mb文件无法正常打开,打开后是空白,但是有正常内存占用,但可以在打开Maya应用程序后打开场景ma和mb格式。
  • ¥20 ML307A在使用AT命令连接EMQX平台的MQTT时被拒绝
  • ¥20 腾讯企业邮箱邮件可以恢复么
  • ¥15 有人知道怎么将自己的迁移策略布到edgecloudsim上使用吗?
  • ¥15 错误 LNK2001 无法解析的外部符号
  • ¥50 安装pyaudiokits失败
  • ¥15 计组这些题应该咋做呀
  • ¥60 更换迈创SOL6M4AE卡的时候,驱动要重新装才能使用,怎么解决?
  • ¥15 让node服务器有自动加载文件的功能