网购的USB转GPIO模块
我的代码如下:
import sys
from sys import argv,exit
from pygpio import *
IN1 = 1
def setup_gpio():
GPIO.setup(IN1, OUT)
if __name__ == '__main__':
app = QApplication(sys.argv)
setup_gpio()
sys.exit(app.exec_())
报错如下:
File "D:\lpcj\Qt-Camera\lpcj3.py", line 166, in setup_gpio
GPIO.setup(IN1, OUT)
AttributeError: module 'pygpio.GPIO' has no attribute 'setup'
打开FT232H.py,内容如下,我看不懂,要怎么写初始化代码,请专家指点。
import atexit
import logging
import math
import os
import subprocess
import sys
import time
import ftdi1 as ftdi
import Adafruit_GPIO.GPIO as GPIO
logger = logging.getLogger(__name__)
FT232H_VID = 0x0403 # Default FTDI FT232H vendor ID
FT232H_PID = 0x6014 # Default FTDI FT232H product ID
MSBFIRST = 0
LSBFIRST = 1
_REPEAT_DELAY = 4
def _check_running_as_root():
# NOTE: Checking for root with user ID 0 isn't very portable, perhaps
# there's a better alternative?
if os.geteuid() != 0:
raise RuntimeError('Expected to be run by root user! Try running with sudo.')
def disable_FTDI_driver():
"""Disable the FTDI drivers for the current platform. This is necessary
because they will conflict with libftdi and accessing the FT232H. Note you
can enable the FTDI drivers again by calling enable_FTDI_driver.
"""
logger.debug('Disabling FTDI driver.')
if sys.platform == 'darwin':
logger.debug('Detected Mac OSX')
# Mac OS commands to disable FTDI driver.
_check_running_as_root()
subprocess.call('kextunload -b com.apple.driver.AppleUSBFTDI', shell=True)
subprocess.call('kextunload /System/Library/Extensions/FTDIUSBSerialDriver.kext', shell=True)
elif sys.platform.startswith('linux'):
logger.debug('Detected Linux')
# Linux commands to disable FTDI driver.
_check_running_as_root()
subprocess.call('modprobe -r -q ftdi_sio', shell=True)
subprocess.call('modprobe -r -q usbserial', shell=True)
# Note there is no need to disable FTDI drivers on Windows!
def enable_FTDI_driver():
"""Re-enable the FTDI drivers for the current platform."""
logger.debug('Enabling FTDI driver.')
if sys.platform == 'darwin':
logger.debug('Detected Mac OSX')
# Mac OS commands to enable FTDI driver.
_check_running_as_root()
subprocess.check_call('kextload -b com.apple.driver.AppleUSBFTDI', shell=True)
subprocess.check_call('kextload /System/Library/Extensions/FTDIUSBSerialDriver.kext', shell=True)
elif sys.platform.startswith('linux'):
logger.debug('Detected Linux')
# Linux commands to enable FTDI driver.
_check_running_as_root()
subprocess.check_call('modprobe -q ftdi_sio', shell=True)
subprocess.check_call('modprobe -q usbserial', shell=True)
def use_FT232H():
"""Disable any built in FTDI drivers which will conflict and cause problems
with libftdi (which is used to communicate with the FT232H). Will register
an exit function so the drivers are re-enabled on program exit.
"""
disable_FTDI_driver()
atexit.register(enable_FTDI_driver)
def enumerate_device_serials(vid=FT232H_VID, pid=FT232H_PID):
"""Return a list of all FT232H device serial numbers connected to the
machine. You can use these serial numbers to open a specific FT232H device
by passing it to the FT232H initializer's serial parameter.
"""
try:
# Create a libftdi context.
ctx = None
ctx = ftdi.new()
# Enumerate FTDI devices.
device_list = None
count, device_list = ftdi.usb_find_all(ctx, vid, pid)
if count < 0:
raise RuntimeError('ftdi_usb_find_all returned error {0}: {1}'.format(count, ftdi.get_error_string(self._ctx)))
# Walk through list of devices and assemble list of serial numbers.
devices = []
while device_list is not None:
# Get USB device strings and add serial to list of devices.
ret, manufacturer, description, serial = ftdi.usb_get_strings(ctx, device_list.dev, 256, 256, 256)
if serial is not None:
devices.append(serial)
device_list = device_list.next
return devices
finally:
# Make sure to clean up list and context when done.
if device_list is not None:
ftdi.list_free(device_list)
if ctx is not None:
ftdi.free(ctx)
class FT232H(GPIO.BaseGPIO):
# Make GPIO constants that match main GPIO class for compatibility.
HIGH = GPIO.HIGH
LOW = GPIO.LOW
IN = GPIO.IN
OUT = GPIO.OUT
def __init__(self, vid=FT232H_VID, pid=FT232H_PID, serial=None):
"""Create a FT232H object. Will search for the first available FT232H
device with the specified USB vendor ID and product ID (defaults to
FT232H default VID & PID). Can also specify an optional serial number
string to open an explicit FT232H device given its serial number. See
the FT232H.enumerate_device_serials() function to see how to list all
connected device serial numbers.
"""
# Initialize FTDI device connection.
self._ctx = ftdi.new()
if self._ctx == 0:
raise RuntimeError('ftdi_new failed! Is libftdi1 installed?')
# Register handler to close and cleanup FTDI context on program exit.
atexit.register(self.close)
if serial is None:
# Open USB connection for specified VID and PID if no serial is specified.
self._check(ftdi.usb_open, vid, pid)
else:
# Open USB connection for VID, PID, serial.
self._check(ftdi.usb_open_string, 's:{0}:{1}:{2}'.format(vid, pid, serial))
# Reset device.
self._check(ftdi.usb_reset)
# Disable flow control. Commented out because it is unclear if this is necessary.
#self._check(ftdi.setflowctrl, ftdi.SIO_DISABLE_FLOW_CTRL)
# Change read & write buffers to maximum size, 65535 bytes.
self._check(ftdi.read_data_set_chunksize, 65535)
self._check(ftdi.write_data_set_chunksize, 65535)
# Clear pending read data & write buffers.
self._check(ftdi.usb_purge_buffers)
# Enable MPSSE and syncronize communication with device.
self._mpsse_enable()
self._mpsse_sync()
# Initialize all GPIO as inputs.
self._write('\x80\x00\x00\x82\x00\x00')
self._direction = 0x0000
self._level = 0x0000
def close(self):
"""Close the FTDI device. Will be automatically called when the program ends."""
if self._ctx is not None:
ftdi.free(self._ctx)
self._ctx = None
def _write(self, string):
"""Helper function to call write_data on the provided FTDI device and
verify it succeeds.
"""
# Get modem status. Useful to enable for debugging.
#ret, status = ftdi.poll_modem_status(self._ctx)
#if ret == 0:
# logger.debug('Modem status {0:02X}'.format(status))
#else:
# logger.debug('Modem status error {0}'.format(ret))
length = len(string)
ret = ftdi.write_data(self._ctx, string, length)
# Log the string that was written in a python hex string format using a very
# ugly one-liner list comprehension for brevity.
#logger.debug('Wrote {0}'.format(''.join(['\\x{0:02X}'.format(ord(x)) for x in string])))
if ret < 0:
raise RuntimeError('ftdi_write_data failed with error {0}: {1}'.format(ret, ftdi.get_error_string(self._ctx)))
if ret != length:
raise RuntimeError('ftdi_write_data expected to write {0} bytes but actually wrote {1}!'.format(length, ret))
def _check(self, command, *args):
"""Helper function to call the provided command on the FTDI device and
verify the response matches the expected value.
"""
ret = command(self._ctx, *args)
logger.debug('Called ftdi_{0} and got response {1}.'.format(command.__name__, ret))
if ret != 0:
raise RuntimeError('ftdi_{0} failed with error {1}: {2}'.format(command.__name__, ret, ftdi.get_error_string(self._ctx)))
def _poll_read(self, expected, timeout_s=5.0):
"""Helper function to continuously poll reads on the FTDI device until an
expected number of bytes are returned. Will throw a timeout error if no
data is received within the specified number of timeout seconds. Returns
the read data as a string if successful, otherwise raises an execption.
"""
start = time.time()
# Start with an empty response buffer.
response = bytearray(expected)
index = 0
# Loop calling read until the response buffer is full or a timeout occurs.
while time.time() - start <= timeout_s:
ret, data = ftdi.read_data(self._ctx, expected - index)
# Fail if there was an error reading data.
if ret < 0:
raise RuntimeError('ftdi_read_data failed with error code {0}.'.format(ret))
# Add returned data to the buffer.
response[index:index+ret] = data[:ret]
index += ret
# Buffer is full, return the result data.
if index >= expected:
return str(response)
time.sleep(0.01)
raise RuntimeError('Timeout while polling ftdi_read_data for {0} bytes!'.format(expected))
def _mpsse_enable(self):
"""Enable MPSSE mode on the FTDI device."""
# Reset MPSSE by sending mask = 0 and mode = 0
self._check(ftdi.set_bitmode, 0, 0)
# Enable MPSSE by sending mask = 0 and mode = 2
self._check(ftdi.set_bitmode, 0, 2)
def _mpsse_sync(self, max_retries=10):
"""Synchronize buffers with MPSSE by sending bad opcode and reading expected
error response. Should be called once after enabling MPSSE."""
# Send a bad/unknown command (0xAB), then read buffer until bad command
# response is found.
self._write('\xAB')
# Keep reading until bad command response (0xFA 0xAB) is returned.
# Fail if too many read attempts are made to prevent sticking in a loop.
tries = 0
sync = False
while not sync:
data = self._poll_read(2)
if data == '\xFA\xAB':
sync = True
tries += 1
if tries >= max_retries:
raise RuntimeError('Could not synchronize with FT232H!')
def mpsse_set_clock(self, clock_hz, adaptive=False, three_phase=False):
"""Set the clock speed of the MPSSE engine. Can be any value from 450hz
to 30mhz and will pick that speed or the closest speed below it.
"""
# Disable clock divisor by 5 to enable faster speeds on FT232H.
self._write('\x8A')
# Turn on/off adaptive clocking.
if adaptive:
self._write('\x96')
else:
self._write('\x97')
# Turn on/off three phase clock (needed for I2C).
# Also adjust the frequency for three-phase clocking as specified in section 2.2.4
# of this document:
# http://www.ftdichip.com/Support/Documents/AppNotes/AN_255_USB%20to%20I2C%20Example%20using%20the%20FT232H%20and%20FT201X%20devices.pdf
if three_phase:
self._write('\x8C')
else:
self._write('\x8D')
# Compute divisor for requested clock.
# Use equation from section 3.8.1 of:
# http://www.ftdichip.com/Support/Documents/AppNotes/AN_108_Command_Processor_for_MPSSE_and_MCU_Host_Bus_Emulation_Modes.pdf
# Note equation is using 60mhz master clock instead of 12mhz.
divisor = int(math.ceil((30000000.0-float(clock_hz))/float(clock_hz))) & 0xFFFF
if three_phase:
divisor = int(divisor*(2.0/3.0))
logger.debug('Setting clockspeed with divisor value {0}'.format(divisor))
# Send command to set divisor from low and high byte values.
self._write(str(bytearray((0x86, divisor & 0xFF, (divisor >> 8) & 0xFF))))
def mpsse_read_gpio(self):
"""Read both GPIO bus states and return a 16 bit value with their state.
D0-D7 are the lower 8 bits and C0-C7 are the upper 8 bits.
"""
# Send command to read low byte and high byte.
self._write('\x81\x83')
# Wait for 2 byte response.
data = self._poll_read(2)
# Assemble response into 16 bit value.
low_byte = ord(data[0])
high_byte = ord(data[1])
logger.debug('Read MPSSE GPIO low byte = {0:02X} and high byte = {1:02X}'.format(low_byte, high_byte))
return (high_byte << 8) | low_byte
def mpsse_gpio(self):
"""Return command to update the MPSSE GPIO state to the current direction
and level.
"""
level_low = chr(self._level & 0xFF)
level_high = chr((self._level >> 8) & 0xFF)
dir_low = chr(self._direction & 0xFF)
dir_high = chr((self._direction >> 8) & 0xFF)
return str(bytearray((0x80, level_low, dir_low, 0x82, level_high, dir_high)))
def mpsse_write_gpio(self):
"""Write the current MPSSE GPIO state to the FT232H chip."""
self._write(self.mpsse_gpio())
def get_i2c_device(self, address, **kwargs):
"""Return an I2CDevice instance using this FT232H object and the provided
I2C address. Meant to be passed as the i2c_provider parameter to objects
which use the Adafruit_Python_GPIO library for I2C.
"""
return I2CDevice(self, address, **kwargs)
# GPIO functions below:
def _setup_pin(self, pin, mode):
if pin < 0 or pin > 15:
raise ValueError('Pin must be between 0 and 15 (inclusive).')
if mode not in (GPIO.IN, GPIO.OUT):
raise ValueError('Mode must be GPIO.IN or GPIO.OUT.')
if mode == GPIO.IN:
# Set the direction and level of the pin to 0.
self._direction &= ~(1 << pin) & 0xFFFF
self._level &= ~(1 << pin) & 0xFFFF
else:
# Set the direction of the pin to 1.
self._direction |= (1 << pin) & 0xFFFF
def setup(self, pin, mode):
"""Set the input or output mode for a specified pin. Mode should be
either OUT or IN."""
self._setup_pin(pin, mode)
self.mpsse_write_gpio()
def setup_pins(self, pins, values={}, write=True):
"""Setup multiple pins as inputs or outputs at once. Pins should be a
dict of pin name to pin mode (IN or OUT). Optional starting values of
pins can be provided in the values dict (with pin name to pin value).
"""
# General implementation that can be improved by subclasses.
for pin, mode in iter(pins.items()):
self._setup_pin(pin, mode)
for pin, value in iter(values.items()):
self._output_pin(pin, value)
if write:
self.mpsse_write_gpio()
def _output_pin(self, pin, value):
if value:
self._level |= (1 << pin) & 0xFFFF
else:
self._level &= ~(1 << pin) & 0xFFFF
def output(self, pin, value):
"""Set the specified pin the provided high/low value. Value should be
either HIGH/LOW or a boolean (true = high)."""
if pin < 0 or pin > 15:
raise ValueError('Pin must be between 0 and 15 (inclusive).')
self._output_pin(pin, value)
self.mpsse_write_gpio()
def output_pins(self, pins, write=True):
"""Set multiple pins high or low at once. Pins should be a dict of pin
name to pin value (HIGH/True for 1, LOW/False for 0). All provided pins
will be set to the given values.
"""
for pin, value in iter(pins.items()):
self._output_pin(pin, value)
if write:
self.mpsse_write_gpio()
def input(self, pin):
"""Read the specified pin and return HIGH/true if the pin is pulled high,
or LOW/false if pulled low."""
return self.input_pins([pin])[0]
def input_pins(self, pins):
"""Read multiple pins specified in the given list and return list of pin values
GPIO.HIGH/True if the pin is pulled high, or GPIO.LOW/False if pulled low."""
if [pin for pin in pins if pin < 0 or pin > 15]:
raise ValueError('Pin must be between 0 and 15 (inclusive).')
_pins = self.mpsse_read_gpio()
return [((_pins >> pin) & 0x0001) == 1 for pin in pins]