I'm referring to following link https://learn.microsoft.com/en-us/azure/databricks/spark/latest/structured-streaming/auto-loader which handles incremental files in azure databricks using spark streaming. I want to know does the HD insight cluster with Data lake stroage Gen2 support incremental files. I tried the example in HD insight spark cluster Im getting following error
Example code:
input_df = spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format","json") \
.option("cloudFiles.connectionString", connection_string) \
.option("cloudFiles.resourceGroup", resource_group) \
.option("cloudFiles.subscriptionId", subscription_id) \
.option("cloudFiles.tenantId", tenant_id) \
.option("cloudFiles.clientId", client_id) \
.option("cloudFiles.clientSecret", client_secret) \
.option("cloudFiles.includeExistingFiles", "true") \
.schema(schema) \
.load(input_folder)
Error
Traceback (most recent call last):
File "<stdin>", line 12, in <module>
File "/usr/hdp/current/spark2-client/python/pyspark/sql/streaming.py", line 398, in load
return self._df(self._jreader.load(path))
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o95.load.
: java.lang.ClassNotFoundException: Failed to find data source: cloudFiles. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:657)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:225)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: cloudFiles.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:634)
.
Unfortunately, Azure HDInsight does not support Auto Loader for new file detection.
Autoloader – new functionality from Databricks allowing to incrementally ingest data into Delta Lake from a variety of data sources. Auto Loader is an optimized cloud file source for Apache Spark that loads data continuously and efficiently from cloud storage as new data arrives. A data ingestion network of partner integrations allow you to ingest data from hundreds of data sources directly into Delta Lake.
Auto Loader incrementally and efficiently processes new data files as they arrive in cloud storage without any additional setup. Auto Loader provides a new Structured Streaming source called cloudFiles. Given an input directory path on the cloud file storage, the cloudFiles source automatically processes new files as they arrive, with the option of also processing existing files in that directory.
For details, see Load files from Azure Blob storage or Azure Data Lake Storage Gen2 using Auto Loader.