SparkSQL with ScalaPB: Error while skipping proto fields while converting from DataFrame to proto dataset

172 views Asked by At

I have the following proto message, that needs to be written through Spark using ScalaPB:

message EnforcementData
{
  required int32 id = 1;
  required int32 source = 2;
  required int32 flagsEnforceOption = 4;
  required int32 categoryEnforceOption = 5;

  optional TypeA a= 100;
  optional TypeB b= 101;
}

TypeA and TypeB are child classes of EnforcementData on the receiver side, which uses protobuf-net to deserialize the same.

Now, my Spark dataframe can either have column a or column b. Suppose, df is my dataframe, I call the following:

  • df.withColumn(b, null).as[EnforcementData].map(_.toByteArray) for TypeA messages
  • df.withColumn(a, null).as[EnforcementData].map(_.toByteArray) for TypeB messages

But the receiver which deserializes the message using protobuf-net, throws StackOverflow exception. I also tried passing a dummy case class instead of null and still it does not seem to work.

Please let me know how to handle this?

1

There are 1 answers

0
Bharadwaj Jayaraman On BEST ANSWER

I was able to resolve this by reconstructing the case class and explicitly skipping the optional child class fields. i.e.

 //for TypeA messages,

 df.withColumn(b, null)
   .as[EnforcementData]
   .map{case EnforcementData(id, source, flag, cat, a, _) => EnforcementData(id, source, flag, cat, a = a) 
   } 

 //for TypeB messages,    

 df.withColumn(s, null)
   .as[EnforcementData]
   .map{case EnforcementData(id, source, flag, cat, _, b) => EnforcementData(id, source, flag, cat, b = b) 
    }