I'm trying to read compressed parquet files containing tf.Examples (serialized to bytes) created by a Spark job.
When I read the compressed files using tfio.IODataset.from_parquet, there is some data corruption once the size of the records crosses a threshold. The bytes read back do not match the input bytes written to the table.
OTOH, if I write the output uncompressed, the bytes read back match the bytes written to the table exactly.
I've observed that there's a threshold at about 5073 records with a total size in the range of ~2,400,000 bytes where the compressed data read back no longer matches the input.
Any ideas what could be going on here? The threshold of 5072 seems arbitrary to me, I have a hard time believing it's a constant which factors into gzip compressed records. It's consistent with a given example schema and varying payload sizes.
If I add or remove features from the example payload, then the threshold will change.
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, BinaryType
import random
import string
import tensorflow as tf
import tensorflow_io as tfio
### Step 1: Create the input data
def make_tf_example_list(num):
out = []
for i in range(num):
ex = tf.train.Example()
ex.features.feature["int_label_1"].int64_list.value.append(random.randint(0, 1))
ex.features.feature["bytes_label_1"].bytes_list.value.append(str(random.randint(0, 2**64)).encode())
ex.features.feature["bytes_feature_1"].bytes_list.value.append(str(random.randint(0, 2**64)).encode())
ex.features.feature["bytes_feature_1"].bytes_list.value.append(''.join(random.choices(string.ascii_lowercase, k=random.randint(0, 100))).encode())
for i in range(64):
ex.features.feature["float_feature_1"].float_list.value.append(random.random())
ex.features.feature["int_feature_2"].int64_list.value.append(random.randint(0, 2**12))
out.append(ex.SerializeToString())
return out
NUM_EXAMPLES = 1024*4 + 976
schema = StructType([
StructField("id", IntegerType(), nullable=True),
StructField("tf_example", BinaryType(), nullable=True)
])
tf_examples = list(enumerate(make_tf_example_list(NUM_EXAMPLES)))
print("Num examples:", len(tf_examples))
print("Examples bytes:", sum([len(ex) for (i, ex) in tf_examples]))
### Step 2: Write to parquet via spark dataframe
spark = SparkSession.builder.master('test-app').getOrCreate()
out_path_sp_compress = 'file:/dbfs/tmp/out.sp.gz'
df = spark.createDataFrame(spark.sparkContext.parallelize(tf_examples), schema)
df.write.parquet(out_path_sp_compress, mode="overwrite", compression='gzip')
### Step 3: Read from parquet via tfio
filenames = tf.data.Dataset.list_files(out_path_sp_compress[5:] + '/*.parquet', shuffle=False)
ds_sp_compress = filenames.interleave(
lambda f: tfio.IODataset.from_parquet(f, columns={"id": tf.int32, "tf_example": tf.string}),
num_parallel_calls=1
)
ex_compress = [(ex['id'], ex['tf_example']) for ex in ds_sp_compress]
### Step 4: Compare input and output data
print("id equal to input:", (tf_examples[0][0] == ex_compress[0][0]).numpy())
print("data equal to input:", (tf_examples[0][1] == ex_compress[0][1]).numpy())
Here are some experiments I've run and the results
| Records | Size (b) | In, Out Match |
|---|---|---|
| 4880 | 2310636 | Matched |
| 5064 | 2401177 | Matched |
| 5064 | 2400739 | Matched |
| 5096 | 2415187 | Not matched |
| 5096 | 2414892 | Not matched |
| 5080 | 2406966 | Not matched |
| 5072 | 2404494 | Matched |
| 5073 | 2406032 | Not Matched |
| 5072 | 2407153 | Matched |