I'm trying to join the following KSQL table:
CREATE TABLE devices
("current" STRUCT<
"device" VARCHAR,
"group" VARCHAR,
"inventoryState" VARCHAR,
"location" STRUCT<
"geo" STRUCT<
"latitude" DOUBLE,
"longitude" DOUBLE>,
"address" STRUCT<
"city" VARCHAR,
"postalCode" VARCHAR,
"street" VARCHAR,
"houseNumber" VARCHAR,
"floor" VARCHAR,
"company" VARCHAR,
"country" VARCHAR,
"reference" VARCHAR,
"timeZone" VARCHAR,
"region" VARCHAR,
"district" VARCHAR>
>
>)
WITH (KAFKA_TOPIC='device', VALUE_FORMAT='JSON');
... with the following KSQL stream:
CREATE STREAM "events" (
"device" VARCHAR,
"event" VARCHAR,
"firstOccurenceTime" VARCHAR,
"lastOccurenceTime" VARCHAR,
"occurenceCount" INTEGER,
"receiveTime" VARCHAR,
"persistTime" VARCHAR,
"state" VARCHAR,
"context" MAP < VARCHAR, VARCHAR >)
WITH (KAFKA_TOPIC = 'device-event', VALUE_FORMAT = 'JSON');
... to include the location
struct onto a new stream (enriched_events).
This is the KSQL select statement that I'm executing to test that new stream:
SELECT devices."current"->"device" AS "device",
devices."current"->"location" AS "location",
"event",
"firstOccurenceTime",
"lastOccurenceTime",
"receiveTime",
"persistTime",
"state",
"context"
FROM "events"
INNER JOIN devices ON "events".ROWKEY = devices.ROWKEY
EMIT CHANGES;
I'm getting no data back, even when setting auto.offset.reset
to earliest
.
I've checked that both the device
table and the device-event
streams are populated with data.
What am I doing wrong here?
Update
Sample data for devices
table (sensitive values redacted due to company policies):
{
"persistTime" : "2020-10-12T11:48:23.384Z",
"previous" : {
"device" : "REDACTED",
"type" : "REDACTED",
"group" : "REDACTED",
"inventoryState" : "unknown",
"managementState" : "connected",
"communicationId" : "REDACTED",
"manufacturer" : "",
"description" : "",
"model" : "",
"location" : {
"geo" : {
"latitude" : "REDACTED",
"longitude" : "REDACTED"
},
"address" : {
"city" : "",
"postalCode" : "",
"street" : "",
"houseNumber" : "",
"floor" : "",
"company" : "",
"country" : "",
"reference" : "",
"timeZone" : "",
"region" : "",
"district" : ""
},
"logicalInstallationPoint" : ""
},
"tags" : [ ]
},
"current" : {
"device" : "REDACTED",
"type" : "REDACTED",
"group" : "REDACTED",
"inventoryState" : "unknown",
"managementState" : "connected",
"communicationId" : "REDACTED",
"manufacturer" : "",
"description" : "",
"model" : "",
"location" : {
"geo" : {
"latitude" : "REDACTED",
"longitude" : "REDACTED"
},
"address" : {
"city" : "",
"postalCode" : "",
"street" : "",
"houseNumber" : "",
"floor" : "",
"company" : "",
"country" : "",
"reference" : "",
"timeZone" : "",
"region" : "",
"district" : ""
},
"logicalInstallationPoint" : ""
},
"tags" : [ ]
}
}