In FLINK sql1.12, I would like to know how to obtain the start time of the window and how to participate in the calculation. This is error. *Caused by: org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: HOP_START(TIMESTAMP(3), INTERVAL SECOND(3) NOT NULL, INTERVAL SECOND(3) NOT NULL) If you think this function should be supported, you can create an issue and start a discussion for it.at org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:847) ~[flink-table-blink_2.11-1.12.2-h0.cbu.dli.233.r28.jar * This is my sql.
CREATE TABLE kafkaSource (
user_id INT,
goods_id INT,
exposure_pv INT,
key STRING,
create_time TIMESTAMP(3) ,
click_pv INT,
addcart_pv INT,
collect_pv INT,
WATERMARK FOR create_time AS create_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = '***',
'properties.bootstrap.servers' = '***',
'properties.group.id' = 'test_flink',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
CREATE TABLE printSink (
user_id INT,
last_hour_click_cnt INT,
two_click_total_cnt INT,
total_click_total_cnt INT
) WITH (
'connector' = 'print'
);
insert into printSink
SELECT
user_id,
SUM(CASE WHEN create_time >= TIMESTAMPADD(MINUTE, -3, HOP_ROWTIME(create_time, INTERVAL '1' MINUTE, INTERVAL '6' MINUTE)) THEN click_pv ELSE 0 END) AS last_hour_click_cnt,
SUM(CASE WHEN create_time BETWEEN TIMESTAMPADD(MINUTE, 15, HOP_START(create_time, INTERVAL '1' MINUTE, INTERVAL '6' MINUTE))
AND TIMESTAMPADD(MINUTE, 0, HOP_START(create_time, INTERVAL '1' MINUTE, INTERVAL '6' MINUTE))
THEN click_pv ELSE 0 END) AS two_click_total_cnt,
SUM(click_pv) AS total_click_total_cnt
FROM kafkaSource
GROUP BY
user_id,
HOP(create_time, INTERVAL '1' MINUTE, INTERVAL '6' MINUTE);
This problem was also reported in Having an equivalent to HOP_START inside an aggregation primitive in Flink, and given the information provided there, this seems to be happening because HOP_START is being used within an aggregation.
My suggestion is to upgrade to Flink 1.13 or later, and use windowing TVFs instead of group window aggregation. This should avoid the problem you have run into.