Example code:

from pyflink.table import EnvironmentSettings, StreamTableEnvironment


env_settings = (
    EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
)
table_env = StreamTableEnvironment.create(environment_settings=env_settings)

table_env.execute_sql(
    """
    CREATE TABLE table1 (
        id INT,
        ts TIMESTAMP(3),
        WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
    ) WITH (
        'connector.type' = 'filesystem',
        'format.type' = 'csv',
        'connector.path' = '/home/alex/work/test-flink/data1.csv'
    )
"""
)


table_env.execute_sql(
    """
    CREATE TABLE table2 (
        id2 INT,
        ts2 TIMESTAMP(3),
        WATERMARK FOR ts2 AS ts2 - INTERVAL '5' SECOND
    ) WITH (
        'connector.type' = 'filesystem',
        'format.type' = 'csv',
        'connector.path' = '/home/alex/work/test-flink/data2.csv'
    )
"""
)

table1 = table_env.from_path("table1")
table2 = table_env.from_path("table2")

print(table1.join(table2).where("ts = ts2 && id = id2").select("id, ts").to_pandas())

Gives an error:

py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame.
: org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: 

FlinkLogicalLegacySink(name=[collect], fields=[id, ts])
+- FlinkLogicalCalc(select=[id, ts])
   +- FlinkLogicalJoin(condition=[AND(=($2, $5), =($0, $3))], joinType=[inner])
      :- FlinkLogicalCalc(select=[id, ts, CAST(ts) AS ts0])
      :  +- FlinkLogicalWatermarkAssigner(rowtime=[ts], watermark=[-($1, 5000:INTERVAL SECOND)])
      :     +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, table1, source: [CsvTableSource(read fields: id, ts)]]], fields=[id, ts])
      +- FlinkLogicalCalc(select=[id2, ts2, CAST(ts2) AS ts20])
         +- FlinkLogicalWatermarkAssigner(rowtime=[ts2], watermark=[-($1, 5000:INTERVAL SECOND)])
            +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, table2, source: [CsvTableSource(read fields: id2, ts2)]]], fields=[id2, ts2])

Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.

This seems different from other similar questions such as this one because I have followed the instructions in the docs and specified both an equi-join and a time interval join (ts = ts2 && id = id2):

An interval join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<, <=, >=, >) or a single equality predicate that compares time attributes of the same type (i.e., processing time or event time) of both input tables.

For example, the following predicates are valid interval join conditions:

  • ltime = rtime

If the problem is that these are not append-only tables, I don't know how to make them so.

Setting the time characteristic doesn't help:

StreamExecutionEnvironment.get_execution_environment().set_stream_time_characteristic(
    TimeCharacteristic.EventTime
)

If I use processing time instead with ts AS PROCTIME() then the query succeeds. But I think I need to use event time and I don't understand why there's this difference.

1

There are 1 answers

6
twalthr On BEST ANSWER

Joins between two regular tables in SQL are always expressed in the same way using FROM a, b or a JOIN b.

However, Flink provides two types of join operators under the hood for the same syntax. One is an interval join which requires time attributes to relate both tables with each other based on time. And one is the regular SQL join that is implemented in a generic way as you know it from databases.

Interval joins are just a streaming optimization to keep the state size low during runtime and produce no updates in the result. The regular SQL join operator can produce the same result as the an interval in the end but with higher maintenance costs.

In order to distinguish between interval join and regular join, the optimizer searches for a predicate in the WHERE clause that works on time attributes. For the interval join, the output can always contain two rowtime attributes for outer temporal operations (downstream temporal operators). Because both rowtime attributes are still aligned with the underlying watermarking system. This means that e.g. an outer window or other interval join could work with the time attribute again.

However, the implementation of interval joins has some shortcomings that are known and covered in FLINK-10211. Due to the bad design, we cannot distinguish between an interval join and regular join at certain locations. Thus, we need to assume that the regular join could be an interval join and cannot cast the time attribute to TIMESTAMP for users automatically. Instead we currently forbid time attributes in the output for regular joins.

At some point this limitation will hopefully be gone, until then a user has two possibilities:

  1. Don't use a regular join on tables that contain a time attribute. You can also just project it away with a nested SELECT clause or do a CAST before joining.

  2. Cast the time attribute to a regular timestamp using CAST(col AS TIMESTAMP) in the SELECT clause. It will be pushed down into the join operation.

Your exception indicates that you are using a regular join. Interval joins need a range to operate (even if it is only 1 ms). They don't support equality.