tox proof is passing but not when I get up my docker-compose: INTERNALERROR

35 views Asked by At

I'm working on project to create different connectors. I've created a connector to a SFTP and to a PostgresDB, and now I'm working on create an S3 connector. So, my project directory looks like this:

.
├── README.md
├── assets
│   └── gitlab-runner.png
├── docker-compose.yml
├── dockerfile
├── extractor
│   ├── aws_requirements.txt
│   ├── src
│   │   ├── connectors
│   │   │   ├── __init__.py
│   │   │   ├── postgres_connector.py
│   │   │   ├── s3_connector.py
│   │   │   └── sftp_connector.py
│   │   ├── interfaces
│   │   │   ├── __init__.py
│   │   │   ├── connector.py
│   │   │   ├── logger.py
│   │   │   └── secret.py
│   │   ├── services
│   │   │   ├── __init__.py
│   │   │   ├── aws_log_service.py
│   │   │   ├── aws_secret_service.py
│   │   │   └── spark_service.py
│   │   └── utils
│   │       ├── __init__.py
│   │       └── common_utils.py
│   ├── test_requirements.txt
│   └── tests
│       ├── conftest.py
│       ├── constants.py
│       ├── test_aws_log_service.py
│       ├── test_aws_secret_service.py
│       ├── test_common_utils.py
│       ├── test_postgres_connector.py
│       ├── test_s3_connector.py
│       ├── test_sftp_connector.py
│       └── test_spark_service.py
├── jars
│   └── postgresql-42.6.0.jar
├── mock
│   ├── config_files
│   │   ├── json.ini
│   │   ├── postgres.ini
│   │   └── sftp.ini
│   ├── dumps
│   │   └── postgres-tables.sql
│   ├── output
│   │   ├── parquet
│   │   │   └── part-00000-98252473-d5d3-4339-9241-bbaf8630a49d-c000.snappy.parquet
│   │   ├── pg
│   │   │   └── part-00000-561992b0-1f7c-4deb-ab93-b4c81bce4f74-c000.csv
│   │   ├── s3
│   │   │   └── part-00000-51fa9b9b-1e9f-4cff-8367-5c4f35f75d91-c000.csv
│   │   └── sftp
│   │       └── part-00000-927da2d0-f115-405c-9559-44ce37f6bfd8-c000.csv
│   └── upload
│       ├── json_data.json
│       ├── parquet_data.parquet
│       └── sftp_data.txt
└── tox.ini

Before I created s3_connector.py and test_s3_connector.py files, everything goes well:

  • When I runned my tox file, the coverage was in 100%
  • When I runned docker-compose up, everything passed the test and there were no errors

But, after I created s3_connector.py and test_s3_connector.py files:

  • After running my tox file, all my tests cover the code 100%
  • BUT when I run docker-compose up, there's an stranger error, and it's not in my s3 files:
datalake-back-unit-test-1  | extractor/tests/test_postgres_connector.py::test_get_credentials_aws 
datalake-back-unit-test-1  | INTERNALERROR> Traceback (most recent call last):
datalake-back-unit-test-1  | INTERNALERROR>   File "/.tox/py310/lib/python3.10/site-packages/_pytest/main.py", line 271, in wrap_session
datalake-back-unit-test-1  | INTERNALERROR>     session.exitstatus = doit(config, session) or 0
datalake-back-unit-test-1  | INTERNALERROR>   File "/.tox/py310/lib/python3.10/site-packages/_pytest/main.py", line 325, in _main
datalake-back-unit-test-1  | INTERNALERROR>     config.hook.pytest_runtestloop(session=session)
datalake-back-unit-test-1  | INTERNALERROR>   File "/.tox/py310/lib/python3.10/site-packages/pluggy/_hooks.py", line 493, in __call__
datalake-back-unit-test-1  | INTERNALERROR>     return self._hookexec(self.name, self._hookimpls, kwargs, firstresult)
datalake-back-unit-test-1  | INTERNALERROR>   File "/.tox/py310/lib/python3.10/site-packages/pluggy/_manager.py", line 115, in _hookexec
datalake-back-unit-test-1  | INTERNALERROR>     return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
datalake-back-unit-test-1  | INTERNALERROR>   File "/.tox/py310/lib/python3.10/site-packages/pluggy/_callers.py", line 152, in _multicall
datalake-back-unit-test-1  | INTERNALERROR>     return outcome.get_result()
datalake-back-unit-test-1  | INTERNALERROR>   File "/.tox/py310/lib/python3.10/site-packages/pluggy/_result.py", line 114, in get_result
datalake-back-unit-test-1  | INTERNALERROR>     raise exc.with_traceback(exc.__traceback__)
datalake-back-unit-test-1  | INTERNALERROR>   File "/.tox/py310/lib/python3.10/site-packages/pluggy/_callers.py", line 77, in _multicall
datalake-back-unit-test-1  | INTERNALERROR>     res = hook_impl.function(*args)
datalake-back-unit-test-1  | INTERNALERROR>   File "/.tox/py310/lib/python3.10/site-packages/_pytest/main.py", line 350, in pytest_runtestloop
datalake-back-unit-test-1  | INTERNALERROR>     item.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem)
datalake-back-unit-test-1  | INTERNALERROR>   File "/.tox/py310/lib/python3.10/site-packages/pluggy/_hooks.py", line 493, in __call__
datalake-back-unit-test-1  | INTERNALERROR>     return self._hookexec(self.name, self._hookimpls, kwargs, firstresult)
datalake-back-unit-test-1  | INTERNALERROR>   File "/.tox/py310/lib/python3.10/site-packages/pluggy/_manager.py", line 115, in _hookexec
datalake-back-unit-test-1  | INTERNALERROR>     return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
datalake-back-unit-test-1  | INTERNALERROR>   File "/.tox/py310/lib/python3.10/site-packages/pluggy/_callers.py", line 152, in _multicall
datalake-back-unit-test-1  | INTERNALERROR>     return outcome.get_result()
datalake-back-unit-test-1  | INTERNALERROR>   File "/.tox/py310/lib/python3.10/site-packages/pluggy/_result.py", line 114, in get_result
datalake-back-unit-test-1  | INTERNALERROR>     raise exc.with_traceback(exc.__traceback__)
datalake-back-unit-test-1  | INTERNALERROR>   File "/.tox/py310/lib/python3.10/site-packages/pluggy/_callers.py", line 77, in _multicall
datalake-back-unit-test-1  | INTERNALERROR>     res = hook_impl.function(*args)
datalake-back-unit-test-1  | INTERNALERROR>   File "/.tox/py310/lib/python3.10/site-packages/_pytest/runner.py", line 114, in pytest_runtest_protocol
datalake-back-unit-test-1  | INTERNALERROR>     runtestprotocol(item, nextitem=nextitem)
datalake-back-unit-test-1  | INTERNALERROR>   File "/.tox/py310/lib/python3.10/site-packages/_pytest/runner.py", line 133, in runtestprotocol
datalake-back-unit-test-1  | INTERNALERROR>     reports.append(call_and_report(item, "call", log))
datalake-back-unit-test-1  | INTERNALERROR>   File "/.tox/py310/lib/python3.10/site-packages/_pytest/runner.py", line 224, in call_and_report
datalake-back-unit-test-1  | INTERNALERROR>     report: TestReport = hook.pytest_runtest_makereport(item=item, call=call)
datalake-back-unit-test-1  | INTERNALERROR>   File "/.tox/py310/lib/python3.10/site-packages/pluggy/_hooks.py", line 493, in __call__
datalake-back-unit-test-1  | INTERNALERROR>     return self._hookexec(self.name, self._hookimpls, kwargs, firstresult)
datalake-back-unit-test-1  | INTERNALERROR>   File "/.tox/py310/lib/python3.10/site-packages/pluggy/_manager.py", line 115, in _hookexec
datalake-back-unit-test-1  | INTERNALERROR>     return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
datalake-back-unit-test-1  | INTERNALERROR>   File "/.tox/py310/lib/python3.10/site-packages/pluggy/_callers.py", line 130, in _multicall
datalake-back-unit-test-1  | INTERNALERROR>     teardown[0].send(outcome)
datalake-back-unit-test-1  | INTERNALERROR>   File "/.tox/py310/lib/python3.10/site-packages/_pytest/skipping.py", line 266, in pytest_runtest_makereport
datalake-back-unit-test-1  | INTERNALERROR>     rep = outcome.get_result()
datalake-back-unit-test-1  | INTERNALERROR>   File "/.tox/py310/lib/python3.10/site-packages/pluggy/_result.py", line 114, in get_result
datalake-back-unit-test-1  | INTERNALERROR>     raise exc.with_traceback(exc.__traceback__)
datalake-back-unit-test-1  | INTERNALERROR>   File "/.tox/py310/lib/python3.10/site-packages/pluggy/_callers.py", line 77, in _multicall
datalake-back-unit-test-1  | INTERNALERROR>     res = hook_impl.function(*args)
datalake-back-unit-test-1  | INTERNALERROR>   File "/.tox/py310/lib/python3.10/site-packages/_pytest/runner.py", line 368, in pytest_runtest_makereport
datalake-back-unit-test-1  | INTERNALERROR>     return TestReport.from_item_and_call(item, call)
datalake-back-unit-test-1  | INTERNALERROR>   File "/.tox/py310/lib/python3.10/site-packages/_pytest/reports.py", line 362, in from_item_and_call
datalake-back-unit-test-1  | INTERNALERROR>     longrepr = item.repr_failure(excinfo)
datalake-back-unit-test-1  | INTERNALERROR>   File "/.tox/py310/lib/python3.10/site-packages/_pytest/python.py", line 1833, in repr_failure
datalake-back-unit-test-1  | INTERNALERROR>     return self._repr_failure_py(excinfo, style=style)
datalake-back-unit-test-1  | INTERNALERROR>   File "/.tox/py310/lib/python3.10/site-packages/_pytest/nodes.py", line 486, in _repr_failure_py
datalake-back-unit-test-1  | INTERNALERROR>     return excinfo.getrepr(
datalake-back-unit-test-1  | INTERNALERROR>   File "/.tox/py310/lib/python3.10/site-packages/_pytest/_code/code.py", line 701, in getrepr
datalake-back-unit-test-1  | INTERNALERROR>     return fmt.repr_excinfo(self)
datalake-back-unit-test-1  | INTERNALERROR>   File "/.tox/py310/lib/python3.10/site-packages/_pytest/_code/code.py", line 989, in repr_excinfo
datalake-back-unit-test-1  | INTERNALERROR>     reprtraceback = self.repr_traceback(excinfo_)
datalake-back-unit-test-1  | INTERNALERROR>   File "/.tox/py310/lib/python3.10/site-packages/_pytest/_code/code.py", line 913, in repr_traceback
datalake-back-unit-test-1  | INTERNALERROR>     entries = [
datalake-back-unit-test-1  | INTERNALERROR>   File "/.tox/py310/lib/python3.10/site-packages/_pytest/_code/code.py", line 914, in <listcomp>
datalake-back-unit-test-1  | INTERNALERROR>     self.repr_traceback_entry(entry, excinfo if last == entry else None)
datalake-back-unit-test-1  | INTERNALERROR>   File "/.tox/py310/lib/python3.10/site-packages/_pytest/_code/code.py", line 867, in repr_traceback_entry
datalake-back-unit-test-1  | INTERNALERROR>     path = self._makepath(entry_path)
datalake-back-unit-test-1  | INTERNALERROR>   File "/.tox/py310/lib/python3.10/site-packages/_pytest/_code/code.py", line 883, in _makepath
datalake-back-unit-test-1  | INTERNALERROR>     np = bestrelpath(Path.cwd(), path)
datalake-back-unit-test-1  | INTERNALERROR>   File "/.tox/py310/lib/python3.10/site-packages/_pytest/pathlib.py", line 769, in bestrelpath
datalake-back-unit-test-1  | INTERNALERROR>     reldest = dest.relative_to(base)
datalake-back-unit-test-1  | INTERNALERROR>   File "/usr/local/lib/python3.10/pathlib.py", line 818, in relative_to
datalake-back-unit-test-1  | INTERNALERROR>     raise ValueError("{!r} is not in the subpath of {!r}"
datalake-back-unit-test-1  | INTERNALERROR> ValueError: '//extractor/src/connectors/postgres_connector.py' is not in the subpath of '/' OR one path is relative and the other is absolute.
datalake-back-unit-test-1  | 
datalake-back-unit-test-1  | ============================= 13 passed in 11.68s ==============================

These are my files: postgres_connector.py:

from pyspark.sql.readwriter import DataFrameReader
from interfaces.connector import ConnectorInterface
from interfaces.secret import SecretInterface
from services.aws_secret_service import AWSSecretServiceException
from services.spark_service import SparkService, SparkServiceException


class PostgresConnectorException(Exception):
    pass


class PostgresConnector(ConnectorInterface):
    def __init__(self, app_name):
        self.spark_service = SparkService(app_name)

    def get_credentials(
        self, secret_service: SecretInterface, secret_name: str, region_name: str
    ):
        """
        Retrieve the credentials to connect to the sftp server.
        Args:
            secret_service (SecretInterface): Secret service to get the credentials.
            secret_name (str): The secret's name.
            region_name (str): The region where the secret is stored.
        Raises:
            PostgresConnectorException: An exception if an error occurred.
        Returns:
            str: The secret's value.
        """
        try:
            return secret_service.get_secret(secret_name, region_name)
        except AWSSecretServiceException as secret_error:
            raise PostgresConnectorException(secret_error) from secret_error

    def pull_data(self, parameters: dict) -> DataFrameReader:
        """
        Pull the data from the table that is stored in the database.
        Args:
            parameters (dict): The parameters to connect to the database.
        Raises:
            PostgresConnectorException: An exception if an error occurred.
        Returns:
            DataFrameReader: The data in format of dataframe.
        """
        try:
            return self.spark_service.read_from_database(parameters)
        except SparkServiceException as spark_error:
            raise PostgresConnectorException(spark_error) from spark_error

    def save_data(
        self, dataframe: DataFrameReader, location: str, data_format: str, mode: str
    ):
        """
        Saves the data in the location and format defined.
        Args:
            dataframe (DataFrameReader): The frame with the data.
            location (str): The location where the data will be stored.
            data_format (str): The format of how the data will be stored.
            mode (str): Overwrite or append.
        Raises:
            PostgresConnectorException: An exception if an error occurred.
        """
        try:
            self.spark_service.save_frame(dataframe, location, data_format, mode)
        except SparkServiceException as spark_error:
            raise PostgresConnectorException(spark_error) from spark_error

test_postgres_connector.py:

import pytest
from pyspark.sql import SparkSession
from connectors.postgres_connector import PostgresConnector, PostgresConnectorException
from constants import PG_URL, REGION_NAME
from services.aws_secret_service import AWSSecretService


def test_get_credentials_aws(aws_secret_manager):
    aws_secret_manager.create_secret(Name="test_pg", SecretString='{"foo": "bar"}')
    expected_output = {"foo": "bar"}
    pg_connector = PostgresConnector("test-pg")
    aws_secret = AWSSecretService()
    output = pg_connector.get_credentials(aws_secret, "test_pg", REGION_NAME)
    assert output == expected_output


def test_get_credentials_aws_exception():
    pg_connector = PostgresConnector("test-pg")
    aws_secret = AWSSecretService()
    with pytest.raises(PostgresConnectorException):
        pg_connector.get_credentials(aws_secret, "not-exist", REGION_NAME)


def test_pull_data():
    pg_params = {
        "url": PG_URL,
        "driver": "org.postgresql.Driver",
        "dbtable": "(SELECT * FROM departments) AS sample",
        "user": "postgres",
        "password": "postgres",
    }
    pg_connector = PostgresConnector("test-pg")
    pg_data = pg_connector.pull_data(pg_params)
    assert pg_data.count() == 3


@pytest.mark.parametrize(
    "pg_params",
    [
        (
            {
                "url": PG_URL,
                "driver": "org.postgresql.Driver",
                "dbtable": "(SELECT * FROM departments) AS sample",
                "user": "postgresa",
                "password": "postgres",
            }
        ),
    ],
)
def test_pull_data_exception(pg_params):
    pg_connector = PostgresConnector("test-pg")
    with pytest.raises(PostgresConnectorException):
        pg_connector.pull_data(pg_params)


def test_save_data():
    output_path = "mock/output/pg"
    data_format = "csv"
    mode = "overwrite"
    spark = SparkSession.builder.appName("test").getOrCreate()
    source_frame = spark.createDataFrame(
        [(1, "RH"), (4, "AR")],
        ["department_id", "department_name"],
    )
    pg_connector = PostgresConnector("test-pg")
    pg_connector.save_data(source_frame, output_path, data_format, mode)


def test_save_data_exception():
    output_path = "/mock/output/pg"
    data_format = "csver"
    mode = "overwrite"
    spark = SparkSession.builder.appName("test").getOrCreate()
    source_frame = spark.createDataFrame(
        [(1, "RH"), (4, "AR")],
        ["department_id", "department_name"],
    )
    with pytest.raises(PostgresConnectorException):
        pg_connector = PostgresConnector("test-pg")
        pg_connector.save_data(source_frame, output_path, data_format, mode)

s3_connector.py:

from pyspark.sql.readwriter import DataFrameReader
from interfaces.connector import ConnectorInterface
from interfaces.secret import SecretInterface
from services.aws_secret_service import AWSSecretServiceException
from services.spark_service import SparkService, SparkServiceException


class S3ConnectorException(Exception):
    pass


class S3Connector(ConnectorInterface):
    def __init__(self, app_name):
        self.spark_service = SparkService(app_name)

    def get_credentials(
        self, secret_service: SecretInterface, secret_name: str, region_name: str
    ):
        """
        Retrieve the credentials to connect to the S3 server.
        Args:
            secret_service (SecretInterface): Secret service to get the credentials.
            secret_name (str): The secret's name.
            region_name (str): The region where the secret is stored.
        Raises:
            PostgresConnectorException: An exception if an error occurred.
        Returns:
            str: The secret's value.
        """
        try:
            return secret_service.get_secret(secret_name, region_name)
        except AWSSecretServiceException as secret_error:
            raise S3ConnectorException(secret_error) from secret_error

    def pull_data(self, parameters: dict) -> bytes:
        """
        Pull the data from the file that is stored in the S3.
        Args:
            parameters (dict): The parameters to connect to the database.
        Raises:
            S3ConnectorException: An exception if an error occurred.
        Returns:
            bytes: The data in format of bytes.
        """
        try:
            return self.spark_service.read_from_file(file_path=parameters.get("file_path"),
                                                     data_format=parameters.get("data_format"),
                                                     parameters=parameters)
        except SparkServiceException as spark_error:
            raise S3ConnectorException(spark_error) from spark_error

    def save_data(
        self, dataframe: DataFrameReader, location: str, data_format: str, mode: str
    ):
        """
        Saves the data in the location and format defined.
        Args:
            dataframe (DataFrameReader): The frame with the data.
            location (str): The location where the data will be stored.
            data_format (str): The format of how the data will be stored.
            mode (str): Overwrite or append.
        Raises:
            PostgresConnectorException: An exception if an error occurred.
        """
        try:
            self.spark_service.save_frame(dataframe, location, data_format, mode)
        except SparkServiceException as spark_error:
            raise S3ConnectorException(spark_error) from spark_error

test_s3_connector.py:

import os
import pytest
from moto import mock_s3
import boto3
from pyspark.sql import SparkSession
from connectors.s3_connector import S3Connector, S3ConnectorException
from constants import REGION_NAME
from services.aws_secret_service import AWSSecretService


def test_get_credentials_aws(aws_secret_manager):
    aws_secret_manager.create_secret(Name="test_s3", SecretString='{"foo": "bar"}')
    expected_output = {"foo": "bar"}
    s3_connector = S3Connector("test-s3")
    aws_secret = AWSSecretService()
    output = s3_connector.get_credentials(aws_secret, "test_s3", REGION_NAME)
    assert output == expected_output


def test_get_credentials_aws_exception():
    s3_connector = S3Connector("test-s3")
    aws_secret = AWSSecretService()
    with pytest.raises(S3ConnectorException):
        s3_connector.get_credentials(aws_secret, "not-exist", REGION_NAME)


@mock_s3
def test_pull_data():
    s3 = boto3.client('s3')
    s3_params = {
        "s3_bucket": s3.create_bucket(Bucket='test-bucket'),
        "s3_key": "test-data.csv",
        "file_path": os.path.join(os.path.dirname(__file__)),
        "data_format": "csv"
    }
    s3_connector = S3Connector("test-s3")
    s3_connector.pull_data(s3_params)


@pytest.mark.parametrize(
    "s3_params",
    [
        (
            {
                "s3_bucket": "test-bucket",
                "s3_key": "test-data.csv",
                "file_path": "my_path",
                "data_format": "csv"
            }
        ),
    ],
)
def test_pull_data_exception(s3_params):
    s3_connector = S3Connector("test-s3")
    with pytest.raises(S3ConnectorException):
        s3_connector.pull_data(s3_params)


def test_save_data():
    output_path = "mock/output/s3"
    data_format = "csv"
    mode = "overwrite"
    spark = SparkSession.builder.appName("test").getOrCreate()
    source_frame = spark.createDataFrame(
        [(1, "RH"), (4, "AR")],
        ["department_id", "department_name"],
    )
    s3_connector = S3Connector("test-s3")
    s3_connector.save_data(source_frame, output_path, data_format, mode)


def test_save_data_exception():
    output_path = "/mock/output/pg"
    data_format = "csver"
    mode = "overwrite"
    spark = SparkSession.builder.appName("test").getOrCreate()
    source_frame = spark.createDataFrame(
        [(1, "RH"), (4, "AR")],
        ["department_id", "department_name"],
    )
    with pytest.raises(S3ConnectorException):
        s3_connector = S3Connector("test-s3")
        s3_connector.save_data(source_frame, output_path, data_format, mode)

Also, you need to know that these scripts uses this spark file, spark_service.py:

from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException, IllegalArgumentException
from pyspark.sql.readwriter import DataFrameReader


class SparkServiceException(Exception):
    pass


class SparkService:
    def __init__(self, app_name: str):
        self.spark = (
            SparkSession.builder.appName(app_name).enableHiveSupport().getOrCreate()
        )

        self.spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
        self.spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
        self.spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
        self.spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
        self.spark.conf.set(
            "spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "CORRECTED"
        )

        spark_context = self.spark.sparkContext
        spark_context.setLogLevel("INFO")

    def read_from_database(self, jdbc_properties: dict) -> DataFrameReader:
        """
        Function to read a table from a database.
        Args:
            jdbc_properties (dict): The options to connect to a table.
        Raises:
            SparkServiceException: An exception if an error ocurred.
        Returns:
            [DataFrameReader]: The dataframe with the content of the table.
        """
        jdbc_reader = self.spark.read.format("jdbc").options(**jdbc_properties)
        try:
            return jdbc_reader.load()
        except Py4JJavaError as java_error:
            raise SparkServiceException(java_error) from java_error
        except IllegalArgumentException as ilegal_error:
            raise SparkServiceException(ilegal_error) from ilegal_error

    def read_from_file(
        self, file_path: str, data_format: str, parameters: dict
    ) -> DataFrameReader:
        """
        This function can read the content of a file that is stored in json, csv, parquet or hudi.
        Args:
            file_path (str): The file's path.
            data_format (str): The format of how is stored the file. E.g. csv, parquet, etc.
            parameters (dict): The different options to read the file.
        Raises:
            SparkServiceException: Raise exception if something fails.
        Returns:
            [DataFrameReader]: A dataframe with the content of the file.
        """
        try:
            return (
                self.spark.read.format(data_format)
                .options(**parameters)
                .load(file_path)
            )
        except AnalysisException as anlysis_error:
            raise SparkServiceException(anlysis_error) from anlysis_error

    def save_frame(
        self, data_frame: DataFrameReader, path: str, data_format: str, mode: str
    ):
        """
        This function will write a dataframe into a defined path.
        Args:
            data_frame (DataFrameReader): The dataframe that will be written.
            path (str): The path where the file will be stored.
            data_format (str): The data format of how the data will be stored.
            mode (str): Overwrite or Append.
        Raises:
            SparkServiceException: If an error occurred this exception will be raised
        """
        try:
            data_frame.repartition(1).write.format(data_format).mode(mode).save(path)
        except AttributeError as attribute_error:
            raise SparkServiceException(attribute_error) from attribute_error
        except Py4JJavaError as java_error:
            raise SparkServiceException(java_error) from java_error

    def stop_spark(self):
        """
        Function to stop the spark session
        """
        self.spark.stop()

I don't know what happens, if there's conflict between s3 and postgres files.

I'm new using docker and this technologies, any idea?

0

There are 0 answers