I'm using Great Expectations library for data validation. I'm trying to create a custom expectation with SQL data source (using SqlAlchemyExecutionEngine
) which takes some columns on the input and uses a custom method to calculate True/False based on these values. It works fine in sample test (using Pandas), but breaks down when using SQL. There is no data in the column_list
variable, just some SQL Alchemy objects for data access.
I checked the documentation which mentions the problem (they say it's a feature) I'm facing. In the second option they mention possibility to execute SQL query in the expectation, but I don't need (nor want) this, I just need the data per row.
Here is the actual code:
"""
This is a template for creating custom MulticolumnMapExpectations.
For detailed instructions on how to use it, please see:
https://docs.greatexpectations.io/docs/guides/expectations/creating_custom_expectations/how_to_create_custom_multicolumn_map_expectations
"""
from typing import Optional
from great_expectations.core.expectation_configuration import ExpectationConfiguration
from great_expectations.compatibility.sqlalchemy import sqlalchemy as sa
from great_expectations.core.metric_domain_types import MetricDomainTypes
from great_expectations.exceptions import InvalidExpectationConfigurationError
from great_expectations.execution_engine import (
PandasExecutionEngine,
SparkDFExecutionEngine,
SqlAlchemyExecutionEngine,
)
from great_expectations.expectations.expectation import MulticolumnMapExpectation
from great_expectations.expectations.metrics.map_metric_provider import (
MulticolumnMapMetricProvider,
multicolumn_condition_partial,
# metric_partial
)
import pandas as pd
from d_dquality.expectations.helper_expect_seats import check_vehicle_seats
# This class defines a Metric to support your Expectation.
# For most MulticolumnMapExpectations, the main business logic for calculation will live in this class.
class MulticolumnValuesSensibleVehicleSeats(MulticolumnMapMetricProvider):
# This is the id string that will be used to reference your metric.
condition_metric_name = "multicolumn_values.sensible_vehicle_seats"
# These point your metric at the provided keys to facilitate calculation
condition_domain_keys = (
"batch_id",
"table",
"column_list",
"row_condition",
"condition_parser",
"ignore_row_if",
)
condition_value_keys = ()
# This method implements the core logic for the PandasExecutionEngine
@multicolumn_condition_partial(engine=PandasExecutionEngine)
def _pandas(cls, column_list, **kwargs):
_vehicle_type = column_list['vehicle_type'].values[0]
_seats = column_list['seats'].values[0]
result = check_vehicle_seats(vehicle_type=_vehicle_type, seats=_seats)
return pd.Series([result])
# This method defines the business logic for evaluating your metric when using a SqlAlchemyExecutionEngine
@multicolumn_condition_partial(engine=SqlAlchemyExecutionEngine)
def _sqlalchemy(cls, column_list, **kwargs):
breakpoint()
return column_list[0].in_(['BUS'])
# This class defines the Expectation itself
class ExpectVehicleSeatsToBeSensible(MulticolumnMapExpectation):
"""Expect number of seats to be in certain range for given vehicle type."""
# These examples will be shown in the public gallery.
# They will also be executed as unit tests for your Expectation.
examples = [
{
"data": {
"vehicle_type": ['TRAILER'],
"seats": [0]
},
"only_for": ["pandas"],
"tests": [
{
"title": "basic_positive_test",
"exact_match_out": False,
"include_in_gallery": True,
"in": {"column_list": ["vehicle_type", "seats"]},
"out": {
"success": True,
},
}
],
},
{
"data": {
"vehicle_type": ['VEHICLE'],
"seats": [5]
},
"only_for": ["pandas"],
"tests": [
{
"title": "basic_positive_test",
"exact_match_out": False,
"include_in_gallery": True,
"in": {"column_list": ["vehicle_type", "seats"]},
"out": {
"success": True,
},
}
],
},
{
"data": {
"vehicle_type": ['VEHICLE'],
"seats": [0]
},
"only_for": ["pandas"],
"tests": [
{
"title": "basic_negative_test",
"exact_match_out": False,
"include_in_gallery": True,
"in": {"column_list": ["vehicle_type", "seats"]},
"out": {
"success": False,
},
}
],
}
]
# This is the id string of the Metric used by this Expectation.
# For most Expectations, it will be the same as the `condition_metric_name` defined in your Metric class above.
map_metric = "multicolumn_values.sensible_vehicle_seats"
# This is a list of parameter names that can affect whether the Expectation evaluates to True or False
success_keys = (
"column_list",
"mostly",
)
# This dictionary contains default values for any parameters that should have default values
default_kwarg_values = {}
def validate_configuration(
self, configuration: Optional[ExpectationConfiguration] = None
) -> None:
"""
Validates that a configuration has been set, and sets a configuration if it has yet to be set. Ensures that
necessary configuration arguments have been provided for the validation of the expectation.
Args:
configuration (OPTIONAL[ExpectationConfiguration]): \
An optional Expectation Configuration entry that will be used to configure the expectation
Returns:
None. Raises InvalidExpectationConfigurationError if the config is not validated successfully
"""
super().validate_configuration(configuration)
configuration = configuration or self.configuration
print("validation")
print(configuration)
# breakpoint()
# # Check other things in configuration.kwargs and raise Exceptions if needed
# try:
# assert (
# ...
# ), "message"
# assert (
# ...
# ), "message"
# except AssertionError as e:
# raise InvalidExpectationConfigurationError(str(e))
# This object contains metadata for display in the public Gallery
library_metadata = {
"tags": [], # Tags for this Expectation in the Gallery
"contributors": [ # Github handles for all contributors to this Expectation.
"@your_name_here", # Don't forget to add your github handle here!
],
}
if __name__ == "__main__":
ExpectVehicleSeatsToBeSensible().print_diagnostic_checklist()
Maybe I got this all wrong...anyway, I'd appreciate any advice.