import paho.mqtt.client as mqtt
from concurrent import futures
clients=[]
nclients=20
mqtt.Client.connected_flag=False #start False
#create clients
########################################
for i in range(nclients):
cname="Client"+str(i)
client= mqtt.Client(cname)
clients.append(client)
########################################
ex = futures.ThreadPoolExecutor(max_workers=10)#这个语句的意思是10个进程并发吗?
futures object 这个对象是干嘛的
while True: #main loop
for i in range(len(clients)):
client.loop(0.01)
#if not connected then connect.Max connection attempts
#at once is set by max_workers#既然并发,为什么还要在这里加loop循环
if not client.connected_flag:#flag set true in on_connect
f = ex.submit(Connect,parameter1,parameter2,etc#这个submit是什么意思,后面的参数parameter应该填什么
#!/usr/bin/env python
import sys
import paho.mqtt.client as mqtt
import serial
import time
from functools import reduce
import threading
port = "/dev/ttyACM1"
broker_adress = "10.0.2.190"
sys.path.append("/home/hu/Schreibtisch/Arduino_BA_2.0/Probe_Programmierung/Python-Arduino-Proto-API-v2/arduino")
ser = serial.Serial(port, 9600,timeout= 1)
gassensor_value = "default_value"
voltagesensor_value = "default value"
currentsensor_value = "default value"
temperaturesensor_value_1 = "default_value"
temperaturesensor_value_2 = "default_value"
humidity_value = "default_value"
air_pressure_value = "default_value"
altitude_value = "default_value"
sensor_value = [['/CBCU/CB123/inner_space_of_CB/gassensor',gassensor_value],
['/CBCU/CB123/battery/voltagesensor', voltagesensor_value],
['/CBCU/CB123/battery/currentsensor', currentsensor_value],
['/CBCU/CB123/wassertank/temperature', temperaturesensor_value_1],
['/CBCU/CB123/battery/humidity', humidity_value],
['/CBCU/CB123/inner_space/temperature_of_inner_space', temperaturesensor_value_2],
['/CBCU/CB123/inner_space/air_pressure', air_pressure_value],
['/CBCU/CB123/inner_space/altitude', altitude_value]]
def char2int(s):
return {'0':0,'1':1,'2':2,'3':3,'4':4,'5':5,'6':6,'7':7,'8':8,'9':9}[s]
def mulit_int(x,y):
return 10*x+y
def str2int(s):
if s.find('.') == -1: # 不是浮点数
return reduce(mulit_int, map(char2int, s))
else: # 是浮点数
s1 = s.split('.')
s2int = reduce(mulit_int, map(char2int, s1[0])) # 整数部分
s2float = reduce(mulit_int, map(char2int, s1[1])) * 0.1 ** len(s1[1]) # 小数部分
return s2int + s2float
#########################################################################
# Callback_1 for relay
#on_connect1,on_disconnect1,on_subscribe1on_message_1
#########################################################################
def on_connect1(mqttrelay, obj, flags, rc):
if rc != 0:
exit(rc)
else:
mqttrelay.subscribe("qos0/test", 0)
def on_disconnect1(mqttrelay, obj, rc):
obj = rc
def on_subscribe1(mqttrelay, obj, mid, granted_qos):
print(mqttrelay.subscribe("qos0/test", 0))
print("Waiting for the subscribed messages")
def on_message1(mqttrelay,userdata, message):
a = str(message.payload.decode("utf-8"))
if (a == "131"):
ser.write(b'131')
elif(a == "130"):
ser.write(b'130')
# time.sleep(1)
# else:
# ser.write(b'0')
# time.sleep(1)
# else:
print("please publish the message 1 or 0")
#########################################################################
#Callback_2 for gassensor
#on_connect2,on_publish2
#########################################################################
def on_publish2(mqttsensor, obj, mid):
print("mid: " + str(mid))
def on_connect2(mqttsensor, userdata, flags, rc):
print("Connected with result code " + str(rc))
def on_publish3(mqttgassensor, obj, mid):
print("mid: " + str(mid))
def on_connect3(mqttgassensor, userdata, flags, rc):
print("Connected with result code " + str(rc))
#create new instance to subscribe the sitution of relay
mqttrelay = mqtt.Client("relay_K_12", 1)
#create new instance to publish the situation of sensor
mqttsensor = mqtt.Client("sensor",1)
#create new instance to publish the situation of gassensor
mqttgassensor = mqtt.Client("gassensor",1)
#the events and callbacks of instance mqttrelais associate with each other:
mqttrelay.on_message = on_message1
mqttrelay.on_connect = on_connect1
mqttrelay.on_subscribe = on_subscribe1
mqttrelay.on_disconnect = on_disconnect1
mqttrelay.connect(broker_adress)
#the events and callbacks of instance sensor associate with each other:
mqttsensor.on_connect = on_connect2
mqttsensor.on_publish = on_publish2
mqttsensor.connect(broker_adress)
#the events and callbacks of instance gassensor associate with each other:
mqttgassensor.on_connect = on_connect3
mqttgassensor.on_publish = on_publish3
mqttgassensor.connect(broker_adress)
def read_Sensor1():
i = 0
while True:
temp = ser.readline().decode('ascii')
#print(type(temp))
#print(temp)
if temp.split(':')[0] == "Voltage":
sensor_value[1][1] = temp
i = i + 1
elif temp.split(':')[0] == "amperage":
sensor_value[2][1] = temp
i = i + 1
elif temp.split(':')[0] == "temperature1":
sensor_value[3][1] =temp
i = i + 1
elif temp.split(':')[0] == "RH":
sensor_value[4][1] =temp
i = i + 1
elif temp.split(':')[0] == "temperature2":
sensor_value[5][1] =temp
i = i + 1
elif temp.split(':')[0] =="airpressure":
sensor_value[6][1] =temp
i = i + 1
elif temp.split(':')[0] == "altitude":
sensor_value[7][1] =temp
i = i + 1
if (i == 7):
break
print(sensor_value[1])
mqttsensor.publish("/CBCU/CB123/battery/voltagesensor",
sensor_value[1][1], retain=1)
print(sensor_value[2])
mqttsensor.publish("/CBCU/CB123/battery/currentsensor",
sensor_value[2][1], retain=1)
print(sensor_value[3])
mqttsensor.publish("/CBCU/CB123/wassertank/temperature",
sensor_value[3][1], retain=1)
print(sensor_value[4])
mqttsensor.publish("/CBCU/CB123/battery/humidity",
sensor_value[4][1], retain=1)
print(sensor_value[5])
mqttsensor.publish("/CBCU/CB123/inner_space/temperature_of_inner_space",
sensor_value[5][1], retain=1)
print(sensor_value[6])
mqttsensor.publish("/CBCU/CB123/inner_space/air_pressure",
sensor_value[6][1], retain=1)
print(sensor_value[7])
mqttsensor.publish("/CBCU/CB123/inner_space/altitude",
sensor_value[7][1], retain=1)
def read_Sensor2():
j = 0
while True:
temp = ser.readline().decode('ascii')
# print(type(temp))
# print(temp)
if temp.split(':')[0] == "Gassesnsor":
sensor_value[0][1] = temp
j = j + 1
if (j == 1):
break
print(sensor_value[0])
mqttsensor.publish("/CBCU/CB123/inner_space_of_CB/gassensor",
sensor_value[0][1],retain = 1)
def loop1():
while True:
time.sleep(10)
#mqttrelay.loop()
mqttsensor.loop()
read_Sensor1()
time.sleep(10)
def loop2():
while True:
read_Sensor2()
mqttgassensor.loop()
def main():
added_thread1 = threading.Thread(target=loop1,name = "thread_1")
added_thread2 = threading.Thread(target=loop2,name = "thread_2")
added_thread2.start()
added_thread1.start()
main()
上面是我的程序,求大佬給個點子,自己用多線程試了很多次,就是出錯
Exception in thread thread_1:
Traceback (most recent call last):
File "/home/hu/PycharmProjects/mqtt_probe/venv/lib/python3.5/site-packages/serial/serialposix.py", line 501, in read
'device reports readiness to read but returned no data '
serial.serialutil.SerialException: device reports readiness to read but returned no data (device disconnected or multiple access on port?)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
self.run()
File "/usr/lib/python3.5/threading.py", line 862, in run
self._target(*self._args, **self._kwargs)
File "/home/hu/mqtt/8_publish_subscribe_for_sensoren_threads.py", line 208, in loop1
read_Sensor1()
File "/home/hu/mqtt/8_publish_subscribe_for_sensoren_threads.py", line 126, in read_Sensor1
temp = ser.readline().decode('ascii')
File "/home/hu/PycharmProjects/mqtt_probe/venv/lib/python3.5/site-packages/serial/serialposix.py", line 509, in read
raise SerialException('read failed: {}'.format(e))
serial.serialutil.SerialException: read failed: device reports readiness to read but returned no data (device disconnected or multiple access on port?)
求大佬們指正