PyFlink 1.11.2 couldn’t configure [taskmanager.memory.task.off-heap.size] property when registering custom UDF function

440 views Asked by At

I'm very new at pyflink and trying to register a custom UDF function using python API. Currently I faced an issue in both server env and my local IDE environment.

When I'm trying to execute the example below I got an error message: The configured Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb. The Task Off-Heap Memory can be configured using the configuration key 'taskmanager.memory.task.off-heap.size

Of course I've added required property into my flink-conf.yaml and checked that pyflink-shell.sh initializes env using specified configuration but it doesn't make any sense and I still have an error.

Here is a code example that I'm trying to run:

from pyflink.dataset import ExecutionEnvironment
from pyflink.table import BatchTableEnvironment, DataTypes
from pyflink.table.udf import udf


@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def test_udf(i):
    return i


if __name__ == "__main__":
    env = ExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)

    bt_env = BatchTableEnvironment.create(env)
    bt_env.register_function("test_udf", test_udf)

    my_table = bt_env.from_elements(
        [
            ("user-1", "http://url/1"),
            ("user-2", "http://url/2"),
            ("user-1", "http://url/3"),
            ("user-3", "http://url/4"),
            ("user-1", "http://url/3")
        ],
        [
            "uid", "url"
        ]
    )

    my_table_grouped_by_uid = my_table.group_by("uid").select("uid, collect(url) as urls")
    bt_env.create_temporary_view("my_temp_table", my_table_grouped_by_uid)

    bt_env.execute_sql("select test_udf(uid) as uid, urls from my_temp_table").print()

thx for your help !

1

There are 1 answers

0
Rinat Sharipov On

The answer is pretty simple, thx a lot to guys from mail list.

  • this property should be specified manually via table env API;
  • flink-conf.yaml doesn't applied to flink-env when running pyflink-shell.sh;

any property can be configured in the following fashion:

bt_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')