I'm refactoring a camel route to hopefully be a little more generic. (I'm also using spring boot, if that helps for any possible bean injection solutions)
from(fromKafka)
.routeId("Rest Models")
.removeHeaders("*")
.aggregate(new GroupedBodyAggregationStrategy())
.constant(true)
.completionTimeout(batchingInterval)
.process(new ListOfJsonToJsonArray())
.unmarshal().json(JsonLibrary.Jackson, InputArrayPojo.class)
.enrich("seda:rest", mergeRestResult)
the processor ListOfJsonToJsonArray()
takes the json string representation of the kafka message, and joins everything, comma separated, with a {[ ]}
on the outside.
The InputArrayPojo.class is thus a wrapper for the array of objects that are coming in from kafka. I need to bundle the objects in order to mini-batch to the REST interface in the enrichment. The objects contained are of format InputPojo.class (effectively just a schema, but also performs some basic data quality checks)
I need a way to generify InputPojo.class such that for our new jobs, we can run the same route, but supply a different InputPojo.class.
I've tried to apply polymorphism and create an interface for InputPojo, however this runs into an error when trying to construct the interface.
@JsonSubTypes({
@JsonSubTypes.Type(value=InputPojo.class, name = "online")
})
public interface InputPojoInterface {
}
I also tried some parameterisation, but I had no luck there either because it would not apply the constructor of the bean, none of the methods then existed.
I've also included
com.fasterxml.jackson.databind.exc.InvalidDefinitionException - Cannot construct instance of `InputPojoInterface` (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
at [Source: (ByteArrayInputStream); line: 1, column: 10] (through reference chain: InputArrayPojo["data"]->java.util.ArrayList[0])]
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({
"data"
})
public class InputArrayPojo{
@JsonProperty("data")
private List<InputPojo> data = null;
@JsonIgnore
private Map<String, Object> additionalProperties = new HashMap<String, Object>();
@JsonProperty("data")
public List<InputPojo> getData() {
return data;
}
@JsonProperty("data")
public void setData(List<InputPojo> data) {
this.data = data;
}
@JsonAnyGetter
public Map<String, Object> getAdditionalProperties() {
return this.additionalProperties;
}
@JsonAnySetter
public void setAdditionalProperty(String name, Object value) {
this.additionalProperties.put(name, value);
}
}
The enrichment also needs to implement some type of generifying logic
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
List<IngestionOutPojo> originalMessages = oldExchange.getIn().getBody(IngestionOutArrayPojo.class).getData();
List<PredictionPojo> enrichmentMessages = newExchange.getIn().getBody(PredictionArrayPojo.class).getData();
List<ModelResultPojo> outputList = new ArrayList<>();
for (int i = 0; i < originalMessages.size(); ++i) {
ModelResultPojo output = new ModelResultPojo();
IngestionOutPojo raw = originalMessages.get(i);
PredictionPojo enrich = enrichmentMessages.get(i);
/*
enrichment logic to create modelResult
*/
outputList.add(modelResult)
}
newExchange.getIn().setBody(outputList);
return newExchange
}
I ended up coming up with a solution by doing the following:
unmarshalled to the default type: Map<String,Object> (without specifying a class, it camel unmarshalls to a Map<String,Object>)
After that I wrote an abstract class that implements a processor. In this processor I take the Map, and apply an abstract editFields() function to the Map.
thus I now have polymorphic handling of business logic through a Map instead of through a POJO.