Multithreading - XMLRPC server registering class objects as individual threads in Python 2.7.12

233 views Asked by At

From the server side, I have prepared a python script where the following tasks are performed.

  • Task 1 => Acquiring data from the sensors
  • Task 2 => Setting a certain register in the robot control unit to 1 or 0 depending upon the acquired data from the sensors

To do the above-mentioned tasks, i implemented two individual threads. To perform task 1, i have defined the class "SensorEvaluationBoard(Thread)" and for performing task 2 i have defined the class "UR_RTDE(Thread)". For more details, please see the following python script from the server side.

Server side:

#!/usr/bin/env python

import time
from time import sleep
import sys
import string
import traceback
import logging

import socket
import struct
import copy

import xmlrpclib
# xmlrpclib.Marshaller.dispatch[type(0L)] = lambda _, v, w: w("<value><i8>%d</i8></value>" % v)
# xmlrpclib.dumps((2**63-1,))
xmlrpclib.Binary
from SimpleXMLRPCServer import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler

import threading
from threading import Thread, Lock

# "https://www.universal-robots.com/articles/ur-articles/real-time-data-exchange-rtde-guide/"
import rtde.rtde as rtde 
import rtde.rtde_config as rtde_config

# urHost = "127.0.0.1" # UR robot's IP address
# "https://www.universal-robots.com/articles/ur-articles/remote-control-via-tcpip/"
urPort = 30004 # Port number dedicated for UR robot's secondary interface 
config_filename = 'control_loop_configuration.xml'
mutexFreedriveMode = Lock()


loadCycleOfSensor1, loadCycleOfSensor2, loadCycleOfSensor3 = "", "", ""
c1, c2, c3 = 0, 0, 0
cnt = 0 
data = bytearray()

sensEvalBoard_ipAddr = "" # IP address of sensor evaluation board
sensEvalBoard_port = 0 # Port number of  of sensor evaluation board
DEFAULT_TIMEOUT = 1
mutexSensVal = Lock()
sebSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # initializing the socket object
mutexSEBSocket = Lock()

def set_ipaddr(new_daemon_host): 
    global sensEvalBoard_ipAddr 
    sensEvalBoard_ipAddr = new_daemon_host 
    return sensEvalBoard_ipAddr

def get_ipaddr():
    tmp = ""
    if str(sensEvalBoard_ipAddr):
        tmp = sensEvalBoard_ipAddr
    else:
        tmp = "No ip address set"
    return tmp

def set_portnum(new_daemon_port): 
    global sensEvalBoard_port 
    sensEvalBoard_port = int (new_daemon_port) 
    return sensEvalBoard_port

def get_portnum():
    tmp = 0
    if sensEvalBoard_port != 0:
        tmp = sensEvalBoard_port
    else:
        tmp = 0
    return tmp

class ConnectionState:
    DISCONNECTED = 0
    CONNECTED = 1
    STARTED = 2
    PAUSED = 3

class SensorEvaluationBoardException(Exception):
    def __init__(self, msg):
        self.msg = msg
    def __str__(self):
        return repr(self.msg)

class SensorEvaluationBoard(Thread): 
    def __init__(self, hostname, port): 
        # Call the Thread class's init function
        Thread.__init__(self)
        self.__hostname = hostname
        self.__port = port 
        self.__conn_state = ConnectionState.DISCONNECTED
        self.__sock = None 

        logging.basicConfig(level=logging.DEBUG)
        self.__logger = logging.getLogger(self.__class__.__name__)
        # self.__streamHandler = logging.StreamHandler()
        # self.__streamHandler.setLevel(logging.DEBUG)
        # self.__formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        # self.__streamHandler.setFormatter(self.__formatter)

        # self.__logger.addHandler(self.__streamHandler)
    
    def connect(self): 
        global sebSocket

        if self.__sock:
            return

        self.__buf = b''
        try:
            self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            self.__sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
            self.__sock.settimeout(DEFAULT_TIMEOUT)
            self.__sock.connect((self.__hostname, self.__port))
            self.__conn_state = ConnectionState.CONNECTED
            
            sebSocket = copy.deepcopy(self.__sock)
        except (socket.timeout, socket.error):
            self.__sock = None
            raise

    def disconnect(self):
        if self.__sock:
            self.__sock.close()
            self.__sock = None
        self.__conn_state = ConnectionState.DISCONNECTED
        
    def is_connected(self):
        return self.__conn_state is not ConnectionState.DISCONNECTED

    def get_connection_state(self):
        return self.__conn_state
    
    def send_input_data(self, data): 
        self.__sock.send(data)
    
    def receive_output_data(self, input_data): 
        self.send_input_data(input_data)
        self.__rcvData = self.__sock.recv(1024)
        return self.__rcvData
    
    def run(self): 
        global loadCycleOfSensor1, loadCycleOfSensor2, loadCycleOfSensor3
        
        self.connect()
        while True: 
            if self.is_connected: 
                # print("Socket => connection state: " + str(self.get_connection_state()) + " means CONNECTED")
                
                self.__input_data = bytearray()
                self.__input_data.append(0x1) 
                self.__rcvData = self.receive_output_data(self.__input_data)
                self.__unpacker_string = 3*'I I I I I B B B B'
                self.__unpacker = struct.Struct('<B '+ self.__unpacker_string)
                self.__unpacked = self.__unpacker.unpack(self.__rcvData) 
                # print("Slave 1: "+ repr(self.__unpacked[1:10]))
                # print("Slave 2: "+ repr(self.__unpacked[10:19]))
                # print("Slave 3: "+ repr(self.__unpacked[19:28]))
                
                mutexSensVal.acquire()
                loadCycleOfSensor1 = str(self.__unpacked[1])
                loadCycleOfSensor2 = str(self.__unpacked[12])
                loadCycleOfSensor3 = str(self.__unpacked[20]) 
                # print("Load cycle of sensors 1, 2 and 3: ["+ loadCycleOfSensor1 + ", " + loadCycleOfSensor2 + ", " + loadCycleOfSensor3 + "]")
                mutexSensVal.release() 
                sleep(0.1)
            else: 
                print("Socket => connection state: " + str(self.get_connection_state()) + " means DISCONNECTED")

class UR_RTDE(Thread): 
    def __init__(self, host, port, config_filename): 
        # Call the Thread class's init function
        Thread.__init__(self) 
        self.__host = host
        self.__port = port
        self.__config_filename = config_filename

        logging.basicConfig(level=logging.DEBUG)        
        self.__logger = logging.getLogger(self.__class__.__name__)
        # self.__streamHandler = logging.StreamHandler()
        # self.__streamHandler.setLevel(logging.INFO)
        # self.__formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        # self.__streamHandler.setFormatter(self.__formatter)
        # self.__logger.addHandler(self.__streamHandler)

        self.__con = None
        self._rtde_ok = False
        self._recept_ok = False 
        self._dataAccessConfig = Lock()
        
        self.__keep_running = True        

    def init_rtde(self):
        self.__logger.debug("init_rtde")
        if not self._rtde_ok :
            if self.__con is None:
                self.connect_rtde()
            if not self._recept_ok:
                self.init_conf()
                self.send_conf_to_robot()
            self.start_communication()
            self._rtde_ok = True
    
    def connect_rtde(self): 

        self.__logger.debug("connect_rtde")
        try:
            self.__con = rtde.RTDE(self.__host, self.__port)
            self.__con.connect()
        except socket.timeout as e:
            self.__logger.error("failed to connect with robot", exc_info=True)
            self.__controller_info = None
            self.__con = None

        self.__controller_info = self.__con.get_controller_version()
        self.__logger.info("connected with UR established")
        self.__logger.info(self.__controller_info)
        return True
    
    def disconnect_rtde(self): 
        self.__con.send_pause()
        self.__con.disconnect()
        self.__logger.debug("disconnect_rtde")

    def init_conf(self):
        self.__logger.debug("init_conf")
        with self._dataAccessConfig:            
            self._conf = rtde_config.ConfigFile(self.__config_filename)
            self._state_names, self._state_types = self._conf.get_recipe('state')
            self._set_freedrive_name, self._set_freedrive_type = self._conf.get_recipe('set_freedrive')
    
    def send_conf_to_robot(self):
        self.__logger.debug("send_conf_to_robot")
        print("in send conf")
        try:
            self.__con.send_output_setup(self._state_names, self._state_types)
            self.__set_freedrive = self.__con.send_input_setup(self._set_freedrive_name, self._set_freedrive_type)

            self.__set_freedrive.__dict__[b"input_bit_register_64"] = 0
            self._recept_ok = True
        except Exception as e:
            self.__set_freedrive = None
            self.__logger.error("rtde recepts error", exc_info=True)
            self._recept_ok = False
    
    def start_communication(self):
        self.__logger.debug("start_communication")
        if not self.__con.is_connected():
            self.__logger.warning("no connection established")
        if not self.__con.send_start():
            self.__logger.warning("starting data_sync failed")
            sys.exit()
        
        self.__logger.info("started communication")
        return True

    def run(self): 
        global loadCycleOfSensor1, loadCycleOfSensor2, loadCycleOfSensor3
        global c1, c2, c3

        lowerThr_loadCycleOfSensor1, upperThr_loadCycleOfSensor1 = 400, 700        
        lowerThr_loadCycleOfSensor2, upperThr_loadCycleOfSensor2 = 100, 400
        lowerThr_loadCycleOfSensor3, upperThr_loadCycleOfSensor3 = 200, 425

        lTh_c1 = lowerThr_loadCycleOfSensor1 
        uTh_c1 = upperThr_loadCycleOfSensor1 
        
        lTh_c2 = lowerThr_loadCycleOfSensor2 
        uTh_c2 = upperThr_loadCycleOfSensor2 
        
        lTh_c3 = lowerThr_loadCycleOfSensor3 
        uTh_c3 = upperThr_loadCycleOfSensor3 

        self.init_rtde() 

        # Set the 'input_bit_register_64' to 0 by default
        self.__set_freedrive.__dict__[b"input_bit_register_64"] = 0 
        
        if self.__con.is_connected(): 
           while self.__keep_running: 
               # receive the current state
                self.__state = self.__con.receive()
                
                if self.__state is None:
                    break
                
                mutexSensVal.acquire()
                c1 = int (loadCycleOfSensor1) 
                self.__logger.info("Loading cycle of CapSen1 is " + str(c1)) 
                c2 = int (loadCycleOfSensor2) 
                self.__logger.info("Loading cycle of CapSen2 is " + str(c2)) 
                c3 = int (loadCycleOfSensor3) 
                self.__logger.info("Loading cycle of CapSen3 is " + str(c3)) 
                mutexSensVal.release() 

                mutexFreedriveMode.acquire()
                #  input_bit_register_64 refers to "general purpose input register 64" 
                # if "general purpose input register 64" variable is set to 1 => Freedrive mode is activated
                if ((lTh_c1 < c1 < uTh_c1) and (lTh_c2 < c2 < uTh_c2)):
                    self.__set_freedrive.__dict__[b"input_bit_register_64"] = 1 
                    self.__logger.info("Capacitive sensors 1 and 2 are touched by the human operator, Freedrive mode activated")
                elif ((lTh_c2 < c2 < uTh_c2) and (lTh_c3 < c3 < uTh_c3)):
                    self.__set_freedrive.__dict__[b"input_bit_register_64"] = 1 
                    self.__logger.info("Capacitive sensors 2 and 3 are touched by the human operator, Freedrive mode activated") 
                elif ((lTh_c3 < c3 < uTh_c3) and (lTh_c1 < c1 < uTh_c1)):
                    self.__set_freedrive.__dict__[b"input_bit_register_64"] = 1 
                    self.__logger.info("Capacitive sensors 1 and 3 are touched by the human operator, Freedrive mode activated")
                else: 
                    self.__set_freedrive.__dict__[b"input_bit_register_64"] = 0 
                    self.__logger.info("No two capacitive sensors are touched by the human operator, Freedrive mode deactivated")
                
                self.__con.send(self.__set_freedrive) 
                mutexFreedriveMode.release()
                sleep(0.1)
                
        self.disconnect_rtde()

def main():
    try: 
        # threadevent = threading.Event()
        # Create an object of Thread
        th_sensevalboard = SensorEvaluationBoard(sensEvalBoard_ipAddr, sensEvalBoard_port)
        # start the SensorEvaluationBoard thread
        th_sensevalboard.start()

        sleep(1)

        # Create an object of Thread
        th_rtde = UR_RTDE(urHost, urPort, config_filename)
        # start the RTDE thread
        th_rtde.start()

        return "SensorEvaluationBoard and RTDE threads has started..."

    except KeyboardInterrupt: 
        print("Terminating communication with the sensor evaluation board... ") 
        th_sensevalboard.disconnect()
        print("Socket => connection state: " + str(th_sensevalboard.get_connection_state()) + " means DISCONNECTED")
        th_sensevalboard.join()
        print ("SensorEvaluationBoard thread successfully closed")

        print("Terminating communication with the RTDE server... ") 
        th_rtde.disconnect()
        th_rtde.join()
        print ("RTDE thread successfully closed")

        return "SensorEvaluationBoard and RTDE threads has stopped..."

sys.stdout.write("CapSens daemon started")
sys.stderr.write("CapSens daemon started with caught exception")

# server_addr = ("127.0.0.1", 40405)
# server = SimpleXMLRPCServer(server_addr, SimpleXMLRPCRequestHandler, allow_none=True)
server = SimpleXMLRPCServer(("127.0.0.1", 40405), allow_none=True)

server.register_function(set_ipaddr, "set_ipaddr")
server.register_function(get_ipaddr, "get_ipaddr")
server.register_function(set_portnum, "set_portnum")
server.register_function(get_portnum, "get_portnum")

server.register_instance(UR_RTDE(urHost, urPort, config_filename), allow_dotted_names=True)
server.register_instance(SensorEvaluationBoard(sensEvalBoard_ipAddr, sensEvalBoard_port), allow_dotted_names=True)
server.register_function(main, "main")
server.serve_forever()
        

In the main() function of the above-mentioned python script, i initialize the two threads 'th_sensevalboard' and 'th_rtde' and start them one after the other. After the main() function, i tried to register the class objects which are individual threads as instances to an XMLRPC server. I am not sure, if this is the right way to do it.

Can anyone please have a look at the above-mentioned python script and tell me if the way in which i register multiple class objects which which are individual threads as instances to an XMLRPC server is proper or not? If it is not the proper way, can you please give me your suggestions.

Best Regards

0

There are 0 answers