Celery/RabbitMQ unacked messages blocking queue?

8.5k views Asked by At

I have invoked a task that fetches some information remotely with urllib2 a few thousand times. The tasks are scheduled with a random eta (within a week) so they all don't hit the server at the same time. Sometimes I get a 404, sometimes not. I am handling the error in case it happens.

In the RabbitMQ console I can see 16 unacknowledged messages: 16 unacknowledged messages

I stopped celery, purged the queue and restarted it. The 16 unacknowledged messages were still there.

I have other tasks that go to the same queue and none of them was executed either. After purging, I tried to submit another task and it's state remains ready:

enter image description here

Any ideas how I can find out why messages remain unacknowledged?

Versions:

celery==3.1.4
{rabbit,"RabbitMQ","3.5.3"}

celeryapp.py

CELERYBEAT_SCHEDULE = {
    'social_grabber': {
        'task': '<django app>.tasks.task_social_grabber',
        'schedule': crontab(hour=5, minute=0, day_of_week='sunday'),
    },
}

tasks.py

@app.task
def task_social_grabber():
    for user in users:
        eta = randint(0, 60 * 60 * 24 * 7) #week in seconds
        task_social_grabber_single.apply_async((user), countdown=eta)

There is no routing for this task defined so it goes into the default queue: celery. There is one worker processing this queue.

supervisord.conf:

[program:celery]
autostart = true
autorestart = true
command = celery worker -A <django app>.celeryapp:app --concurrency=3 -l INFO -n celery
2

There are 2 answers

3
Eric Workman On BEST ANSWER

RabbitMQ broke QoS settings in version 3.3. You need to upgrade celery to at least 3.1.11 (changelog) and kombu to at least 3.0.15 (changelog). You should use the latest versions.

I hit this exact same behavior when 3.3 was released. RabbitMQ flipped the default behavior of the prefetch_count flag. Before this, if a consumer reached the CELERYD_PREFETCH_MULTIPLIER limit in eta'd messages, the worker would up this limit in order to fetch more messages. The change broke this behavior, as the new default behavior denied this capability.

0
Janusz Skonieczny On

I had a similar symptoms. Messages where getting to the MQ (visible in the charts) but where not picked up by the worker.

This led me to the assumption that my Django app had correctly setup Celery app, but I was missing an import ensuring Celery would be configured during Django startup:

from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app  # noqa

It is a silly mistake, but the messages getting to the broker, having returned an AsyncResult, got me off track, and made me looking i the wrong places. Then I noticed that setting CELERY_ALWAYS_EAGER = True didn't do squat, event then tasks weren't executed at all.

PS: This may not be an answer to @kev question, but since I got here couple of times, while looking for the solution to my problem, I post it here for anyone in similar situation.