Error when running pyflink datastream api

151 views Asked by At

Version:

Python: 3.8.10

Apache-Flink: 1.18.0

Apache-KafKa: 3.5.1

PyFlink Datastream API displays errors when running

Could not found the Java class 'org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer'

Code:

import io
import os
import gzip
import requests
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.datastream.functions import KeyedProcessFunction
from pyflink.datastream import DataStream
from pyflink.datastream import TimeCharacteristic

env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

deserialization_schema = SimpleStringSchema()

kafka_consumer = FlinkKafkaConsumer(
    topics='test_source_topic',
    deserialization_schema=deserialization_schema,
    properties={
        'bootstrap.servers': 'localhost:9092', 
        'group.id': 'test-consumer-group', 
        'auto.offset.reset': 'earliest'
        }
    )

source = env.add_source(kafka_consumer)

After running the above code, the following error occurred:

Traceback (most recent call last):
  File "/root/flink_scripts/logs_05.py", line 18, in <module>
    kafka_consumer = FlinkKafkaConsumer(
  File "/usr/local/lib/python3.8/dist-packages/pyflink/datastream/connectors/kafka.py", line 203, in __init__
    j_flink_kafka_consumer = _get_kafka_consumer(topics, properties, deserialization_schema,
  File "/usr/local/lib/python3.8/dist-packages/pyflink/datastream/connectors/kafka.py", line 161, in _get_kafka_consumer
    j_flink_kafka_consumer = j_consumer_clz(topics,
  File "/usr/local/lib/python3.8/dist-packages/pyflink/util/exceptions.py", line 185, in wrapped_call
    raise TypeError(
TypeError: Could not found the Java class 'org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer'. The Java dependencies could be specified via command line argument '--jarfile' or the config option 'pipeline.jars'

org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
    at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)
    at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095)
    at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
    at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
    at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
    at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)
    ... 14 more

I imported the FlinkKafkaConsumer module into my code, but I still got an error when I ran it.

0

There are 0 answers