Possible bug in using Pyarrow is_null function with delta tables

212 views Asked by At

I've noticed an issue while trying to apply filters on Pyarrow datasets initialised from delta tables. Specifically, the is_null expression predicate only seems to return rows if all the rows in the particular partition/parquet file have null values for the filtered column. I'm using the delta-rs library to create the Pyarrow dataset from the Delta table so I'm unsure if this is a bug in the Pyarrow library or the delta-rs library or if it is even a bug and I'm using the wrong operation for my use-case.

I've created a minimal example using the following data written to a delta table using Pyspark:

import pyspark
from delta import configure_spark_with_delta_pip
from pyspark.sql.types import StructType, StructField, StringType

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()

data = [['a', None],
        ['a', 'exists'],
        ['b', None],
        ['b', None]]

schema = StructType([
    StructField("key", StringType()),
    StructField('null_column', StringType())
])

spark_df = spark.createDataFrame(data=data, schema=schema)
spark_df.repartition(2, "key").write.format("delta").mode("overwrite").save("./table")

This results in the following entry in the delta_log (included to show that the log correctly identifies all 3 null entries):

{"commitInfo":{"timestamp":1687784715118,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"2","numOutputRows":"4","numOutputBytes":"1436"},"engineInfo":"Apache-Spark/3.4.0 Delta-Lake/2.4.0","txnId":"d718e2b3-c062-4783-987f-97cab1c5f110"}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"eec58790-7456-4bfb-9fd5-6a576e46851b","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"key\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"null_column\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1687784710499}}
{"add":{"path":"part-00000-ada91cf1-2cd6-4928-af3b-0b1bcc987bfc-c000.snappy.parquet","partitionValues":{},"size":745,"modificationTime":1687784712864,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"key\":\"a\",\"null_column\":\"exists\"},\"maxValues\":{\"key\":\"a\",\"null_column\":\"exists\"},\"nullCount\":{\"key\":0,\"null_column\":1}}"}}
{"add":{"path":"part-00001-85bbf245-a29d-40bc-be96-86391569710c-c000.snappy.parquet","partitionValues":{},"size":691,"modificationTime":1687784712854,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"key\":\"b\"},\"maxValues\":{\"key\":\"b\"},\"nullCount\":{\"key\":0,\"null_column\":2}}"}}

Now, in a separate file, I ran the following code using the delta-rs library to try to extract all rows that have null values as the entry for null_column:

from deltalake import DeltaTable
import pyarrow.compute as pc

ds = DeltaTable("./table").to_pyarrow_dataset()
pat = ds.to_table(filter=(pc.field("null_column").is_null()))
print(pat.num_rows)
print(pat.to_pandas().head())

I expected the result to have 3 rows (2 with key 'b' and 1 with key 'a') but it only has the 2 rows with key 'b'. Any inputs on whether this is a bug or the expected behaviour would be great. Additionally, if this is indeed the expected behaviour, I'd really appreciate some help on how to get what I need i.e. all 3 rows. Thanks!

0

There are 0 answers