Flink with Kafka source and Iceberg sink doesn't write

477 views Asked by At

Using Flink I am trying to read data from Kafka, convert the Protobuf event to a Json string and write it to a table in Iceberg.

I've written the code by following the official docs, but I must have missed something. The job successfully connects to Kafka, and I can see it producing logs and parsing events, but it never writes to disk. It does however connect to Iceberg, and it seems to make an empty table. enter image description here What am I missing, why is the sink never actually being triggered?



public class App {


    private static final Logger LOG = LoggerFactory.getLogger(App.class);

    public static void main(String[] args) throws Exception {

        // Check if the S3 endpoint IP argument is provided
        if (args.length < 1) {
            throw new IllegalArgumentException("Please provide the S3 endpoint IP as an argument");
        }
        String s3EndpointIp = args[0];

        LOG.info("JOB Env setup");

        // Set up the Flink environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(
                env,
                EnvironmentSettings.newInstance().inStreamingMode().build());

        // create the Nessie catalog
        tableEnv.executeSql(
                String.format(
                        "CREATE CATALOG iceberg WITH ("
                                + "'type'='iceberg',"
                                + "'catalog-impl'='org.apache.iceberg.nessie.NessieCatalog',"
                                + "'io-impl'='org.apache.iceberg.aws.s3.S3FileIO',"
                                + "'uri'='http://catalog:19120/api/v1',"
                                + "'authentication.type'='none',"
                                + "'ref'='main',"
                                + "'client.assume-role.region'='local',"
                                + "'warehouse' = 's3://warehouse',"
                                + "'s3.endpoint'='http://%s:9000'"
                                + ")",
                        s3EndpointIp));

        // Set the current catalog to the new catalog
        tableEnv.useCatalog("iceberg");

        // Create a database in the current catalog
        tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS db");

        // create the table
        tableEnv.executeSql(
                "CREATE TABLE IF NOT EXISTS db.table1 ("
                        + "data STRING"
                        + ")");

        LOG.info("JOB Kafka setup");

        KafkaSourceBuilder<String> kafkaSourceBuilder = KafkaSource
                .<String>builder()
                .setProperty("security.protocol", "SASL_SSL")
                .setProperty("sasl.mechanism", "SCRAM-SHA-256")
                .setProperty("ssl.truststore.location", "./jks/truststore.jks")
                .setProperty("ssl.truststore.password", "strong-password")
                .setProperty("sasl.jaas.config",
                        "org.apache.kafka.common.security.scram.ScramLoginModule required " +
                                "username=\"username\" " +
                                "password=\"strong-password\";")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setBootstrapServers("my-dns:my-port")
                .setGroupId("datalake-flink-java")
                .setTopics(Arrays.asList("my-kafka-topic"))
                .setDeserializer(new EventDeserializationSchema()); // missing
        KafkaSource<String> kafkaSource = kafkaSourceBuilder.build();


        DataStream<String> processedStream = env
            .fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source")
            .map(new MapFunction<String, String>() {
                @Override
                public String map(String value) {
                    LOG.info("Mapping value");
                    return value;
                }
            });

        FileSink<String> sink = FileSink
            .forRowFormat(new Path("db.table1"), new SimpleStringEncoder<String>("UTF-8"))
            .withRollingPolicy(
                DefaultRollingPolicy.builder()
                    .withRolloverInterval(Duration.ofMinutes(5))
                    .withInactivityInterval(Duration.ofMinutes(2))
                    .withMaxPartSize(MemorySize.ofMebiBytes(256))
                    .build())
            .build();

        processedStream.sinkTo(sink);

        env.execute("Kafka to Iceberg job");

    }


    public static class EventDeserializationSchema implements KafkaRecordDeserializationSchema<String> {

        @Override
        public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<String> collector) throws IOException {
            try {
                ResponseEnrichedEvent parsed = ResponseEnrichedEvent.parseFrom(record.value());
                LOG.info("About to deserialize Kafka record");
                String jsonString = JsonFormat.printer().print(parsed);
                collector.collect(jsonString);
                LOG.info("Successfully to deserialized Kafka record");
            } catch (InvalidProtocolBufferException e) {
                LOG.error("Failed to deserialize Kafka record", e);
            }
        }

        @Override
        public TypeInformation<String> getProducedType() {
            return TypeInformation.of(String.class);
        }
    }
    
}

console

datalake-flink-taskmanager-1  | 2023-10-26 17:06:59,925 INFO  App                                              [] - Mapping value
datalake-flink-taskmanager-1  | 2023-10-26 17:06:59,925 INFO  App                                              [] - Successfully to deserialized Kafka record
datalake-flink-taskmanager-1  | 2023-10-26 17:06:59,925 INFO  App                                              [] - About to deserialize Kafka record
datalake-flink-taskmanager-1  | 2023-10-26 17:06:59,925 INFO  App                                              [] - Mapping value
datalake-flink-taskmanager-1  | 2023-10-26 17:06:59,926 INFO  App                                              [] - Successfully to deserialized Kafka record

I do also see that the source is created, but not the sink.

I tried to rewrite the code so that the sink is in a separate class and to only use SQL, but I seem to end up having similar results.

2

There are 2 answers

0
Shaflump On

I managed to make it work by changing my approach slightly, I added checkpointing as the comments suggested and then i used the tableEnv created directly via tableEnv.fromDataStream() for writing, instead of using a FileSink. The tableEnv was already configured to use Iceberg directly.

However, I do see that tableEnv.fromDataStream is considered deprecated for version 1.17+, so a more future proof solution could be found.

So during init I did:

// Set up the Flink environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(5000);
        final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(
                env,
                EnvironmentSettings.newInstance().inStreamingMode().build());

and then instead of using FileSink I did this:


        // Convert DataStream to Table
        org.apache.flink.table.api.Table processedTable = tableEnv.fromDataStream(processedStream, $("data"));


        // Write Table to Iceberg
        processedTable.executeInsert("db.table1");
0
Peter Vary On

Your code correctly creates the Iceberg table, but you are trying to write the data using FileSink (I assume this is org.apache.flink.connector.file.sink.FileSink).

If your goal is to write to the Iceberg table, you should use org.apache.iceberg.flink.sink.FlinkSink like mentioned here: https://iceberg.apache.org/docs/latest/flink/#writing

For example:

FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.append();

Or, as you have already discovered, you can use FlinkSQL to write the data:

processedTable.executeInsert("db.table1");

Or the same with actual SQL commands:

INSERT INTO `hive_catalog`.`db`.`table1` SELECT id, data from other_kafka_table;