How can i add use a SparkDFDataset to my Great Expectations' validator?

143 views Asked by At

Appreciate your advice on the following issue.

I am testing if Great Expectations is viable for use on my hive tables. Ideally i would want to open a html file showing my expectations in a user friendly page.

I am not initializing Great Expectations. So basically i will enter pyspark3 and run the following commands before i eventually get a message that my spark dataframe dataset has no attribute persist:

>>> import great_expectations as ge
>>> sk = SparkSession.builder.appName("GE_TEST").getOrCreate()
>>> sk.sql("use DB1")
>>> hive_table=sk.sql("SELECT * FROM TABLEX")
>>> df_ge = ge.dataset.SparkDFDataset(hive_table)
>>> context = ge.get_context()
>>> datasource = context.sources.add_spark("my_spark_datasource")
>>> name = "my_df_asset"
>>> data_asset = datasource.add_dataframe_asset(name=name)
>>> my_batch_request = data_asset.build_batch_request(dataframe=df_ge)
>>> expectation_suite_name = "test"
>>> context.add_or_update_expectation_suite(expectation_suite_name=expectation_suite_name)
>>> validator = context.get_validator(batch_request=my_batch_request,expectation_suite_name=expectation_suite_name,)
23/11/22 17:58:26 WARN  sql.SparkSession: [Thread-3]: Using an existing Spark session; only runtime SQL configurations will take effect.
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/GX/testenv/lib64/python3.8/site-packages/great_expectations/data_context/data_context/abstract_data_context.py", line 2388, in get_validator
    return self.get_validator_using_batch_list(
  File "/GX/testenv/lib64/python3.8/site-packages/great_expectations/data_context/data_context/abstract_data_context.py", line 2444, in get_validator_using_batch_list
    validator = Validator(
  File "/GX/testenv/lib64/python3.8/site-packages/great_expectations/validator/validator.py", line 211, in __init__
    self.load_batch_list(batch_list=batches)
  File "/GX/testenv/lib64/python3.8/site-packages/great_expectations/validator/validator.py", line 322, in load_batch_list
    self._execution_engine.batch_manager.load_batch_list(batch_list=batch_list)
  File "/GX/testenv/lib64/python3.8/site-packages/great_expectations/core/batch_manager.py", line 156, in load_batch_list
    self._execution_engine.load_batch_data(
  File "/GX/testenv/lib64/python3.8/site-packages/great_expectations/execution_engine/sparkdf_execution_engine.py", line 248, in load_batch_data
    batch_data.dataframe.persist()
AttributeError: 'SparkDFDataset' object has no attribute 'persist'

Does anyone know how to get around this? Or are there alternative to acheive what I need. toPandas() seem to not work as i keep getting a java heap error from my hive_table that has at least 4 million rows with many columns.

Thank you for your all!

0

There are 0 answers