I'm using Esper 8.9 for complex event processing in a Spring Boot application. Despite configuring my events and patterns correctly, the getUnderlying() method in my UpdateListener implementation returns null for events that successfully match my patterns. Here's a simplified version of my setup:
@Bean
public IntegrationFlow mqttEventInboundFlow(@Qualifier("mqttClientFactoryAasEvents") MqttPahoClientFactory factory) {
ObjectMapper objectMapper = new ObjectMapper();
return IntegrationFlow.from(mqttEventInboundAdapter(factory))
.channel(mqttInputChannelEventInbound())
.handle(message -> {
String jsonPayload = (String) message.getPayload();
EventWrapperDto payload;
try {
payload = objectMapper.readValue(jsonPayload, EventWrapperDto.class);
log.info("Sending event to Esper: {}", objectMapper.writeValueAsString(payload));
log.info("The pojo of the payload {}", payload);
epService.getEventService().sendEventBean(payload, "EventWrapperDto");
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
log.info("Received message");
})
.get();
}
statement.addListener((newData, oldData, stmt, rt) -> {
log.info("Received new data: {}", newData.toString());
log.info("Received old data: {}", oldData);
if (newData != null) {
try {
List<EventWrapperDto> matchedEvents = Arrays.stream(newData)
.map(eventBean -> {
log.info("Received eventBean: {}", eventBean + " toString: " + eventBean.toString());
log.info("Underlying instance type: {}", eventBean.getUnderlying().getClass().getName());
log.info("Underlying map contents: {}", eventBean.getUnderlying());
EventWrapperDto eventWrapperDto = (EventWrapperDto) eventBean.getUnderlying();
log.info("Converted to EventWrapperDto: {}", eventWrapperDto);
return eventWrapperDto;
})
.toList();
log.info("Pattern found, combining events");
} catch (Exception e) {
log.error("Error processing events", e);
}
and the Esper configuration is this
@Configuration
public class EsperConfiguration {
@Bean
public EPRuntime epServiceProvider() {
com.espertech.esper.common.client.configuration.Configuration config = new com.espertech.esper.common.client.configuration.Configuration();
config.getCommon().addEventType("EventWrapperDto", EventWrapperDto.class.getName());
return EPRuntimeProvider.getDefaultRuntime(config);
}
}
the logs are:
2024-02-22T17:59:56.080+02:00 INFO 34396 --- [05-8be8cf13b987] c.e.i.a.port.in.mqtt.MqttEventInbound : Sending event to Esper: {"event":{"source":{"timestamp":1708617595993,"value":"0","assetName":"IF","name":"OperationalData:temperature_of_the_bar","valueType":{"name":"STRING"},"unit":null,"@type":"Attribute"},"assetName":"IF","name":"OperationalData:temperature_of_the_bar","eventData":[],"timestamp":null,"@type":"ValueSetEventDTO"}}
2024-02-22T17:59:56.080+02:00 INFO 34396 --- [05-8be8cf13b987] c.e.i.a.port.in.mqtt.MqttEventInbound : The pojo of the payload EventWrapperDto(event=EventDto(type=ValueSetEventDTO, source=SourceDto(type=Attribute, timestamp=Thu Feb 22 17:59:55 EET 2024, value=0, assetName=IF, name=OperationalData:temperature_of_the_bar, valueType=ValueTypeDto(name=STRING), unit=null), assetName=IF, name=OperationalData:temperature_of_the_bar, eventData=[], timestamp=null))
2024-02-22T17:59:56.085+02:00 INFO 34396 --- [05-8be8cf13b987] c.e.i.a.port.in.mqtt.MqttEventInbound : Received message
2024-02-22T18:00:10.152+02:00 INFO 34396 --- [05-8be8cf13b987] c.e.i.a.port.in.mqtt.MqttEventInbound : Sending event to Esper: {"event":{"source":{"timestamp":1708617610147,"value":"bar","assetName":"IF","name":"OperationalData:product_id","valueType":{"name":"STRING"},"unit":null,"@type":"Attribute"},"assetName":"IF","name":"OperationalData:product_id","eventData":[],"timestamp":null,"@type":"ValueSetEventDTO"}}
2024-02-22T18:00:10.152+02:00 INFO 34396 --- [05-8be8cf13b987] c.e.i.a.port.in.mqtt.MqttEventInbound : The pojo of the payload EventWrapperDto(event=EventDto(type=ValueSetEventDTO, source=SourceDto(type=Attribute, timestamp=Thu Feb 22 18:00:10 EET 2024, value=bar, assetName=IF, name=OperationalData:product_id, valueType=ValueTypeDto(name=STRING), unit=null), assetName=IF, name=OperationalData:product_id, eventData=[], timestamp=null))
2024-02-22T18:00:10.157+02:00 INFO 34396 --- [05-8be8cf13b987] c.e.i.a.p.in.mqtt.service.EsperService : Received new data: [Lcom.espertech.esper.common.client.EventBean;@60c3cec2
2024-02-22T18:00:10.158+02:00 INFO 34396 --- [05-8be8cf13b987] c.e.i.a.p.in.mqtt.service.EsperService : Received old data: {}
2024-02-22T18:00:10.158+02:00 INFO 34396 --- [05-8be8cf13b987] c.e.i.a.p.in.mqtt.service.EsperService : Received eventBean: MapEventBean eventType=com.espertech.esper.common.internal.event.map.MapEventType@66d8c66b toString: MapEventBean eventType=com.espertech.esper.common.internal.event.map.MapEventType@66d8c66b
2024-02-22T18:00:10.158+02:00 INFO 34396 --- [05-8be8cf13b987] c.e.i.a.p.in.mqtt.service.EsperService : Underlying instance type: java.util.HashMap
2024-02-22T18:00:10.158+02:00 INFO 34396 --- [05-8be8cf13b987] c.e.i.a.p.in.mqtt.service.EsperService : Underlying map contents: {}
As you may see despite when i send the events the POJOs are correct, in the listener the getUnderlying is null.
You would need to provide the EPL as part of your post so that one can see what is selected. I would recommend trying this out without MQTT and without JSON just to see what you are getting. Output that is a Map-type object and that contains no value for some map keys is normal.