Polymorphic Json Marshalling with Apache Camel

499 views Asked by At

I'm refactoring a camel route to hopefully be a little more generic. (I'm also using spring boot, if that helps for any possible bean injection solutions)

from(fromKafka)
                .routeId("Rest Models")
                .removeHeaders("*")
                .aggregate(new GroupedBodyAggregationStrategy())
                .constant(true)
                .completionTimeout(batchingInterval)
                .process(new ListOfJsonToJsonArray())
                .unmarshal().json(JsonLibrary.Jackson, InputArrayPojo.class)
                .enrich("seda:rest", mergeRestResult)

the processor ListOfJsonToJsonArray() takes the json string representation of the kafka message, and joins everything, comma separated, with a {[ ]} on the outside.

The InputArrayPojo.class is thus a wrapper for the array of objects that are coming in from kafka. I need to bundle the objects in order to mini-batch to the REST interface in the enrichment. The objects contained are of format InputPojo.class (effectively just a schema, but also performs some basic data quality checks)

I need a way to generify InputPojo.class such that for our new jobs, we can run the same route, but supply a different InputPojo.class.

I've tried to apply polymorphism and create an interface for InputPojo, however this runs into an error when trying to construct the interface.

@JsonSubTypes({
        @JsonSubTypes.Type(value=InputPojo.class, name = "online")
})
public interface InputPojoInterface {
}

I also tried some parameterisation, but I had no luck there either because it would not apply the constructor of the bean, none of the methods then existed.

I've also included

com.fasterxml.jackson.databind.exc.InvalidDefinitionException - Cannot construct instance of `InputPojoInterface` (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
 at [Source: (ByteArrayInputStream); line: 1, column: 10] (through reference chain: InputArrayPojo["data"]->java.util.ArrayList[0])]
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({
        "data"
})
public class InputArrayPojo{

    @JsonProperty("data")
    private List<InputPojo> data = null;
    @JsonIgnore
    private Map<String, Object> additionalProperties = new HashMap<String, Object>();

    @JsonProperty("data")
    public List<InputPojo> getData() {
        return data;
    }

    @JsonProperty("data")
    public void setData(List<InputPojo> data) {
        this.data = data;
    }

    @JsonAnyGetter
    public Map<String, Object> getAdditionalProperties() {
        return this.additionalProperties;
    }

    @JsonAnySetter
    public void setAdditionalProperty(String name, Object value) {
        this.additionalProperties.put(name, value);
    }

}

The enrichment also needs to implement some type of generifying logic

@Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {

        List<IngestionOutPojo> originalMessages = oldExchange.getIn().getBody(IngestionOutArrayPojo.class).getData();
        List<PredictionPojo> enrichmentMessages = newExchange.getIn().getBody(PredictionArrayPojo.class).getData();
        List<ModelResultPojo> outputList = new ArrayList<>();

        for (int i = 0; i < originalMessages.size(); ++i) {
            ModelResultPojo output = new ModelResultPojo();
            IngestionOutPojo raw = originalMessages.get(i);
            PredictionPojo enrich = enrichmentMessages.get(i);
            /*
            enrichment logic to create modelResult
            */
            outputList.add(modelResult)
    }
    newExchange.getIn().setBody(outputList);
    return newExchange
}
1

There are 1 answers

1
David Parker On BEST ANSWER

I ended up coming up with a solution by doing the following:

unmarshalled to the default type: Map<String,Object> (without specifying a class, it camel unmarshalls to a Map<String,Object>)

After that I wrote an abstract class that implements a processor. In this processor I take the Map, and apply an abstract editFields() function to the Map.

thus I now have polymorphic handling of business logic through a Map instead of through a POJO.