Python cdll inotify watch descriptor fails only in async task

66 views Asked by At

I'm trying to run a filesystem watcher in an async task which worked well, but when I tried to move the watch descriptor creation into the task it stopped working. Changing where that line was called is the only change that causes the issue. I cannot see anything in the asyncio docs that indicate this shouldn't be possible.

I've made the smallest reproducible example I can below. It doesn't seem to matter where the file descriptor is initialised, only where the watch is called.

import asyncio, ctypes, ctypes.util, io, logging, os

# https://man7.org/linux/man-pages/man7/inotify.7.html
class _INotifyEvent(ctypes.Structure):
    _fields_ = [
        ("wd", ctypes.c_int),
        ("mask", ctypes.c_uint32),
        ("cookie", ctypes.c_uint32),
        ("len", ctypes.c_uint32),
        # ("name", char[])
    ]

_libc = ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True)
_libc.inotify_init1.argtypes = (ctypes.c_int,)
_libc.inotify_add_watch.argtypes = (ctypes.c_int, ctypes.c_char_p, ctypes.c_uint)
EVENT_SIZE = ctypes.sizeof(_INotifyEvent) + 1

class FilesystemMonitor:
    def __init__(self, path: str, delayed=False):
        self._file_descriptor = _libc.inotify_init1(os.O_CLOEXEC)
        self._path = path
        self._wd = None
        if not delayed:
            self.add_watch()

    def add_watch(self):
        self._wd = _libc.inotify_add_watch(self._file_descriptor, bytes(self._path, "utf-8"), 8)
        print("WD =", self._wd)

    def get(self):
        buffer = io.BytesIO(os.read(self._file_descriptor, EVENT_SIZE))
        event_buffer = buffer.read(EVENT_SIZE)
        event = _INotifyEvent.from_buffer_copy(event_buffer)
        return self._path if event.wd == self._wd else "unknown"

    async def start(self, queue: asyncio.Queue):
        ioloop = asyncio.get_running_loop()

        if self._wd is None:
            self.add_watch()

        def _add_to_queue():
            queue.put_nowait(self.get())

        ioloop.add_reader(self._file_descriptor, _add_to_queue)

async def run_monitor(monitor):
    queue = asyncio.Queue()
    asyncio.gather(monitor.start(queue))
    while True:
        print("!!! EVENT:", await queue.get())

async def run(delayed):
    print("Start, delayed =", delayed)
    path = "/tmp/file.txt"
    monitor = FilesystemMonitor(path, delayed=delayed)
    asyncio.create_task(run_monitor(monitor))

    with open(path, "w") as f:
        f.write("content")

    await asyncio.sleep(0.1)  # yield so run_monitor reads from queue
    print("End")

logging.basicConfig(level=logging.DEBUG)
for val in (True, False):
    asyncio.run(run(val))


The output of which is

$ touch /tmp/file.txt && PYTHONASYNCIODEBUG=1 python3.11 temp.py 
DEBUG:asyncio:Using selector: EpollSelector
Start, delayed = True
WD = 1
End
DEBUG:asyncio:Close <_UnixSelectorEventLoop running=False closed=False debug=True>
DEBUG:asyncio:Using selector: EpollSelector
Start, delayed = False
WD = 1
!!! EVENT: /tmp/file.txt
End
DEBUG:asyncio:Close <_UnixSelectorEventLoop running=False closed=False debug=True>

Adding the following to the end of add_watch shows inotify has created a watch descriptor in both cases.

        with open(f"/proc/{os.getpid()}/fdinfo/{self._file_descriptor}") as f:
            print(f.read())

Reading this other stackoverflow question and a question linked to it, it implies that async's usage of epoll prevents it from working. However, the same watch does work in async if it's created outside of the async loop. Just to check, I tried changing the async event loop policy using the following, and trying all other Selector classes available, but nothing changed the outcome.

import selectors

class MyPolicy(asyncio.DefaultEventLoopPolicy):
    def new_event_loop(self):
        selector = selectors.SelectSelector()
        return asyncio.SelectorEventLoop(selector)

asyncio.set_event_loop_policy(MyPolicy())

Attempting to PDB through the async behaviour didn't help identify anything. I fear I'm simply misunderstanding something fundamental about the behaviour of async here. Can anyone explain why the async task works for a watch created outside of async but fails for a watch created within it? And is there a way to make it work?

0

There are 0 answers