Version:
Python: 3.8.10
Apache-Flink: 1.18.0
Apache-KafKa: 3.5.1
PyFlink Datastream API displays errors when running
Could not found the Java class 'org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer'
Code:
import io
import os
import gzip
import requests
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.datastream.functions import KeyedProcessFunction
from pyflink.datastream import DataStream
from pyflink.datastream import TimeCharacteristic
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
deserialization_schema = SimpleStringSchema()
kafka_consumer = FlinkKafkaConsumer(
topics='test_source_topic',
deserialization_schema=deserialization_schema,
properties={
'bootstrap.servers': 'localhost:9092',
'group.id': 'test-consumer-group',
'auto.offset.reset': 'earliest'
}
)
source = env.add_source(kafka_consumer)
After running the above code, the following error occurred:
Traceback (most recent call last):
File "/root/flink_scripts/logs_05.py", line 18, in <module>
kafka_consumer = FlinkKafkaConsumer(
File "/usr/local/lib/python3.8/dist-packages/pyflink/datastream/connectors/kafka.py", line 203, in __init__
j_flink_kafka_consumer = _get_kafka_consumer(topics, properties, deserialization_schema,
File "/usr/local/lib/python3.8/dist-packages/pyflink/datastream/connectors/kafka.py", line 161, in _get_kafka_consumer
j_flink_kafka_consumer = j_consumer_clz(topics,
File "/usr/local/lib/python3.8/dist-packages/pyflink/util/exceptions.py", line 185, in wrapped_call
raise TypeError(
TypeError: Could not found the Java class 'org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer'. The Java dependencies could be specified via command line argument '--jarfile' or the config option 'pipeline.jars'
org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095)
at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)
... 14 more
I imported the FlinkKafkaConsumer module into my code, but I still got an error when I ran it.