Running KSQLDB Version: 0.12.0
I'm having an issue with joined streams.
When creating a stream as an windowed inner join query the right side fields are all null, even the ones that are part of the join condition. When running the same query independently I get the fields normally.
So here's the setup:
Running the server with docker-compose:
services:
ksqldb-server:
image: confluentinc/ksqldb-server:0.12.0
hostname: ksqldb-server
container_name: ksqldb-server
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: mybroker:9092
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
KSQL_KSQL_SCHEMA_REGISTRY_URL: http://myregistry:8081
KSQL_KSQL_STREAMS_AUTO_OFFSET_RESET: 'earliest'
The stream and join definitions
CREATE STREAM generic_orders (
...
clientrequestid_ string KEY,
...
) WITH (
KAFKA_TOPIC='thetopic',
VALUE_FORMAT='AVRO'
);
CREATE STREAM specific_order (
originatoruserid_ string,
request_clientid_ bigint,
bidquoteinfo_volume_ bigint
) WITH (
KAFKA_TOPIC='anothertopic',
VALUE_FORMAT='AVRO'
);
When joining the two above it wouldn't work, I tried make request_clientid_
a key as well.
I suspected it wouldn't work due to the types of clientrequestid_
and request_clientid_
being different so I created another stream:
CREATE STREAM specific_order_typed AS
select originatoruserid_ ,
CAST(request_clientid_ AS string) as request_clientid_ KEY,
bidquoteinfo_volume_
FROM ice_massquote_order
EMIT CHANGES
;
That did not help...
Here's the joined stream:
CREATE STREAM enriched_orders AS
SELECT i.request_clientid_ as reqid, o.clientrequestid_ as orderreq FROM specific_order_typed i
INNER JOIN generic_order o WITHIN 1 HOURS ON o.clientrequestid_ = i.request_clientid_
EMIT CHANGES;
Also tried swaping the from stream and the join stream... Results are always null:
|623562762 |null
...
When running the query directly though I get what I expected
SELECT i.request_clientid_ as reqid, o.clientrequestid_ as orderreq FROM specific_order_typed i
INNER JOIN generic_order o WITHIN 1 HOURS ON o.clientrequestid_ = i.request_clientid_
EMIT CHANGES;
|623562762 |623562762
Anybody has an idea of what's going on? I've exhausted all the idea I had
So I solved my problem by downgrading to 0.10.2.
The first difference I noticed it that when trying to join the original streams
generic_orders
andspecific_order
I get a proper error for the comparison between thestring
and thebigint
Then I just had to cast the bigint to a string (I tried that with 0.12 as well before):
And finally the values are not null anymore when reading that stream.