Null Check on Columns taking longer time to complete

58 views Asked by At

Need to check null on dataframe columns and update a dataframe column in efficient way.

For ex - dataframe is -

enter image description here

And doing null check on every column(Column1 to Column20) and updating error_notes as shown below -

val df1 = data.withColumn("error_notes", when(col(column1).isNull, concat_WS("!", Column1 is null, col("error_note")).otherwise("error_notes"))

val df2 = df1.withColumn("error_notes", when(col(column2).isNull, concat_WS("!", Column2 is null, col("error_notes")).otherwise("error_notes"))


val df3 = df2.withColumn("error_notes", when(col(column3).isNull, concat_WS("!", Column3 is null, col("error_notes")).otherwise("error_notes"))

.
.
.
.

val df20 = df19.withColumn("error_notes", when(col(column20).isNull, concat_WS("!", Column20 is null, col("error_notes")).otherwise("error_notes"))

While executing null check for all the columns and updating error_note column taking longer time(almost 4 hours) to finish as dataframe size is huge. Is there any efficient and performant way to resolve this issue.

3

There are 3 answers

0
pasha701 On

Checks for all columns can be prepared, and method withColumn used only once:

val columnsToCheck = Seq("column1", "column2", "column3")
val df = Seq(
  (Some(1), Some(2), Some(3)),
  (Some(4), None, None)
).toDF(columnsToCheck: _*)

val errorMessages = columnsToCheck.map(colName =>
  when(col(colName).isNull, lit(colName + " is null")).otherwise(null)
)

val result = df.withColumn("error_note", concat_ws("!", errorMessages: _*))

Result is:

+-------+-------+-------+-------------------------------+
|column1|column2|column3|error_note                     |
+-------+-------+-------+-------------------------------+
|1      |2      |3      |                               |
|4      |null   |null   |column2 is null!column3 is null|
+-------+-------+-------+-------------------------------+
0
Chris On

Do not use withColumn, it's the primary reason for the slowdown, you should also avoid strings and instead use dedicated status fields (see Quality's storage model for a less wasteful and faster example).

The only reason I add this in addition to pasha701's is that you will find an upper limit to the number of when clauses you can add to calculate with, they will be compiled out to a single function which cannot escape the 64k code size restriction on every spark version. Some versions will throw a janino error directly because of this and others will blow the stack looking to optimise. Quality's approach for such rules handles this directly. YMMV and pasha701's response otherwise fits the bill.

1
Junhua.xie On

If you have a pk for the dataframe, maybe you can use join to speed up . like this:

val df1 = data.select("pk", "column1")
val df2 = df1.where("column1 is null")
val df3 = df2.withColumn("error_notes", concat_WS("!", Column1 is null, col("error_note")))
val df4 = df2.join(df3, Seq("pk"))