How do you set the row group size of files in hdfs?

4.2k views Asked by At

I am running some experiments on block size (dfs.block.size) and row group size (parquet.block.size) in hdfs.

I have a large set of data in hdfs, and I want to replicate the data with various block sizes and row group sizes for testing. I'm able to copy the data with a different block size using:

hdfs dfs -D dfs.block.size=67108864 -D parquet.block.size=67108864 -cp /new_sample_parquet /new_sample_parquet_64M

But only the dfs.block.size gets changed. I am verifying with hdfs dfs -stat for the block size, and parquet-tools meta for the row group size. In fact, if I replace parquet.block.size with blah.blah.blah it has the same effect. I even went into spark-shell and set the parquet.block.size property manually using

sc.hadoopConfiguration.setInt("parquet.block.size", 67108864).

I am using hadoop 3.1.0. I got the property name of parquet.block.size from here.

Here is the first 10 rows of the output of my attempt

row group 1:                    RC:4140100 TS:150147503 OFFSET:4
row group 2:                    RC:3520100 TS:158294646 OFFSET:59176084
row group 3:                    RC:880100 TS:80122359 OFFSET:119985867
row group 4:                    RC:583579 TS:197303521 OFFSET:149394540
row group 5:                    RC:585594 TS:194850776 OFFSET:213638039
row group 6:                    RC:2620100 TS:130170698 OFFSET:277223867
row group 7:                    RC:2750100 TS:136761819 OFFSET:332088066
row group 8:                    RC:1790100 TS:86766854 OFFSET:389772650
row group 9:                    RC:2620100 TS:125876377 OFFSET:428147454
row group 10:                   RC:1700100 TS:83791047 OFFSET:483600973

As you can se, the TS (total size) is way larger than 64MB (67108864 bytes)

My current theory:

I am doing this in spark-shell:

sc.hadoopConfiguration.setInt("parquet.block.size", 67108864)
val a = spark.read.parquet("my_sample_data")
a.rdd.getNumPartitions // 1034
val s = a.coalesce(27)
s.write.format("parquet").mode("Overwrite").options(Map("dfs.block.size" -> "67108864")).save("/my_new_sample_data")

So perhaps it's because my input data already has 1034 partitions. I'm really not sure. My data has about 118 columns per row.

1

There are 1 answers

12
Zoltan On BEST ANSWER

The parquet.block.size property only affects Parquet writers. The hdfs dfs -cp command copies files regardless of their contents on the other hand. The parquet.block.size property is therefore ignored by hdfs dfs -cp.

Imagine that you have an application that takes screenshots in either JPG or PNG format, depending on a config file. You make a copy of those screenshots with the cp command. Naturally, even if you change the desired image format in the config file, the cp command will always create output files in the image format of the original files, regardless of the config file. The config file is only used by the screenshot application and not by cp. This is how the parquet.block.size property works as well.

What you can do to change the block size is to rewrite the file. You mentioned that you have spark-shell. Use that to rewrite the Parquet file by issuing

sc.hadoopConfiguration.setInt("parquet.block.size", 67108864)
var df = spark.read.parquet("/path/to/input.parquet")
df.write.parquet("/path/to/output")

Update: Since you mentioned in the comments below that it does not work for you, I made an experiment and posting the session transcript below:

$ spark-shell
scala> sc.hadoopConfiguration.setInt("parquet.block.size", 200000)
scala> var df = spark.read.parquet("/tmp/infile.parquet")
df: org.apache.spark.sql.DataFrame = [field0000: binary, field0001: binary ... 78 more fields]
scala> df.write.parquet("/tmp/200K")
scala> df.write.format("parquet").mode("Overwrite").options(Map("parquet.block.size" -> "300000")).save("/tmp/300K")
scala> :quit
$ hadoop fs -copyToLocal /tmp/{200K,300K} /tmp
$ parquet-tools meta /tmp/infile.parquet | grep "row group" | head -n 3
row group 1:  RC:4291 TS:5004800 OFFSET:4
row group 2:  RC:3854 TS:4499360 OFFSET:5004804
row group 3:  RC:4293 TS:5004640 OFFSET:10000000
$ parquet-tools meta /tmp/200K/part-00000-* | grep "row group" | head -n 3
row group 1:   RC:169 TS:202080 OFFSET:4
row group 2:   RC:168 TS:201760 OFFSET:190164
row group 3:   RC:169 TS:203680 OFFSET:380324
$ parquet-tools meta /tmp/300K/part-00000-* | grep "row group" | head -n 3
row group 1:   RC:254 TS:302720 OFFSET:4
row group 2:   RC:255 TS:303280 OFFSET:284004
row group 3:   RC:263 TS:303200 OFFSET:568884

By looking at the TS values you can see that the input file had a row group size of 4.5-5M and the output files have row groups sizes of 200K and 300K, respectively. This shows that the value set using sc.hadoopConfiguration becomes the "default", while the other method you mentioned in a comment below involving df.options overrides this default.

Update 2: Now that you have posted your output, I can see what is going on. In your case, compression is taking place, increasing the amount of data that will fit in row groups. The row group size applies to the compressed data, but TS shows the size of uncompressed data. However, you can deduce the size of row groups by subtracting their starting offsets. For example, the compressed size of your first row group is 59176084 - 4 = 59176080 bytes or less (since padding can take place as well). I copied your results into /tmp/rowgroups.dat on my computer and calculated your row group sizes by issuing the following command:

$ cat /tmp/rowgroups.dat | sed 's/.*OFFSET://' | numinterval
59176080
60809783
29408673
64243499
63585828
54864199
57684584
38374804
55453519

(The numinterval command is in the num-utils package on Ubuntu.) As you can see, all of your row groups are smaller than the row group size you specified. (The reason why they are not exactly the specified size is PARQUET-1337.)