qq_15309293 2021-07-18 00:18 采纳率: 0%
浏览 37



我想把pi_backend.py 里的 humid ,temp ,light 获得的变量传到 pi_upload.py文件里,给pi_upload.py文件的变量赋值为pi_backend.py 里的humid ,temp ,light 数据,然后从而运行 pi_upload.py传数据到云端,但是我不会怎么跨文件调用变量传递参数。目的就是想把从pi_backend.py 里收集的湿度温度亮度数据传到 pi_upload.py上传到云端,这里不知道怎么改代码,可以帮助我吗?


from flask import Flask, request, render_template, Response
#import client           #uncomment when integrating with the client.py API

# Create the flask app
app = Flask(__name__)

# This is for the callback, which should be defined to
# with the following signature:
# def callback(temp, humid, light)

_callback_ = None

# Create just a single route to read data from the Arduino
@app.route('/data', methods = ['GET'])
def addData():
    ''' The one and only route. It extracts the
    data from the request, converts to float if the
    data is not None, then calls the callback if it is set

    global _callback_

    tempStr = request.args.get('temp')
    humidStr = request.args.get('humid')
    lightStr = request.args.get('light')

    temp = float(tempStr) if tempStr else None
    humid = float(humidStr) if humidStr else None
    light = float(lightStr) if lightStr else None

    if _callback_:
        _callback_(temp = temp, humid = humid, light = light)
    return "OK", 200
def temp():
    global _callback_

    tempStr = request.args.get('temp')

    temp = float(tempStr) if tempStr else None

    if _callback_:
        _callback_(temp = temp)
    return temp
def humid():
    global _callback_

    tempStr = request.args.get('humid')

    temp = float(humidStr) if humidStr else None

    if _callback_:
        _callback_(humid = humid)
    return humid  
def light():
    global _callback_

    tempStr = request.args.get('light')

    temp = float(lightStr) if lightStr else None

    if _callback_:
        _callback_(light = light)
    return light   
def set_callback(callback):
    ''' Sets the callback.

    @param callback The callback with signature def callback(temp, humid,

    if temp is not None:
        print("\nTemperature = %3.2f" % temp)

    if humid is not None:
        print("Humidity = %3.2f" % humid)

    if light is not None:
        print("Light = %3.2f" % light)

    if temp is None and humid is None and light is None:
        print("\n*** Called without any parameters. ***")


def main():
    app.run(host = "", port = '3009')

if __name__ == '__main__':
'''! Example client code for depositing data on the backend server.
import pi_backend
import client
import datetime
import codes
import random

# My latitude and longitude
LON = 39.53
LAT = 40.09

# Username and password
USER = 'sws12'
PASSWORD = '3iksndkgnssf'

def print_result(result):
    '''! Prints the result from the server

    if result.status_code == 200:
        print("Operation OK.\n")
    elif result.status_code == 500:
        print("Technical error.")
        outcome = result.json()
        print("Error code: %d, details: %s\n" % (outcome['result'],

def print_data(data):
    '''! Prints data returned by the server.

    if data == {}:
        print("No data returned.\n")

    print("Data from server:")

    print("# of items found; %d\n" % data['count'])

    for datum in data['data']:
        print("Location in [lon, lat]: [%f, %f]." % (datum['loc'][0],
        print("Time in dd/mm/yy HH:MM:SS: ",
        '%d/%m/%y %H:%M:%S'))
        print("Temperature: ", datum['temp'])
        print("Humidity: ", datum['humid'])
        print("Light: ", datum['light'])
        print("Size of binary attachment: ", len(datum['raw']))


def main():

    # Create a new user. We will get an error message if the user exists.


    # Log in:
    client.login(USER, PASSWORD)

    # Enter random temperature and humidity data
    if __name__ == "__main__"
    temp  = str(pi_backend.temp)
    humid = str(pi_backend.humid)
    light = str(pi_backend.light)
    print("Adding new data.")
    resp = client.add_data(lon = LON, lat = LAT, temp = temp, humid = humid , light = light)

    #Pull the data we have
    print("Searching for data:")
    data  = client.find_data(username = 'sws12', sort =

    # If we have a valid data response, print it.
    if 'count' in data:

if __name__ == '__main__':

"""! @brief Client library for accessing the SWS3009 Base System Backend. """

# imports

import requests
import codes
import uri
import json
import datetime
import base64

headers = {'Content-type':'application/json'}

userid = ''
passwd = ''

# Print response messages

def print_resp(resp):

        result = resp.json()
        print("Response code: %d. Result: %d. Result Details: %s." %
        (resp.status_code, result['result'], result['details']))
    except Exception as e:
        print("Error printing response: ", e)

# @section: Helper functions
# This part of the library contains internal functions used later on.

def send_request(endpoint, request):
    """! Forms JSON POST requests to the backend.

    @param endpoint The endpoint to send to, e.g. '/user/add'
    @param request  The POST data in the form of a Python dictionary.

    @return The result from the server


    return requests.post(uri.URI + endpoint, headers=headers,
    data = json.dumps(request))

# User related calls

def login(user, password):
    '''! Records the user name and password in a global variable so that we
    don't need to include it in all calls.

    @param user User name
    @param password Password

    global userid, passwd

    userid = user
    passwd = password

def create_user(user, password):
    """! Create a new user.

    @param user New user ID.
    @param password Password for the new user.

    @return The result from the server.

    The result string can be obtained from result.text

    req  = {'userid':user, 'passwd':password}
    return send_request('/user/newuser', req)

def change_pw(new_password):
    """! Change existing user's password.

    @param new_password New password to change to.

    @return Result from server.

    global userid, passwd

    req = {'userid':userid, 'passwd':passwd, 'new_passwd':new_password}
    return send_request('/user/newpassword', req)

# Helper functions for data related calls

def load_binary_file(filename):
    ''' Load up a binary file.

    @param filename Name of file to load

    @return Binary data from file.


        with open(filename, "rb") as f:
            file_data = f.read()
        print("Unable to open file.")
        file_data = None

    return file_data

def add_data_file(lon, lat, timestamp = None, temp=-1.0, humid=-1.0,
light=-1.0, rain = -1.0, filename = None):
    ''' Add data to backend, optionlly reading binary from a file.

    Except for lon and lat, all other parameters are optional.

    @param lon, lat  Longitude and latitude
    @param temp     Temperature
    @param humid    Humitiy
    @param light    Light level
    @param filename Name of binary file to load

    @return Result from server.

    global userid, passwd

    if filename is None:
        add_data(lon, lat, timestamp, temp, humid, light, rain)
        file_data = load_binary_file(filename)
        return add_data(lon, lat, timestamp, temp, humid, light, rain, file_data)

def add_data(lon, lat, timestamp=None, temp=-1.0, humid=-1.0, light=-1.0, rain = -1.0,
    """! Add new IoT sensor data to the database.

    @param lat  Current location's latitude
    @param lon  Current location's longitude
    @param timestamp Date/time in DD:MM:YYYY:HH:MM:SS format
    @param temp Temperature (optional)
    @param humid    Humidity (optional)
    @param light    Light level (optional)
    @param rain     Whether it is raining
    @param raw  Raw data (optional). 

    @returns    Response from server

    global userid, passwd

    if timestamp is None:
        thetime = datetime.datetime.utcnow()

        timestr = datetime.datetime.strftime(thetime, codes.DATE_TIME_STR)
        timestr = timestamp

    req = {'userid':userid, 'passwd':passwd, 'lat':lat, 'lon':lon,
    'timestamp':timestr, 'temp':temp, 'humid':humid, 
    'light':light, 'rain':rain}

    if raw is None:
        req['raw'] = ''
            if len(raw) <= codes.MAX_BINARY_SIZE:
                req['raw'] = base64.b64encode(raw).decode('utf-8')
                print("Raw data too big. Ignored.")
                req['raw'] = ''
            print("Unable to encode raw data.")
            req['raw'] = ''

    if len(req) > codes.MAX_UPLOAD_SIZE:
        print("Upload size too large. Exiting.")
        return None
        return send_request('/data/add', req)

def find_data(lon = None, lat = None, radius = None,
timestamp = None, interval_width = None, username = None,
sort = codes.SORT_NONE):
    """! Search for data. All parameters are optional.

    @param lat  Latitude for geo search. Lat and Lon must be specified
    @param lon  Longitude for geo search. Lat and Lon must be specified
    @param radius   Maximum search radius in meters. Optional.
    @param timestamp    Timestamp search 
    @param interval_width   Timestamp search radius in minutes. Optional.
    @param username Search for data for a particular user.
    @param sort - Sort by user ID or timestamp, ascending or descending.

    @returns    Data from server in this format:
    {'count':<# of records found>,
     'data': {
        'username':<Name of user who deposited record>,
        'timestamp':<Time stamp of record as datetime.datetime object>,
        'temp': <Temperature>,
        'humid': <Humidity>,
        'light': <Light level>,
        'temp': <Temperature>,
        'rain':<Rain: 0.1 = No, 0.9 = Yes>,
        'raw': <Binary data>,
        'loc': <Location in [longitude, latitude]>}


    global userid, passwd

    req = {"userid":userid, "passwd":passwd, 'sort':sort}

    if lat is not None:
        if lon is not None:
            req['lat'] = lat;
            req['lon'] = lon;

            if radius is not None:
                req['maxdist'] = radius
            print("Longitude must be specified.")
            return None
    elif lon is not None:
        print("Latitude must be specified.")
        return None

    if timestamp is not None:
        if type(timestamp) is not datetime.datetime:
            print("Timestamp must be of type datetime.")
            return None

        req['timestamp'] = datetime.datetime.strftime(timestamp, codes.DATE_TIME_STR)
        if interval_width is not None:
            req['maxtimediff'] = interval_width

    if username is not None:
        req['username'] = username
    resp = send_request('/data/search', req)

    resp_dict = json.loads(resp.text)

    if resp_dict != {} and 'data' in resp_dict:

        for x in resp_dict['data']:
            if 'timestamp' in x:
                x['timestamp'] = datetime.datetime.strptime(
                x['timestamp'], codes.DATE_TIME_STR)

            if x['raw'] != '':
                    x['raw'] = base64.b64decode(x['raw'].encode('utf-8'))
                    print("Error, unable to process raw data.")

    return resp_dict


  • 写回答

2条回答 默认 最新

  • 天元浪子 Python领域优质创作者 2021-07-18 10:55





  • 创建了问题 7月18日


  • ¥30 这是哪个作者做的宝宝起名网站
  • ¥60 版本过低apk如何修改可以兼容新的安卓系统
  • ¥50 有数据,怎么建立模型求影响全要素生产率的因素
  • ¥50 有数据,怎么用matlab求全要素生产率
  • ¥15 TI的insta-spin例程
  • ¥15 完成下列问题完成下列问题
  • ¥15 C#算法问题, 不知道怎么处理这个数据的转换
  • ¥15 YoloV5 第三方库的版本对照问题
  • ¥15 请完成下列相关问题!