What causes viewflow to throw 'More than one join instance for process found'

218 views Asked by At

I have a django-viewflow workflow which includes a 3-way Split() to handle email, SMS, etc. Since each of those activities may take an extended period of time to complete, I represent each one of the 3 split branches as a pair of nodes:

  • A normal Handler() node which spawns a Celery job.
  • A custom wait-for-celery node.

The custom node looks like this:

class CeleryEvent(mixins.TaskDescriptionViewMixin,
                  mixins.NextNodeMixin, mixins.DetailViewMixin, 
                  mixins.UndoViewMixin,
                  mixins.CancelViewMixin, Event):
    ....
    activation_class = derived-from-AbstractJobActivation
    setting task_type = "somestring"

The invocation of the Viewflow code on Celery job completion follows the model of another question, and specifically includes the lock incorporated there. Generally, the result works fine. However, once in a while,I get this exception from Viewflow 1.3.0's join.py:

tasks = flow_class.task_class._default_manager.filter(
        flow_task=flow_task,
        process=process,
        status=STATUS.STARTED)

if len(tasks) > 1:
    raise FlowRuntimeError('More than one join instance for process found')

The 3 branches join like this:

close_join = flow.Join(wait_all=True). \
    Next(this.alert_devops)

I'm a bit baffled as to the cause of this as on inspection after the error, the combination of process and flow_task for the close_join in state STARTED does occur twice. I'm wondering if something I am doing could possibly be causing the issue. As far as I know, none of my code actually writes to this table directly.

I do note that the Task table does not have a unique_together('process', 'flow_task'), which I think might be because a Viewflow loop would cause the same flow_task to be hit multiple times. Since my code does not have loops (yet), I wonder if it would be a good idea to temporarily add such a constraint; at least then the creator of the illegal state would be the point of failure?

Is it possible that the lock being taken is not safe across processes? Since Celery is running this piece of code on several processes in the machine, that might then explain the problem?

    lock = self.flow_class.lock_impl(self.flow_class.instance)
    with lock(self.flow_class, task.process_id):
        #
        # Re-acquire the task under a lock (see the StackOverflow thread).
        #
        task = self.flow_class.task_class._default_manager.get(pk=task.pk)
        activation = self.activation_class()
        activation.initialize(self, task)
        activation.start()
        activation.done()
1

There are 1 answers

0
Shaheed Haque On

I believe the lack of locking is indeed the issue. As per the docs:

Locking is not enabled by default. You need to choose proper lock implementation and enable it.