How to use Timer Thread with Python

1.2k views Asked by At

I am writing a Ryu application(Python) in which I have if else statement. If a condition satisfies for the first time, then it should start the timer till 10 seconds, within these 10 seconds there will be other packets arriving as well matching the same condition but I don't want to start timer every time a condition is satisfied(within these 10 seconds). In short, the timer should run in parallel.

Here is the code snippet I used for thread. Every time I run this and send multiple packets then multiple threads start whereas I want only one thread to run till 10 seconds

def timeit():
         time.sleep(10)
         aggr()
         return

def aggr():
         self.no_of_data=len(self.iot_data)
         self.ip_proto=proto
         self.ip_saddr=source
         self.ip_daddr=destination
         ip_head= pack('!BBHHHBBH16s16s' , self.ip_ihl_ver, self.ip_tos, self.ip_tot_len, self.ip_id, self.ip_frag_off, self.ip_ttl,self.ip_check,self.ip_proto, self.ip_saddr, self.ip_daddr)
         total_pkts= pack('!I', self.no_of_data)
         print "TOTALLLL,,,,",self.no_of_data
         ip_head="{" + ip_head + "}"
         total_pkts="{" + total_pkts + "}"
         s='$'
         data = s.join(self.iot_data)
         data="$" + data
         pckt= ip_head + total_pkts + data
         self.iot_data = []
         print "BUFFER: ", self.iot_data
         self.iot_data_size = 0
         self.start_time = time.time()
         self.logger.info("packet-out %s" % (repr(pckt),))
         out_port = ofproto.OFPP_FLOOD
         actions = [parser.OFPActionOutput(out_port)]
         out = parser.OFPPacketOut(datapath=datapath,
               buffer_id=ofproto.OFP_NO_BUFFER,
               in_port=in_port, actions=actions,                                          
               data=pckt)
         print "out--->" , out
         datapath.send_msg(out)
thread1 = threading.Thread(target=timeit)
thread1.start()
if  proto  == 150 and total_len  < 1500:
        if not thread1.isAlive():
                thread1.run()
        print "ifff"
        data = msg.data
        #print " # stores the packet data"
        self.iot_data.append(data)
        #print "# increment size counter"
        self.iot_data_size += total_len
        #elapsed_time = time.time() - self.start_time
        print "ELAPSED: ", elapsed_time
        print "BUFFER: ", self.iot_data

After 10 seconds, again timer should start when the first packet arrives and it should run parallel with the same code. I am so much confused with this. Please anyone help.

I hope this is clear if not I am sorry please ask for the clarification.

Thank you

1

There are 1 answers

19
campovski On BEST ANSWER

Indeed, you have to go with multi-threading (might be achieved without it but it would certainly be a pain in the ass). The idea is to run a thread that will run a function that sleeps for 10 seconds and returns. After this function returns, the thread will be set as inactive, until we run it the next time.

By knowing that we can write the following code. All details and explanations are written as comments for easier reference.

import time
import threading

packet_groups = [] # Groups of packets inside 10 seconds.
group = [] # Temporary group that will get stored into packet_groups.

# The function that will count 10 seconds:
def timeit():
    sleep(10)
    return

# Do something with packets.
def packet_handler():
    ...

# Put all your code inside another function that does not create
# new thread each time. Create a thread in main and then run this function.
def get_packets(thread1):
    ... # get packets
    if dst == 'some_address':
        # Check if thread is alive. If it is alive, a counter is running.
        # If it is not alive, then we must start the counter by running
        # thread.
        if not thread1.isAlive():
            thread1.run()
            packet_handler(packet, True)
        else:
            packet_handler(packet, False)

if __name__ == '__main__':
    # Create thread.
    thread1 = threading.Thread(target=timeit)
    # Start the thread. This is done only once per each thread.
    thread1.start()

    get_packets(thread1)

Now since you mentioned that you want to group the packets inside these 10 seconds blocks, you can implement packet_handler() like this:

def packet_handler(packet, new):
    # If we started new thread and the group isn't empty, we must
    # append group to packet_groups (that is all groups) and reset
    # the group to only contain current packet
    if new and group != []:
        packet_groups.append(group)
        group = [packet]
        return
    # If group isn't new, we are still inside 10 seconds block. We
    # just append the current packet to this block.
    if not new:
        group.append(packet)

If you want to be able to print or in any other way be able to show the timer, you can't sleep for 10 seconds because if you sleep for 10 seconds, nothing will be done in between. In such case you want to change timeit() to something like this:

def timeit():
    for i in range(10):
        print 'Time remaining: {0}s'.format(10-i)
        sleep(1)
    return