Kombu/Celery messaging

4.2k views Asked by At

I have a simple application that sends & receives messages, kombu, and uses Celery to task the message. Kombu alon, I can receive the message properly. when I send "Hello", kombu receives "Hello". But when I added the task, what kombu receives is the task ID of the celery.

My purpose for this project is so that I can schedule when to send and receive messages, hence Celery.

What I would like to know is why is kombu receiving the task id instead of the sent message? I have searched and searched and have not found any related results on this matter. I am a beginner in using this applications and I would appreciate some help in fixing this matter.

My codes:

task.py

from celery import Celery

app = Celery('tasks', broker='amqp://xx:xx@localhost/xx', backend='amqp://')

@app.task(name='task.add')
def add(x, y):
    return x+y

send.py

import kombu
from task import add
#declare connection with broker connection
connection = kombu.Connection(hostname='xx',
                              userid='xx',
                              password='xx',
                              virtual_host='xx')

connection.connect()
if connection.connect() is False:
    print("not connected")
else:
    print("connected")

#checks if connection is okay


#rabbitmq connection
channel = connection.channel()

#queue & exchange for kombu
exchange = kombu.Exchange('exchnge', type='direct')
queue = kombu.Queue('kombu_queue', exchange, routing_key='queue1')

#message here

x = input ("Enter first name: ")
y = input ("Enter last name: ")
result= add.delay(x,y)
print(result)



#syntax used for sending messages to queue
producer = kombu.Producer(channel, exchange)
producer.publish(result,
                 exchange = exchange,
                 routing_key='queue1')

print("Message sent: [x]")
connection.release()

receive.py

import kombu

#receive
connection = kombu.Connection(hostname='xx',
                              userid='xx',
                              password='xx',
                              virtual_host='xx')
connection.connect()

channel = connection.channel()

exchange = kombu.Exchange('exchnge', type='direct')
queue = kombu.Queue('kombu_queue', exchange, routing_key='queue1')

print("Waiting for messages...")
def callback(body, message):
    print('Got message - %s' % body)
    message.ack()

consumer = kombu.Consumer(channel,
                          queues=queue,
                          callbacks=[callback])
consumer.consume()

while True:
    connection.drain_events()

I am using:

Kombu 3.0.26
Celery 3.1.18
RabbitMQ as the broker

What I sent:

xxx
yyy

What kombu receives:

Got message - d22880c9-b22c-48d8-bc96-5d839b224f2a
2

There are 2 answers

0
predator On BEST ANSWER

I found an answer to my problem and to anyone who may come across this kind of problem, I'll share the answer that worked for me.

I found the solution here.

Or here - user jennaliu answer may probably help you if the first link didn't work.

1
tuomur On

You need to call result.get() to receive the actual value of add.delay(). What you are seeing as the message body is AsyncResult instance in string format. Which doesn't make much sense.