Messages not sent to RabbitMQ when new files are added in a directory

63 views Asked by At

I have the following code that is used in a Kubernetes pod:

import pika
import time
import os
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class FileEventHandler(FileSystemEventHandler):
    def __init__(self, channel):
        self.channel = channel

    def on_created(self, event):
        if not event.is_directory:
            message = 'New file created: %s' % event.src_path
            print("Detected new file, sending message: ", message)
            is_delivered = self.channel.basic_publish(exchange='', routing_key='myqueue', body=message)
            print("Message delivery status: ", is_delivered)

def main():
    credentials = pika.PlainCredentials('LBO64L', 'test')
    print('Logged')

    parameters = pika.ConnectionParameters('rabbitmq.developpement-dev-01.svc.cluster.local',
                                   5672,
                                   '/',
                                   credentials,
                                   socket_timeout=2)
    print(parameters)

    connection = pika.BlockingConnection(parameters)
    print("Connection...")
   
    channel = connection.channel()

    channel.queue_declare(queue='myqueue')

    print('Queue created !')

    channel.basic_publish(exchange='', routing_key='myqueue', body='Test message')
    print("Test message sent")

    event_handler = FileEventHandler(channel)

    if os.path.isdir('/mnt/lb064l/data/'):
        print("Directory exists and is accessible")
    else:
        print("Directory does not exist or is not accessible")

    print('Starting observer...')
    observer = Observer()
    observer.schedule(event_handler, path='/mnt/lb064l/data/', recursive=True)
    observer.start()

    try:
        print('Running...')
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

    print('Message sent to queue !')

if __name__ == "__main__":
    main()

This pod connects well to RabbitMQ. My goal is to send a message to a queue everytime a new file is added in the directory /mnt/. /mnt/ contains data that comes from a persistent volume.

The following works well but messages are not sent when new file come in the directory /mnt/:

channel.basic_publish(exchange='', routing_key='myqueue', body='Test message')

Files comes from another service in kubernetes, they are added in an SFTP server and persisted. In the python pod I mount the same volume persisted from the sftp.

Where can I be wrong ?

0

There are 0 answers