I'm learning Kafka as a method to replace the in-house streaming tools we are currently using. The current stack receives geospatial data from a bunch of measurement devices and inserts them into a Postgres+PostGIS database. There are a bunch of Kafka streaming functions that would very useful to have but I'm not sure of the vocabulary that describes them, whether they're covered in a use case etc.
I'm working off this Kafka Streams, geospatial tutorial.
Let's say my data stream looks like:
| Timestamp | DeviceID | Data |
|---|---|---|
| 10:00 | A | 0.1 |
| 10:00 | A | 0.1 |
| 10:03 | A | 0.2 |
| 10:11 | A | 0.1 |
| 10:12 | A | 10.1 |
| 10:30 | B | 1.3 |
| 10:35 | A | 0.2 |
What I want to do is take a stream of messages, filter them so that:
- Duplicate messages are removed
- Messages that happen too quickly are removed (one message from device every 10 minutes)
- Messages that show large jumps in data are preserved.
The output stream should look like:
| Timestamp | DeviceID | Data |
|---|---|---|
| 10:00 | A | 0.1 |
| 10:11 | A | 0.1 |
| 10:12 | A | 10.1 |
| 10:30 | B | 1.3 |
| 10:35 | A | 0.2 |
My very limited experience tells me I should create a new streaming topic, defined by some ksql that does the filtering, and then use a JDBC connector to dump this reduced stream into Postgres.
Is this correct and/or possible? Can anyone help me figure out what the KSQL would look like? Is there a term for this kind of operation?
My first stab at ksql is:
SELECT
DeviceID,
LATEST_BY_OFFSET(TimeStamp),
LATEST_BY_OFFSET(Data)
FROM ship_status_reports
WINDOW TUMBLING (SIZE 10 minutes)
WHERE DeviceID = 'A'
GROUP BY DeviceID
EMIT CHANGES;
but this seems to return a message every 2 minutes (the data rate I'm assuming).