Reactive event loop in Python

446 views Asked by At

I'm trying to build a system that collects data from some sources using I/O (HDD, network...)

For this, I have a class (controller) that launch the collectors.

Each collector is an infinite loop with a classic ETL process (extract, transform and load).

I want send some commands to the collectors (stop, reload settings...) from an interface (CLI, web...) and I'm not sure about how to do it.

For example, this is the skeleton for a collector:

class Collector(object):
    def __init__(self):
        self.reload_settings()

    def reload_settings(self):
        # Get the settings
        # Set the settings as attributes

    def process_data(self, data):
        # Do something

    def run(self):
        while True:
            data = retrieve_data()
            self.process_data(data)

And this is the skeleton for the controller:

class Controller(object):
    def __init__(self, collectors):
        self.collectors = collectors

    def run(self):
        for collector in collectors:
            collector.run()

    def reload_settings(self):
        ??

    def stop(self):
        ??

Is there a classic design pattern that solves this problem (Publish–subscribe, event loop, reactor...)? What is the best way to solve this problem?

PD: Obviously, this will be a multiprocess application and will run on a single machine.

2

There are 2 answers

0
Jarek.D On

I'd look at the possibility of adapting your case to socket-based client-server architecture where Controller would instantiate required number of Collectors each listening on its own port and handling received data in more elegant way through handle() method of the server. The fact that data comes from various I/O sources speaks even more for this solution - you could use Client part of this architecture to standarize the DataSource -> Collector protocol

https://docs.python.org/2/library/socketserver.html

2
abarnert On

There are multiple choices here, but they boil down to two major kinds: cooperative (event loop/reactor/coroutine/explicit greenlet), or preemptive (implicit greenlet/thread/multiprocess).

The first requires a lot more restructuring of your collectors. It can be a nice way to make the nondeterminism explicit, or to achieve massive concurrency, but neither of those seems relevant here. The second just requires sticking the collectors on threads, and using some synchronization mechanism for both communication and shared data. It seems like you have no shared data, and your communication is trivial and not highly time-sensitive. So, I'd go with threads.

Assuming you want to go with threads in the general sense, assuming your collectors are I/O-bound and you don't have dozens of them, I'd go with actual threads.

So, here's one way you can write it:

class Collector(threading.Thread):
    def __init__(self):
        self._reload_settings()
        self._need_reload = threading.Event()
        self._need_stop = threading.Event()

    def _reload_settings(self):
        # Get the settings
        # Set the settings as attributes
        self._need_reload.clear()

    def reload_settings(self):
        self._need_reload.set()

    def stop(self):
        self._need_stop.set()

    def process_data(self, data):
        # Do something

    def run(self):
        while not self._need_stop.is_set():
            if self._need_reload.is_set():
                self._reload_settings()
            data = retrieve_data()
            self.process_data(data)

class Controller(object):
    def __init__(self, collectors):
        self.collectors = collectors

    def run(self):
        for collector in self.collectors:
            collector.start()

    def reload_settings(self):
        for collector in self.collectors:
            collector.reload_settings()

    def stop(self):
        for collector in self.collectors:
            collector.stop()
        for collector in self.collectors:
            collector.join()

(Although I'd call the Controller.run method stop, because it fits in better with the naming used not only by Thread, but also by the stdlib server classes and other similar things.)