I'm writing a unit test for a Flink SQL statement that uses match_recognize. I'm setting up the test data like this
Table data = tEnv.fromValues(DataTypes.ROW(
DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3)),
DataTypes.FIELD("foobar", DataTypes.STRING()),
....
),
row(...),
row(...)
);
I have two questions,
- How do I designate event_time as the field for watermarking? (indicating rowtime)
- Less important, give the table created a meaningful name?
FLINK VERSION: 1.11
You hit a current limitation of the Table API: it's not possible to define watermarks and rowtime attributes in combination with the
forValues
method; you need a connector. There are a couple of options to work around it:1. Use a
csv
connector that you stack up with yourVALUES
, like shown in this example.2. Use the built-in DataGen connector. Since you're putting together a unit test for CEP, I imagine that you want some degree of control over the data that is generated, so this is probably not a viable option. Thought I'd mention it, anyways.
Note: Using SQL DDL syntax is the recommended way to create tables from Flink 1.10. This would make both things you're trying to do (i.e. defining a watermark and naming your table) more straightforward:
The watermark is declared as a computed column and there are multiple watermark strategies you can opt to use. Please check this documentation page for more details.