Boto3 Moto bucket not found after mocking it

2.1k views Asked by At

I am trying to used Boto3 moto's mocks3 utility to test my code that connects to s3, the function basically list down all the folders with partition date and returns the latest one. I see no exception in mocking s3 buckets however the test code doesn't seem to find that buclet

My Test spec

import os
import unittest
from botocore.client import ClientError
from moto import mock_s3
from src.utils.aws_utils import *
import logging

log = logging.getLogger("my-logger")
MY_BUCKET = "mock_s3_bucket"
MY_PREFIX = "mock_folder"


@mock_s3
class TestPysparkUtils(unittest.TestCase):

    def setUp(self):
        s3 = boto3.resource(
            "s3",
            region_name="us-east-1",
            aws_access_key_id="fake_access_key",
            aws_secret_access_key="fake_secret_key",
        )
        s3.create_bucket(Bucket="{}".format(MY_BUCKET))
        s3.Bucket(MY_BUCKET).put_object(Key='{}/{}/partition_date=20201223/file_20201223.txt'
                                        .format(MY_BUCKET, MY_PREFIX), Body='def')
        s3.Bucket(MY_BUCKET).put_object(Key='{}/{}/partition_date=20201222/file_20201222.txt'
                                        .format(MY_BUCKET, MY_PREFIX), Body='abc')

    def tearDown(self):
        s3 = boto3.resource(
            "s3",
            region_name="us-east-1",
            aws_access_key_id="fake_access_key",
            aws_secret_access_key="fake_secret_key",
        )
        bucket = s3.Bucket(MY_BUCKET)
        for key in bucket.objects.all():
            key.delete()
        bucket.delete()

    def test_get_latest_file_path_inter(self):
        print('{}/{}'.format(MY_BUCKET, MY_PREFIX))
        s3 = boto3.resource(
            "s3",
            region_name="us-east-1",
            aws_access_key_id="fake_access_key",
            aws_secret_access_key="fake_secret_key",
        )
        try:
            s3.meta.client.head_bucket(Bucket=MY_BUCKET)
            print("Bucket Exists!")
        except ClientError:
            log.info('The bucket does not exist or you have no access.')
        result = get_latest_file_path_inter(log, s3, 's3://{}/{}/'.format(MY_BUCKET, MY_PREFIX), 'partition_date')
        print('------------------------')
        print(result)
        desired_result = ["foo.json", "bar.json"]
        self.assertCountEqual(result, desired_result)


if __name__ == "__main__":
    unittest.main()

Test Function

def get_latest_file_path_inter(logger, s3_client, base_path, partition):
    """
    Returns full s3 path of latest partition assuming partition date is of format yyyyMMdd
    :type (object, str, str) -> (str)
    :parameter
    :param logger Logger object
    :param s3_client boto3 s3 client object
    :param base_path Base s3 path
    :param partition column name
    """
    print("Inside get_latest_file_path_inter() : Given: {} {}".format(base_path, partition))
    start = base_path.find("//") + 2
    end = base_path.find("/", start)
    bucket_in = base_path[start:end]
    prefix_in = base_path[base_path.find(bucket_in) + len(bucket_in) + 1:]
    print(
        "bucket: {} | prefix: {} | partition: {} | path: s3://{}/{}".format(bucket_in, prefix_in, partition,
                                                                            bucket_in, prefix_in))
    objects = list(s3_client.Bucket(bucket_in).objects.filter(Prefix=prefix_in))
    print("total objects found: {}".format(len(objects)))
    dict_out = {}
    if len(objects) == 0:
        logger.info("Error. no files found")
        return
    for i in range(0, len(objects)):
        file_str = objects[i].key
        start = file_str.find(partition) + len(partition)
        end = file_str.find("/", start)
        part_found = file_str[start:end]
        partial_path = file_str[:file_str.find(partition) + len(partition) + 8]
        dict_out[part_found] = partial_path
    dict_sort = collections.OrderedDict(sorted(dict_out.items()))
    last = list(dict_sort.keys())[len(dict_sort) - 1]
    path_final = "s3://{}/{}/".format(bucket_in, dict_sort.get(last))
    print("path_final: {} for base_path: {} and partition: {} and last: {} and dict_sort: {}".format(
        path_final, base_path, partition, last, dict_sort))
    return path_final

Output

mock_s3_bucket/mock_folder
Inside get_latest_file_path_inter() : Given: s3://mock_s3_bucket/mock_folder/ partition_date
bucket: mock_s3_bucket | prefix: mock_folder/ | partition: partition_date | path: s3://mock_s3_bucket/mock_folder/
s3.Bucket(name='mock_s3_bucket')
total objects found: 0
------------------------
None
2

There are 2 answers

2
Explorer On BEST ANSWER

Got it working, I was mixing boto3 client and boto3 resource apis in test spec and its corresponding functions. After figuring out the difference between both, I changed everything to boto3 client api and got it working. Below is the modified function and its corresponding spec.

ssl._create_default_https_context = ssl._create_unverified_context

MY_BUCKET = "mock_s3_bucket"
MY_PREFIX = "mock_folder/mock_sub_folder"
MY_ANOTHER_PREFIX = "mock_folder/mock_another_sub_folder"


class TestPysparkUtils(unittest.TestCase):
    mock_s3 = mock_s3()
    LOGGER = logging.getLogger("my-logger")

    def setUp(self):
        self.mock_s3.start()
        s3 = boto3.resource(
            "s3",
            region_name="us-east-1",
            aws_access_key_id="fake_access_key",
            aws_secret_access_key="fake_secret_key",
        )
        s3.create_bucket(Bucket="{}".format(MY_BUCKET))
        s3.Bucket(MY_BUCKET).put_object(Key='{}/{}/partition_date=20201223/file_20201223.txt'
                                        .format(MY_BUCKET, MY_PREFIX), Body='def')
        s3.Bucket(MY_BUCKET).put_object(Key='{}/{}/partition_date=20201222/file_20201222.txt'
                                        .format(MY_BUCKET, MY_PREFIX), Body='abc')
        s3.Bucket(MY_BUCKET).put_object(Key='{}/{}/partition_date=20201222/file1_20201222.txt'
                                        .format(MY_BUCKET, MY_PREFIX), Body='xyz')

        s3.Bucket(MY_BUCKET).put_object(Key='{}/{}/partition_date=20201225/file_20201225.txt'
                                        .format(MY_BUCKET, MY_ANOTHER_PREFIX), Body='mno')
        s3.Bucket(MY_BUCKET).put_object(Key='{}/{}/partition_date=20201225/_SUCCESS'
                                        .format(MY_BUCKET, MY_ANOTHER_PREFIX), Body='pqr')

    def tearDown(self):
        self.mock_s3.stop()

    def test_get_latest_file_path_inter(self):
        boto3_s3_client = boto3.client("s3")
        result = get_latest_file_path_from_s3(self.LOGGER, boto3_s3_client, 's3://{}/{}/'.format(MY_BUCKET, MY_PREFIX),
                                              'partition_date')
        desired_result = 's3://mock_s3_bucket/mock_folder/mock_sub_folder/partition_date=20201223/'
        self.assertEqual(result, desired_result)

        with pytest.raises(KeyError):
            get_latest_file_path_from_s3(self.LOGGER, boto3_s3_client, 's3://{}/{}/'.format(MY_BUCKET, 'unavailable_prefix'),
                                         'partition_date')
def get_latest_file_path_from_s3(logger, boto_s3_client, base_path, partition):
    """
    Returns full s3 path of latest partition assuming partition date is of format yyyyMMdd
    :type (object, str, str) -> (str)
    :parameter
    :param logger Logger object
    :param boto_s3_client boto3 s3 client object
    :param base_path Base s3 path i.e. path till partition column name
    :param partition final partition column name
    """
    logger.info("Inside get_latest_file_path_inter() : Given: {} {}".format(base_path, partition))
    start = base_path.find("//") + 2
    end = base_path.find("/", start)
    bucket_in = base_path[start:end]
    prefix_in = base_path[base_path.find(bucket_in) + len(bucket_in) + 1:]
    logger.info("bucket: {} | prefix: {} | partition: {} | path: s3://{}/{}".format(bucket_in, prefix_in, partition,
                                                                                    bucket_in, prefix_in))
    try:
        s3_files = boto_s3_client.list_objects_v2(Bucket=bucket_in, Prefix='{}/{}'.format(bucket_in, prefix_in))['Contents']
    except KeyError:
        logger.error("Exception while listing objects from path : {}/{}".format(bucket_in, prefix_in))
        raise

    if len(s3_files) == 0:
        raise FileNotFoundError("Error. no files found at provided path, path: s3://{}/{} "
                                "and partition: {}".format(bucket_in, prefix_in, partition))

    latest_partition_date = 0
    for obj in s3_files:
        file_str = obj['Key']
        folder_path = file_str.rsplit('/', 1)[0]
        partition_date = int(folder_path.rpartition('=')[2])
        if partition_date > latest_partition_date:
            latest_partition_date = partition_date
    path_final = "s3://{}/{}{}={}/".format(bucket_in, prefix_in, partition, latest_partition_date)
    logger.info("path_final: {} for base_path: {} and partition: {}".format(
        path_final, base_path, partition))
    return path_final
0
Ben On

I'm not sure if this will help anyone, and it could be a confusion on sessions/resources/clients, however I had the same problem that buckets weren't created using moto (v4.1.6). I got it working by creating the session separately not using resource correction directly e.g.

WORKS

session = boto3.Session(profile_name='xyzbatch')
s3=session.resource('s3', region_name=REGION)
s3.create_bucket(Bucket=self.bucket_name,CreateBucketConfiguration={'LocationConstraint': REGION})

did not work....

s3resource = session.resource('s3', region_name='us-east-2')
s3resource.create_bucket(Bucket=bucketname, CreateBucketConfiguration={'LocationConstraint': REGION})