I am using Pyspark 2.4.4.
I want to load into a spark dataframe some parquet files that are in a s3 bucket and I want to read all these files at once.
I have been looking how to do it in these links:
- How to read parquet data from S3 to spark dataframe Python?
- Unable to read from s3 bucket using spark
- https://gist.github.com/asmaier/5768c7cda3620901440a62248614bbd0
I have tried in multiple ways but I cannot load the files, I have tried for example:
import os
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pandas as pd
import databricks.koalas as ks
import boto3
from boto3.session import Session
import botocore
from zipfile import ZipFile
import urllib
import datetime
import os
from s3fs import S3FileSystem
import dask.dataframe as dd
aws_region = 'ap-southeast-1'
# Create Spark config for our Kubernetes based cluster manager
sparkConf = SparkConf()
sparkConf.setMaster("k8s://https://kubernetes.default.svc.cluster.local:443")
sparkConf.setAppName("spark")
sparkConf.set("spark.kubernetes.container.image", "<myimage>")
sparkConf.set("spark.kubernetes.container.image.pullSecrets", "<secret>")
sparkConf.set("spark.kubernetes.namespace", "spark")
sparkConf.set("spark.executor.instances", "3")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.memory", "512m")
sparkConf.set("spark.executor.memory", "512m")
sparkConf.set("spark.kubernetes.pyspark.pythonVersion", "3")
sparkConf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "spark")
sparkConf.set("spark.kubernetes.authenticate.serviceAccountName", "spark")
sparkConf.set("spark.driver.port", "29413")
sparkConf.set("spark.driver.host", "<HOST>")
sparkConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sparkConf.set("com.amazonaws.services.s3.enableV4", "true")
sparkConf.set("fs.s3a.access.key", "<mykey>")
sparkConf.set("fs.s3a.secret.key", "<mysecret>")
sparkConf.set("fs.s3a.connection.maximum", "100000")
# see https://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region
sparkConf.set("fs.s3a.endpoint", "s3." + aws_region + ".amazonaws.com")
# Initialize our Spark cluster, this will actually
# generate the worker nodes.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
sc = spark.sparkContext
df = spark.read.parquet(f"s3a://<path>")
Also I have tried:
import os
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pandas as pd
import databricks.koalas as ks
import boto3
from boto3.session import Session
import botocore
from zipfile import ZipFile
import urllib
import datetime
import os
from s3fs import S3FileSystem
import dask.dataframe as dd
aws_region = 'ap-southeast-1'
# Create Spark config for our Kubernetes based cluster manager
sparkConf = SparkConf()
sparkConf.setMaster("k8s://https://kubernetes.default.svc.cluster.local:443")
sparkConf.setAppName("spark")
sparkConf.set("spark.kubernetes.container.image", "<myimage>")
sparkConf.set("spark.kubernetes.container.image.pullSecrets", "<secret>")
sparkConf.set("spark.kubernetes.namespace", "spark")
sparkConf.set("spark.executor.instances", "3")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.memory", "512m")
sparkConf.set("spark.executor.memory", "512m")
sparkConf.set("spark.kubernetes.pyspark.pythonVersion", "3")
sparkConf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "spark")
sparkConf.set("spark.kubernetes.authenticate.serviceAccountName", "spark")
sparkConf.set("spark.driver.port", "29413")
sparkConf.set("spark.driver.host", "<HOST>")
# Initialize our Spark cluster, this will actually
# generate the worker nodes.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
sc = spark.sparkContext
sc.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")
hadoop_conf=sc._jsc.hadoopConfiguration()
aws_region = 'ap-southeast-1'
# see https://stackoverflow.com/questions/43454117/how-do-you-use-s3a-with-spark-2-1-0-on-aws-us-east-2
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("com.amazonaws.services.s3.enableV4", "true")
hadoop_conf.set("fs.s3a.access.key", "<KEY>")
hadoop_conf.set("fs.s3a.secret.key", "<SECRET>")
hadoop_conf.set("fs.s3a.connection.maximum", "100000")
# see https://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region
hadoop_conf.set("fs.s3a.endpoint", "s3." + aws_region + ".amazonaws.com")
import pyspark
date = datetime.datetime.today() - datetime.timedelta(days=2)
path = '<path>'
sql=pyspark.sql.SparkSession(sc)
sc.parquet("s3a://" + path)
But I have this error:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-6-14c1e166e21f> in <module>
1 date = datetime.datetime.today() - datetime.timedelta(days=2)
----> 2 df = spark.read.parquet(f"s3a://cp-datadumps/MCF/2020/10/17/advances/advances.parquet_0_0_0.snappy.parquet")
/usr/local/spark/python/pyspark/sql/readwriter.py in parquet(self, *paths)
314 [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
315 """
--> 316 return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
317
318 @ignore_unicode_prefix
/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o209.parquet.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2195)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2654)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:547)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:355)
at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:644)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193)
... 30 more
I know that the path is correct because using das I am able to load the data: `
storage_options = {
"key": "<MYKEY>",
"secret": "<MYSECRET>",
}
s3 = S3FileSystem(**storage_options)
s3.invalidate_cache()
df1 = dd.read_parquet(f"s3://<path>", storage_options=storage_options)
The issue is hidden at the end of the Java stacktrace and is independent from the file being Parquet. What is missing is the libraries that are needed for the S3A filesystem are not available.
You need to make sure that the hadoop-aws JAR is on the classpath. This JAR contains the class
org.apache.hadoop.fs.s3a.S3AFileSystem
which could not be found in the above code.More information about these JARs can be found on https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html#Getting_Started