Loop through changing dataset with inlineCallbacks/yield (python-twisted)

653 views Asked by At

I have a defer.inlineCallback function for incrementally updating a large (>1k) list one piece at a time. This list may change at any time, and I'm getting bugs because of that behavior.

The simplest representation of what I'm doing is:-

@defer.inlineCallbacks
def _get_details(self, dt=None):
    data = self.data
    for e in data:
        if needs_update(e):
            more_detail = yield get_more_detail(e)
            do_the_update(e, more_detail)
    schedule_future(self._get_details)

self.data is a list of dictionaries which is initially populated with basic information (e.g. a name and ID) at application start. _get_details will run whenever allowed to by the reactor to get more detailed information for each item in data, updating the item as it goes along.

This works well when self.data does not change, but once it is changed (can be at any point) the loop obviously refers to the wrong information. In fact in that situation it would be better to just stop the loop entirely.

I'm able to set a flag in my class (which the inlineCallback can then check) when the data is changed.

  1. Where should this check be conducted?
  2. How does the inlineCallback code execute compared to a normal deferred (and indeed to a normal python generator).
  3. Does code execution stop everytime it encounters yield (i.e. can I rely on this code between one yield and the next to be atomic)?
  4. In the case of unreliable large lists, should I even be looping through the data (for e in data), or is there a better way?
4

There are 4 answers

0
Ng Oon-Ee On
@defer.inlineCallback
def _get_details(self, dt=None):
    data = self.data
    i = 0
    while i < len(data):
        e = data[i]
        if needs_update(e):
            more_detail = yield get_more_detail(e)
            if i < len(data) or data[i] != e:
                break
            do_the_update(e, more_detail)
        i += 1
    schedule_future(self._get_details)

Based on more testing, the following are my observations.

  1. for e in data iterates through elements, with the element still existing even if data itself does not, both before and after the yield statement.

  2. As far as I can tell, execution is atomic between one yield and the next.

  3. Looping through the data is more transparently done by using a counter. This also allows for checking whether the data has changed. The check can be done anytime after yield because any changes must have occurred before yield returned. This results in the code shown above.

1
Dolan Murvihill On

the Twisted reactor never preempts your code while it is executing -- you have to voluntarily yield to the reactor by returning a value. This is why it is such a terrible thing to write Twisted code that blocks on I/O, because the reactor is not able to schedule any tasks while you are waiting for your disk.

So the short answer is that yes, execution is atomic between yields.

Without @inlineCallbacks, the _get_details function returns a generator. The @inlineCallbacks annotation simply wraps the generator in a Deferred that traverses the generator until it reaches a StopIteration exception or a defer.returnValue exception. When either of those conditions is reached, inlineCallbacks fires its Deferred. It's quite clever, really.

I don't know enough about your use case to help with your concurrency problem. Maybe make a copy of the list with tuple() and update that. But it seems like you really want an event-driven solution and not a state-driven one.

1
notorious.no On

self.data is a list of dictionaries...once it is changed (can be at any point) the loop obviously refers to the wrong information

If you're modifying a list while you iterate it, as Raymond Hettinger would say "You're living in the land of sin and you deserve everything that happens to you." :) Scenarios like this should be avoided or the list should be immutable. To circumvent this problem, you can use self.data.pop() or DeferredQueue object to store data. This way you can add and remove elements at anytime without causing adverse effects. Example with a list:

@defer.inlineCallbacks
def _get_details(self, dt=None):
    try:
        data = yield self.data.pop()
    except IndexError:
        schedule_future(self._get_details)
        defer.returnValue(None)         # exit function

    if needs_update(e):
        more_detail = yield get_more_detail(data)
        do_the_update(data, more_detail)

    schedule_future(self._get_details)

Take a look at DeferredQueue because a Deferred is returned when the get() function is called, which you can chain callbacks to handle each element you pop from the queue.

1
Dariusz Bączkowski On

You need to protect access to shared resource (self.data). You can do this with: twisted.internet.defer.DeferredLock.

http://twistedmatrix.com/documents/current/api/twisted.internet.defer.DeferredLock.html

Method acquire

Attempt to acquire the lock. Returns a Deferred that fires on lock acquisition with the DeferredLock as the value. If the lock is locked, then the Deferred is placed at the end of a waiting list.

Method release

Release the lock. If there is a waiting list, then the first Deferred in that waiting list will be called back.