KSQLDB right end side of inner joined streams is always null

159 views Asked by At

Running KSQLDB Version: 0.12.0

I'm having an issue with joined streams.

When creating a stream as an windowed inner join query the right side fields are all null, even the ones that are part of the join condition. When running the same query independently I get the fields normally.

So here's the setup:

Running the server with docker-compose:

services:
  ksqldb-server:
    image: confluentinc/ksqldb-server:0.12.0
    hostname: ksqldb-server
    container_name: ksqldb-server
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: mybroker:9092
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
      KSQL_KSQL_SCHEMA_REGISTRY_URL: http://myregistry:8081
      KSQL_KSQL_STREAMS_AUTO_OFFSET_RESET: 'earliest'

The stream and join definitions

CREATE STREAM generic_orders (
    ...
    clientrequestid_ string KEY,
    ...
)   WITH (
    KAFKA_TOPIC='thetopic',
    VALUE_FORMAT='AVRO'
);

CREATE STREAM specific_order (
    originatoruserid_ string,
    request_clientid_ bigint,
    bidquoteinfo_volume_ bigint
)   WITH (
    KAFKA_TOPIC='anothertopic',
    VALUE_FORMAT='AVRO'
);

When joining the two above it wouldn't work, I tried make request_clientid_ a key as well. I suspected it wouldn't work due to the types of clientrequestid_ and request_clientid_ being different so I created another stream:

CREATE STREAM specific_order_typed AS
    select originatoruserid_ ,
    CAST(request_clientid_ AS string) as request_clientid_ KEY, 
    bidquoteinfo_volume_ 
    FROM ice_massquote_order
    EMIT CHANGES
    ;

That did not help...

Here's the joined stream:

CREATE STREAM enriched_orders AS
    SELECT i.request_clientid_ as reqid, o.clientrequestid_ as orderreq  FROM specific_order_typed i
    INNER JOIN generic_order o WITHIN 1 HOURS ON o.clientrequestid_ = i.request_clientid_ 
    EMIT CHANGES;

Also tried swaping the from stream and the join stream... Results are always null:

|623562762 |null
...

When running the query directly though I get what I expected

    SELECT i.request_clientid_ as reqid, o.clientrequestid_ as orderreq  FROM specific_order_typed i
    INNER JOIN generic_order o WITHIN 1 HOURS ON o.clientrequestid_ = i.request_clientid_ 
    EMIT CHANGES;
|623562762 |623562762

Anybody has an idea of what's going on? I've exhausted all the idea I had

1

There are 1 answers

0
Maresh On BEST ANSWER

So I solved my problem by downgrading to 0.10.2.

The first difference I noticed it that when trying to join the original streams generic_orders and specific_order I get a proper error for the comparison between the string and the bigint

CREATE STREAM generic_orders (
    ...
    clientrequestid_ string,
    ...
)   WITH (
    KAFKA_TOPIC='thetopic',
    VALUE_FORMAT='AVRO'
);

CREATE STREAM specific_order (
    originatoruserid_ string,
    request_clientid_ bigint,
    bidquoteinfo_volume_ bigint
)   WITH (
    KAFKA_TOPIC='anothertopic',
    VALUE_FORMAT='AVRO'
);

Then I just had to cast the bigint to a string (I tried that with 0.12 as well before):

CREATE STREAM enriched_orders AS
    SELECT * FROM specific_order_typed i
    INNER JOIN generic_order o WITHIN 1 HOURS ON o.clientrequestid_ = CAST(i.request_clientid_ AS String)
    EMIT CHANGES;

And finally the values are not null anymore when reading that stream.