TensorFlowIO: Corrupted reads of pyspark compressed spark Parquet files

15 views Asked by At

I'm trying to read compressed parquet files containing tf.Examples (serialized to bytes) created by a Spark job.

When I read the compressed files using tfio.IODataset.from_parquet, there is some data corruption once the size of the records crosses a threshold. The bytes read back do not match the input bytes written to the table.

OTOH, if I write the output uncompressed, the bytes read back match the bytes written to the table exactly.

I've observed that there's a threshold at about 5073 records with a total size in the range of ~2,400,000 bytes where the compressed data read back no longer matches the input.

Any ideas what could be going on here? The threshold of 5072 seems arbitrary to me, I have a hard time believing it's a constant which factors into gzip compressed records. It's consistent with a given example schema and varying payload sizes.

If I add or remove features from the example payload, then the threshold will change.

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, BinaryType
import random
import string
import tensorflow as tf
import tensorflow_io as tfio

### Step 1: Create the input data
def make_tf_example_list(num):
    out = []
    for i in range(num):
        ex = tf.train.Example()
        ex.features.feature["int_label_1"].int64_list.value.append(random.randint(0, 1))
        ex.features.feature["bytes_label_1"].bytes_list.value.append(str(random.randint(0, 2**64)).encode())
        ex.features.feature["bytes_feature_1"].bytes_list.value.append(str(random.randint(0, 2**64)).encode())
        ex.features.feature["bytes_feature_1"].bytes_list.value.append(''.join(random.choices(string.ascii_lowercase, k=random.randint(0, 100))).encode())
        for i in range(64):
            ex.features.feature["float_feature_1"].float_list.value.append(random.random())
        ex.features.feature["int_feature_2"].int64_list.value.append(random.randint(0, 2**12))
        out.append(ex.SerializeToString())
    return out

NUM_EXAMPLES = 1024*4 + 976
schema = StructType([
    StructField("id", IntegerType(), nullable=True),
    StructField("tf_example", BinaryType(), nullable=True)
])
tf_examples = list(enumerate(make_tf_example_list(NUM_EXAMPLES)))
print("Num examples:", len(tf_examples))
print("Examples bytes:", sum([len(ex) for (i, ex) in tf_examples]))

### Step 2: Write to parquet via spark dataframe
spark = SparkSession.builder.master('test-app').getOrCreate()
out_path_sp_compress = 'file:/dbfs/tmp/out.sp.gz'
df = spark.createDataFrame(spark.sparkContext.parallelize(tf_examples), schema)
df.write.parquet(out_path_sp_compress, mode="overwrite", compression='gzip')

### Step 3: Read from parquet via tfio
filenames = tf.data.Dataset.list_files(out_path_sp_compress[5:] + '/*.parquet', shuffle=False)
ds_sp_compress = filenames.interleave(
    lambda f: tfio.IODataset.from_parquet(f, columns={"id": tf.int32, "tf_example": tf.string}),
    num_parallel_calls=1
)
ex_compress = [(ex['id'], ex['tf_example']) for ex in ds_sp_compress]

### Step 4: Compare input and output data
print("id equal to input:", (tf_examples[0][0] == ex_compress[0][0]).numpy())
print("data equal to input:", (tf_examples[0][1] == ex_compress[0][1]).numpy())

Here are some experiments I've run and the results

Records Size (b) In, Out Match
4880 2310636 Matched
5064 2401177 Matched
5064 2400739 Matched
5096 2415187 Not matched
5096 2414892 Not matched
5080 2406966 Not matched
5072 2404494 Matched
5073 2406032 Not Matched
5072 2407153 Matched
0

There are 0 answers