S3FS fails when given aiobotocore session to initialize the File System

252 views Asked by At

I am trying to use a combination of aiobotocore, botocore and s3fs to build an S3 client which refreshes its credentials automatically.

This is my code so far:

Code to create a refreshable session. It has two methods, one to generate a normal boto3 session and another one to generate an aiobotocore session object.

import boto3

from boto3 import Session
from botocore.session import get_session
from botocore.credentials import RefreshableCredentials
import random
import pytz
from datetime import datetime, timedelta
from dateutil.parser import parse

class RefreshableSession:

    def __init__(self):
        session = boto3.Session()
        self.sts_client = session.client("sts", region_name='eu-west-1')
        self.session_name = "session" + str(random.randint(1, 1000))
        self.duration = 3600
        self.session = None
        self.session_aio = None
        response = self.sts_client.get_caller_identity()
        arn_role = response['Arn']
        self.role_name = arn_role
        word_list = [':role', 'instance-profile', ':assumed-role']
        if [ele for ele in word_list if (ele in self.role_name)]:
            if ':assumed-role' in self.role_name:
                sts_to_iam = self.role_name.replace('arn:aws:sts::', '').replace(':assumed-role', '')[:-1].split('/')
                self.role_name = 'arn:aws:iam::' + sts_to_iam[0] + ':role/' + sts_to_iam[1]
            params = {
                "RoleArn": self.role_name,
                "RoleSessionName": self.session_name,
                "DurationSeconds": self.duration,
            }
            response = self.sts_client.assume_role(**params).get("Credentials")
            self.metadata = {
                "access_key": response.get("AccessKeyId"),
                "secret_key": response.get("SecretAccessKey"),
                "token": response.get("SessionToken"),
                "expiry_time": response.get("Expiration").isoformat(),
                "expired": False
            }
        else:
            print(
                "There is no role in the constructor or in the env, so creating a standard session with your "
                "env user credentials instead")
            self.session = session
            credentials = session.get_credentials().get_frozen_credentials()
            expiry_time = (pytz.utc.localize(datetime.utcnow()) + timedelta(hours=1)).isoformat()
            self.metadata = {
                "access_key": credentials.access_key,
                "secret_key": credentials.secret_key,
                "token": credentials.token,
                "expiry_time": expiry_time,
                "expired": False
            }

    def get_metadata(self):
        """
        If the credentials has not been automatically refreshed, this method will refresh.
        :return: credentials
        """
        # Give a 10 minute interval to make sure the credentials are not expired
        utc_now = pytz.utc.localize(datetime.utcnow())
        expiry_time = parse(self.metadata['expiry_time']) - timedelta(minutes=10)

        if expiry_time < utc_now:
            session = boto3.Session()
            # Refresh session
            print("Refreshing credentials")
            print(expiry_time, utc_now)
            params = {
                "RoleArn": self.role_name,
                "RoleSessionName": self.session_name + str(random.randint(1, 1000)),
                "DurationSeconds": self.duration,
            }
            self.sts_client = session.client("sts", region_name='eu-west-1')
            response2 = self.sts_client.assume_role(**params).get("Credentials")
            self.metadata = {
                "access_key": response2.get("AccessKeyId"),
                "secret_key": response2.get("SecretAccessKey"),
                "token": response2.get("SessionToken"),
                "expiry_time": response2.get("Expiration").isoformat()
            }
        return self.metadata

    def Session(self):
        """
        Works as Singleton returning the existing session, if not, create a new one
        :return: if the session already exists return exisiting session
        """
        if self.session is None:
            session_credentials = RefreshableCredentials.create_from_metadata(
                metadata=self.get_metadata(),
                refresh_using=self.get_metadata,
                method="sts-assume-role",
            )
            session = get_session()
            session._credentials = session_credentials
            session.set_config_variable("region", 'eu-west-1')
            autorefresh_session = Session(botocore_session=session)
            self.session = autorefresh_session
        return self.session

    def session_aiobotocore(self, region_name: str = 'eu-west-1', profile_name: str = None):
        """
        Get a basic aiobotocore.session.AioSession session with use a vy_session.
        :param region_name -> aws_region to use
        :param profile_name -> local aws profile name to use to create the session
        :return: aiobotocore.session.AioSession
        """
        from aiobotocore.session import get_session as get_session_aio
        from aiobotocore.credentials import AioRefreshableCredentials

        if self.session_aio is None:
            aio_session = get_session_aio()
            if region_name is not None:
                aio_session.set_config_variable("region", region_name)

            credentials = AioRefreshableCredentials.create_from_metadata(
                metadata=self.get_metadata(),
                refresh_using=self.get_metadata,
                method="sts-assume-role",
            )
            aio_session._credentials = credentials
            if region_name is not None:
                aio_session.set_config_variable("region", region_name)
            self.session_aio = aio_session
        return self.session_aio

Then, I use this class to give a session in the S3FileSystem creation so that the session can be refreshed automatically:

fs = s3fs.S3FileSystem(anon=False, session=AutoRefreshSession().session_aiobotocore(), asynchronous=async_mode)

This is a full example that can be used to reproduce the erorr:

import datetime as dt
import pandas as pd
import s3fs
import os


df = pd.DataFrame(data={'test': [1,2,3], 'test2': [4,5,6]})
bucket_name = 'xxxx'
bucket_path = 'xxxxxx'

ses = AutoRefreshSession().session_aiobotocore()
fs = s3fs.S3FileSystem(anon=False, session=ses, asynchronous=False)

# Set csv file name
folder_name_year = dt.datetime.utcnow().strftime('%Y/')
folder_name_month = dt.datetime.utcnow().strftime('%m/')
folder_name_day = dt.datetime.utcnow().strftime('%d/')
folder_name_hour = dt.datetime.utcnow().strftime('%H/')
file_name = dt.datetime.utcnow().strftime('%Y%m%d%H%M%S')
file = file_name + '.csv'
key_name = bucket_name + '/' + bucket_path + folder_name_year + folder_name_month + folder_name_day + folder_name_hour + file
table = bucket_path.split('/')
tmp_name = table[-2] + file
df.to_csv(tmp_name, index=False)

# THIS LINE HERE IS THE ONE FAILING
fs.put(tmp_name, key_name)
os.remove(tmp_name)

When i run the code above, I get the following error:

Trace back (most recent call last): File "<stdin>", line 1, in <module> File "/usr/local/lib/python3.10/site-packages/fsspec/asyn.py", line 118, in wrapper return sync(self.loop, func, *args, **kwargs) File "/usr/local/lib/python3.10/site-packages/fsspec/asyn.py", line 10 3, in sync raise return_result File "/usr/local/lib/python3.10/site-packages/fsspec/asyn.py", line 56, in _runner result[0] = aw ait coro File "/usr/local/lib/python3.10/site-packages/fsspec/asyn.py", line 536, in _put trailing_sep(rpath) or await self._isdir(rp ath) File "/usr/local/lib/python3.10/site-packages/s3fs/core.py", line 1411, in _isdir return bool(await self._lsdir(path)) File "/usr/local/lib/python3.10/site-packages/s3fs/core.py", line 706, in _lsdir async for c in self._iterdir( File "/usr/local/lib/python 3.10/site-packages/s3fs/core.py", line 737, in _iterdir await self.set_session() File "/usr/local/lib/python3.10/site-packages/s3fs/c ore.py", line 527, in set_session self._s3 = await s3creator.aenter() File "/usr/local/lib/python3.10/site-packages/botocore/clie nt.py", line 888, in getattr raise AttributeError( AttributeError: 'S3' object has no attribute 'aenter'

However, if I change the line fs = s3fs.S3FileSystem(anon=False, session=ses, asynchronous=False) and I remove the session I created fs = s3fs.S3FileSystem(anon=False) it works just fine.

What is wrong with my session? Is there something I am missing?

The package versions I am using are the following: aiobotocore==2.5.4 boto3==1.28.17 botocore==1.31.17 s3fs==2023.9.2 s3transfer==0.6.2

Any help is much appreciated!

0

There are 0 answers