fsspec - is there a way how we can get paginated response from sftp?

328 views Asked by At

I use fsspec which uses in-built capabilities of paramiko but could not really find a way how we can paginate the response.

Is there a way to have that functionality over here?

The use-case is like every directory has 100000 files and listing all of these separately in memory is a bad-idea I suppose.

There is a sftp.listdir_iter but do we have that capability in fsspec?

1

There are 1 answers

10
VonC On BEST ANSWER

listdir_iter would provide a more direct way to achieve pagination since it returns an iterator, allowing you to retrieve items one by one.

import paramiko
from fsspec.implementations.sftp import SFTPFileSystem

class PaginatedSFTPFileSystem(SFTPFileSystem):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def listdir_paginated(self, path, page_size=1000, start_page=0):
        if not self.isdir(path):
            raise ValueError("Path should be a directory")

        sftp = self.ftp

        # Use Paramiko's listdir_iter method
        items_iter = sftp.listdir_iter(path)

        # Skip the items before the starting page
        for _ in range(start_page * page_size):
            try:
                next(items_iter)
            except StopIteration:
                break

        items = []
        for i, item in enumerate(items_iter):
            if i >= page_size:
                break
            items.append(item)

        return items

But you could also consider listdir_attr, which loads all items at once and then slices the list to get the desired page: that would be faster. That mean you can try and implement the pagination by slicing the returned list of SFTPAttributes objects. For example:

import paramiko
from fsspec.implementations.sftp import SFTPFileSystem

class PaginatedSFTPFileSystem(SFTPFileSystem):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def listdir_paginated(self, path, page_size=1000, start_page=0):
        if not self.isdir(path):
            raise ValueError("Path should be a directory")

        sftp = self.ftp

        # Use Paramiko's listdir_attr method
        items = sftp.listdir_attr(path)

        # Slice the items list to implement pagination
        start_index = start_page * page_size
        end_index = start_index + page_size
        paginated_items = items[start_index:end_index]

        # Extract filenames from the SFTPAttributes objects
        filenames = [item.filename for item in paginated_items]

        return filenames

You would use it as:

fs = PaginatedSFTPFileSystem(host="your_host", username="your_username", password="your_password")
paginated_response = fs.listdir_paginated("/path/to/your/directory", page_size=1000, start_page=0)

This approach is slightly more efficient than the one using listdir_iter, since it avoids iterating through the items one by one.
However, it still loads all the SFTPAttributes objects in memory before slicing the list. This memory overhead might not be an issue unless you have a very large number of files and limited memory resources.


To use listdir_iter with fsspec, you can create a custom PaginatedSFTPFileSystem class that inherits from SFTPFileSystem.
The custom class accesses the underlying paramiko SFTP client through the self.ftp attribute, and then would still use the listdir_iter method directly.

import paramiko
from fsspec.implementations.sftp import SFTPFileSystem

class PaginatedSFTPFileSystem(SFTPFileSystem):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def listdir_paginated(self, path, page_size=1000, start_page=0):
        if not self.isdir(path):
            raise ValueError("Path should be a directory")

        sftp = self.ftp  # Access the paramiko SFTP client

        # Use Paramiko's listdir_iter method
        items_iter = sftp.listdir_iter(path)

        # Implement pagination by controlling the iterator
        # ... (rest of the implementation)

        return items

By accessing the paramiko SFTP client in this way, you can use listdir_iter to implement pagination directly, even though it is not part of fsspec.


Using sshfs (an implementation of fsspec for the SFTP protocol using asyncssh), I do not see a SSHFS.listdir-like method.

But sshfs also has a lot of other basic filesystem operations, such as mkdir, touch and find.

You might therefore try and use the find method, which is inherited from the AbstractFileSystem class in fsspec, for pagination:

import sshfs

class PaginatedSSHFS(sshfs.SSHFS):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    async def find_paginated(self, path, page_size=1000, start_page=0):
        if not await self.isdir(path):
            raise ValueError("Path should be a directory")

        # Use SSHFS's find method
        items = await super().find(path, detail=False)

        # Implement pagination by slicing the items list
        start_index = start_page * page_size
        end_index = start_index + page_size
        paginated_items = items[start_index:end_index]

        return paginated_items

You can use this custom implementation in your project as follows:

import asyncio

async def main():
    fs = PaginatedSSHFS(host="your_host", username="your_username", password="your_password")
    paginated_response = await fs.find_paginated("/path/to/your/directory", page_size=1000, start_page=0)
    print(paginated_response)

asyncio.run(main())

This implementation uses the find method with the detail parameter set to False to get a list of file paths.
Then, it implements pagination by slicing the list of items.

Again, this approach loads all the items into memory before slicing the list, which may be inefficient for very large directories.


Do you know if there's a way how we can add an sftp connection directly to the PaginatedSFTPFileSystem instead of having host, username, password again since I already have a fsspec.filesystem object sftp connection created that performs multiple tasks?

I suppose you can pass an existing SFTPFileSystem object to your custom PaginatedSFTPFileSystem class and use its underlying sftp connection.

To do this, you can modify the custom class to accept an SFTPFileSystem object during initialization and use its sftp attribute for listing the directory items.

from fsspec.implementations.sftp import SFTPFileSystem

class PaginatedSFTPFileSystem(SFTPFileSystem):
    def __init__(self, sftp_fs, *args, **kwargs):
        self.sftp = sftp_fs.ftp
        super().__init__(*args, **kwargs)

    def listdir_paginated(self, path, page_size=1000, start_page=0):
        if not self.isdir(path):
            raise ValueError("Path should be a directory")

        # Use the sftp connection from the existing SFTPFileSystem object
        items_iter = self.sftp.listdir_iter(path)

        # Implement pagination by controlling the iterator
        # ... (rest of the implementation)

        return items

Now you can create an SFTPFileSystem object and pass it to the PaginatedSFTPFileSystem:

sftp_fs = SFTPFileSystem(host="your_host", username="your_username", password="your_password")

# Pass the existing SFTPFileSystem object to the custom PaginatedSFTPFileSystem
paginated_fs = PaginatedSFTPFileSystem(sftp_fs)

paginated_response = paginated_fs.listdir_paginated("/path/to/your/directory", page_size=1000, start_page=0)
print(paginated_response)

This custom class will now use the sftp connection from the existing SFTPFileSystem object, eliminating the need to provide the host, username, and password again.


Corralien suggests in the comments to use walk(path, maxdepth=None, topdown=True, **kwargs).

You can use this method with your custom PaginatedSFTPFileSystem class, as it inherits from SFTPFileSystem, which in turn inherits from AbstractFileSystem.
This means that the walk method is available to your custom class.

However, that might not be the most suitable choice for pagination, as it returns files and directories in a nested structure, making it harder to paginate the results in a straightforward manner.

If you need pagination for only the top-level directories, you can modify the custom PaginatedSFTPFileSystem class to include a custom implementation of the walk method with pagination support for the top level.

from fsspec.implementations.sftp import SFTPFileSystem

class PaginatedSFTPFileSystem(SFTPFileSystem):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def walk_paginated(self, path, page_size=1000, start_page=0, maxdepth=None, topdown=True, **kwargs):
        if not self.isdir(path):
            raise ValueError("Path should be a directory")

        # Use the original walk method to generate the nested structure
        walk_generator = super().walk(path, maxdepth=maxdepth, topdown=topdown, **kwargs)

        # Implement pagination for the top level only
        start_index = start_page * page_size
        end_index = start_index + page_size

        paginated_walk = []
        for idx, (root, directories, files) in enumerate(walk_generator):
            if start_index <= idx < end_index:
                paginated_walk.append((root, directories, files))
            if idx >= end_index:
                break

        return paginated_walk

Used with:

fs = PaginatedSFTPFileSystem(host="your_host", username="your_username", password="your_password")

paginated_response = fs.walk_paginated("/path/to/your/directory", page_size=1000, start_page=0)
for root, directories, files in paginated_response:
    print(f"Root: {root}")
    print(f"Directories: {directories}")
    print(f"Files: {files}\n")

Again, that would only paginates the top-level directories and files, not those within the subdirectories.
If you need pagination for files and directories at all levels, consider using the find method or the custom listdir_paginated method, as shown in previous examples.


As noted by mdurant in the comments:

fsspec caches filesystem instances, so if you call SSHFileSystem (or derivative) withe same arguments twice, you do NOT create a new ftp session but reuse the existing one.

See Instance/Listing caching.

Depending on your use-case, you might need to pass skip_instance_cache=True or use_listings_cache=False.

Consider that, if you use the same arguments to create a PaginatedSFTPFileSystem instance, fsspec will return the cached SFTPFileSystem instance.
If you want to force the creation of a new FTP session, you can do so by passing a unique argument when creating the PaginatedSFTPFileSystem instance.

For example, you can add a dummy argument that takes a unique value each time you want to create a new FTP session:

# Create a new PaginatedSFTPFileSystem instance with a new FTP session
fs1 = PaginatedSFTPFileSystem(host="your_host", username="your_username", password="your_password", dummy_arg1="unique_value_1")

# Create another new PaginatedSFTPFileSystem instance with a new FTP session
fs2 = PaginatedSFTPFileSystem(host="your_host", username="your_username", password="your_password", dummy_arg2="unique_value_2")

In that example, fs1 and fs2 will have separate FTP sessions, despite having the same host, username, and password, because the unique dummy arguments force fsspec to create new instances instead of reusing the cached one.