diff --git a/examples/serial_port.py b/examples/serial_port.py index d0902f6..0c2db02 100644 --- a/examples/serial_port.py +++ b/examples/serial_port.py @@ -1,3 +1,4 @@ +from __future__ import print_function # emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- # vi: set ft=python sts=4 ts=4 sw=4 et: from ioLabs import USBBox @@ -9,4 +10,4 @@ while True: bytes=usbbox.serial.read() if bytes: - print bytes \ No newline at end of file + print(bytes) \ No newline at end of file diff --git a/examples/simonsays.py b/examples/simonsays.py index c5b56fb..f2f5030 100644 --- a/examples/simonsays.py +++ b/examples/simonsays.py @@ -1,4 +1,8 @@ +from __future__ import print_function +from builtins import input +from builtins import zip +from builtins import range from ioLabs import USBBox, REPORT import time @@ -12,7 +16,7 @@ def flash(usbbox,led_mask=0xFF,rate=1,count=1): # ensure light off first usbbox.commands.p2set(0x00) - for i in xrange(count): + for i in range(count): usbbox.commands.p2set(led_mask) time.sleep(rate/2.0) usbbox.commands.p2set(0x00) @@ -33,8 +37,8 @@ def simon_says(usbbox,num): # create a random sequence (0-7) import random - order=[random.choice(xrange(8)) for i in range(num)] - print "simon says:", " ".join([BUTTON_COLORS[i] for i in order]) + order=[random.choice(range(8)) for i in range(num)] + print("simon says:", " ".join([BUTTON_COLORS[i] for i in order])) # flash LEDs once flash(usbbox) @@ -104,25 +108,25 @@ def key_report(report): # should tell us how long the user took to press all of the # buttons last_press=keyevents[-1].rtc - print "correct (%dms)" % last_press + print("correct (%dms)" % last_press) flash(usbbox) else: - print "user said:", " ".join([BUTTON_COLORS[event.key_code] for event in keyevents]) - print "wrong" + print("user said:", " ".join([BUTTON_COLORS[event.key_code] for event in keyevents])) + print("wrong") flash(usbbox,rate=0.25,count=3) return correct usbbox=USBBox() -print "Simon Says" -print "consists of several rounds of sequences being shown:" -print "1) LEDs will flash on box" -print "2) a sequence of LEDs will be shown" -print "3) after the sequence finishes the LEDs will flash again" -print "4) enter in the sequence previously shown (press the buttons that match the LEDs)" -print "5) if you get it right the LEDs will flash once and it'll repeat the cycle (with one more item to remember)" -print "6) if you get it wrong the LEDs will flash three times and you'll be asked if you want to play a new game" +print("Simon Says") +print("consists of several rounds of sequences being shown:") +print("1) LEDs will flash on box") +print("2) a sequence of LEDs will be shown") +print("3) after the sequence finishes the LEDs will flash again") +print("4) enter in the sequence previously shown (press the buttons that match the LEDs)") +print("5) if you get it right the LEDs will flash once and it'll repeat the cycle (with one more item to remember)") +print("6) if you get it wrong the LEDs will flash three times and you'll be asked if you want to play a new game") # use logset so that 1 turns LED on and 0 turn LED off usbbox.commands.logset(0xFF,0xFF) @@ -132,7 +136,7 @@ def key_report(report): usbbox.commands.dirset(0,0,0) while True: - command=raw_input("play game y/n [y]: ").strip() + command=input("play game y/n [y]: ").strip() if command == '': command = 'y' if command.lower() != 'y': @@ -140,7 +144,7 @@ def key_report(report): for i in range(1,50): time.sleep(1) - print "%d to remember" % i + print("%d to remember" % i) if not simon_says(usbbox,i): break diff --git a/examples/simonsays2.py b/examples/simonsays2.py index a5e0a3b..762a086 100644 --- a/examples/simonsays2.py +++ b/examples/simonsays2.py @@ -1,4 +1,7 @@ +from __future__ import print_function +from builtins import input +from builtins import range from ioLabs import USBBox, REPORT import time @@ -11,7 +14,7 @@ def flash(usbbox,led_mask=0xFF,rate=1,count=1): # ensure light off first usbbox.leds.state=0x00 - for i in xrange(count): + for i in range(count): usbbox.leds.state=led_mask time.sleep(rate/2.0) usbbox.leds.state=0x00 @@ -32,8 +35,8 @@ def simon_says(usbbox,num): # create a random sequence (0-7) import random - order=[random.choice(xrange(8)) for i in range(num)] - print "simon says:", " ".join([BUTTON_COLORS[i] for i in order]) + order=[random.choice(range(8)) for i in range(num)] + print("simon says:", " ".join([BUTTON_COLORS[i] for i in order])) # flash LEDs once flash(usbbox) @@ -87,25 +90,25 @@ def simon_says(usbbox,num): # should tell us how long the user took to press all of the # buttons last_press=keyevents[-1].rtc - print "correct (%dms)" % last_press + print("correct (%dms)" % last_press) flash(usbbox) else: - print "user said:", " ".join([BUTTON_COLORS[event.key_code] for event in keyevents]) - print "wrong" + print("user said:", " ".join([BUTTON_COLORS[event.key_code] for event in keyevents])) + print("wrong") flash(usbbox,rate=0.25,count=3) return correct usbbox=USBBox() -print "Simon Says" -print "consists of several rounds of sequences being shown:" -print "1) LEDs will flash on box" -print "2) a sequence of LEDs will be shown" -print "3) after the sequence finishes the LEDs will flash again" -print "4) enter in the sequence previously shown (press the buttons that match the LEDs)" -print "5) if you get it right the LEDs will flash once and it'll repeat the cycle (with one more item to remember)" -print "6) if you get it wrong the LEDs will flash three times and you'll be asked if you want to play a new game" +print("Simon Says") +print("consists of several rounds of sequences being shown:") +print("1) LEDs will flash on box") +print("2) a sequence of LEDs will be shown") +print("3) after the sequence finishes the LEDs will flash again") +print("4) enter in the sequence previously shown (press the buttons that match the LEDs)") +print("5) if you get it right the LEDs will flash once and it'll repeat the cycle (with one more item to remember)") +print("6) if you get it wrong the LEDs will flash three times and you'll be asked if you want to play a new game") # use logset so that 1 turns LED on and 0 turn LED off usbbox.leds.logic=0xFF @@ -115,7 +118,7 @@ def simon_says(usbbox,num): usbbox.disable_loopback() while True: - command=raw_input("play game y/n [y]: ").strip() + command=input("play game y/n [y]: ").strip() if command == '': command = 'y' if command.lower() != 'y': @@ -123,7 +126,7 @@ def simon_says(usbbox,num): for i in range(1,50): time.sleep(1) - print "%d to remember" % i + print("%d to remember" % i) if not simon_says(usbbox,i): break diff --git a/hid/__init__.py b/hid/__init__.py index 22b3131..73ceffc 100644 --- a/hid/__init__.py +++ b/hid/__init__.py @@ -5,6 +5,7 @@ and interacting with those devices. ''' +from builtins import object import logging import os diff --git a/hid/cparser.py b/hid/cparser.py index 32adc45..7ae7853 100644 --- a/hid/cparser.py +++ b/hid/cparser.py @@ -1,5 +1,8 @@ # emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- # vi: set ft=python sts=4 ts=4 sw=4 et: +from builtins import next +from past.builtins import basestring +from builtins import object from ctypes import * import re @@ -42,7 +45,7 @@ def define(name, t): def _parse_type(type_str): # see if the type is there - if _types.has_key(type_str): + if type_str in _types: return _types[type_str] if type_str.endswith('*'): type_str=type_str[:-1] @@ -71,7 +74,7 @@ def __init__(self,s): self.i=-1 - def next(self): + def __next__(self): if self.empty(): raise ValueError('no more tokens found parsing - %s' % self.s) self.i+=1 @@ -140,9 +143,9 @@ def __repr__(self): def parse_type(t): # grab variable name (including * for pointers) - base_type=t.next() + base_type=next(t) while not t.empty(): - if t.next() == '*': + if next(t) == '*': base_type += '*' else: t.push_back() @@ -150,18 +153,18 @@ def parse_type(t): return c_type(base_type) def parse_fn_name(t): - if t.next() == '(': + if next(t) == '(': # fn pointer name - assert t.next() == '*' - name=t.next() - assert t.next() == ')' + assert next(t) == '*' + name=next(t) + assert next(t) == ')' else: name=t.current() return name def parse_param(t): param_type=parse_type(t) - name=t.next() + name=next(t) if WORD.match(name): param_type.name=name else: @@ -171,12 +174,12 @@ def parse_param(t): def parse_param_list(t): - assert t.next() == '(' + assert next(t) == '(' params=[] - while t.next() != ')': + while next(t) != ')': t.push_back() params.append(parse_param(t)) - if t.next() != ',': + if next(t) != ',': break assert t.current() == ')' return params @@ -191,7 +194,7 @@ def parse(s): t=tokenizer(s) def_type = parse_type(t) if not t.empty(): - if t.next() == '(': + if next(t) == '(': # looks like we're parsing a function pointer definition # e.g. void (*my_fn)(void) t.push_back() @@ -202,9 +205,9 @@ def parse(s): return c_function(def_type, fn_name, param_list) else: t.push_back() - name = t.next() + name = next(t) if not t.empty(): - if t.next() == '(': + if next(t) == '(': # parsing function def again (regular def not pointer) # e.g. void my_fn(void) t.push_back() diff --git a/hid/osx.py b/hid/osx.py index 5fd8326..bb28dd3 100644 --- a/hid/osx.py +++ b/hid/osx.py @@ -6,6 +6,8 @@ Refer to the hid module for available functions ''' +from builtins import range +from builtins import object from ctypes import * from ctypes.util import find_library @@ -186,7 +188,7 @@ class IOHIDDeviceInterface122(Structure): ######################################################## # class to handle COM object references -class COMObjectRef: +class COMObjectRef(object): def __init__(self,ref): self.ref=ref logging.info("created: %s",self) @@ -195,7 +197,7 @@ def __del__(self): logging.info("releasing: %s",self) self.Release() - def __nonzero__(self): + def __bool__(self): return self.ref is not None def __str__(self): diff --git a/hid/win32.py b/hid/win32.py index d4b3462..94549ec 100644 --- a/hid/win32.py +++ b/hid/win32.py @@ -1,315 +1,316 @@ # emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- # vi: set ft=python sts=4 ts=4 sw=4 et: -''' -The Windows (Win32) HID interface module. -Dynamically loaded on Windows platforms. -Refer to the hid module for available functions -''' - - -#http://permalink.gmane.org/gmane.comp.python.ctypes/2410 - -import logging -import struct - -from ctypes import * -from ctypes.wintypes import * - -from hid import HIDDevice - -DIGCF_ALLCLASSES=0x00000004 -DIGCF_DEVICEINTERFACE=0x00000010 -DIGCF_PRESENT=0x00000002 -DIGCF_PROFILE=0x00000008 - -FORMAT_MESSAGE_FROM_SYSTEM=0x00001000 -FORMAT_MESSAGE_ALLOCATE_BUFFER=0x00000100 -FORMAT_MESSAGE_IGNORE_INSERTS=0x00000200 - -GENERIC_READ=0x80000000 -GENERIC_WRITE=0x40000000 - -FILE_SHARE_READ=0x00000001 -FILE_SHARE_WRITE=0x00000002 - -OPEN_EXISTING=3 - -INVALID_HANDLE_VALUE=-1 - -FILE_FLAG_OVERLAPPED=0x40000000 # needed so we can read and write at the same time - - -WAIT_TIMEOUT=0x00000102 -WAIT_OBJECT_0=0x00000000 - - -GUID=c_uint8*16 -USHORT=c_ushort - -LPVOID=c_void_p -LPCVOID=c_void_p - - -HidGuid=GUID() -hid_dll=windll.hid -hid_dll.HidD_GetHidGuid(byref(HidGuid)) - -setupapi_dll=windll.setupapi - -Kernel32=windll.Kernel32 - -ULONG_PTR=ULONG - -class OVERLAPPED(Structure): - _fields_ = [ - ("Internal", ULONG_PTR), - ("InternalHigh", ULONG_PTR), - ("Offset", DWORD), - ("OffsetHigh", DWORD), - ("hEvent",HANDLE) - ] - def __init__(self): - self.Offset=0 - self.OffsetHigh=0 - -LPOVERLAPPED=POINTER(OVERLAPPED) - -# callback function type for ReadFileEx and WriteFileEx -LPOVERLAPPED_COMPLETION_ROUTINE=WINFUNCTYPE(None,DWORD,DWORD,LPOVERLAPPED) - - -ReadFileEx=Kernel32.ReadFileEx -ReadFileEx.argtypes = [HANDLE,LPVOID,DWORD,LPOVERLAPPED,LPOVERLAPPED_COMPLETION_ROUTINE] - -WriteFileEx=Kernel32.WriteFileEx -WriteFileEx.argtypes = [HANDLE,LPCVOID,DWORD,LPOVERLAPPED,LPOVERLAPPED_COMPLETION_ROUTINE] - -def GetLastErrorMessage(): - error=Kernel32.GetLastError() - Kernel32.SetLastError(0) - msg=c_char_p() - - Kernel32.FormatMessageA( - FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_IGNORE_INSERTS, - None,error,0,byref(msg), 0, None) - msgStr='error #%d: %s' % (error,msg.value) - Kernel32.LocalFree(msg) - return msgStr - -class SP_DEVICE_INTERFACE_DATA(Structure): - _fields_ = [ - ("cbSize", DWORD), - ("InterfaceClassGuid", GUID), - ("Flags", DWORD), - ("Reserved", POINTER(ULONG)) - ] - def __init__(self): - self.cbSize=sizeof(self) - -def SP_DEVICE_INTERFACE_DETAIL_DATA_OF_SIZE(size): - '''dynamically declare the structure, so we will have the right size - allocated for the DevicePath field - However cbSize will be the size of the _fixed_ size - ''' - - # DevicePath is normally declared as being char[1], but - # that only works because of C's lax boundary checking - # so we'll dynamically declare the size here - class SP_DEVICE_INTERFACE_DETAIL_DATA(Structure): - _fields_ = [ - ("cbSize", DWORD), - ("DevicePath", c_char * size), - ] - detailData=SP_DEVICE_INTERFACE_DETAIL_DATA() - detailData.cbSize=sizeof(DWORD)+sizeof(c_char*1) - return detailData - -class HIDD_ATTRIBUTES(Structure): - _fields_ = [ - ("Size", ULONG), - ("VendorID", USHORT), - ("ProductID", USHORT), - ("VersionNumber", USHORT) - ] - def __init__(self): - self.Size=sizeof(self) - - -class Win32HIDDevice(HIDDevice): - - def __init__(self,device_path,vendor,product): - HIDDevice.__init__(self,vendor,product) - self._device_path=device_path - - self._device_handle=None - self._CloseHandle=Kernel32.CloseHandle - - def is_open(self): - return self._device_handle is not None - - def _open_handle(self): - return Kernel32.CreateFileA( - self._device_path, - GENERIC_READ | GENERIC_WRITE, - FILE_SHARE_READ | FILE_SHARE_WRITE, - None, - OPEN_EXISTING, - FILE_FLAG_OVERLAPPED, - None - ) - - def open(self): - self._running=False - if not self.is_open(): - logging.info("opening device") - self._device_handle=self._open_handle() - - if self._device_handle == INVALID_HANDLE_VALUE: - self._device_handle=None - raise RuntimeError("could not open device") - else: - self._write_overlapped=OVERLAPPED() - - - - - def close(self): - # make sure we stop the thread first - HIDDevice.close(self) - - if self._device_handle: - # re-import logging, as may have been deleted already - import logging - logging.info("closing _device_handle") - self._CloseHandle(self._device_handle) - self._device_handle=None - - self._write_overlapped=None - - - def set_report(self,report_data,report_id=0): - ''' - "set" a report - send the data to the device (which must have been opened previously) - ''' - HIDDevice.set_report(self,report_data,report_id) - - report_buffer=(c_ubyte*(len(report_data)+1))() - report_buffer[0]=report_id # first byte is report id - for i,c in enumerate(report_data): - report_buffer[i+1]=struct.unpack('B',c)[0] - - def completion_callback(dwErrorCode,dwNumberOfBytesTransfered,lpOverlapped): - pass - - overlap_completion=LPOVERLAPPED_COMPLETION_ROUTINE(completion_callback) - - result=WriteFileEx( - self._device_handle, - report_buffer, - len(report_buffer), - self._write_overlapped, - overlap_completion ) - - if not result: - raise RuntimeError("WriteFileEx failed") - - if Kernel32.SleepEx(100,1) == 0: - raise RuntimeError("timed out when writing to device") - - - def _run_interrupt_callback_loop(self,report_buffer_size): - ''' - run on a thread to handle reading events from the device - ''' - if not self.is_open(): - raise RuntimeError("device not open") - - logging.info("starting _run_interrupt_callback_loop") - - # +1 to allow for report id byte - report_buffer=(c_ubyte*(report_buffer_size+1))() - overlapped=OVERLAPPED() - - def completion_callback(dwErrorCode,dwNumberOfBytesTransfered,lpOverlapped): - report_data="".join([struct.pack('B',b) for b in report_buffer]) - report_data=report_data[1:] # remove first byte (report id) - self._callback(self,report_data) - - overlap_completion=LPOVERLAPPED_COMPLETION_ROUTINE(completion_callback) - - # do async reads until the thread stops - while self._running and self.is_open(): - result=ReadFileEx(self._device_handle,report_buffer,len(report_buffer),byref(overlapped),overlap_completion) - if not result: - raise RuntimeError("ReadFileEx failed") - Kernel32.SleepEx(100,1) - - # thread is stopping so make sure we won't receive any more messages - Kernel32.CancelIo(self._device_handle) - - -def find_hid_devices(): - ''' - query the host computer for all available HID devices - and returns a list of any found - ''' - devices=[] - hDevInfo=setupapi_dll.SetupDiGetClassDevsA(byref(HidGuid),None,None,DIGCF_PRESENT | DIGCF_DEVICEINTERFACE) - - try: - for memberIndex in range(0,256): # work on assumption there won't be more than 255 devices attached, just in case - deviceInterface=SP_DEVICE_INTERFACE_DATA() - - result=setupapi_dll.SetupDiEnumDeviceInterfaces(hDevInfo,0,byref(HidGuid),memberIndex,byref(deviceInterface)) - - if not result: - break # last device - - requiredSize=DWORD() - - # find the size of the structure we'll need - if not setupapi_dll.SetupDiGetDeviceInterfaceDetailA(hDevInfo,byref(deviceInterface),None,0,byref(requiredSize),None): - GetLastErrorMessage() # ignore the error, as we just want to find the size - - # then make the structure and call again - detailData=SP_DEVICE_INTERFACE_DETAIL_DATA_OF_SIZE(requiredSize.value) - - if not setupapi_dll.SetupDiGetDeviceInterfaceDetailA(hDevInfo,byref(deviceInterface),byref(detailData),requiredSize,None,None): - raise RuntimeError(GetLastErrorMessage()) - - DeviceHandle=None - try: - DeviceHandle=Kernel32.CreateFileA( - detailData.DevicePath, - GENERIC_READ | GENERIC_WRITE, - FILE_SHARE_READ | FILE_SHARE_WRITE, - None, - OPEN_EXISTING, - 0, - None - ) - - # if we opened it ok - if DeviceHandle != INVALID_HANDLE_VALUE: - Attributes=HIDD_ATTRIBUTES() - - result=hid_dll.HidD_GetAttributes( - DeviceHandle, - byref(Attributes) - ) - - if result: - device=Win32HIDDevice(detailData.DevicePath,Attributes.VendorID,Attributes.ProductID) - devices.append(device) - else: - logging.info("failed to open device to read attributes") - - finally: - if DeviceHandle and DeviceHandle != INVALID_HANDLE_VALUE: - Kernel32.CloseHandle(DeviceHandle) - finally: - setupapi_dll.SetupDiDestroyDeviceInfoList(hDevInfo) - - return devices - +''' +The Windows (Win32) HID interface module. +Dynamically loaded on Windows platforms. +Refer to the hid module for available functions +''' + + +#http://permalink.gmane.org/gmane.comp.python.ctypes/2410 + +from builtins import range +import logging +import struct + +from ctypes import * +from ctypes.wintypes import * + +from hid import HIDDevice + +DIGCF_ALLCLASSES=0x00000004 +DIGCF_DEVICEINTERFACE=0x00000010 +DIGCF_PRESENT=0x00000002 +DIGCF_PROFILE=0x00000008 + +FORMAT_MESSAGE_FROM_SYSTEM=0x00001000 +FORMAT_MESSAGE_ALLOCATE_BUFFER=0x00000100 +FORMAT_MESSAGE_IGNORE_INSERTS=0x00000200 + +GENERIC_READ=0x80000000 +GENERIC_WRITE=0x40000000 + +FILE_SHARE_READ=0x00000001 +FILE_SHARE_WRITE=0x00000002 + +OPEN_EXISTING=3 + +INVALID_HANDLE_VALUE=-1 + +FILE_FLAG_OVERLAPPED=0x40000000 # needed so we can read and write at the same time + + +WAIT_TIMEOUT=0x00000102 +WAIT_OBJECT_0=0x00000000 + + +GUID=c_uint8*16 +USHORT=c_ushort + +LPVOID=c_void_p +LPCVOID=c_void_p + + +HidGuid=GUID() +hid_dll=windll.hid +hid_dll.HidD_GetHidGuid(byref(HidGuid)) + +setupapi_dll=windll.setupapi + +Kernel32=windll.Kernel32 + +ULONG_PTR=ULONG + +class OVERLAPPED(Structure): + _fields_ = [ + ("Internal", ULONG_PTR), + ("InternalHigh", ULONG_PTR), + ("Offset", DWORD), + ("OffsetHigh", DWORD), + ("hEvent",HANDLE) + ] + def __init__(self): + self.Offset=0 + self.OffsetHigh=0 + +LPOVERLAPPED=POINTER(OVERLAPPED) + +# callback function type for ReadFileEx and WriteFileEx +LPOVERLAPPED_COMPLETION_ROUTINE=WINFUNCTYPE(None,DWORD,DWORD,LPOVERLAPPED) + + +ReadFileEx=Kernel32.ReadFileEx +ReadFileEx.argtypes = [HANDLE,LPVOID,DWORD,LPOVERLAPPED,LPOVERLAPPED_COMPLETION_ROUTINE] + +WriteFileEx=Kernel32.WriteFileEx +WriteFileEx.argtypes = [HANDLE,LPCVOID,DWORD,LPOVERLAPPED,LPOVERLAPPED_COMPLETION_ROUTINE] + +def GetLastErrorMessage(): + error=Kernel32.GetLastError() + Kernel32.SetLastError(0) + msg=c_char_p() + + Kernel32.FormatMessageA( + FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_IGNORE_INSERTS, + None,error,0,byref(msg), 0, None) + msgStr='error #%d: %s' % (error,msg.value) + Kernel32.LocalFree(msg) + return msgStr + +class SP_DEVICE_INTERFACE_DATA(Structure): + _fields_ = [ + ("cbSize", DWORD), + ("InterfaceClassGuid", GUID), + ("Flags", DWORD), + ("Reserved", POINTER(ULONG)) + ] + def __init__(self): + self.cbSize=sizeof(self) + +def SP_DEVICE_INTERFACE_DETAIL_DATA_OF_SIZE(size): + '''dynamically declare the structure, so we will have the right size + allocated for the DevicePath field + However cbSize will be the size of the _fixed_ size + ''' + + # DevicePath is normally declared as being char[1], but + # that only works because of C's lax boundary checking + # so we'll dynamically declare the size here + class SP_DEVICE_INTERFACE_DETAIL_DATA(Structure): + _fields_ = [ + ("cbSize", DWORD), + ("DevicePath", c_char * size), + ] + detailData=SP_DEVICE_INTERFACE_DETAIL_DATA() + detailData.cbSize=sizeof(DWORD)+sizeof(c_char*1) + return detailData + +class HIDD_ATTRIBUTES(Structure): + _fields_ = [ + ("Size", ULONG), + ("VendorID", USHORT), + ("ProductID", USHORT), + ("VersionNumber", USHORT) + ] + def __init__(self): + self.Size=sizeof(self) + + +class Win32HIDDevice(HIDDevice): + + def __init__(self,device_path,vendor,product): + HIDDevice.__init__(self,vendor,product) + self._device_path=device_path + + self._device_handle=None + self._CloseHandle=Kernel32.CloseHandle + + def is_open(self): + return self._device_handle is not None + + def _open_handle(self): + return Kernel32.CreateFileA( + self._device_path, + GENERIC_READ | GENERIC_WRITE, + FILE_SHARE_READ | FILE_SHARE_WRITE, + None, + OPEN_EXISTING, + FILE_FLAG_OVERLAPPED, + None + ) + + def open(self): + self._running=False + if not self.is_open(): + logging.info("opening device") + self._device_handle=self._open_handle() + + if self._device_handle == INVALID_HANDLE_VALUE: + self._device_handle=None + raise RuntimeError("could not open device") + else: + self._write_overlapped=OVERLAPPED() + + + + + def close(self): + # make sure we stop the thread first + HIDDevice.close(self) + + if self._device_handle: + # re-import logging, as may have been deleted already + import logging + logging.info("closing _device_handle") + self._CloseHandle(self._device_handle) + self._device_handle=None + + self._write_overlapped=None + + + def set_report(self,report_data,report_id=0): + ''' + "set" a report - send the data to the device (which must have been opened previously) + ''' + HIDDevice.set_report(self,report_data,report_id) + + report_buffer=(c_ubyte*(len(report_data)+1))() + report_buffer[0]=report_id # first byte is report id + for i,c in enumerate(report_data): + report_buffer[i+1]=struct.unpack('B',c)[0] + + def completion_callback(dwErrorCode,dwNumberOfBytesTransfered,lpOverlapped): + pass + + overlap_completion=LPOVERLAPPED_COMPLETION_ROUTINE(completion_callback) + + result=WriteFileEx( + self._device_handle, + report_buffer, + len(report_buffer), + self._write_overlapped, + overlap_completion ) + + if not result: + raise RuntimeError("WriteFileEx failed") + + if Kernel32.SleepEx(100,1) == 0: + raise RuntimeError("timed out when writing to device") + + + def _run_interrupt_callback_loop(self,report_buffer_size): + ''' + run on a thread to handle reading events from the device + ''' + if not self.is_open(): + raise RuntimeError("device not open") + + logging.info("starting _run_interrupt_callback_loop") + + # +1 to allow for report id byte + report_buffer=(c_ubyte*(report_buffer_size+1))() + overlapped=OVERLAPPED() + + def completion_callback(dwErrorCode,dwNumberOfBytesTransfered,lpOverlapped): + report_data="".join([struct.pack('B',b) for b in report_buffer]) + report_data=report_data[1:] # remove first byte (report id) + self._callback(self,report_data) + + overlap_completion=LPOVERLAPPED_COMPLETION_ROUTINE(completion_callback) + + # do async reads until the thread stops + while self._running and self.is_open(): + result=ReadFileEx(self._device_handle,report_buffer,len(report_buffer),byref(overlapped),overlap_completion) + if not result: + raise RuntimeError("ReadFileEx failed") + Kernel32.SleepEx(100,1) + + # thread is stopping so make sure we won't receive any more messages + Kernel32.CancelIo(self._device_handle) + + +def find_hid_devices(): + ''' + query the host computer for all available HID devices + and returns a list of any found + ''' + devices=[] + hDevInfo=setupapi_dll.SetupDiGetClassDevsA(byref(HidGuid),None,None,DIGCF_PRESENT | DIGCF_DEVICEINTERFACE) + + try: + for memberIndex in range(0,256): # work on assumption there won't be more than 255 devices attached, just in case + deviceInterface=SP_DEVICE_INTERFACE_DATA() + + result=setupapi_dll.SetupDiEnumDeviceInterfaces(hDevInfo,0,byref(HidGuid),memberIndex,byref(deviceInterface)) + + if not result: + break # last device + + requiredSize=DWORD() + + # find the size of the structure we'll need + if not setupapi_dll.SetupDiGetDeviceInterfaceDetailA(hDevInfo,byref(deviceInterface),None,0,byref(requiredSize),None): + GetLastErrorMessage() # ignore the error, as we just want to find the size + + # then make the structure and call again + detailData=SP_DEVICE_INTERFACE_DETAIL_DATA_OF_SIZE(requiredSize.value) + + if not setupapi_dll.SetupDiGetDeviceInterfaceDetailA(hDevInfo,byref(deviceInterface),byref(detailData),requiredSize,None,None): + raise RuntimeError(GetLastErrorMessage()) + + DeviceHandle=None + try: + DeviceHandle=Kernel32.CreateFileA( + detailData.DevicePath, + GENERIC_READ | GENERIC_WRITE, + FILE_SHARE_READ | FILE_SHARE_WRITE, + None, + OPEN_EXISTING, + 0, + None + ) + + # if we opened it ok + if DeviceHandle != INVALID_HANDLE_VALUE: + Attributes=HIDD_ATTRIBUTES() + + result=hid_dll.HidD_GetAttributes( + DeviceHandle, + byref(Attributes) + ) + + if result: + device=Win32HIDDevice(detailData.DevicePath,Attributes.VendorID,Attributes.ProductID) + devices.append(device) + else: + logging.info("failed to open device to read attributes") + + finally: + if DeviceHandle and DeviceHandle != INVALID_HANDLE_VALUE: + Kernel32.CloseHandle(DeviceHandle) + finally: + setupapi_dll.SetupDiDestroyDeviceInfoList(hDevInfo) + + return devices + __all__ = ['find_hid_devices','Win32HIDDevice'] \ No newline at end of file diff --git a/ioLabs.py b/ioLabs.py index 25fc119..989d69d 100644 --- a/ioLabs.py +++ b/ioLabs.py @@ -4,7 +4,14 @@ the module for interfacing with an ioLan button box. USBBox is the main class that should be used from this module ''' +from __future__ import print_function +from future import standard_library +standard_library.install_aliases() +from builtins import input +from builtins import zip +from builtins import range +from builtins import object __version__='3.2' # turn on logging so we can see what's going on @@ -14,7 +21,7 @@ import time import struct -from Queue import Queue, Empty +from queue import Queue, Empty import hid IO_LABS_VENDOR_ID=0x19BC @@ -23,20 +30,20 @@ def is_usb_bbox(device): return device.vendor == IO_LABS_VENDOR_ID and device.product == BUTTON_BOX_PRODUCT_ID -class dict_struct: +class dict_struct(object): '''simple class that takes keyword arguments and uses them to create fields on itself''' def __init__(self,**kw): self.__dict__.update(kw) def __str__(self): attribs=[] - for key,value in self.__dict__.items(): + for key,value in list(self.__dict__.items()): attribs.append('%s=%s'%(key,value)) return ','.join(attribs) def __repr__(self): attribs=[] - for key,value in self.__dict__.items(): + for key,value in list(self.__dict__.items()): attribs.append('%s=%r'%(key,value)) return "dict_struct(%s)" % ','.join(attribs) @@ -101,11 +108,11 @@ def __repr__(self): 0x45 : ('ERROR', 'BBBBBBB', ('data1','data2','data3','data4','data5','data6','data7') ) } -class messages: +class messages(object): '''class to handle message id lookup, and packing message objects into binary''' def __init__(self,message_summaries): self.message_summaries=message_summaries - for message_id,message_summary in message_summaries.items(): + for message_id,message_summary in list(message_summaries.items()): # add field for the ID message_name=message_summary[0] self.__dict__[message_name]=message_id @@ -113,7 +120,7 @@ def __init__(self,message_summaries): self.__dict__[message_name.lower()]=self._create_packing_function(message_id,message_summary) def ALL_IDS(self): - return self.message_summaries.keys() + return list(self.message_summaries.keys()) def _create_packing_function(self,message_id,message_summary): # always big endian format @@ -137,7 +144,7 @@ def parse(self,message_data): ''' id_byte=struct.unpack('B',message_data[0])[0] # see if we know how to parse this message - if self.message_summaries.has_key(id_byte): + if id_byte in self.message_summaries: summary=self.message_summaries[id_byte] format='>B'+summary[1] @@ -161,7 +168,7 @@ def parse(self,message_data): COMMAND=messages(COMMAND_SUMMARY) REPORT=messages(REPORT_SUMMARY) -class Commands: +class Commands(object): ''' class to handle sending reports to device and parsing incoming reports. dynamically looks up/creates method for sending reports when none is @@ -978,65 +985,65 @@ def reset_box(self): usbbox=USBBox() - print "USBBox connected" - print "serial #:",usbbox.serial_num - print "version:",usbbox.version - print "voice version:", usbbox.voice_version + print("USBBox connected") + print("serial #:",usbbox.serial_num) + print("version:",usbbox.version) + print("voice version:", usbbox.voice_version) # attached a callback for every report type def report_callback(msg): - print "received:",msg + print("received:",msg) for command_id in REPORT.ALL_IDS(): usbbox.commands.add_callback(command_id,report_callback) - from StringIO import StringIO + from io import StringIO outfile=StringIO() # record all incoming reports - usbbox.start_recording(REPORT_SUMMARY.keys(),outfile) + usbbox.start_recording(list(REPORT_SUMMARY.keys()),outfile) import re while True: time.sleep(0.5) # sleep a little to let any reports get received usbbox.process_received_reports() - command=raw_input("command: ").strip() + command=input("command: ").strip() if command == 'exit': break elif command == '': continue elif command == 'help': # print list of available commands - print "commands:" - print " exit" - print " help" - for command_id in COMMAND_SUMMARY.keys(): + print("commands:") + print(" exit") + print(" help") + for command_id in list(COMMAND_SUMMARY.keys()): command_name=COMMAND_SUMMARY[command_id][0].lower() command_args=COMMAND_SUMMARY[command_id][2] command_args=['<%s>' % arg for arg in command_args] - print " %s %s" % (command_name,' '.join(command_args)) + print(" %s %s" % (command_name,' '.join(command_args))) else: try: command_parts=command.split() command_name,command_args=command_parts[0],command_parts[1:] # check command is known known=False - for command_id in COMMAND_SUMMARY.keys(): + for command_id in list(COMMAND_SUMMARY.keys()): if COMMAND_SUMMARY[command_id][0].lower() == command_name: known=True break if not known: - print "error, unknown command: " + command_name + print("error, unknown command: " + command_name) else: command_fn=getattr(usbbox.commands,command_name) # turn all arguments into int's command_args=[int(arg) for arg in command_args] command_fn(*command_args) except: - print "error running: " + command + print("error running: " + command) # make sure we process any remaining reports usbbox.process_received_reports() usbbox.stop_recording() - print "recorded reports:" - print outfile.getvalue() + print("recorded reports:") + print(outfile.getvalue()) diff --git a/psyscopex.py b/psyscopex.py index ac52a0b..78a6fa5 100644 --- a/psyscopex.py +++ b/psyscopex.py @@ -8,6 +8,7 @@ the same way as for a HID device ''' +from builtins import range import logging import struct from ctypes import * @@ -341,7 +342,7 @@ def set_report(self,report_data,report_id=0): ioret=self._devInterface.WritePipe(2, report_buffer, len(report_data)) if ioret != kIOReturnSuccess: - logging.info("error writing to device: 0x%x" % long(ioret)) + logging.info("error writing to device: 0x%x" % int(ioret)) def _run_interrupt_callback_loop(self,report_buffer_size): if not self.is_open(): @@ -365,4 +366,4 @@ def _run_interrupt_callback_loop(self,report_buffer_size): logging.info('interrupt_report_callback(%r)',report_data) self._callback(self,report_data) else: - logging.info("error reading from device: 0x%x" % long(ioret)) + logging.info("error reading from device: 0x%x" % int(ioret)) diff --git a/test/test_cparser.py b/test/test_cparser.py index e84a5d1..602496d 100644 --- a/test/test_cparser.py +++ b/test/test_cparser.py @@ -1,5 +1,7 @@ +from __future__ import print_function # emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- # vi: set ft=python sts=4 ts=4 sw=4 et: +from builtins import next from hid.cparser import * from hid.cparser import tokenizer @@ -8,24 +10,24 @@ def test_tokenizer(): t=tokenizer('void* my_fn(void)') assert not t.empty() - assert t.next() == 'void' - assert t.next() == '*' - assert t.next() == 'my_fn' - assert t.next() == '(' - assert t.next() == 'void' - assert t.next() == ')' + assert next(t) == 'void' + assert next(t) == '*' + assert next(t) == 'my_fn' + assert next(t) == '(' + assert next(t) == 'void' + assert next(t) == ')' assert t.empty() def test_tokenizer_keywords(): define('long long', ctypes.c_longlong) t=tokenizer('void* my_fn(long long)') assert not t.empty() - assert t.next() == 'void' - assert t.next() == '*' - assert t.next() == 'my_fn' - assert t.next() == '(' - assert t.next() == 'long long' - assert t.next() == ')' + assert next(t) == 'void' + assert next(t) == '*' + assert next(t) == 'my_fn' + assert next(t) == '(' + assert next(t) == 'long long' + assert next(t) == ')' assert t.empty() @@ -75,7 +77,7 @@ def test_parse_var(): def test_parse_var_type(): var=parse('int') assert var.type_name == 'int' - print var.name + print(var.name) assert var.name == '' var=parse('void') @@ -84,7 +86,7 @@ def test_parse_var_type(): var=parse('void*') assert var.type_name == 'void*' - print var.name + print(var.name) assert var.name == '' def test_parse_void_ctype(): diff --git a/test/test_ioLabs.py b/test/test_ioLabs.py index 6dc7716..583795c 100644 --- a/test/test_ioLabs.py +++ b/test/test_ioLabs.py @@ -1,5 +1,6 @@ # emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- # vi: set ft=python sts=4 ts=4 sw=4 et: +from builtins import range from ioLabs import * import random @@ -75,7 +76,7 @@ def _check_port(self,port,field): assert hasattr(port,field) # only test sub-set of values to speed up testing a bit - for i in random.sample(range(0,256),16): + for i in random.sample(list(range(0,256)),16): setattr(port,field,i) assert getattr(port,field) == i @@ -146,7 +147,7 @@ def test_int1_debounce_up(self): def _check_port_and_state(self,port): - for i in random.sample(range(0,256),16): + for i in random.sample(list(range(0,256)),16): port.state=i mask = random.randint(0,255) state = port.and_state(mask) @@ -161,7 +162,7 @@ def test_port2_and_state(self): def _check_port_or_state(self,port): - for i in random.sample(range(0,256),16): + for i in random.sample(list(range(0,256)),16): port.state=i mask = random.randint(0,255) state = port.or_state(mask) @@ -176,7 +177,7 @@ def test_port2_or_state(self): def _check_port_xor_state(self,port): - for i in random.sample(range(0,256),16): + for i in random.sample(list(range(0,256)),16): port.state=i mask = random.randint(0,255) state = port.xor_state(mask)