Flink SQL multiple over aggregations from a single source

33 views Asked by At

Here is my current scenario:

  • a Kafka source datastream
  • convert source datastream into a Table with primary key, rowtime and watermark columns
tableEnvironment.fromDataStream(
  sourceKafkaDataStream,
  Schema,newBuilder()
   .primaryKey("id")
   .columnByExpression("proctime", "PROCTIME()")
   .columnByExpression("rowtime", "TO_TIMESTAMP_LTZ(eventtime, 3)")
   .watermark("rowtime", "rowtime - INTERVAL '60' SECOND")
   .build()
)
  • create multiple temporary views that extract over aggregations (distinct count) on different columns
select 
  id, 
  rowtime, 
  key1,
  count(*) over last1hour AS cnt, 
  count(distinct category) over last_hour as distinct_categories
from sourceKafkaTable
window last_hour AS (
 PARTITION BY key1 -- different key
 ORDER BY rowtime ASC 
 RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
)
select 
  id, 
  rowtime, 
  key2,
  count(*) over last1hour AS cnt, 
  count(distinct category) over last_hour as distinct_categories
from sourceKafkaTable
where productId is not null
window last_hour AS (
 PARTITION BY key2 -- different key
 ORDER BY rowtime ASC 
 RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
)
tableEnvironment.createTemporaryView("Table2", categoryOverAggregation);
tableEnvironment.createTemporaryView("Table3", productOverAggregation);
  • left join over aggregation queries to merge all columns per row and sink to hdfs
select A.id, B.key1, C.key2, B.distinct_categories, C.distinct_categories
from sourceTable A
left join Table2 FOR SYSTEM_TIME AS OF A.rowtime AS B ON A.id = B.id
left join Table3 FOR SYSTEM_TIME AS OF A.rowtime AS C ON A.id = C.id

however, following error is thrown

Temporal Table Join requires primary key in versioned table, but no primary key can be found. The physical plan is:

is this the right way to join over aggregation queries from a single source table? if so, how can I define primary keys on temporary views?

0

There are 0 answers