Python unit tests for Foundry's transforms?

946 views Asked by At

I would like to set up tests on my transforms into Foundry, passing test inputs and checking that the output is the expected one. Is it possible to call a transform with dummy datasets (.csv file in the repo) or should I create functions inside the transform to be called by the tests (data created in code)?

1

There are 1 answers

0
vanhooser On

If you check your platform documentation under Code Repositories -> Python Transforms -> Python Unit Tests, you'll find quite a few resources there that will be helpful.

The sections on writing and running tests in particular is what you're looking for.


// START DOCUMENTATION

Writing a Test

Full documentation can be found at https://docs.pytest.org

Pytest finds tests in any Python file that begins with test_. It is recommended to put all your tests into a test package under the src directory of your project. Tests are simply Python functions that are also named with the test_ prefix and assertions are made using Python’s assert statement. PyTest will also run tests written using Python’s builtin unittest module. For example, in transforms-python/src/test/test_increment.py a simple test would look like this:

def increment(num):
     return num + 1

def test_increment():
     assert increment(3) == 5

Running this test will cause checks to fail with a message that looks like this:

============================= test session starts =============================
collected 1 item

test_increment.py F                                                       [100%]

================================== FAILURES ===================================
_______________________________ test_increment ________________________________

    def test_increment():
>       assert increment(3) == 5
E       assert 4 == 5
E        +  where 4 = increment(3)

test_increment.py:5: AssertionError
========================== 1 failed in 0.08 seconds ===========================

Testing with PySpark

PyTest fixtures are a powerful feature that enables injecting values into test functions simply by adding a parameter of the same name. This feature is used to provide a spark_session fixture for use in your test functions. For example:

def test_dataframe(spark_session):
    df = spark_session.createDataFrame([['a', 1], ['b', 2]], ['letter', 'number'])
    assert df.schema.names == ['letter', 'number']

// END DOCUMENTATION


If you don't want to specify your schemas in code, you can also read in a file in your repository by following the instructions in documentation under How To -> Read file in Python repository


// START DOCUMENTATION

Read file in Python repository

You can read other files from your repository into the transform context. This might be useful in setting parameters for your transform code to reference.

To start, In your python repository edit setup.py:

setup(
    name=os.environ['PKG_NAME'],
# ...
    package_data={
        '': ['*.yaml', '*.csv']
    }
)

This tells python to bundle the yaml and csv files into the package. Then place a config file (for example config.yaml, but can be also csv or txt) next to your python transform (e.g. read_yml.py see below):

- name: tbl1
  primaryKey:
  - col1
  - col2
  update:
  - column: col3
    with: 'XXX'

You can read it in your transform read_yml.py with the code below:

from transforms.api import transform_df, Input, Output
from pkg_resources import resource_stream
import yaml
import json

@transform_df(
    Output("/Demo/read_yml")
)
def my_compute_function(ctx):
    stream = resource_stream(__name__, "config.yaml")
    docs = yaml.load(stream)
    return ctx.spark_session.createDataFrame([{'result': json.dumps(docs)}])

So your project structure would be:

  • some_folder
    • config.yaml
    • read_yml.py

This will output in your dataset a single row with one column "result" with content:

[{"primaryKey": ["col1", "col2"], "update": [{"column": "col3", "with": "XXX"}], "name": "tbl1"}]

// END DOCUMENTATION