I have the following code that is used in a Kubernetes pod:
import pika
import time
import os
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class FileEventHandler(FileSystemEventHandler):
def __init__(self, channel):
self.channel = channel
def on_created(self, event):
if not event.is_directory:
message = 'New file created: %s' % event.src_path
print("Detected new file, sending message: ", message)
is_delivered = self.channel.basic_publish(exchange='', routing_key='myqueue', body=message)
print("Message delivery status: ", is_delivered)
def main():
credentials = pika.PlainCredentials('LBO64L', 'test')
print('Logged')
parameters = pika.ConnectionParameters('rabbitmq.developpement-dev-01.svc.cluster.local',
5672,
'/',
credentials,
socket_timeout=2)
print(parameters)
connection = pika.BlockingConnection(parameters)
print("Connection...")
channel = connection.channel()
channel.queue_declare(queue='myqueue')
print('Queue created !')
channel.basic_publish(exchange='', routing_key='myqueue', body='Test message')
print("Test message sent")
event_handler = FileEventHandler(channel)
if os.path.isdir('/mnt/lb064l/data/'):
print("Directory exists and is accessible")
else:
print("Directory does not exist or is not accessible")
print('Starting observer...')
observer = Observer()
observer.schedule(event_handler, path='/mnt/lb064l/data/', recursive=True)
observer.start()
try:
print('Running...')
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
print('Message sent to queue !')
if __name__ == "__main__":
main()
This pod connects well to RabbitMQ. My goal is to send a message to a queue everytime a new file is added in the directory /mnt/. /mnt/ contains data that comes from a persistent volume.
The following works well but messages are not sent when new file come in the directory /mnt/:
channel.basic_publish(exchange='', routing_key='myqueue', body='Test message')
Files comes from another service in kubernetes, they are added in an SFTP server and persisted. In the python pod I mount the same volume persisted from the sftp.
Where can I be wrong ?