Reading data from multiple usbs using threads in python

730 views Asked by At

Reading data from multiple usbs using threads in python.

Hi Everyone, I have written code in python to read data from USB. the logic I followed is

  1. find USB device with correct VID and PID ( dev = usb.core.find(idVendor=0xABCD, idProdcut=0xWXYZ)
  2. set active configuration (dev.set_active_configuration())
  3. get active configuration (intf=dev.get_active_configuration())
  4. get read and write end points.
  5. write startup packet and get the corresponding response through read.
  6. write startup complete packet and get the corresponding response through read.
  7. At this point, I will getting data continuosly through read (infinite loop)

I am able to read data from single USB by keeping everything in single .py file.

I have many usbs with same vendorID but different PIDs. If I have to write generic code to read data from all USBs using threads. but when I thought of using threads from it is not happening step 5 onwards (my below code may be writing startup packet but not able to read) :( created separate files for CreateLoopObject.py and USBOjbect.py

PLEASE HELP...

my approach will be:

class CreateLoopObject(object):
    def __init__(self, dev):
        object.__init__(self)
        self.connection = USBOjbect(dev)
        self.mythread = threading.Thread(name='Mythread', target = self.beginTransaction)
    def begin(self):
        self.connection.open()
        self.connection.DoInitialTransactions()
        self.mythread.start()

    def beginTransaction(self):
        # Read infitely from read and process
        while True:
            data = self.read()
            # process data.....

    def read(self):
        return self.connection.read()

class USBOjbect(object):
    def __init__(self, dev):
        self.device = dev
        self.outPoint = None
        self.inPoint = None
    def open(self):
        self.device.set_active_configuration()
        cfg = self.device.get_active_configuration()
        intf = cfg[(0,0)]
        self.outPoint = usb.util.descriptor(..)  # Get OUT END POINT 
        self.inPoint = usb.util.descriptor(..)   # Get IN END POINT

    def read(self):
        return self.inPoint.read(64)
    def write(self, msg):
        self.outPoint.write(msg)

    def DoInitialTransactions(self):
        self.write(startup_packet)
        while True:
            try:
                data = self.read(64)
                break
            except Exception as e:
                print('not able to read')
        self.write(startup_complete_packet)
        data = self.read(64)


if __name__=='__main__':
    devices = usb.core.find(find_all=True, idVendor=0xABCD)
    Threaddict = {}
    for dev in devices:
        if dev.idProduct not in Threaddict:
        Threaddict[dev.idProdcut] = CreateLoopObject(dev)
        Threaddict[dev.idProdcut].begin()
0

There are 0 answers