How can I read/write data from Azurite using Spark?

1.1k views Asked by At

I have tried to read/write Parquet files from/to Azurite using Spark like this:

import com.holdenkarau.spark.testing.DatasetSuiteBase
import org.apache.spark.SparkConf
import org.apache.spark.sql.SaveMode
import org.scalatest.WordSpec

class SimpleAzuriteSpec extends WordSpec with DatasetSuiteBase {
  val AzuriteHost = "localhost"
  val AzuritePort = 10000
  val AzuriteAccountName = "devstoreaccount1"
  val AzuriteAccountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
  val AzuriteContainer = "container1"
  val AzuriteDirectory = "dir1"
  val AzuritePath = s"wasb://$AzuriteContainer@$AzuriteAccountName.blob.core.windows.net/$AzuriteDirectory/"

  override final def conf: SparkConf = {
    val cfg = super.conf
    val settings =
      Map(
        s"spark.hadoop.fs.azure.storage.emulator.account.name" -> AzuriteAccountName,
        s"spark.hadoop.fs.azure.account.key.${AzuriteAccountName}.blob.core.windows.net" -> AzuriteAccountKey
      )
    settings.foreach { case (k, v) =>
      cfg.set(k, v)
    }
    cfg
  }

  "Spark" must {
    "write to/read from Azurite" in {
      import spark.implicits._
      val xs = List(Rec(1, "Alice"), Rec(2, "Bob"))
      val inputDs = spark.createDataset(xs)

      inputDs.write
        .format("parquet")
        .mode(SaveMode.Overwrite)
        .save(AzuritePath)

      val ds = spark.read
        .format("parquet")
        .load(AzuritePath)
        .as[Rec]

      ds.show(truncate = false)

      val actual = ds.collect().toList.sortBy(_.id)
      assert(actual == xs)
    }
  }
}

case class Rec(id: Int, name: String)
  • I have tried both Azurite 3.9.0 and Azurite 2.7.0 (both in Docker). I can transfer files to/from Azurite using az (dockerized as well).

  • The test above runs on the Docker host. Azurite is reachable from the Docker host.

I am using Spark 2.4.5, Hadoop 2.10.0, and this dependency:

libraryDependencies += "org.apache.hadoop" % "hadoop-azure" % "2.10.0"

When using az, this connection string works:

AZURE_STORAGE_CONNECTION_STRING="DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite-3.9.0:10000/devstoreaccount1;QueueEndpoint=http://azurite-3.9.0:10001/devstoreaccount1;"

yet I do not know how to configure this in Spark.

My question: How can I configure the host, the port, credentials etc. (in the path or in SparkConf)?

2

There are 2 answers

1
Frank On

Don't use Azurite, just add these Jars to your Spark Dockerfile:

# Set JARS env
ENV JARS=${SPARK_HOME}/jars/azure-storage-${AZURE_STORAGE_VER}.jar,${SPARK_HOME}/jars/hadoop-azure-${HADOOP_AZURE_VER}.jar,${SPARK_HOME}/jars/jetty-util-ajax-${JETTY_VER}.jar,${SPARK_HOME}/jars/jetty-util-${JETTY_VER}.jar

RUN echo "spark.jars ${JARS}" >> $SPARK_HOME/conf/spark-defaults.conf

Set your configuration:

spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
spark.sparkContext._jsc.hadoopConfiguration().set(f"fs.azure.account.key.{ os.environ['AZURE_STORAGE_ACCOUNT'] }.blob.core.windows.net", os.environ['AZURE_STORAGE_KEY'])

Then you can read it:

val df = spark.read.parquet("wasbs://<container-name>@<storage-account-name>.blob.core.windows.net/<directory-name>")
2
Vasily Malakhin On

Yes, that's possible, but azurite should be accessible via 127.0.0.1:10000 for wasb (so if it runs on another machine then port forwarding will help) and then specify following spark args as example:

./pyspark --conf "spark.hadoop.fs.defaultFS=wasb://container@azurite" --conf "spark.hadoop.fs.azure.storage.emulator.account.name=azurite"

Then default file system will be backed up by your instance of azurite.