Reading an Elasticsearch Index from PySpark

4.9k views Asked by At

Could anyone tell me why this test script for PySpark errors out? (python 3.6.8, hadoop 3.3.1, spark 3.2.1, elasticsearch-hadoop 7.14)

from pyspark.sql import SparkSession, SQLContext

myspark = SparkSession.builder \
  .appName("My test.") \
  .master("spark://xx.xx.xx:7077") \
  .config("es.nodes", "xx.xx.xx.xx") \
  .config("es.port", "9200") \
  .config("es.net.http.auth.user", "xxxx") \
  .config("es.net.http.auth.pass", "xxxx") \
  .getOrCreate()

mycontext = SQLContext(myspark)
myquery = '{ "query": { "match_all": {} }}'

myreader = mycontext.read.format("org.elasticsearch.spark.sql") \
  .option("es.nodes", "xx.xx.xx.xx") \
  .option("es.port", "9200") \
  .option("es.net.http.auth.user", "xxxx") \
  .option("es.net.http.auth.pass", "xxxx") \
  .option("es.query", myquery)

myframe = myreader.load("myindex")

The error I get on .load() is:

py4j.protocol.Py4JJavaError: An error occurred while calling 039.load.
: java.lang.NoClassDefFoundError: scala/Product$class
     at org.elasticsearch.spark.sql.ElasticsearchRelation.<init>(DefaultSource.scala:220)
     at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:97)
     ...

I also have a test snippet using the older SparkConf(), SparkContext(), and .newAPIHadoopRDD() and it works fine connecting to the same spark master and elastic cluster. So that rules out a lot of potential problems with my classpath or firewall or authentication.

1

There are 1 answers

0
Netanel Malka On

In order to work with spark 3.2.1 you need elasticsearch-hadoop version of 8.2.0.

You can see that on the release notes