Does Azure HD Insight support Auto Loader for new file detection?

395 views Asked by At

I'm referring to following link https://learn.microsoft.com/en-us/azure/databricks/spark/latest/structured-streaming/auto-loader which handles incremental files in azure databricks using spark streaming. I want to know does the HD insight cluster with Data lake stroage Gen2 support incremental files. I tried the example in HD insight spark cluster Im getting following error

Example code:

input_df = spark.readStream \
            .format("cloudFiles") \
            .option("cloudFiles.format","json") \
            .option("cloudFiles.connectionString", connection_string) \
            .option("cloudFiles.resourceGroup", resource_group) \
            .option("cloudFiles.subscriptionId", subscription_id) \
            .option("cloudFiles.tenantId", tenant_id) \
            .option("cloudFiles.clientId", client_id) \
            .option("cloudFiles.clientSecret", client_secret) \
            .option("cloudFiles.includeExistingFiles", "true") \
            .schema(schema) \
            .load(input_folder) 

Error

  Traceback (most recent call last):
  File "<stdin>", line 12, in <module>
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/streaming.py", line 398, in load
    return self._df(self._jreader.load(path))
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o95.load.
: java.lang.ClassNotFoundException: Failed to find data source: cloudFiles. Please find packages at http://spark.apache.org/third-party-projects.html
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:657)
        at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161)
        at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:225)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: cloudFiles.DefaultSource
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
        at scala.util.Try.orElse(Try.scala:84)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:634)

.

1

There are 1 answers

1
CHEEKATLAPRADEEP On BEST ANSWER

Unfortunately, Azure HDInsight does not support Auto Loader for new file detection.

What is Auto Loader?

Autoloader – new functionality from Databricks allowing to incrementally ingest data into Delta Lake from a variety of data sources. Auto Loader is an optimized cloud file source for Apache Spark that loads data continuously and efficiently from cloud storage as new data arrives. A data ingestion network of partner integrations allow you to ingest data from hundreds of data sources directly into Delta Lake.

Under the hood (in Azure Databricks), running Auto Loader will automatically set up an Azure Event Grid and Queue Storage services. Through these services, auto loader uses the queue from Azure Storage to easily find the new files, pass them to Spark and thus load the data with low latency and at a low cost within your streaming or batch jobs. The Auto Loader logs which files were processed which guarantees an exactly once processing of the incoming data.

Auto Loader incrementally and efficiently processes new data files as they arrive in cloud storage without any additional setup. Auto Loader provides a new Structured Streaming source called cloudFiles. Given an input directory path on the cloud file storage, the cloudFiles source automatically processes new files as they arrive, with the option of also processing existing files in that directory.

enter image description here

For details, see Load files from Azure Blob storage or Azure Data Lake Storage Gen2 using Auto Loader.