I'm trying to join two existing Kafka topics in KSQL. Some data samples from Kafka (actual values redacted due to corporate environment):
device topic:
{
"persistTime" : "2020-10-06T13:30:25.373Z",
"previous" : {
"device" : "REDACTED",
"type" : "REDACTED",
"group" : "REDACTED",
"inventoryState" : "unknown",
"managementState" : "registered",
"communicationId" : "REDACTED",
"manufacturer" : "",
"description" : "",
"model" : "",
"location" : {
"geo" : {
"latitude" : "REDACTED",
"longitude" : "REDACTED"
},
"address" : {
"city" : "",
"postalCode" : "",
"street" : "",
"houseNumber" : "",
"floor" : "",
"company" : "",
"country" : "",
"reference" : "",
"timeZone" : "",
"region" : "",
"district" : ""
},
"logicalInstallationPoint" : ""
},
"tags" : [ ]
},
"current" : {
"device" : "REDACTED",
"type" : "REDACTED",
"group" : "REDACTED",
"inventoryState" : "unknown",
"managementState" : "registered",
"communicationId" : "REDACTED",
"manufacturer" : "",
"description" : "",
"model" : "",
"location" : {
"geo" : {
"latitude" : "REDACTED",
"longitude" : "REDACTED"
},
"address" : {
"city" : "",
"postalCode" : "",
"street" : "",
"houseNumber" : "",
"floor" : "",
"company" : "",
"country" : "",
"reference" : "",
"timeZone" : "",
"region" : "",
"district" : ""
},
"logicalInstallationPoint" : ""
},
"tags" : [ ]
}
}
device-event topic (1st sample):
{
"device" : "REDACTED",
"event" : "403151",
"firstOccurrenceTime" : "2020-09-30T11:03:50.000Z",
"lastOccurrenceTime" : "2020-09-30T11:03:50.000Z",
"occurrenceCount" : 1,
"receiveTime" : "2020-09-30T11:03:50.000Z",
"persistTime" : "2020-09-30T14:32:59.580Z",
"state" : "open",
"context" : {
"2" : "25",
"3" : "0",
"4" : "60",
"1" : "REDACTED"
}
}
device-event topic (2nd sample):
{
"device" : "REDACTED",
"event" : "402004",
"firstOccurrenceTime" : "2020-10-07T07:02:48Z",
"lastOccurrenceTime" : "2020-10-07T07:02:48Z",
"occurrenceCount" : 1,
"receiveTime" : "2020-10-07T07:02:48Z",
"persistTime" : "2020-10-07T07:15:28.533Z",
"state" : "open",
"context" : {
"2" : "2020-10-07T07:02:48.0000000Z",
"1" : "REDACTED"
}
}
The issue that I'm facing is the varying amount of variables inside of context
under the device-event topic.
I've tried the following statements for creating the events stream on ksqlDB:
CREATE STREAM "events"\
("device" VARCHAR, \
"event" VARCHAR, \
"firstOccurenceTime" VARCHAR, \
"lastOccurenceTime" VARCHAR, \
"occurenceCount" INTEGER, \
"receiveTime" VARCHAR, \
"persistTime" VARCHAR, \
"state" VARCHAR, \
"context" ARRAY<STRING>) \
WITH (KAFKA_TOPIC='device-event', VALUE_FORMAT='JSON');
CREATE STREAM "events"\
("device" VARCHAR, \
"event" VARCHAR, \
"firstOccurenceTime" VARCHAR, \
"lastOccurenceTime" VARCHAR, \
"occurenceCount" INTEGER, \
"receiveTime" VARCHAR, \
"persistTime" VARCHAR, \
"state" VARCHAR, \
"context" STRUCT\
<"1" VARCHAR, \
"2" VARCHAR, \
"3" VARCHAR, \
"4" VARCHAR>) \
WITH (KAFKA_TOPIC='ext_device-event_10195', VALUE_FORMAT='JSON');
The second statement only brings in data that has all four context variables present ("1", "2", "3" and "4").
How would one go about creating the KSQL equivalent stream for the device-event Kafka topic?
You need to use a
MAP
rather than aSTRUCT
.BTW you also don't need the
\
line separator any more :)Here's a working example using ksqlDB 0.12.
Load the sample data into a topic
In ksqlDB, declare the stream:
Query the stream to check things work:
Use the
['']
syntax to access specific keys within the map: