I want to download and upload files to S3 asynchronously, and I'm using aiobotocore for that. Rather than creating a new client for every request as mentioned in the docs (which is slow), I decided to reuse the client.
I have this class
class S3Utils:
def __init__(self):
self._session = get_session()
self._s3_client = self._session.create_client('s3', region_name=config.S3_REGION)
async def upload_file(self, file_path):
try:
async with self._s3_client as s3_client:
async with aiofiles.open(file_path, 'rb') as f:
await s3_client.put_object(Bucket=config.S3_BUCKET, Key=file_path, Body=(await f.read()))
except Exception as e:
logger.error('Error uploading file', exc_info=e)
async def download_file(self, file_path):
try:
async with self._s3_client as s3_client:
response = await s3_client.get_object(Bucket=config.S3_BUCKET, Key=file_path)
content = await response['Body'].read()
async with aiofiles.open(file_path, 'wb') as f:
await f.write(content)
return file_path
except Exception as e:
logger.error('Error downloading file', exc_info=e)
return None
async def download_all_files_in_directory(self, bucket, directory_path):
try:
files = []
async with self._s3_client as s3_client:
response = await s3_client.list_objects_v2(Bucket=bucket, Prefix=directory_path)
if 'Contents' in response:
for file in response['Contents']:
file_path = file['Key']
if file_path.endswith(('jpg', 'png')):
files.append(file_path)
results = await asyncio.gather(*[asyncio.create_task(self.download_file(file)) for file in files])
print(f'results: {results}')
except Exception as e:
logger.error('Error downloading files', exc_info=e)
if __name__ == '__main__':
asyncio.run(S3Utils().download_all_files_in_directory('bucket-name', 'folder-name'))
This is the exception I'm getting:
2024-03-12T07:13:51.177643Z [error ] Error downloading file filename=s3_utils.py func_name=download_file lineno=38
Traceback (most recent call last):
File "<path>\src\utils\s3_utils.py", line 29, in download_file
async with self._s3_client as s3_client:
File "<path>\venv\lib\site-packages\aiobotocore\session.py", line 25, in __aenter__
self._client = await self._coro
RuntimeError: cannot reuse already awaited coroutine
results: [None, None, None, None, None, None, None, None, None, None, None, None]
I'm not sure why I'm receiveing this error. Since I'm awaiting the download_file
function as a task and not as a coroutine, this should've worked.
I tried copying the code of the download_file
function and using it directly inside download_all_files_in_directory
like this:
async def download_all_files_in_directory(self, bucket, directory_path):
try:
files = []
async with self._s3_client as s3_client:
response = await s3_client.list_objects_v2(Bucket=bucket, Prefix=directory_path)
if 'Contents' in response:
for file in response['Contents']:
file_path = file['Key']
if file_path.endswith(('jpg', 'png')):
response = await s3_client.get_object(Bucket=config.S3_BUCKET, Key=file_path)
content = await response['Body'].read()
async with aiofiles.open(file_path, 'wb') as f:
await f.write(content)
except Exception as e:
logger.error('Error downloading files', exc_info=e)
This worked but this looks ugly since I'm copying the code. I think the original version of this method is cleaner but I'm not sure why it doesn't work.