Unmarshalling protobufs in Scala

147 views Asked by At

I have previously got Unmarshalling working in Scala using ScalaPB and the following marshallers:

implicit def marshaller[T <: GeneratedMessage]: ToEntityMarshaller[T] = PredefinedToEntityMarshallers.ByteArrayMarshaller.compose[T](r => r.toByteArray)

implicit def unmarshaller[T <: GeneratedMessage with GeneratedMessageCompanion[Request]](implicit companion: GeneratedMessageCompanion[Request]): FromEntityUnmarshaller[Request] = {
    Unmarshaller.byteArrayUnmarshaller.map[Request](bytes => companion.parseFrom(bytes))
  }

This allows my Route to accept incoming messages of type Request, defined as:

syntax = "proto3";

package PROTOS;

option java_package = "hydra.core.messaging.protobuf";

message RegisterRequest {
  string username = 1;
  optional string password = 2;
}

message Request {
  string hostID = 1;

  oneof requestType {
    RegisterRequest registerRequest = 2;
  }
}

I have added another Route to the system, which takes in DataRequest types. This is defined as:

syntax = "proto3";

package PROTOS;

option java_package = "hydra.core.messaging.protobuf";

message DataRequest {
  string hostID = 1;
  string data = 2;
}

As a result, I have modified my AKKA actors and routes to use wildcard types for the type of messages they take in and respond with, defined as:

  final case class ActorRequest[T, E](request: T, replyTo: ActorRef[ActorResponse[E]])

  final case class ActorResponse[T](response: T)

To reduce having duplicate code, I moved the Route creation into the super class. The super-class Layer looks like:

trait Marshalling extends DefaultJsonProtocol with SprayJsonSupport {
  
  implicit def marshaller[E <: GeneratedMessage]: ToEntityMarshaller[E] = PredefinedToEntityMarshallers.ByteArrayMarshaller.compose[E](r => r.toByteArray)

  implicit def unmarshaller[T <: GeneratedMessage with GeneratedMessageCompanion[T]](implicit companion: GeneratedMessageCompanion[T]): FromEntityUnmarshaller[T] = {
    Unmarshaller.byteArrayUnmarshaller.map[T](bytes => companion.parseFrom(bytes))
  }
  
}

abstract class Layer[T <: GeneratedMessage, E <: GeneratedMessage](name: String, directivePath: String)
  extends CORSHandler with Marshalling {

  implicit val timeout: Timeout = Timeout.create(SYSTEM.settings.config.getDuration("my-app.routes.ask-timeout"))

  private var systemActor: ActorRef[ActorRequest[T, E]] = null

  def createResponse(request: T): ActorResponse[E]

  private def createRoutes(): Route = {
    pathPrefix(HOST_ID) {
      path(directivePath) {
        post {
          entity(as[T]) { request =>
            onComplete(handle(request)) {
              case Success(response) =>
                complete(response.response)
              case Failure(exception) => complete(InternalServerError, s"An error occurred ${exception.getMessage}")
            }
          }
        }
      }
    }
  }

...
}

When switching to the wildcard Unmarshaller, I get the following error:

I found:

    akka.http.scaladsl.unmarshalling.Unmarshaller.
      messageUnmarshallerFromEntityUnmarshaller[T](
      akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport.
        sprayJsonUnmarshaller[T](/* missing */summon[spray.json.RootJsonReader[T]])
    )

But no implicit values were found that match type spray.json.RootJsonReader[T].
          entity(as[T]) { request =>

Is there anyone who is an expert in this that can help me identify the issue? The error seems to be complaining it is not a FromRequestMarshaller but neither was the Unmarshaller previously when the class type was defined. Any suggestions?

Minimal reproducible example: https://github.com/ritcat14/hydra_broken_marshalling

2

There are 2 answers

1
thesamet On BEST ANSWER

The implicit def unmarshaller in trait Marshalling can't be used from the Layer class: the unmarshaller needs a GeneratedMessageCompanion[T], but the Layer class does not have the guarantee that such a companion will be available for a T that it would instantiate for, and therefore you get a compile error. The solution would be to add the implicit companion as a constructor parameter to class Layer so it can be provided to `def unmarshaller.

This would be the minimal definition for Marshalling (the unnecessary JSON stuff cleared out, but that wasn't the cause of the issue):

trait Marshalling[T <: GeneratedMessage, E <: GeneratedMessage] {
  implicit def protobufMarshaller: ToEntityMarshaller[E] = PredefinedToEntityMarshallers.ByteArrayMarshaller.compose[E](r => r.toByteArray)

  implicit def protobufUnmarshaller(implicit companion: GeneratedMessageCompanion[T]): FromEntityUnmarshaller[T] = {
    Unmarshaller.byteArrayUnmarshaller.map[T](bytes => companion.parseFrom(bytes))
  }
}

Then, the Layer class signature can capture the implicit companion:

abstract class Layer[T <: GeneratedMessage, E <: GeneratedMessage](name: String, directivePath: String)(implicit cmp: GeneratedMessageCompanion[T])
  extends CORSHandler with Marshalling[T, E] {`

however, since the instance cmp isn't really needed directly in the implementation of Layer, this could be rewritten as:

abstract class Layer[T <: GeneratedMessage : GeneratedMessageCompanion, E <: GeneratedMessage](name: String, directivePath: String)
  extends CORSHandler with Marshalling[T, E] {
6
Kris Rice On

So, whilst this "solves" the problem, it is a nasty work around and puts the fix on the protobuf side rather than fixing the wildcard marshalling, but for now this works.

I created a ProtoMessage protobuf that only has a oneof field of ALL message types, as such:

syntax = "proto3";

import "Request.proto";
import "DataRequest.proto";
import "Response.proto";
import "DataResponse.proto";

package PROTOS;

option java_package = "hydra.core.messaging.protobuf";

message UnknownType {}

message ProtoMessage {
  string hostID = 1;

  oneof messageType {
    Request request = 2;
    DataRequest dataRequest = 3;

    Response response = 4;
    DataResponse dataResponse = 5;

    UnknownType unknownType = 6;
  }
}

I then marshall/unmarshall this object instead:

  implicit def protobufMarshaller[T <: GeneratedMessage]: ToEntityMarshaller[ProtoMessage] = PredefinedToEntityMarshallers.ByteArrayMarshaller.compose[ProtoMessage](r => r.toByteArray)

  implicit def requestMarshaller[T <: GeneratedMessage with GeneratedMessageCompanion[ProtoMessage]](implicit companion: GeneratedMessageCompanion[ProtoMessage]): FromEntityUnmarshaller[ProtoMessage] = {
    Unmarshaller.byteArrayUnmarshaller.map[ProtoMessage](bytes => companion.parseFrom(bytes))
  }

And I replaced all the wildcard class types with ProtoMessage.

This is A solution, yes. Is it horrible? Absolutely. Is it a permanent fix? Nope. Still open to suggestions as with this fix, I have to filter on the messageType field every time I receive a message on each Route.