Unable to publish data to Kafka Topic using pyflink 1.17.1

216 views Asked by At

I am trying to publish the data which was originally a list but I converted it to the string and then tried to push it to the Kafka topic as per this official documentation I tried the below code:

sink = KafkaSink.builder() \
            .set_bootstrap_servers("localhost:9092") \
            .set_record_serializer(
            KafkaRecordSerializationSchema.builder()
            .set_topic("test-topic45")
            .set_value_serialization_schema(SimpleStringSchema())
            .build()
        ) \
            .build()
datastream.sink_to(sink)

but it threw the below error:

Caused by: java.lang.ClassCastException: class [B cannot be cast to class java.lang.String ([B and java.lang.String are in module java.base of loader 'bootstrap')

I even tried setting up the key serializer to SimpleStringSchema(which I don't think was needed) but same result.

Also, I don't need to convert explicitly since SimpleStringSchema will handle it for me also I have ensured my upstream layers are working fine. Absolutely no problem with that.

The upstream layer to this is the process function which returns a list of tuples of tuples and I haven't mentioned the output_type parameter for the process function. Should I mention it or will it be handled by SimpleStringSchema?

Is there anything else I am missing here?

Any hints are appreciated.

1

There are 1 answers

7
Jaehyeon Kim On BEST ANSWER

Yes, you're right. The error indicates the stream record doesn't match to what's been specified in the Kafka sink builder.

You would find JSON to be more useful than STRING and the Flink document has an example about how to serialise Kafka key and value with JSON - see link

Below shows a quick working example. Note you should have the connector JAR file in (./jars/flink-sql-connector-kafka-1.17.1.jar).

import os

from pyflink.common import Row
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, TimeCharacteristic
from pyflink.datastream.connectors.kafka import (
    KafkaSink,
    KafkaRecordSerializationSchema,
    DeliveryGuarantee,
)
from pyflink.datastream.formats.json import JsonRowSerializationSchema

BOOTSTRAP_SERVERS = os.getenv("BOOTSTRAP_SERVERS", "localhost:29092")

env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)
SRC_DIR = os.path.dirname(os.path.realpath(__file__))
jar_files = ["flink-sql-connector-kafka-1.17.1.jar"]
jar_paths = tuple([f"file://{os.path.join(SRC_DIR, 'jars', name)}" for name in jar_files])
print(jar_paths)
env.add_jars(*jar_paths)

value_type_info = Types.ROW_NAMED(
    field_names=["data"],
    field_types=[Types.STRING()],
)

source_stream = env.from_collection(
    collection=[
        [
            (("user1", "gold"), ("user2", "gold"), ("user5", "gold")),
            (("user3", "gold"), ("user4", "gold"), ("user6", "gold")),
        ]
    ]
)


sink = (
    KafkaSink.builder()
    .set_bootstrap_servers(BOOTSTRAP_SERVERS)
    .set_record_serializer(
        KafkaRecordSerializationSchema.builder()
        .set_topic("sink-demo")
        .set_value_serialization_schema(
            JsonRowSerializationSchema.builder().with_type_info(value_type_info).build()
        )
        .build()
    )
    .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE)
    .build()
)

# source_stream.map(lambda e: Row(data=str(e)), output_type=value_type_info).print()
source_stream.map(lambda e: Row(data=str(e)), output_type=value_type_info).sink_to(sink).name(
    "sink-demo"
).uid("sink-demo")

env.execute("Kafka sink example.")

If successful, you should be able see the following Kafka message.

enter image description here

Also the message value can be parsed as shown below. Note it only works with Python and wouldn't be recommended if the messages are shared by other data types.

enter image description here