巫山雨_fz 2021-11-24 09:13 采纳率: 74.1%
浏览 91
已结题

win10系统usb转gpio模块在Python中import pygpio怎么初始化FTDI公司的FT232H模块

网购的USB转GPIO模块
我的代码如下:

import sys
from sys import argv,exit
from pygpio import *

IN1 = 1
def setup_gpio():
    GPIO.setup(IN1, OUT)

if __name__ == '__main__':
    app = QApplication(sys.argv)
    setup_gpio()
    sys.exit(app.exec_())

报错如下:
File "D:\lpcj\Qt-Camera\lpcj3.py", line 166, in setup_gpio
GPIO.setup(IN1, OUT)
AttributeError: module 'pygpio.GPIO' has no attribute 'setup'

打开FT232H.py,内容如下,我看不懂,要怎么写初始化代码,请专家指点。

import atexit
import logging
import math
import os
import subprocess
import sys
import time

import ftdi1 as ftdi

import Adafruit_GPIO.GPIO as GPIO


logger = logging.getLogger(__name__)

FT232H_VID = 0x0403   # Default FTDI FT232H vendor ID
FT232H_PID = 0x6014   # Default FTDI FT232H product ID

MSBFIRST = 0
LSBFIRST = 1

_REPEAT_DELAY = 4


def _check_running_as_root():
    # NOTE: Checking for root with user ID 0 isn't very portable, perhaps
    # there's a better alternative?
    if os.geteuid() != 0:
        raise RuntimeError('Expected to be run by root user! Try running with sudo.')

def disable_FTDI_driver():
    """Disable the FTDI drivers for the current platform.  This is necessary
    because they will conflict with libftdi and accessing the FT232H.  Note you
    can enable the FTDI drivers again by calling enable_FTDI_driver.
    """
    logger.debug('Disabling FTDI driver.')
    if sys.platform == 'darwin':
        logger.debug('Detected Mac OSX')
        # Mac OS commands to disable FTDI driver.
        _check_running_as_root()
        subprocess.call('kextunload -b com.apple.driver.AppleUSBFTDI', shell=True)
        subprocess.call('kextunload /System/Library/Extensions/FTDIUSBSerialDriver.kext', shell=True)
    elif sys.platform.startswith('linux'):
        logger.debug('Detected Linux')
        # Linux commands to disable FTDI driver.
        _check_running_as_root()
        subprocess.call('modprobe -r -q ftdi_sio', shell=True)
        subprocess.call('modprobe -r -q usbserial', shell=True)
    # Note there is no need to disable FTDI drivers on Windows!

def enable_FTDI_driver():
    """Re-enable the FTDI drivers for the current platform."""
    logger.debug('Enabling FTDI driver.')
    if sys.platform == 'darwin':
        logger.debug('Detected Mac OSX')
        # Mac OS commands to enable FTDI driver.
        _check_running_as_root()
        subprocess.check_call('kextload -b com.apple.driver.AppleUSBFTDI', shell=True)
        subprocess.check_call('kextload /System/Library/Extensions/FTDIUSBSerialDriver.kext', shell=True)
    elif sys.platform.startswith('linux'):
        logger.debug('Detected Linux')
        # Linux commands to enable FTDI driver.
        _check_running_as_root()
        subprocess.check_call('modprobe -q ftdi_sio', shell=True)
        subprocess.check_call('modprobe -q usbserial', shell=True)

def use_FT232H():
    """Disable any built in FTDI drivers which will conflict and cause problems
    with libftdi (which is used to communicate with the FT232H).  Will register
    an exit function so the drivers are re-enabled on program exit.
    """
    disable_FTDI_driver()
    atexit.register(enable_FTDI_driver)

def enumerate_device_serials(vid=FT232H_VID, pid=FT232H_PID):
    """Return a list of all FT232H device serial numbers connected to the
    machine.  You can use these serial numbers to open a specific FT232H device
    by passing it to the FT232H initializer's serial parameter.
    """
    try:
        # Create a libftdi context.
        ctx = None
        ctx = ftdi.new()
        # Enumerate FTDI devices.
        device_list = None
        count, device_list = ftdi.usb_find_all(ctx, vid, pid)
        if count < 0:
            raise RuntimeError('ftdi_usb_find_all returned error {0}: {1}'.format(count, ftdi.get_error_string(self._ctx)))
        # Walk through list of devices and assemble list of serial numbers.
        devices = []
        while device_list is not None:
            # Get USB device strings and add serial to list of devices.
            ret, manufacturer, description, serial = ftdi.usb_get_strings(ctx, device_list.dev, 256, 256, 256)
            if serial is not None:
                devices.append(serial)
            device_list = device_list.next
        return devices
    finally:
        # Make sure to clean up list and context when done.
        if device_list is not None:
            ftdi.list_free(device_list)
        if ctx is not None:
            ftdi.free(ctx)


class FT232H(GPIO.BaseGPIO):
    # Make GPIO constants that match main GPIO class for compatibility.
    HIGH = GPIO.HIGH
    LOW  = GPIO.LOW
    IN   = GPIO.IN
    OUT  = GPIO.OUT

    def __init__(self, vid=FT232H_VID, pid=FT232H_PID, serial=None):
        """Create a FT232H object.  Will search for the first available FT232H
        device with the specified USB vendor ID and product ID (defaults to
        FT232H default VID & PID).  Can also specify an optional serial number
        string to open an explicit FT232H device given its serial number.  See
        the FT232H.enumerate_device_serials() function to see how to list all
        connected device serial numbers.
        """
        # Initialize FTDI device connection.
        self._ctx = ftdi.new()
        if self._ctx == 0:
            raise RuntimeError('ftdi_new failed! Is libftdi1 installed?')
        # Register handler to close and cleanup FTDI context on program exit.
        atexit.register(self.close)
        if serial is None:
            # Open USB connection for specified VID and PID if no serial is specified.
            self._check(ftdi.usb_open, vid, pid)
        else:
            # Open USB connection for VID, PID, serial.
            self._check(ftdi.usb_open_string, 's:{0}:{1}:{2}'.format(vid, pid, serial))
        # Reset device.
        self._check(ftdi.usb_reset)
        # Disable flow control. Commented out because it is unclear if this is necessary.
        #self._check(ftdi.setflowctrl, ftdi.SIO_DISABLE_FLOW_CTRL)
        # Change read & write buffers to maximum size, 65535 bytes.
        self._check(ftdi.read_data_set_chunksize, 65535)
        self._check(ftdi.write_data_set_chunksize, 65535)
        # Clear pending read data & write buffers.
        self._check(ftdi.usb_purge_buffers)
        # Enable MPSSE and syncronize communication with device.
        self._mpsse_enable()
        self._mpsse_sync()
        # Initialize all GPIO as inputs.
        self._write('\x80\x00\x00\x82\x00\x00')
        self._direction = 0x0000
        self._level = 0x0000

    def close(self):
        """Close the FTDI device.  Will be automatically called when the program ends."""
        if self._ctx is not None:
            ftdi.free(self._ctx)
        self._ctx = None

    def _write(self, string):
        """Helper function to call write_data on the provided FTDI device and
        verify it succeeds.
        """
        # Get modem status. Useful to enable for debugging.
        #ret, status = ftdi.poll_modem_status(self._ctx)
        #if ret == 0:
        #    logger.debug('Modem status {0:02X}'.format(status))
        #else:
        #    logger.debug('Modem status error {0}'.format(ret))
        length = len(string)
        ret = ftdi.write_data(self._ctx, string, length)
        # Log the string that was written in a python hex string format using a very
        # ugly one-liner list comprehension for brevity.
        #logger.debug('Wrote {0}'.format(''.join(['\\x{0:02X}'.format(ord(x)) for x in string])))
        if ret < 0:
            raise RuntimeError('ftdi_write_data failed with error {0}: {1}'.format(ret, ftdi.get_error_string(self._ctx)))
        if ret != length:
            raise RuntimeError('ftdi_write_data expected to write {0} bytes but actually wrote {1}!'.format(length, ret))

    def _check(self, command, *args):
        """Helper function to call the provided command on the FTDI device and
        verify the response matches the expected value.
        """
        ret = command(self._ctx, *args)
        logger.debug('Called ftdi_{0} and got response {1}.'.format(command.__name__, ret))
        if ret != 0:
            raise RuntimeError('ftdi_{0} failed with error {1}: {2}'.format(command.__name__, ret, ftdi.get_error_string(self._ctx)))

    def _poll_read(self, expected, timeout_s=5.0):
        """Helper function to continuously poll reads on the FTDI device until an
        expected number of bytes are returned.  Will throw a timeout error if no
        data is received within the specified number of timeout seconds.  Returns
        the read data as a string if successful, otherwise raises an execption.
        """
        start = time.time()
        # Start with an empty response buffer.
        response = bytearray(expected)
        index = 0
        # Loop calling read until the response buffer is full or a timeout occurs.
        while time.time() - start <= timeout_s:
            ret, data = ftdi.read_data(self._ctx, expected - index)
            # Fail if there was an error reading data.
            if ret < 0:
                raise RuntimeError('ftdi_read_data failed with error code {0}.'.format(ret))
            # Add returned data to the buffer.
            response[index:index+ret] = data[:ret]
            index += ret
            # Buffer is full, return the result data.
            if index >= expected:
                return str(response)
            time.sleep(0.01)
        raise RuntimeError('Timeout while polling ftdi_read_data for {0} bytes!'.format(expected))

    def _mpsse_enable(self):
        """Enable MPSSE mode on the FTDI device."""
        # Reset MPSSE by sending mask = 0 and mode = 0
        self._check(ftdi.set_bitmode, 0, 0)
        # Enable MPSSE by sending mask = 0 and mode = 2
        self._check(ftdi.set_bitmode, 0, 2)

    def _mpsse_sync(self, max_retries=10):
        """Synchronize buffers with MPSSE by sending bad opcode and reading expected
        error response.  Should be called once after enabling MPSSE."""
        # Send a bad/unknown command (0xAB), then read buffer until bad command
        # response is found.
        self._write('\xAB')
        # Keep reading until bad command response (0xFA 0xAB) is returned.
        # Fail if too many read attempts are made to prevent sticking in a loop.
        tries = 0
        sync = False
        while not sync:
            data = self._poll_read(2)
            if data == '\xFA\xAB':
                sync = True
            tries += 1
            if tries >= max_retries:
                raise RuntimeError('Could not synchronize with FT232H!')

    def mpsse_set_clock(self, clock_hz, adaptive=False, three_phase=False):
        """Set the clock speed of the MPSSE engine.  Can be any value from 450hz
        to 30mhz and will pick that speed or the closest speed below it.
        """
        # Disable clock divisor by 5 to enable faster speeds on FT232H.
        self._write('\x8A')
        # Turn on/off adaptive clocking.
        if adaptive:
            self._write('\x96')
        else:
            self._write('\x97')
        # Turn on/off three phase clock (needed for I2C).
        # Also adjust the frequency for three-phase clocking as specified in section 2.2.4
        # of this document:
        #   http://www.ftdichip.com/Support/Documents/AppNotes/AN_255_USB%20to%20I2C%20Example%20using%20the%20FT232H%20and%20FT201X%20devices.pdf
        if three_phase:
            self._write('\x8C')
        else:
            self._write('\x8D')
        # Compute divisor for requested clock.
        # Use equation from section 3.8.1 of:
        #  http://www.ftdichip.com/Support/Documents/AppNotes/AN_108_Command_Processor_for_MPSSE_and_MCU_Host_Bus_Emulation_Modes.pdf
        # Note equation is using 60mhz master clock instead of 12mhz.
        divisor = int(math.ceil((30000000.0-float(clock_hz))/float(clock_hz))) & 0xFFFF
        if three_phase:
            divisor = int(divisor*(2.0/3.0))
        logger.debug('Setting clockspeed with divisor value {0}'.format(divisor))
        # Send command to set divisor from low and high byte values.
        self._write(str(bytearray((0x86, divisor & 0xFF, (divisor >> 8) & 0xFF))))

    def mpsse_read_gpio(self):
        """Read both GPIO bus states and return a 16 bit value with their state.
        D0-D7 are the lower 8 bits and C0-C7 are the upper 8 bits.
        """
        # Send command to read low byte and high byte.
        self._write('\x81\x83')
        # Wait for 2 byte response.
        data = self._poll_read(2)
        # Assemble response into 16 bit value.
        low_byte = ord(data[0])
        high_byte = ord(data[1])
        logger.debug('Read MPSSE GPIO low byte = {0:02X} and high byte = {1:02X}'.format(low_byte, high_byte))
        return (high_byte << 8) | low_byte

    def mpsse_gpio(self):
        """Return command to update the MPSSE GPIO state to the current direction
        and level.
        """
        level_low  = chr(self._level & 0xFF)
        level_high = chr((self._level >> 8) & 0xFF)
        dir_low  = chr(self._direction & 0xFF)
        dir_high = chr((self._direction >> 8) & 0xFF)
        return str(bytearray((0x80, level_low, dir_low, 0x82, level_high, dir_high)))

    def mpsse_write_gpio(self):
        """Write the current MPSSE GPIO state to the FT232H chip."""
        self._write(self.mpsse_gpio())

    def get_i2c_device(self, address, **kwargs):
        """Return an I2CDevice instance using this FT232H object and the provided
        I2C address.  Meant to be passed as the i2c_provider parameter to objects
        which use the Adafruit_Python_GPIO library for I2C.
        """
        return I2CDevice(self, address, **kwargs)

    # GPIO functions below:

    def _setup_pin(self, pin, mode):
        if pin < 0 or pin > 15:
            raise ValueError('Pin must be between 0 and 15 (inclusive).')
        if mode not in (GPIO.IN, GPIO.OUT):
            raise ValueError('Mode must be GPIO.IN or GPIO.OUT.')
        if mode == GPIO.IN:
            # Set the direction and level of the pin to 0.
            self._direction &= ~(1 << pin) & 0xFFFF
            self._level     &= ~(1 << pin) & 0xFFFF
        else:
            # Set the direction of the pin to 1.
            self._direction |= (1 << pin) & 0xFFFF

    def setup(self, pin, mode):
        """Set the input or output mode for a specified pin.  Mode should be
        either OUT or IN."""
        self._setup_pin(pin, mode)
        self.mpsse_write_gpio()

    def setup_pins(self, pins, values={}, write=True):
        """Setup multiple pins as inputs or outputs at once.  Pins should be a
        dict of pin name to pin mode (IN or OUT).  Optional starting values of
        pins can be provided in the values dict (with pin name to pin value).
        """
        # General implementation that can be improved by subclasses.
        for pin, mode in iter(pins.items()):
            self._setup_pin(pin, mode)
        for pin, value in iter(values.items()):
            self._output_pin(pin, value)
        if write:
            self.mpsse_write_gpio()

    def _output_pin(self, pin, value):
        if value:
            self._level |= (1 << pin) & 0xFFFF
        else:
            self._level &= ~(1 << pin) & 0xFFFF

    def output(self, pin, value):
        """Set the specified pin the provided high/low value.  Value should be
        either HIGH/LOW or a boolean (true = high)."""
        if pin < 0 or pin > 15:
            raise ValueError('Pin must be between 0 and 15 (inclusive).')
        self._output_pin(pin, value)
        self.mpsse_write_gpio()

    def output_pins(self, pins, write=True):
        """Set multiple pins high or low at once.  Pins should be a dict of pin
        name to pin value (HIGH/True for 1, LOW/False for 0).  All provided pins
        will be set to the given values.
        """
        for pin, value in iter(pins.items()):
            self._output_pin(pin, value)
        if write:
            self.mpsse_write_gpio()

    def input(self, pin):
        """Read the specified pin and return HIGH/true if the pin is pulled high,
        or LOW/false if pulled low."""
        return self.input_pins([pin])[0]

    def input_pins(self, pins):
        """Read multiple pins specified in the given list and return list of pin values
        GPIO.HIGH/True if the pin is pulled high, or GPIO.LOW/False if pulled low."""
        if [pin for pin in pins if pin < 0 or pin > 15]:
            raise ValueError('Pin must be between 0 and 15 (inclusive).')
        _pins = self.mpsse_read_gpio()
        return [((_pins >> pin) & 0x0001) == 1 for pin in pins]



  • 写回答

3条回答 默认 最新

  • 辉煌仪奇 2021-11-24 09:26
    关注

    报错内容是

     module 'pygpio.GPIO' has no attribute 'setup'
    

    所以你这里调用这个模块有问题,文件中可能没有,

    GPIO.setup(IN1, OUT)
    

    还有,在 setup_gpio函数中 IN1全局变量没有导入,out没有定义

     setup_gpio():
    

    如果你非要调用该模块,你可以试试下面方法

    from pygpio import FT232H
    
    GPIO=FT232H.FT232H()
    GPIO.setup(IN1, OUT)
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论 编辑记录
查看更多回答(2条)

报告相同问题?

问题事件

  • 系统已结题 12月5日
  • 已采纳回答 11月27日
  • 创建了问题 11月24日

悬赏问题

  • ¥20 有人能用聚类分析帮我分析一下文本内容嘛
  • ¥15 请问Lammps做复合材料拉伸模拟,应力应变曲线问题
  • ¥30 python代码,帮调试
  • ¥15 #MATLAB仿真#车辆换道路径规划
  • ¥15 java 操作 elasticsearch 8.1 实现 索引的重建
  • ¥15 数据可视化Python
  • ¥15 要给毕业设计添加扫码登录的功能!!有偿
  • ¥15 kafka 分区副本增加会导致消息丢失或者不可用吗?
  • ¥15 微信公众号自制会员卡没有收款渠道啊
  • ¥100 Jenkins自动化部署—悬赏100元