Pfeffer 2018-04-19 21:23 采纳率: 50%
浏览 1713
已结题

求大神跟我解释一下这个程序

 import paho.mqtt.client as mqtt
from concurrent import futures 
clients=[] 
nclients=20 
mqtt.Client.connected_flag=False #start False
#create clients
########################################
for i  in range(nclients): 
    cname="Client"+str(i)
        client= mqtt.Client(cname)
        clients.append(client)
########################################
ex = futures.ThreadPoolExecutor(max_workers=10)#这个语句的意思是10个进程并发吗?
futures object 这个对象是干嘛的
while True: #main loop
    for i in range(len(clients)):
    client.loop(0.01) 
    #if not connected then connect.Max connection attempts
    #at once is set by max_workers#既然并发,为什么还要在这里加loop循环
if not client.connected_flag:#flag set true in on_connect 
    f = ex.submit(Connect,parameter1,parameter2,etc#这个submit是什么意思,后面的参数parameter应该填什么

#!/usr/bin/env python

import sys
import paho.mqtt.client as mqtt
import serial
import time
from functools import reduce
import threading

port = "/dev/ttyACM1"
broker_adress = "10.0.2.190"
sys.path.append("/home/hu/Schreibtisch/Arduino_BA_2.0/Probe_Programmierung/Python-Arduino-Proto-API-v2/arduino")
ser = serial.Serial(port, 9600,timeout= 1)
gassensor_value = "default_value"
voltagesensor_value = "default value"
currentsensor_value = "default value"
temperaturesensor_value_1 = "default_value"
temperaturesensor_value_2 = "default_value"
humidity_value = "default_value"
air_pressure_value = "default_value"
altitude_value = "default_value"

sensor_value = [['/CBCU/CB123/inner_space_of_CB/gassensor',gassensor_value],
                ['/CBCU/CB123/battery/voltagesensor', voltagesensor_value],
                ['/CBCU/CB123/battery/currentsensor', currentsensor_value],
                ['/CBCU/CB123/wassertank/temperature', temperaturesensor_value_1],
                ['/CBCU/CB123/battery/humidity', humidity_value],
                ['/CBCU/CB123/inner_space/temperature_of_inner_space', temperaturesensor_value_2],
                ['/CBCU/CB123/inner_space/air_pressure', air_pressure_value],
                ['/CBCU/CB123/inner_space/altitude', altitude_value]]
def char2int(s):
    return {'0':0,'1':1,'2':2,'3':3,'4':4,'5':5,'6':6,'7':7,'8':8,'9':9}[s]

def mulit_int(x,y):
    return 10*x+y

def str2int(s):
    if s.find('.') == -1:  # 不是浮点数
        return reduce(mulit_int, map(char2int, s))
    else:  # 是浮点数
        s1 = s.split('.')
        s2int = reduce(mulit_int, map(char2int, s1[0]))  # 整数部分

        s2float = reduce(mulit_int, map(char2int, s1[1])) * 0.1 ** len(s1[1])  # 小数部分
        return s2int + s2float
#########################################################################
# Callback_1 for relay
#on_connect1,on_disconnect1,on_subscribe1on_message_1
#########################################################################

def on_connect1(mqttrelay, obj, flags, rc):

    if rc != 0:
        exit(rc)
    else:
        mqttrelay.subscribe("qos0/test", 0)

def on_disconnect1(mqttrelay, obj, rc):
    obj = rc

def on_subscribe1(mqttrelay, obj, mid, granted_qos):
    print(mqttrelay.subscribe("qos0/test", 0))
    print("Waiting for the subscribed messages")


def on_message1(mqttrelay,userdata, message):
   a = str(message.payload.decode("utf-8"))
   if (a == "131"):
       ser.write(b'131')
   elif(a == "130"):
       ser.write(b'130')
   #         time.sleep(1)
   #     else:
   #         ser.write(b'0')
   #         time.sleep(1)
   # else:
   print("please publish the message 1 or 0")
#########################################################################
#Callback_2 for gassensor
#on_connect2,on_publish2
#########################################################################

def on_publish2(mqttsensor, obj, mid):
    print("mid: " + str(mid))

def on_connect2(mqttsensor, userdata, flags, rc):
    print("Connected with result code " + str(rc))


def on_publish3(mqttgassensor, obj, mid):
    print("mid: " + str(mid))

def on_connect3(mqttgassensor, userdata, flags, rc):
    print("Connected with result code " + str(rc))

#create new instance to subscribe the sitution of relay
mqttrelay = mqtt.Client("relay_K_12", 1)

#create new instance to publish the situation of sensor
mqttsensor = mqtt.Client("sensor",1)

#create new instance to publish the situation of gassensor
mqttgassensor = mqtt.Client("gassensor",1)

#the events and callbacks of instance mqttrelais associate with each other:
mqttrelay.on_message = on_message1
mqttrelay.on_connect = on_connect1
mqttrelay.on_subscribe = on_subscribe1
mqttrelay.on_disconnect = on_disconnect1
mqttrelay.connect(broker_adress)

#the events and callbacks of instance sensor associate with each other:
mqttsensor.on_connect = on_connect2
mqttsensor.on_publish = on_publish2
mqttsensor.connect(broker_adress)

#the events and callbacks of instance gassensor associate with each other:
mqttgassensor.on_connect = on_connect3
mqttgassensor.on_publish = on_publish3
mqttgassensor.connect(broker_adress)


def read_Sensor1():
    i = 0
    while True:
        temp = ser.readline().decode('ascii')
        #print(type(temp))
        #print(temp)
        if temp.split(':')[0] == "Voltage":
            sensor_value[1][1] = temp
            i = i + 1
        elif temp.split(':')[0] == "amperage":
            sensor_value[2][1] = temp
            i = i + 1
        elif temp.split(':')[0] ==  "temperature1":
            sensor_value[3][1] =temp
            i = i + 1
        elif temp.split(':')[0] == "RH":
            sensor_value[4][1] =temp
            i = i + 1
        elif temp.split(':')[0] == "temperature2":
            sensor_value[5][1] =temp
            i = i + 1
        elif temp.split(':')[0] =="airpressure":
            sensor_value[6][1] =temp
            i = i + 1
        elif temp.split(':')[0] ==  "altitude":
            sensor_value[7][1] =temp
            i = i + 1
        if (i == 7):
            break

    print(sensor_value[1])
    mqttsensor.publish("/CBCU/CB123/battery/voltagesensor",
                           sensor_value[1][1], retain=1)

    print(sensor_value[2])
    mqttsensor.publish("/CBCU/CB123/battery/currentsensor",
                           sensor_value[2][1], retain=1)

    print(sensor_value[3])
    mqttsensor.publish("/CBCU/CB123/wassertank/temperature",
                           sensor_value[3][1], retain=1)

    print(sensor_value[4])
    mqttsensor.publish("/CBCU/CB123/battery/humidity",
                           sensor_value[4][1], retain=1)

    print(sensor_value[5])
    mqttsensor.publish("/CBCU/CB123/inner_space/temperature_of_inner_space",
                           sensor_value[5][1], retain=1)

    print(sensor_value[6])
    mqttsensor.publish("/CBCU/CB123/inner_space/air_pressure",
                           sensor_value[6][1], retain=1)

    print(sensor_value[7])
    mqttsensor.publish("/CBCU/CB123/inner_space/altitude",
                           sensor_value[7][1], retain=1)


def read_Sensor2():
    j = 0
    while True:
        temp = ser.readline().decode('ascii')
        # print(type(temp))
        # print(temp)
        if temp.split(':')[0] == "Gassesnsor":
            sensor_value[0][1] = temp
            j = j + 1
        if (j == 1):
            break


    print(sensor_value[0])
    mqttsensor.publish("/CBCU/CB123/inner_space_of_CB/gassensor",
                       sensor_value[0][1],retain = 1)




def loop1():
    while True:
        time.sleep(10)
        #mqttrelay.loop()

        mqttsensor.loop()
        read_Sensor1()
        time.sleep(10)

def loop2():
    while True:
        read_Sensor2()
        mqttgassensor.loop()


def main():
    added_thread1 = threading.Thread(target=loop1,name = "thread_1")
    added_thread2 = threading.Thread(target=loop2,name = "thread_2")

    added_thread2.start()
    added_thread1.start()



main()









上面是我的程序,求大佬給個點子,自己用多線程試了很多次,就是出錯
Exception in thread thread_1:
Traceback (most recent call last):
File "/home/hu/PycharmProjects/mqtt_probe/venv/lib/python3.5/site-packages/serial/serialposix.py", line 501, in read
'device reports readiness to read but returned no data '
serial.serialutil.SerialException: device reports readiness to read but returned no data (device disconnected or multiple access on port?)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
self.run()
File "/usr/lib/python3.5/threading.py", line 862, in run
self._target(*self._args, **self._kwargs)
File "/home/hu/mqtt/8_publish_subscribe_for_sensoren_threads.py", line 208, in loop1
read_Sensor1()
File "/home/hu/mqtt/8_publish_subscribe_for_sensoren_threads.py", line 126, in read_Sensor1
temp = ser.readline().decode('ascii')
File "/home/hu/PycharmProjects/mqtt_probe/venv/lib/python3.5/site-packages/serial/serialposix.py", line 509, in read
raise SerialException('read failed: {}'.format(e))
serial.serialutil.SerialException: read failed: device reports readiness to read but returned no data (device disconnected or multiple access on port?)

求大佬們指正

  • 写回答

3条回答 默认 最新

  • threenewbee 2018-04-20 02:45
    关注
     ex = futures.ThreadPoolExecutor(max_workers=10) 最大的工作线程是10个
    futures是一个库,用来做线程池和并发的。
    client.loop(0.01)工作线程并发的同时,调用者需要循环等待
    submit是提交,后面的参数根据你不同的情况可以自定义
    
    评论

报告相同问题?

悬赏问题

  • ¥15 求差集那个函数有问题,有无佬可以解决
  • ¥15 【提问】基于Invest的水源涵养
  • ¥20 微信网友居然可以通过vx号找到我绑的手机号
  • ¥15 寻一个支付宝扫码远程授权登录的软件助手app
  • ¥15 解riccati方程组
  • ¥15 display:none;样式在嵌套结构中的已设置了display样式的元素上不起作用?
  • ¥15 使用rabbitMQ 消息队列作为url源进行多线程爬取时,总有几个url没有处理的问题。
  • ¥15 Ubuntu在安装序列比对软件STAR时出现报错如何解决
  • ¥50 树莓派安卓APK系统签名
  • ¥65 汇编语言除法溢出问题