I'm working on project to create different connectors. I've created a connector to a SFTP and to a PostgresDB, and now I'm working on create an S3 connector. So, my project directory looks like this:
.
├── README.md
├── assets
│ └── gitlab-runner.png
├── docker-compose.yml
├── dockerfile
├── extractor
│ ├── aws_requirements.txt
│ ├── src
│ │ ├── connectors
│ │ │ ├── __init__.py
│ │ │ ├── postgres_connector.py
│ │ │ ├── s3_connector.py
│ │ │ └── sftp_connector.py
│ │ ├── interfaces
│ │ │ ├── __init__.py
│ │ │ ├── connector.py
│ │ │ ├── logger.py
│ │ │ └── secret.py
│ │ ├── services
│ │ │ ├── __init__.py
│ │ │ ├── aws_log_service.py
│ │ │ ├── aws_secret_service.py
│ │ │ └── spark_service.py
│ │ └── utils
│ │ ├── __init__.py
│ │ └── common_utils.py
│ ├── test_requirements.txt
│ └── tests
│ ├── conftest.py
│ ├── constants.py
│ ├── test_aws_log_service.py
│ ├── test_aws_secret_service.py
│ ├── test_common_utils.py
│ ├── test_postgres_connector.py
│ ├── test_s3_connector.py
│ ├── test_sftp_connector.py
│ └── test_spark_service.py
├── jars
│ └── postgresql-42.6.0.jar
├── mock
│ ├── config_files
│ │ ├── json.ini
│ │ ├── postgres.ini
│ │ └── sftp.ini
│ ├── dumps
│ │ └── postgres-tables.sql
│ ├── output
│ │ ├── parquet
│ │ │ └── part-00000-98252473-d5d3-4339-9241-bbaf8630a49d-c000.snappy.parquet
│ │ ├── pg
│ │ │ └── part-00000-561992b0-1f7c-4deb-ab93-b4c81bce4f74-c000.csv
│ │ ├── s3
│ │ │ └── part-00000-51fa9b9b-1e9f-4cff-8367-5c4f35f75d91-c000.csv
│ │ └── sftp
│ │ └── part-00000-927da2d0-f115-405c-9559-44ce37f6bfd8-c000.csv
│ └── upload
│ ├── json_data.json
│ ├── parquet_data.parquet
│ └── sftp_data.txt
└── tox.ini
Before I created s3_connector.py and test_s3_connector.py files, everything goes well:
- When I runned my tox file, the coverage was in 100%
- When I runned docker-compose up, everything passed the test and there were no errors
But, after I created s3_connector.py and test_s3_connector.py files:
- After running my tox file, all my tests cover the code 100%
- BUT when I run docker-compose up, there's an stranger error, and it's not in my s3 files:
datalake-back-unit-test-1 | extractor/tests/test_postgres_connector.py::test_get_credentials_aws
datalake-back-unit-test-1 | INTERNALERROR> Traceback (most recent call last):
datalake-back-unit-test-1 | INTERNALERROR> File "/.tox/py310/lib/python3.10/site-packages/_pytest/main.py", line 271, in wrap_session
datalake-back-unit-test-1 | INTERNALERROR> session.exitstatus = doit(config, session) or 0
datalake-back-unit-test-1 | INTERNALERROR> File "/.tox/py310/lib/python3.10/site-packages/_pytest/main.py", line 325, in _main
datalake-back-unit-test-1 | INTERNALERROR> config.hook.pytest_runtestloop(session=session)
datalake-back-unit-test-1 | INTERNALERROR> File "/.tox/py310/lib/python3.10/site-packages/pluggy/_hooks.py", line 493, in __call__
datalake-back-unit-test-1 | INTERNALERROR> return self._hookexec(self.name, self._hookimpls, kwargs, firstresult)
datalake-back-unit-test-1 | INTERNALERROR> File "/.tox/py310/lib/python3.10/site-packages/pluggy/_manager.py", line 115, in _hookexec
datalake-back-unit-test-1 | INTERNALERROR> return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
datalake-back-unit-test-1 | INTERNALERROR> File "/.tox/py310/lib/python3.10/site-packages/pluggy/_callers.py", line 152, in _multicall
datalake-back-unit-test-1 | INTERNALERROR> return outcome.get_result()
datalake-back-unit-test-1 | INTERNALERROR> File "/.tox/py310/lib/python3.10/site-packages/pluggy/_result.py", line 114, in get_result
datalake-back-unit-test-1 | INTERNALERROR> raise exc.with_traceback(exc.__traceback__)
datalake-back-unit-test-1 | INTERNALERROR> File "/.tox/py310/lib/python3.10/site-packages/pluggy/_callers.py", line 77, in _multicall
datalake-back-unit-test-1 | INTERNALERROR> res = hook_impl.function(*args)
datalake-back-unit-test-1 | INTERNALERROR> File "/.tox/py310/lib/python3.10/site-packages/_pytest/main.py", line 350, in pytest_runtestloop
datalake-back-unit-test-1 | INTERNALERROR> item.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem)
datalake-back-unit-test-1 | INTERNALERROR> File "/.tox/py310/lib/python3.10/site-packages/pluggy/_hooks.py", line 493, in __call__
datalake-back-unit-test-1 | INTERNALERROR> return self._hookexec(self.name, self._hookimpls, kwargs, firstresult)
datalake-back-unit-test-1 | INTERNALERROR> File "/.tox/py310/lib/python3.10/site-packages/pluggy/_manager.py", line 115, in _hookexec
datalake-back-unit-test-1 | INTERNALERROR> return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
datalake-back-unit-test-1 | INTERNALERROR> File "/.tox/py310/lib/python3.10/site-packages/pluggy/_callers.py", line 152, in _multicall
datalake-back-unit-test-1 | INTERNALERROR> return outcome.get_result()
datalake-back-unit-test-1 | INTERNALERROR> File "/.tox/py310/lib/python3.10/site-packages/pluggy/_result.py", line 114, in get_result
datalake-back-unit-test-1 | INTERNALERROR> raise exc.with_traceback(exc.__traceback__)
datalake-back-unit-test-1 | INTERNALERROR> File "/.tox/py310/lib/python3.10/site-packages/pluggy/_callers.py", line 77, in _multicall
datalake-back-unit-test-1 | INTERNALERROR> res = hook_impl.function(*args)
datalake-back-unit-test-1 | INTERNALERROR> File "/.tox/py310/lib/python3.10/site-packages/_pytest/runner.py", line 114, in pytest_runtest_protocol
datalake-back-unit-test-1 | INTERNALERROR> runtestprotocol(item, nextitem=nextitem)
datalake-back-unit-test-1 | INTERNALERROR> File "/.tox/py310/lib/python3.10/site-packages/_pytest/runner.py", line 133, in runtestprotocol
datalake-back-unit-test-1 | INTERNALERROR> reports.append(call_and_report(item, "call", log))
datalake-back-unit-test-1 | INTERNALERROR> File "/.tox/py310/lib/python3.10/site-packages/_pytest/runner.py", line 224, in call_and_report
datalake-back-unit-test-1 | INTERNALERROR> report: TestReport = hook.pytest_runtest_makereport(item=item, call=call)
datalake-back-unit-test-1 | INTERNALERROR> File "/.tox/py310/lib/python3.10/site-packages/pluggy/_hooks.py", line 493, in __call__
datalake-back-unit-test-1 | INTERNALERROR> return self._hookexec(self.name, self._hookimpls, kwargs, firstresult)
datalake-back-unit-test-1 | INTERNALERROR> File "/.tox/py310/lib/python3.10/site-packages/pluggy/_manager.py", line 115, in _hookexec
datalake-back-unit-test-1 | INTERNALERROR> return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
datalake-back-unit-test-1 | INTERNALERROR> File "/.tox/py310/lib/python3.10/site-packages/pluggy/_callers.py", line 130, in _multicall
datalake-back-unit-test-1 | INTERNALERROR> teardown[0].send(outcome)
datalake-back-unit-test-1 | INTERNALERROR> File "/.tox/py310/lib/python3.10/site-packages/_pytest/skipping.py", line 266, in pytest_runtest_makereport
datalake-back-unit-test-1 | INTERNALERROR> rep = outcome.get_result()
datalake-back-unit-test-1 | INTERNALERROR> File "/.tox/py310/lib/python3.10/site-packages/pluggy/_result.py", line 114, in get_result
datalake-back-unit-test-1 | INTERNALERROR> raise exc.with_traceback(exc.__traceback__)
datalake-back-unit-test-1 | INTERNALERROR> File "/.tox/py310/lib/python3.10/site-packages/pluggy/_callers.py", line 77, in _multicall
datalake-back-unit-test-1 | INTERNALERROR> res = hook_impl.function(*args)
datalake-back-unit-test-1 | INTERNALERROR> File "/.tox/py310/lib/python3.10/site-packages/_pytest/runner.py", line 368, in pytest_runtest_makereport
datalake-back-unit-test-1 | INTERNALERROR> return TestReport.from_item_and_call(item, call)
datalake-back-unit-test-1 | INTERNALERROR> File "/.tox/py310/lib/python3.10/site-packages/_pytest/reports.py", line 362, in from_item_and_call
datalake-back-unit-test-1 | INTERNALERROR> longrepr = item.repr_failure(excinfo)
datalake-back-unit-test-1 | INTERNALERROR> File "/.tox/py310/lib/python3.10/site-packages/_pytest/python.py", line 1833, in repr_failure
datalake-back-unit-test-1 | INTERNALERROR> return self._repr_failure_py(excinfo, style=style)
datalake-back-unit-test-1 | INTERNALERROR> File "/.tox/py310/lib/python3.10/site-packages/_pytest/nodes.py", line 486, in _repr_failure_py
datalake-back-unit-test-1 | INTERNALERROR> return excinfo.getrepr(
datalake-back-unit-test-1 | INTERNALERROR> File "/.tox/py310/lib/python3.10/site-packages/_pytest/_code/code.py", line 701, in getrepr
datalake-back-unit-test-1 | INTERNALERROR> return fmt.repr_excinfo(self)
datalake-back-unit-test-1 | INTERNALERROR> File "/.tox/py310/lib/python3.10/site-packages/_pytest/_code/code.py", line 989, in repr_excinfo
datalake-back-unit-test-1 | INTERNALERROR> reprtraceback = self.repr_traceback(excinfo_)
datalake-back-unit-test-1 | INTERNALERROR> File "/.tox/py310/lib/python3.10/site-packages/_pytest/_code/code.py", line 913, in repr_traceback
datalake-back-unit-test-1 | INTERNALERROR> entries = [
datalake-back-unit-test-1 | INTERNALERROR> File "/.tox/py310/lib/python3.10/site-packages/_pytest/_code/code.py", line 914, in <listcomp>
datalake-back-unit-test-1 | INTERNALERROR> self.repr_traceback_entry(entry, excinfo if last == entry else None)
datalake-back-unit-test-1 | INTERNALERROR> File "/.tox/py310/lib/python3.10/site-packages/_pytest/_code/code.py", line 867, in repr_traceback_entry
datalake-back-unit-test-1 | INTERNALERROR> path = self._makepath(entry_path)
datalake-back-unit-test-1 | INTERNALERROR> File "/.tox/py310/lib/python3.10/site-packages/_pytest/_code/code.py", line 883, in _makepath
datalake-back-unit-test-1 | INTERNALERROR> np = bestrelpath(Path.cwd(), path)
datalake-back-unit-test-1 | INTERNALERROR> File "/.tox/py310/lib/python3.10/site-packages/_pytest/pathlib.py", line 769, in bestrelpath
datalake-back-unit-test-1 | INTERNALERROR> reldest = dest.relative_to(base)
datalake-back-unit-test-1 | INTERNALERROR> File "/usr/local/lib/python3.10/pathlib.py", line 818, in relative_to
datalake-back-unit-test-1 | INTERNALERROR> raise ValueError("{!r} is not in the subpath of {!r}"
datalake-back-unit-test-1 | INTERNALERROR> ValueError: '//extractor/src/connectors/postgres_connector.py' is not in the subpath of '/' OR one path is relative and the other is absolute.
datalake-back-unit-test-1 |
datalake-back-unit-test-1 | ============================= 13 passed in 11.68s ==============================
These are my files: postgres_connector.py:
from pyspark.sql.readwriter import DataFrameReader
from interfaces.connector import ConnectorInterface
from interfaces.secret import SecretInterface
from services.aws_secret_service import AWSSecretServiceException
from services.spark_service import SparkService, SparkServiceException
class PostgresConnectorException(Exception):
pass
class PostgresConnector(ConnectorInterface):
def __init__(self, app_name):
self.spark_service = SparkService(app_name)
def get_credentials(
self, secret_service: SecretInterface, secret_name: str, region_name: str
):
"""
Retrieve the credentials to connect to the sftp server.
Args:
secret_service (SecretInterface): Secret service to get the credentials.
secret_name (str): The secret's name.
region_name (str): The region where the secret is stored.
Raises:
PostgresConnectorException: An exception if an error occurred.
Returns:
str: The secret's value.
"""
try:
return secret_service.get_secret(secret_name, region_name)
except AWSSecretServiceException as secret_error:
raise PostgresConnectorException(secret_error) from secret_error
def pull_data(self, parameters: dict) -> DataFrameReader:
"""
Pull the data from the table that is stored in the database.
Args:
parameters (dict): The parameters to connect to the database.
Raises:
PostgresConnectorException: An exception if an error occurred.
Returns:
DataFrameReader: The data in format of dataframe.
"""
try:
return self.spark_service.read_from_database(parameters)
except SparkServiceException as spark_error:
raise PostgresConnectorException(spark_error) from spark_error
def save_data(
self, dataframe: DataFrameReader, location: str, data_format: str, mode: str
):
"""
Saves the data in the location and format defined.
Args:
dataframe (DataFrameReader): The frame with the data.
location (str): The location where the data will be stored.
data_format (str): The format of how the data will be stored.
mode (str): Overwrite or append.
Raises:
PostgresConnectorException: An exception if an error occurred.
"""
try:
self.spark_service.save_frame(dataframe, location, data_format, mode)
except SparkServiceException as spark_error:
raise PostgresConnectorException(spark_error) from spark_error
test_postgres_connector.py:
import pytest
from pyspark.sql import SparkSession
from connectors.postgres_connector import PostgresConnector, PostgresConnectorException
from constants import PG_URL, REGION_NAME
from services.aws_secret_service import AWSSecretService
def test_get_credentials_aws(aws_secret_manager):
aws_secret_manager.create_secret(Name="test_pg", SecretString='{"foo": "bar"}')
expected_output = {"foo": "bar"}
pg_connector = PostgresConnector("test-pg")
aws_secret = AWSSecretService()
output = pg_connector.get_credentials(aws_secret, "test_pg", REGION_NAME)
assert output == expected_output
def test_get_credentials_aws_exception():
pg_connector = PostgresConnector("test-pg")
aws_secret = AWSSecretService()
with pytest.raises(PostgresConnectorException):
pg_connector.get_credentials(aws_secret, "not-exist", REGION_NAME)
def test_pull_data():
pg_params = {
"url": PG_URL,
"driver": "org.postgresql.Driver",
"dbtable": "(SELECT * FROM departments) AS sample",
"user": "postgres",
"password": "postgres",
}
pg_connector = PostgresConnector("test-pg")
pg_data = pg_connector.pull_data(pg_params)
assert pg_data.count() == 3
@pytest.mark.parametrize(
"pg_params",
[
(
{
"url": PG_URL,
"driver": "org.postgresql.Driver",
"dbtable": "(SELECT * FROM departments) AS sample",
"user": "postgresa",
"password": "postgres",
}
),
],
)
def test_pull_data_exception(pg_params):
pg_connector = PostgresConnector("test-pg")
with pytest.raises(PostgresConnectorException):
pg_connector.pull_data(pg_params)
def test_save_data():
output_path = "mock/output/pg"
data_format = "csv"
mode = "overwrite"
spark = SparkSession.builder.appName("test").getOrCreate()
source_frame = spark.createDataFrame(
[(1, "RH"), (4, "AR")],
["department_id", "department_name"],
)
pg_connector = PostgresConnector("test-pg")
pg_connector.save_data(source_frame, output_path, data_format, mode)
def test_save_data_exception():
output_path = "/mock/output/pg"
data_format = "csver"
mode = "overwrite"
spark = SparkSession.builder.appName("test").getOrCreate()
source_frame = spark.createDataFrame(
[(1, "RH"), (4, "AR")],
["department_id", "department_name"],
)
with pytest.raises(PostgresConnectorException):
pg_connector = PostgresConnector("test-pg")
pg_connector.save_data(source_frame, output_path, data_format, mode)
s3_connector.py:
from pyspark.sql.readwriter import DataFrameReader
from interfaces.connector import ConnectorInterface
from interfaces.secret import SecretInterface
from services.aws_secret_service import AWSSecretServiceException
from services.spark_service import SparkService, SparkServiceException
class S3ConnectorException(Exception):
pass
class S3Connector(ConnectorInterface):
def __init__(self, app_name):
self.spark_service = SparkService(app_name)
def get_credentials(
self, secret_service: SecretInterface, secret_name: str, region_name: str
):
"""
Retrieve the credentials to connect to the S3 server.
Args:
secret_service (SecretInterface): Secret service to get the credentials.
secret_name (str): The secret's name.
region_name (str): The region where the secret is stored.
Raises:
PostgresConnectorException: An exception if an error occurred.
Returns:
str: The secret's value.
"""
try:
return secret_service.get_secret(secret_name, region_name)
except AWSSecretServiceException as secret_error:
raise S3ConnectorException(secret_error) from secret_error
def pull_data(self, parameters: dict) -> bytes:
"""
Pull the data from the file that is stored in the S3.
Args:
parameters (dict): The parameters to connect to the database.
Raises:
S3ConnectorException: An exception if an error occurred.
Returns:
bytes: The data in format of bytes.
"""
try:
return self.spark_service.read_from_file(file_path=parameters.get("file_path"),
data_format=parameters.get("data_format"),
parameters=parameters)
except SparkServiceException as spark_error:
raise S3ConnectorException(spark_error) from spark_error
def save_data(
self, dataframe: DataFrameReader, location: str, data_format: str, mode: str
):
"""
Saves the data in the location and format defined.
Args:
dataframe (DataFrameReader): The frame with the data.
location (str): The location where the data will be stored.
data_format (str): The format of how the data will be stored.
mode (str): Overwrite or append.
Raises:
PostgresConnectorException: An exception if an error occurred.
"""
try:
self.spark_service.save_frame(dataframe, location, data_format, mode)
except SparkServiceException as spark_error:
raise S3ConnectorException(spark_error) from spark_error
test_s3_connector.py:
import os
import pytest
from moto import mock_s3
import boto3
from pyspark.sql import SparkSession
from connectors.s3_connector import S3Connector, S3ConnectorException
from constants import REGION_NAME
from services.aws_secret_service import AWSSecretService
def test_get_credentials_aws(aws_secret_manager):
aws_secret_manager.create_secret(Name="test_s3", SecretString='{"foo": "bar"}')
expected_output = {"foo": "bar"}
s3_connector = S3Connector("test-s3")
aws_secret = AWSSecretService()
output = s3_connector.get_credentials(aws_secret, "test_s3", REGION_NAME)
assert output == expected_output
def test_get_credentials_aws_exception():
s3_connector = S3Connector("test-s3")
aws_secret = AWSSecretService()
with pytest.raises(S3ConnectorException):
s3_connector.get_credentials(aws_secret, "not-exist", REGION_NAME)
@mock_s3
def test_pull_data():
s3 = boto3.client('s3')
s3_params = {
"s3_bucket": s3.create_bucket(Bucket='test-bucket'),
"s3_key": "test-data.csv",
"file_path": os.path.join(os.path.dirname(__file__)),
"data_format": "csv"
}
s3_connector = S3Connector("test-s3")
s3_connector.pull_data(s3_params)
@pytest.mark.parametrize(
"s3_params",
[
(
{
"s3_bucket": "test-bucket",
"s3_key": "test-data.csv",
"file_path": "my_path",
"data_format": "csv"
}
),
],
)
def test_pull_data_exception(s3_params):
s3_connector = S3Connector("test-s3")
with pytest.raises(S3ConnectorException):
s3_connector.pull_data(s3_params)
def test_save_data():
output_path = "mock/output/s3"
data_format = "csv"
mode = "overwrite"
spark = SparkSession.builder.appName("test").getOrCreate()
source_frame = spark.createDataFrame(
[(1, "RH"), (4, "AR")],
["department_id", "department_name"],
)
s3_connector = S3Connector("test-s3")
s3_connector.save_data(source_frame, output_path, data_format, mode)
def test_save_data_exception():
output_path = "/mock/output/pg"
data_format = "csver"
mode = "overwrite"
spark = SparkSession.builder.appName("test").getOrCreate()
source_frame = spark.createDataFrame(
[(1, "RH"), (4, "AR")],
["department_id", "department_name"],
)
with pytest.raises(S3ConnectorException):
s3_connector = S3Connector("test-s3")
s3_connector.save_data(source_frame, output_path, data_format, mode)
Also, you need to know that these scripts uses this spark file, spark_service.py:
from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException, IllegalArgumentException
from pyspark.sql.readwriter import DataFrameReader
class SparkServiceException(Exception):
pass
class SparkService:
def __init__(self, app_name: str):
self.spark = (
SparkSession.builder.appName(app_name).enableHiveSupport().getOrCreate()
)
self.spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
self.spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
self.spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
self.spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
self.spark.conf.set(
"spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "CORRECTED"
)
spark_context = self.spark.sparkContext
spark_context.setLogLevel("INFO")
def read_from_database(self, jdbc_properties: dict) -> DataFrameReader:
"""
Function to read a table from a database.
Args:
jdbc_properties (dict): The options to connect to a table.
Raises:
SparkServiceException: An exception if an error ocurred.
Returns:
[DataFrameReader]: The dataframe with the content of the table.
"""
jdbc_reader = self.spark.read.format("jdbc").options(**jdbc_properties)
try:
return jdbc_reader.load()
except Py4JJavaError as java_error:
raise SparkServiceException(java_error) from java_error
except IllegalArgumentException as ilegal_error:
raise SparkServiceException(ilegal_error) from ilegal_error
def read_from_file(
self, file_path: str, data_format: str, parameters: dict
) -> DataFrameReader:
"""
This function can read the content of a file that is stored in json, csv, parquet or hudi.
Args:
file_path (str): The file's path.
data_format (str): The format of how is stored the file. E.g. csv, parquet, etc.
parameters (dict): The different options to read the file.
Raises:
SparkServiceException: Raise exception if something fails.
Returns:
[DataFrameReader]: A dataframe with the content of the file.
"""
try:
return (
self.spark.read.format(data_format)
.options(**parameters)
.load(file_path)
)
except AnalysisException as anlysis_error:
raise SparkServiceException(anlysis_error) from anlysis_error
def save_frame(
self, data_frame: DataFrameReader, path: str, data_format: str, mode: str
):
"""
This function will write a dataframe into a defined path.
Args:
data_frame (DataFrameReader): The dataframe that will be written.
path (str): The path where the file will be stored.
data_format (str): The data format of how the data will be stored.
mode (str): Overwrite or Append.
Raises:
SparkServiceException: If an error occurred this exception will be raised
"""
try:
data_frame.repartition(1).write.format(data_format).mode(mode).save(path)
except AttributeError as attribute_error:
raise SparkServiceException(attribute_error) from attribute_error
except Py4JJavaError as java_error:
raise SparkServiceException(java_error) from java_error
def stop_spark(self):
"""
Function to stop the spark session
"""
self.spark.stop()
I don't know what happens, if there's conflict between s3 and postgres files.
I'm new using docker and this technologies, any idea?