I am trying to use a combination of aiobotocore, botocore and s3fs to build an S3 client which refreshes its credentials automatically.
This is my code so far:
Code to create a refreshable session. It has two methods, one to generate a normal boto3 session and another one to generate an aiobotocore session object.
import boto3
from boto3 import Session
from botocore.session import get_session
from botocore.credentials import RefreshableCredentials
import random
import pytz
from datetime import datetime, timedelta
from dateutil.parser import parse
class RefreshableSession:
def __init__(self):
session = boto3.Session()
self.sts_client = session.client("sts", region_name='eu-west-1')
self.session_name = "session" + str(random.randint(1, 1000))
self.duration = 3600
self.session = None
self.session_aio = None
response = self.sts_client.get_caller_identity()
arn_role = response['Arn']
self.role_name = arn_role
word_list = [':role', 'instance-profile', ':assumed-role']
if [ele for ele in word_list if (ele in self.role_name)]:
if ':assumed-role' in self.role_name:
sts_to_iam = self.role_name.replace('arn:aws:sts::', '').replace(':assumed-role', '')[:-1].split('/')
self.role_name = 'arn:aws:iam::' + sts_to_iam[0] + ':role/' + sts_to_iam[1]
params = {
"RoleArn": self.role_name,
"RoleSessionName": self.session_name,
"DurationSeconds": self.duration,
}
response = self.sts_client.assume_role(**params).get("Credentials")
self.metadata = {
"access_key": response.get("AccessKeyId"),
"secret_key": response.get("SecretAccessKey"),
"token": response.get("SessionToken"),
"expiry_time": response.get("Expiration").isoformat(),
"expired": False
}
else:
print(
"There is no role in the constructor or in the env, so creating a standard session with your "
"env user credentials instead")
self.session = session
credentials = session.get_credentials().get_frozen_credentials()
expiry_time = (pytz.utc.localize(datetime.utcnow()) + timedelta(hours=1)).isoformat()
self.metadata = {
"access_key": credentials.access_key,
"secret_key": credentials.secret_key,
"token": credentials.token,
"expiry_time": expiry_time,
"expired": False
}
def get_metadata(self):
"""
If the credentials has not been automatically refreshed, this method will refresh.
:return: credentials
"""
# Give a 10 minute interval to make sure the credentials are not expired
utc_now = pytz.utc.localize(datetime.utcnow())
expiry_time = parse(self.metadata['expiry_time']) - timedelta(minutes=10)
if expiry_time < utc_now:
session = boto3.Session()
# Refresh session
print("Refreshing credentials")
print(expiry_time, utc_now)
params = {
"RoleArn": self.role_name,
"RoleSessionName": self.session_name + str(random.randint(1, 1000)),
"DurationSeconds": self.duration,
}
self.sts_client = session.client("sts", region_name='eu-west-1')
response2 = self.sts_client.assume_role(**params).get("Credentials")
self.metadata = {
"access_key": response2.get("AccessKeyId"),
"secret_key": response2.get("SecretAccessKey"),
"token": response2.get("SessionToken"),
"expiry_time": response2.get("Expiration").isoformat()
}
return self.metadata
def Session(self):
"""
Works as Singleton returning the existing session, if not, create a new one
:return: if the session already exists return exisiting session
"""
if self.session is None:
session_credentials = RefreshableCredentials.create_from_metadata(
metadata=self.get_metadata(),
refresh_using=self.get_metadata,
method="sts-assume-role",
)
session = get_session()
session._credentials = session_credentials
session.set_config_variable("region", 'eu-west-1')
autorefresh_session = Session(botocore_session=session)
self.session = autorefresh_session
return self.session
def session_aiobotocore(self, region_name: str = 'eu-west-1', profile_name: str = None):
"""
Get a basic aiobotocore.session.AioSession session with use a vy_session.
:param region_name -> aws_region to use
:param profile_name -> local aws profile name to use to create the session
:return: aiobotocore.session.AioSession
"""
from aiobotocore.session import get_session as get_session_aio
from aiobotocore.credentials import AioRefreshableCredentials
if self.session_aio is None:
aio_session = get_session_aio()
if region_name is not None:
aio_session.set_config_variable("region", region_name)
credentials = AioRefreshableCredentials.create_from_metadata(
metadata=self.get_metadata(),
refresh_using=self.get_metadata,
method="sts-assume-role",
)
aio_session._credentials = credentials
if region_name is not None:
aio_session.set_config_variable("region", region_name)
self.session_aio = aio_session
return self.session_aio
Then, I use this class to give a session in the S3FileSystem creation so that the session can be refreshed automatically:
fs = s3fs.S3FileSystem(anon=False, session=AutoRefreshSession().session_aiobotocore(), asynchronous=async_mode)
This is a full example that can be used to reproduce the erorr:
import datetime as dt
import pandas as pd
import s3fs
import os
df = pd.DataFrame(data={'test': [1,2,3], 'test2': [4,5,6]})
bucket_name = 'xxxx'
bucket_path = 'xxxxxx'
ses = AutoRefreshSession().session_aiobotocore()
fs = s3fs.S3FileSystem(anon=False, session=ses, asynchronous=False)
# Set csv file name
folder_name_year = dt.datetime.utcnow().strftime('%Y/')
folder_name_month = dt.datetime.utcnow().strftime('%m/')
folder_name_day = dt.datetime.utcnow().strftime('%d/')
folder_name_hour = dt.datetime.utcnow().strftime('%H/')
file_name = dt.datetime.utcnow().strftime('%Y%m%d%H%M%S')
file = file_name + '.csv'
key_name = bucket_name + '/' + bucket_path + folder_name_year + folder_name_month + folder_name_day + folder_name_hour + file
table = bucket_path.split('/')
tmp_name = table[-2] + file
df.to_csv(tmp_name, index=False)
# THIS LINE HERE IS THE ONE FAILING
fs.put(tmp_name, key_name)
os.remove(tmp_name)
When i run the code above, I get the following error:
Trace back (most recent call last): File "<stdin>", line 1, in <module> File "/usr/local/lib/python3.10/site-packages/fsspec/asyn.py", line 118, in wrapper return sync(self.loop, func, *args, **kwargs) File "/usr/local/lib/python3.10/site-packages/fsspec/asyn.py", line 10 3, in sync raise return_result File "/usr/local/lib/python3.10/site-packages/fsspec/asyn.py", line 56, in _runner result[0] = aw ait coro File "/usr/local/lib/python3.10/site-packages/fsspec/asyn.py", line 536, in _put trailing_sep(rpath) or await self._isdir(rp ath) File "/usr/local/lib/python3.10/site-packages/s3fs/core.py", line 1411, in _isdir return bool(await self._lsdir(path)) File "/usr/local/lib/python3.10/site-packages/s3fs/core.py", line 706, in _lsdir async for c in self._iterdir( File "/usr/local/lib/python 3.10/site-packages/s3fs/core.py", line 737, in _iterdir await self.set_session() File "/usr/local/lib/python3.10/site-packages/s3fs/c ore.py", line 527, in set_session self._s3 = await s3creator.aenter() File "/usr/local/lib/python3.10/site-packages/botocore/clie nt.py", line 888, in getattr raise AttributeError( AttributeError: 'S3' object has no attribute 'aenter'
However, if I change the line fs = s3fs.S3FileSystem(anon=False, session=ses, asynchronous=False)
and I remove the session I created fs = s3fs.S3FileSystem(anon=False)
it works just fine.
What is wrong with my session? Is there something I am missing?
The package versions I am using are the following: aiobotocore==2.5.4 boto3==1.28.17 botocore==1.31.17 s3fs==2023.9.2 s3transfer==0.6.2
Any help is much appreciated!