Using Flink I am trying to read data from Kafka, convert the Protobuf event to a Json string and write it to a table in Iceberg.
I've written the code by following the official docs, but I must have missed something. The job successfully connects to Kafka, and I can see it producing logs and parsing events, but it never writes to disk. It does however connect to Iceberg, and it seems to make an empty table. What am I missing, why is the sink never actually being triggered?
public class App {
private static final Logger LOG = LoggerFactory.getLogger(App.class);
public static void main(String[] args) throws Exception {
// Check if the S3 endpoint IP argument is provided
if (args.length < 1) {
throw new IllegalArgumentException("Please provide the S3 endpoint IP as an argument");
}
String s3EndpointIp = args[0];
LOG.info("JOB Env setup");
// Set up the Flink environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(
env,
EnvironmentSettings.newInstance().inStreamingMode().build());
// create the Nessie catalog
tableEnv.executeSql(
String.format(
"CREATE CATALOG iceberg WITH ("
+ "'type'='iceberg',"
+ "'catalog-impl'='org.apache.iceberg.nessie.NessieCatalog',"
+ "'io-impl'='org.apache.iceberg.aws.s3.S3FileIO',"
+ "'uri'='http://catalog:19120/api/v1',"
+ "'authentication.type'='none',"
+ "'ref'='main',"
+ "'client.assume-role.region'='local',"
+ "'warehouse' = 's3://warehouse',"
+ "'s3.endpoint'='http://%s:9000'"
+ ")",
s3EndpointIp));
// Set the current catalog to the new catalog
tableEnv.useCatalog("iceberg");
// Create a database in the current catalog
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS db");
// create the table
tableEnv.executeSql(
"CREATE TABLE IF NOT EXISTS db.table1 ("
+ "data STRING"
+ ")");
LOG.info("JOB Kafka setup");
KafkaSourceBuilder<String> kafkaSourceBuilder = KafkaSource
.<String>builder()
.setProperty("security.protocol", "SASL_SSL")
.setProperty("sasl.mechanism", "SCRAM-SHA-256")
.setProperty("ssl.truststore.location", "./jks/truststore.jks")
.setProperty("ssl.truststore.password", "strong-password")
.setProperty("sasl.jaas.config",
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"username\" " +
"password=\"strong-password\";")
.setStartingOffsets(OffsetsInitializer.earliest())
.setBootstrapServers("my-dns:my-port")
.setGroupId("datalake-flink-java")
.setTopics(Arrays.asList("my-kafka-topic"))
.setDeserializer(new EventDeserializationSchema()); // missing
KafkaSource<String> kafkaSource = kafkaSourceBuilder.build();
DataStream<String> processedStream = env
.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source")
.map(new MapFunction<String, String>() {
@Override
public String map(String value) {
LOG.info("Mapping value");
return value;
}
});
FileSink<String> sink = FileSink
.forRowFormat(new Path("db.table1"), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(5))
.withInactivityInterval(Duration.ofMinutes(2))
.withMaxPartSize(MemorySize.ofMebiBytes(256))
.build())
.build();
processedStream.sinkTo(sink);
env.execute("Kafka to Iceberg job");
}
public static class EventDeserializationSchema implements KafkaRecordDeserializationSchema<String> {
@Override
public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<String> collector) throws IOException {
try {
ResponseEnrichedEvent parsed = ResponseEnrichedEvent.parseFrom(record.value());
LOG.info("About to deserialize Kafka record");
String jsonString = JsonFormat.printer().print(parsed);
collector.collect(jsonString);
LOG.info("Successfully to deserialized Kafka record");
} catch (InvalidProtocolBufferException e) {
LOG.error("Failed to deserialize Kafka record", e);
}
}
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
}
}
console
datalake-flink-taskmanager-1 | 2023-10-26 17:06:59,925 INFO App [] - Mapping value
datalake-flink-taskmanager-1 | 2023-10-26 17:06:59,925 INFO App [] - Successfully to deserialized Kafka record
datalake-flink-taskmanager-1 | 2023-10-26 17:06:59,925 INFO App [] - About to deserialize Kafka record
datalake-flink-taskmanager-1 | 2023-10-26 17:06:59,925 INFO App [] - Mapping value
datalake-flink-taskmanager-1 | 2023-10-26 17:06:59,926 INFO App [] - Successfully to deserialized Kafka record
I do also see that the source is created, but not the sink.
I tried to rewrite the code so that the sink is in a separate class and to only use SQL, but I seem to end up having similar results.
I managed to make it work by changing my approach slightly, I added checkpointing as the comments suggested and then i used the
tableEnv
created directly viatableEnv.fromDataStream()
for writing, instead of using a FileSink. ThetableEnv
was already configured to use Iceberg directly.However, I do see that
tableEnv.fromDataStream
is considered deprecated for version 1.17+, so a more future proof solution could be found.So during init I did:
and then instead of using FileSink I did this: