I have Kafka streams as follows:
topic1: event example in Avro:
{
"ID": {"string":"S1"},
"text": {"string":"department"}
}
{
"ID": {"string":"S2"},
"text":{"string":"cellphone"}
}
{
"ID": {"string":"S3"},
"text":{"string":"fax"}
}
{
"ID": {"string":"S4"},
"text":{"string":"telephone"}
}
stream1 which is a reference has details on any abbrevations built on topic1:
stream1 definitation:
full_name string
example values:
___+______
|ID|text|
___+______
|S1|department
|S2|cellphone
|S3|fax
|S4|telephone
__________________________________________
topic2:
event example in avro:
{
"ID": {"string":"S1"},
"value":{"string":"department"}
}
{
"ID": {"string":"S2"},
"value":{"string":"cellphone"}
}
{
"ID": {"string":"S3"},
"value":{"string":"maximum"}
}
{
"ID": {"string":"S4"},
"value":{"string":"telephone"}
}
stream2 which is a reference has details on any abbrevations built on topic1:
stream2 definitation:
full_name string
example values:
___+______
|ID|value|
___+______
|S1|dt
|S2|cp
|S3|fax
|S4|th
_________________________________________
topic3:
event example in avro:
{
"ID_k": {"string":"K1"},
"received_abbrevation":{"string":"dt"}
}
{
"ID_k": {"string":"K2"},
"received_abbrevation":{"string":"cp"}
}
stream3 which receives event from topic2
stream3 definitation:
ID string
received_abbrevation string
example values:
_____+_____________________
|ID_k|received_abbrevation|
_____+_____________________
|K1 |dt |
|K2 |cp |
what I want as a result is, whenever, there is new event occuring in stream3 it should query stream1 and stream2 from row1 till ..n always
and display the "text" value from stream1 by comparing ID of stream1 and stream2 and get the stream2.value as (select received_abbrevation from stream3) diplay the result as stream4.
note: If there a new event in topic1/stream1, there is no need to query stream2.
the resultant stream should look like:
-------------
|actual_name|
-------------
|department |
|cellphone |
For example, as in SQL query we can write as below: select stream1.text as actual_name from stream1, stream2 where stream1.ID=stream2.ID and stream2.value(select received_abbrevation from stream3).
Kindly let me know how I can achieve this in KSQL.