How to insert a time/data filtered Kafka Stream into a Postgres Database

22 views Asked by At

I'm learning Kafka as a method to replace the in-house streaming tools we are currently using. The current stack receives geospatial data from a bunch of measurement devices and inserts them into a Postgres+PostGIS database. There are a bunch of Kafka streaming functions that would very useful to have but I'm not sure of the vocabulary that describes them, whether they're covered in a use case etc.

I'm working off this Kafka Streams, geospatial tutorial.

Let's say my data stream looks like:

Timestamp DeviceID Data
10:00 A 0.1
10:00 A 0.1
10:03 A 0.2
10:11 A 0.1
10:12 A 10.1
10:30 B 1.3
10:35 A 0.2

What I want to do is take a stream of messages, filter them so that:

  1. Duplicate messages are removed
  2. Messages that happen too quickly are removed (one message from device every 10 minutes)
  3. Messages that show large jumps in data are preserved.

The output stream should look like:

Timestamp DeviceID Data
10:00 A 0.1
10:11 A 0.1
10:12 A 10.1
10:30 B 1.3
10:35 A 0.2

My very limited experience tells me I should create a new streaming topic, defined by some ksql that does the filtering, and then use a JDBC connector to dump this reduced stream into Postgres.

Is this correct and/or possible? Can anyone help me figure out what the KSQL would look like? Is there a term for this kind of operation?

My first stab at ksql is:

SELECT 
    DeviceID, 
    LATEST_BY_OFFSET(TimeStamp),
    LATEST_BY_OFFSET(Data)
FROM ship_status_reports
  WINDOW TUMBLING (SIZE 10 minutes) 
  WHERE DeviceID = 'A'
  GROUP BY DeviceID
  EMIT CHANGES;

but this seems to return a message every 2 minutes (the data rate I'm assuming).

0

There are 0 answers