I am trying to implement Sliding window using Apache Flink in Java. I have been referring to the below document:
As per my use case, I receive an incoming data stream from Kafka. The stream essentially contains the speed of the user, captured from the user's device. My Task is to figure out the average speed of the user in the last 10 minutes, and this needs to be calculated every minute. Hence I am using Sliding Window of size 10 minutes with a slide interval of 1 minute. Since I am in the initial phase of development, I am just trying to print the result on the console.
Let's say, I start my flink job at 11:00 hrs. My first output should be produced at 11:10 hrs, followed by output at 11:11, 11:12, and so on.
But the first output is printed on the console at 11:01, followed by output every minute.
Below is my code:
package org.example;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.Schema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
public class DEStationarySample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableForceAvro();
env.setParallelism(2);
// Creating AVRO Schema for Rider Location Events
final Schema RiderLocationSchema = new org.apache.avro.Schema.Parser().parse("avro_schema");
// Creating Rider Location Events Stream
KafkaSource<GenericRecord> riderLocationEventsSource = KafkaSource.<GenericRecord>builder()
.setBootstrapServers("bootstrapServers")
.setGroupId("consumer_group_id")
.setTopics("kafka_topic")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(ConfluentRegistryAvroDeserializationSchema.forGeneric(RiderLocationSchema, "schema_registry_url")
)
.build();
DataStream<GenericRecord> riderLocationEventsStream = env.fromSource(
riderLocationEventsSource,
WatermarkStrategy.<GenericRecord>forBoundedOutOfOrderness(Duration.ofSeconds(60))
.withTimestampAssigner((genericRecord, timestamp) -> (long)genericRecord.get("eventTime")),
"Kafka Source"
);
DataStream<Map<String, Object>> outputResult = riderLocationEventsStream.keyBy(data-> data.get("tripId"))
.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1)))
.aggregate(new AggregateFunction<GenericRecord, Map<String, Object>, Map<String, Object>>() {
@Override
public Map<String, Object> createAccumulator() {
Map<String, Object> m = new HashMap<>();
m.put("tripId",null);
m.put("firstEventTime",0L);
m.put("lastEventTime",0L);
m.put("speed",0.0);
m.put("count",0);
return m;
}
@Override
public Map<String, Object> add(GenericRecord genericRecord, Map<String, Object> accumulator) {
long firstEventTime = (long)accumulator.get("firstEventTime");
if(firstEventTime == 0L){
accumulator.put("firstEventTime", genericRecord.get("eventTime"));
}
accumulator.put("lastEventTime", genericRecord.get("eventTime"));
accumulator.put("tripId", genericRecord.get("tripId"));
accumulator.put("speed", (double)accumulator.get("speed") + (double)genericRecord.get("speed"));
accumulator.put("count", (int)accumulator.get("count") +1);
return accumulator;
}
@Override
public Map<String, Object> getResult(Map<String, Object> accumulator) {
double speed = (double)accumulator.get("speed");
int count = (int)accumulator.get("count");
String tripId = accumulator.get("tripId").toString();
Map<String, Object> result = new HashMap<>();
result.put("tripId",tripId);
result.put("averageSpeed",speed/count);
return result;
}
@Override
public Map<String, Object> merge(Map<String, Object> stringObjectMap, Map<String, Object> acc1) {
return null;
}
});
outputResult.print();
env.execute("Sample Processing");
}
}
Input kafka stream looks like below:
{
"eventName": "RIDER_LOCATION_ACTIVITY",
"eventType": "RIDER_LOCATION_UPDATED",
"scmVersion": 1,
"eventTime": 1706887297150,
"correlationId": "1b27f143-f22f-4642-bb24-f8c3d18f35c8-652-681",
"riderId": "999999",
"lat": 12.876188,
"lng": 77.60885,
"battery": 69,
"network": "LTE",
"capturedAt": 1706887297977,
"speed": 0.0027558245,
"orderId": "",
"orderStatus": "",
"tripId": "trip_1234342",
"employerReferenceId": "EMP12345"
}
It is guaranteed that each event will have tripId associated with it.
I tried changing the slide interval. I observed that the first output was produced as soon as the slide interval ended. However, the results were still produced before the window size was reached. I am new to Flink. Any help is appreciated.
That isn't the case, since your watermark assigner is based on eventTime, and not on processing time. That's also what you want, else you would get different results every time you start the job.
So the eventTime and watermark are determined by the very first eventTime that's being read. If that's
1706887297150, that equals to UTC15:21:37.150which determines the start of your window time.