Joining a stream and a table with ksqlDB

160 views Asked by At

I'm trying to join the following KSQL table:

CREATE TABLE devices
    ("current" STRUCT<
        "device" VARCHAR, 
        "group" VARCHAR, 
        "inventoryState" VARCHAR, 
        "location" STRUCT<
            "geo" STRUCT<
                "latitude" DOUBLE, 
                "longitude" DOUBLE>, 
            "address" STRUCT<
                "city" VARCHAR, 
                "postalCode" VARCHAR, 
                "street" VARCHAR, 
                "houseNumber" VARCHAR, 
                "floor" VARCHAR, 
                "company" VARCHAR, 
                "country" VARCHAR, 
                "reference" VARCHAR, 
                "timeZone" VARCHAR, 
                "region" VARCHAR, 
                "district" VARCHAR>
            >
        >)
WITH (KAFKA_TOPIC='device', VALUE_FORMAT='JSON');

... with the following KSQL stream:

CREATE STREAM "events" (
    "device" VARCHAR,
    "event" VARCHAR,
    "firstOccurenceTime" VARCHAR,
    "lastOccurenceTime" VARCHAR,
    "occurenceCount" INTEGER,
    "receiveTime" VARCHAR,
    "persistTime" VARCHAR,
    "state" VARCHAR,
    "context" MAP < VARCHAR, VARCHAR >) 
WITH (KAFKA_TOPIC = 'device-event', VALUE_FORMAT = 'JSON');

... to include the location struct onto a new stream (enriched_events).

This is the KSQL select statement that I'm executing to test that new stream:

SELECT devices."current"->"device" AS "device",
    devices."current"->"location" AS "location",
    "event",
    "firstOccurenceTime",
    "lastOccurenceTime",
    "receiveTime",
    "persistTime",
    "state",
    "context"
FROM "events"
INNER JOIN devices ON "events".ROWKEY = devices.ROWKEY
EMIT CHANGES;

I'm getting no data back, even when setting auto.offset.reset to earliest.

I've checked that both the device table and the device-event streams are populated with data.


What am I doing wrong here?


Update

Sample data for devices table (sensitive values redacted due to company policies):

{
  "persistTime" : "2020-10-12T11:48:23.384Z",
  "previous" : {
    "device" : "REDACTED",
    "type" : "REDACTED",
    "group" : "REDACTED",
    "inventoryState" : "unknown",
    "managementState" : "connected",
    "communicationId" : "REDACTED",
    "manufacturer" : "",
    "description" : "",
    "model" : "",
    "location" : {
      "geo" : {
        "latitude" : "REDACTED",
        "longitude" : "REDACTED"
      },
      "address" : {
        "city" : "",
        "postalCode" : "",
        "street" : "",
        "houseNumber" : "",
        "floor" : "",
        "company" : "",
        "country" : "",
        "reference" : "",
        "timeZone" : "",
        "region" : "",
        "district" : ""
      },
      "logicalInstallationPoint" : ""
    },
    "tags" : [ ]
  },
  "current" : {
    "device" : "REDACTED",
    "type" : "REDACTED",
    "group" : "REDACTED",
    "inventoryState" : "unknown",
    "managementState" : "connected",
    "communicationId" : "REDACTED",
    "manufacturer" : "",
    "description" : "",
    "model" : "",
    "location" : {
      "geo" : {
        "latitude" : "REDACTED",
        "longitude" : "REDACTED"
      },
      "address" : {
        "city" : "",
        "postalCode" : "",
        "street" : "",
        "houseNumber" : "",
        "floor" : "",
        "company" : "",
        "country" : "",
        "reference" : "",
        "timeZone" : "",
        "region" : "",
        "district" : ""
      },
      "logicalInstallationPoint" : ""
    },
    "tags" : [ ]
  }
}
0

There are 0 answers