Reading data from multiple usbs using threads in python.
Hi Everyone, I have written code in python to read data from USB. the logic I followed is
- find USB device with correct VID and PID ( dev = usb.core.find(idVendor=0xABCD, idProdcut=0xWXYZ)
- set active configuration (dev.set_active_configuration())
- get active configuration (intf=dev.get_active_configuration())
- get read and write end points.
- write startup packet and get the corresponding response through read.
- write startup complete packet and get the corresponding response through read.
- At this point, I will getting data continuosly through read (infinite loop)
I am able to read data from single USB by keeping everything in single .py file.
I have many usbs with same vendorID but different PIDs. If I have to write generic code to read data from all USBs using threads. but when I thought of using threads from it is not happening step 5 onwards (my below code may be writing startup packet but not able to read) :( created separate files for CreateLoopObject.py and USBOjbect.py
PLEASE HELP...
my approach will be:
class CreateLoopObject(object):
def __init__(self, dev):
object.__init__(self)
self.connection = USBOjbect(dev)
self.mythread = threading.Thread(name='Mythread', target = self.beginTransaction)
def begin(self):
self.connection.open()
self.connection.DoInitialTransactions()
self.mythread.start()
def beginTransaction(self):
# Read infitely from read and process
while True:
data = self.read()
# process data.....
def read(self):
return self.connection.read()
class USBOjbect(object):
def __init__(self, dev):
self.device = dev
self.outPoint = None
self.inPoint = None
def open(self):
self.device.set_active_configuration()
cfg = self.device.get_active_configuration()
intf = cfg[(0,0)]
self.outPoint = usb.util.descriptor(..) # Get OUT END POINT
self.inPoint = usb.util.descriptor(..) # Get IN END POINT
def read(self):
return self.inPoint.read(64)
def write(self, msg):
self.outPoint.write(msg)
def DoInitialTransactions(self):
self.write(startup_packet)
while True:
try:
data = self.read(64)
break
except Exception as e:
print('not able to read')
self.write(startup_complete_packet)
data = self.read(64)
if __name__=='__main__':
devices = usb.core.find(find_all=True, idVendor=0xABCD)
Threaddict = {}
for dev in devices:
if dev.idProduct not in Threaddict:
Threaddict[dev.idProdcut] = CreateLoopObject(dev)
Threaddict[dev.idProdcut].begin()