Kafka topic with Variable nested JSON object as KSQL DB stream

3.3k views Asked by At

I'm trying to join two existing Kafka topics in KSQL. Some data samples from Kafka (actual values redacted due to corporate environment):

device topic:

{
  "persistTime" : "2020-10-06T13:30:25.373Z",
  "previous" : {
    "device" : "REDACTED",
    "type" : "REDACTED",
    "group" : "REDACTED",
    "inventoryState" : "unknown",
    "managementState" : "registered",
    "communicationId" : "REDACTED",
    "manufacturer" : "",
    "description" : "",
    "model" : "",
    "location" : {
      "geo" : {
        "latitude" : "REDACTED",
        "longitude" : "REDACTED"
      },
      "address" : {
        "city" : "",
        "postalCode" : "",
        "street" : "",
        "houseNumber" : "",
        "floor" : "",
        "company" : "",
        "country" : "",
        "reference" : "",
        "timeZone" : "",
        "region" : "",
        "district" : ""
      },
      "logicalInstallationPoint" : ""
    },
    "tags" : [ ]
  },
  "current" : {
    "device" : "REDACTED",
    "type" : "REDACTED",
    "group" : "REDACTED",
    "inventoryState" : "unknown",
    "managementState" : "registered",
    "communicationId" : "REDACTED",
    "manufacturer" : "",
    "description" : "",
    "model" : "",
    "location" : {
      "geo" : {
        "latitude" : "REDACTED",
        "longitude" : "REDACTED"
      },
      "address" : {
        "city" : "",
        "postalCode" : "",
        "street" : "",
        "houseNumber" : "",
        "floor" : "",
        "company" : "",
        "country" : "",
        "reference" : "",
        "timeZone" : "",
        "region" : "",
        "district" : ""
      },
      "logicalInstallationPoint" : ""
    },
    "tags" : [ ]
  }
}

device-event topic (1st sample):

{
  "device" : "REDACTED",
  "event" : "403151",
  "firstOccurrenceTime" : "2020-09-30T11:03:50.000Z",
  "lastOccurrenceTime" : "2020-09-30T11:03:50.000Z",
  "occurrenceCount" : 1,
  "receiveTime" : "2020-09-30T11:03:50.000Z",
  "persistTime" : "2020-09-30T14:32:59.580Z",
  "state" : "open",
  "context" : {
    "2" : "25",
    "3" : "0",
    "4" : "60",
    "1" : "REDACTED"
  }
}

device-event topic (2nd sample):

{
  "device" : "REDACTED",
  "event" : "402004",
  "firstOccurrenceTime" : "2020-10-07T07:02:48Z",
  "lastOccurrenceTime" : "2020-10-07T07:02:48Z",
  "occurrenceCount" : 1,
  "receiveTime" : "2020-10-07T07:02:48Z",
  "persistTime" : "2020-10-07T07:15:28.533Z",
  "state" : "open",
  "context" : {
    "2" : "2020-10-07T07:02:48.0000000Z",
    "1" : "REDACTED"
  }
}

The issue that I'm facing is the varying amount of variables inside of context under the device-event topic.

I've tried the following statements for creating the events stream on ksqlDB:

CREATE STREAM "events"\
("device" VARCHAR, \
"event" VARCHAR, \
"firstOccurenceTime" VARCHAR, \
"lastOccurenceTime" VARCHAR, \
"occurenceCount" INTEGER, \
"receiveTime" VARCHAR, \
"persistTime" VARCHAR, \
"state" VARCHAR, \
"context" ARRAY<STRING>) \
WITH (KAFKA_TOPIC='device-event', VALUE_FORMAT='JSON');
CREATE STREAM "events"\
("device" VARCHAR, \
"event" VARCHAR, \
"firstOccurenceTime" VARCHAR, \
"lastOccurenceTime" VARCHAR, \
"occurenceCount" INTEGER, \
"receiveTime" VARCHAR, \
"persistTime" VARCHAR, \
"state" VARCHAR, \
"context" STRUCT\
<"1" VARCHAR, \
"2" VARCHAR, \
"3" VARCHAR, \
"4" VARCHAR>) \
WITH (KAFKA_TOPIC='ext_device-event_10195', VALUE_FORMAT='JSON');

The second statement only brings in data that has all four context variables present ("1", "2", "3" and "4").

How would one go about creating the KSQL equivalent stream for the device-event Kafka topic?

1

There are 1 answers

2
Robin Moffatt On BEST ANSWER

You need to use a MAP rather than a STRUCT.

BTW you also don't need the \ line separator any more :)

Here's a working example using ksqlDB 0.12.

  • Load the sample data into a topic

    kafkacat -b localhost:9092 -P -t events <<EOF
    { "device" : "REDACTED", "event" : "403151", "firstOccurrenceTime" : "2020-09-30T11:03:50.000Z", "lastOccurrenceTime" : "2020-09-30T11:03:50.000Z", "occurrenceCount" : 1, "receiveTime" : "2020-09-30T11:03:50.000Z", "persistTime" : "2020-09-30T14:32:59.580Z", "state" : "open", "context" : { "2" : "25", "3" : "0", "4" : "60", "1" : "REDACTED" } }
    { "device" : "REDACTED", "event" : "402004", "firstOccurrenceTime" : "2020-10-07T07:02:48Z", "lastOccurrenceTime" : "2020-10-07T07:02:48Z", "occurrenceCount" : 1, "receiveTime" : "2020-10-07T07:02:48Z", "persistTime" : "2020-10-07T07:15:28.533Z", "state" : "open", "context" : { "2" : "2020-10-07T07:02:48.0000000Z", "1" : "REDACTED" } }
    EOF
    
  • In ksqlDB, declare the stream:

    CREATE STREAM "events" (
        "device" VARCHAR,
        "event" VARCHAR,
        "firstOccurenceTime" VARCHAR,
        "lastOccurenceTime" VARCHAR,
        "occurenceCount" INTEGER,
        "receiveTime" VARCHAR,
        "persistTime" VARCHAR,
        "state" VARCHAR,
        "context" MAP < VARCHAR, VARCHAR >
    ) WITH (KAFKA_TOPIC = 'events', VALUE_FORMAT = 'JSON');
    
  • Query the stream to check things work:

    ksql> SET 'auto.offset.reset' = 'earliest';
    Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
    
    ksql> SELECT "device", "event", "receiveTime", "state", "context" FROM "events" EMIT CHANGES;
    +----------+--------+--------------------------+--------+------------------------------------+
    |device    |event   |receiveTime               |state   |context                             |
    +----------+--------+--------------------------+--------+------------------------------------+
    |REDACTED  |403151  |2020-09-30T11:03:50.000Z  |open    |{1=REDACTED, 2=25, 3=0, 4=60}       |
    |REDACTED  |402004  |2020-10-07T07:02:48Z      |open    |{1=REDACTED, 2=2020-10-07T07:02:48.0|
    |          |        |                          |        |000000Z}                            |
    
  • Use the [''] syntax to access specific keys within the map:

    ksql> SELECT "device", "event", "context", "context"['1'] AS CONTEXT_1, "context"['3'] AS CONTEXT_3 FROM "events" EMIT CHANGES;
    +-----------+--------+------------------------------------+-----------+-----------+
    |device     |event   |context                             |CONTEXT_1  |CONTEXT_3  |
    +-----------+--------+------------------------------------+-----------+-----------+
    |REDACTED   |403151  |{1=REDACTED, 2=25, 3=0, 4=60}       |REDACTED   |0          |
    |REDACTED   |402004  |{1=REDACTED, 2=2020-10-07T07:02:48.0|REDACTED   |null       |
    |           |        |000000Z}                            |           |           |