I am trying to subscribe to a catalog for notification. But cannot find symbol "thenAppy". Please help.
// subscription to notifications
CompletionStage<NotificationSubscriptionControl> controlStage =
queryApi.subscribeToNotifications(consumerSettings)
.thenApply(subscription -> {
subscription
.notifications()
.runWith(Sink.foreach(notification ->
// this callback is called each time a new batch publication happens in catalog
System.out.printf("catalog %s has a new version %d\n", catalogHrn, notification.getCatalogVersion())
), myMaterializer);
return subscription.subscriptionControl();
});
[ERROR] COMPILATION ERROR : [INFO] ------------------------------------------------------------- [ERROR] Main.java:[41,25] cannot find symbol symbol: method thenApply((subscript...; }) location: interface org.apache.flink.streaming.api.functions.source.SourceFunction [ERROR] Main.java:[44,65] package akka.stream.javadsl does not exist [ERROR] Main.java:[47,40] cannot find symbol symbol: variable myMaterializer
Based on your compilation error, it appears you're trying to subscribe to a notification within a Flink app. The Flink data client queryApi returns a SourceFunction, not a CompletionStage. You can use it like so: