How to use flink-fs-azure-hadoop plugin in pyflink 1.17.2?

60 views Asked by At

Im trying to use Azure Delta Storage as data destination.

I'm runing the job in my local machine using pycharm, also i installed the plugin flink-fs-azure-hadoop who give me the conection with azure support, just as the flink documentation says, this is my workaspace.

but when i run my code i geting this exception (full ex):

Traceback (most recent call last):   File "test.py", line 72, in \<module\>     kafka_to_delta_lake()   File "test.py", line 65, in kafka_to_delta_lake     table.execute_insert('my_sink').wait()   File "table_result.py", line 76, in wait     get_method(self.\_j_table_result, "await")()   File "java_gateway.py", line 1322, in __call__     return_value = get_return_value(   File "exceptions.py", line 146, in deco     return f(\*a, \*\*kw)   File "protocol.py", line 326, in get_return_value     raise Py4JJavaError( py4j.protocol.Py4JJavaError: An error occurred while calling o153.await. : java.util.concurrent.ExecutionException: org.apache.flink.table.api.TableException: Failed to wait job finish  at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)   at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)    at org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:118)  at org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:81)   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)     at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)     at java.base/java.lang.reflect.Method.invoke(Method.java:566)   at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)  at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)    at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)     at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)  at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)    at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)    at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.flink.table.api.TableException: Failed to wait job finish  at org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:85)   at org.apache.flink.table.api.internal.InsertResultProvider.isFirstRowReady(InsertResultProvider.java:71)   at org.apache.flink.table.api.internal.TableResultImpl.lambda$awaitInternal$1(TableResultImpl.java:105)     at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1736)   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)    ... 1 more Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.     at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)   at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)    at org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83)   ... 6 more Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.  at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)    at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)  at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)   at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:267)   at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)     at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)     at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)   at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1300)    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)  at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)  at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)     at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)     at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)     at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)   at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)   at akka.dispatch.OnComplete.internal(Future.scala:300)  at akka.dispatch.OnComplete.internal(Future.scala:297)  at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)    at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)     at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)     at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)    at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)   at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)   at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)  at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:622)     at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)  at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)  at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536)     at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)    at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)     at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)     at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)     at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)   at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)   at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)  at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)   at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)     at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy  at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)   at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)     at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258)     at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:249)     at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:242)  at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:748)  at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:725)    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:80)     at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:479)    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)     at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)     at java.base/java.lang.reflect.Method.invoke(Method.java:566)   at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)   at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)  at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)   at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)  at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)    at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)  at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)  at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)  at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)  at akka.actor.Actor.aroundReceive(Actor.scala:537)  at akka.actor.Actor.aroundReceive$(Actor.scala:535)     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)     at akka.actor.ActorCell.invoke(ActorCell.scala:547)     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)  at akka.dispatch.Mailbox.run(Mailbox.scala:231)     at akka.dispatch.Mailbox.exec(Mailbox.scala:243)    ... 5 more **Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'wasb'. The scheme is directly supported by Flink through the following plugin(s): flink-fs-azure-hadoop. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/ for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.**   at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:515)  at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)     at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBucketWriter(StreamingFileSink.java:430)  at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBuckets(StreamingFileSink.java:440)   at org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(AbstractStreamingWriter.java:96)    at org.apache.flink.connector.file.table.stream.StreamingFileWriter.initializeState(StreamingFileWriter.java:81)    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)     at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274)     at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)     at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734)    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)   at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709)     at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)     at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)     at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)     at java.base/java.lang.Thread.run(Thread.java:829)

The important info i think is:

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'wasb'. The scheme is directly supported by Flink through the following plugin(s): flink-fs-azure-hadoop. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/ for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.

If anyone knows how to solve it I would appreciate it :)

I'm using pyflink v1.17.2, python 8 and windows 11 and i'm trying to connect to a delta storage via wasb.

0

There are 0 answers