I've been struggling to make s3fs and ProcessPoolExecutor work together. Essentially, the issue is that s3fs, by default, holds some session information for connections. So, that doesn't play well with forked processes.
I know that s3fs.S3FileSystem has a constructor with asynchronous=True, loop=
and so on, and that there exist asynchronous versions of most of the useful calls (prepended with _
). That said, I can't find any examples of how to marry s3fs with ProcessPoolExecutor, or figure out how to put them together myself and I'm looking for some help.
The minimal test case I can think of is below. It works in the initial release of s3fs (0.4.x which is prior to its integration with fsspec, I believe). The ProcessPoolExecutor.map()
call deadlocks with any version more recent than that (I'm trying with 2023.10.0). This case is overly simple, and bordering on useless. But, you can get an idea of what I'm after from it. And, if I can get this case working, I should easily be able to move forward.
import concurrent.futures
import s3fs
def get_tags(uri):
s3 = s3fs.S3FileSystem()
return s3.get_tags(uri)
def get_files(filepath):
s3 = s3fs.S3FileSystem()
return s3.ls(filepath)
if __name__ == "__main__":
files = get_files('s3://<bucket name>/<prefix>')
with concurrent.futures.ProcessPoolExecutor() as executor:
results = executor.map(get_tags, files)
print([r for r in results])
It shouldn't matter, but I'm using Python 3.12 just in case it does.