1
0
mirror of https://github.com/bkerler/edl synced 2024-11-22 11:06:23 +00:00
edl/edlclient/Library/Connection/devicehandler.py
2024-06-10 01:25:50 +08:00

147 lines
4.6 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!
import inspect
import traceback
from binascii import hexlify
try:
from edlclient.Library.utils import *
except:
from Library.utils import *
class DeviceClass(metaclass=LogBase):
def __init__(self, loglevel=logging.INFO, portconfig=None, devclass=-1):
self.connected = False
self.timeout = 1000
self.maxsize = 512
self.vid = None
self.pid = None
self.stopbits = None
self.databits = None
self.parity = None
self.baudrate = None
self.configuration = None
self.device = None
self.devclass = -1
self.loglevel = loglevel
self.xmlread = True
self.portconfig = portconfig
self.__logger = self.__logger
self.info = self.__logger.info
self.error = self.__logger.error
self.warning = self.__logger.warning
self.debug = self.__logger.debug
self.__logger.setLevel(loglevel)
if loglevel == logging.DEBUG:
logfilename = os.path.join("logs", "log.txt")
fh = logging.FileHandler(logfilename, encoding='utf-8')
self.__logger.addHandler(fh)
def connect(self, options):
raise NotImplementedError()
def close(self, reset=False):
raise NotImplementedError()
def flush(self):
raise NotImplementedError()
def detectdevices(self):
raise NotImplementedError()
def getInterfaceCount(self):
raise NotImplementedError()
def setLineCoding(self, baudrate=None, parity=0, databits=8, stopbits=1):
raise NotImplementedError()
def setbreak(self):
raise NotImplementedError()
def setcontrollinestate(self, RTS=None, DTR=None, isFTDI=False):
raise NotImplementedError()
def write(self, command, pktsize=None):
raise NotImplementedError()
def usbwrite(self, data, pktsize=None):
raise NotImplementedError()
def usbread(self, resplen=None, timeout=0):
raise NotImplementedError()
def ctrl_transfer(self, bmRequestType, bRequest, wValue, wIndex, data_or_wLength):
raise NotImplementedError()
def usbreadwrite(self, data, resplen):
raise NotImplementedError()
def read(self, length=None, timeout=-1):
if timeout == -1:
timeout = self.timeout
if length is None:
length = self.maxsize
return self.usbread(length, timeout)
def rdword(self, count=1, little=False):
rev = "<" if little else ">"
value = self.usbread(4 * count)
data = unpack(rev + "I" * count, value)
if count == 1:
return data[0]
return data
def rword(self, count=1, little=False):
rev = "<" if little else ">"
data = []
for _ in range(count):
v = self.usbread(2)
if len(v) == 0:
return data
data.append(unpack(rev + "H", v)[0])
if count == 1:
return data[0]
return data
def rbyte(self, count=1):
return self.usbread(count)
def verify_data(self, data, pre="RX:"):
if self.__logger.level == logging.DEBUG:
frame = inspect.currentframe()
stack_trace = traceback.format_stack(frame)
td = []
for trace in stack_trace:
if "verify_data" not in trace and "Port" not in trace:
td.append(trace)
self.debug(td[:-1])
if isinstance(data, bytes) or isinstance(data, bytearray):
if data[:5] == b"<?xml":
try:
rdata = b""
for line in data.split(b"\n"):
try:
self.debug(pre + line.decode('utf-8'))
rdata += line + b"\n"
except:
v = hexlify(line)
self.debug(pre + v.decode('utf-8'))
return rdata
except Exception as err:
self.debug(str(err))
pass
if logging.DEBUG >= self.__logger.level:
self.debug(pre + hexlify(data).decode('utf-8'))
else:
if logging.DEBUG >= self.__logger.level:
self.debug(pre + hexlify(data).decode('utf-8'))
return data