I'm wondering if Apache Beam supports windows azure storage blob files(wasb) IO. Is there any support yet?
I'm asking because I've deployed an apache beam application to run a job on an Azure Spark cluster and basically it's impossible to IO wasb files from the associated storage container with that spark cluster. Is there any alternative solution?
Context: I'm trying to run the WordCount example on my Azure Spark Cluster. Already had set some components as stated here believing that would help me. Below is the part of my code where I'm setting the hadoop configuration:
final SparkPipelineOptions options = PipelineOptionsFactory.create().as(SparkPipelineOptions.class);
options.setAppName("WordCountExample");
options.setRunner(SparkRunner.class);
options.setSparkMaster("yarn");
JavaSparkContext context = new JavaSparkContext();
Configuration conf = context.hadoopConfiguration();
conf.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem");
conf.set("fs.azure.account.key.<storage-account>.blob.core.windows.net",
"<key>");
options.setProvidedSparkContext(context);
Pipeline pipeline = Pipeline.create(options);
But unfortunately I keep ending with the following error:
java.lang.IllegalStateException: Failed to validate wasb://<storage-container>@<storage-account>.blob.core.windows.net/user/spark/kinglear.txt
at org.apache.beam.sdk.io.TextIO$Read$Bound.apply(TextIO.java:288)
at org.apache.beam.sdk.io.TextIO$Read$Bound.apply(TextIO.java:195)
at org.apache.beam.sdk.runners.PipelineRunner.apply(PipelineRunner.java:76)
at org.apache.beam.runners.spark.SparkRunner.apply(SparkRunner.java:129)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:400)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:323)
at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:58)
at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:173)
at spark.example.WordCount.main(WordCount.java:47)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:627)
Caused by: java.io.IOException: Unable to find handler for wasb://<storage-container>@<storage-account>.blob.core.windows.net/user/spark/kinglear.txt
at org.apache.beam.sdk.util.IOChannelUtils.getFactory(IOChannelUtils.java:187)
at org.apache.beam.sdk.io.TextIO$Read$Bound.apply(TextIO.java:283)
... 13 more
I was thinking about implementing a customized IO for Apache Beam in this case for Azure Storage Blobs if comes to that as a solution, I'd like to check with the community if that's an alternative solution.
Apache Beam doesn't have a built-in connector for Windows Azure Storage Blob (WASB) at this moment.
There's an active effort in the Apache Beam project to add support for the
HadoopFileSystem
. I believe WASB has a connector forHadoopFileSystem
in thehadoop-azure
module. This would make WASB available with Beam indirectly -- this is likely the easiest path forward, and it should be ready very soon.Now, it would be great to have a native support for WASB in Beam. It would likely enable another level of performance, and should be relatively straightforward to implement. As far as I'm aware, nobody is actively working on it, but this would be an awesome contribution to the project! (If you are personally interested in contributing, please reach out!)