Java spark and MongoDB: query only needed data

810 views Asked by At

I've been upgrading a JAVA spark project from using txt file input to reading from a MongoDB. My question is can we just query the data needed, for example, I have a millions of records. I want to get only the records from the beginning of this week and start processing on it.

Looking at MongoDB documentation, they all start like this:

  // Create a JavaSparkContext using the SparkSession's SparkContext object
    JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());

    // Load data and infer schema, disregard toDF() name as it returns Dataset
    Dataset<Row> implicitDS = MongoSpark.load(jsc).toDF();

Basically, the MongoSpark load the whole collection to the context and then transform it into a DF, which means even if I only need 1000 records of this week, the program still has to get the whole 1 million records before doing anything else. I wonder if there is something else which allow me to pass the query directly to MongoSpark instead of doing this?

Thank you.

1

There are 1 answers

0
Ross On

A DataFrame or even RDD's represent a lazy collection so doing:

Dataset<Row> implicitDS = MongoSpark.load(jsc).toDF();

Will not cause any compute to happen inside Spark and no data will be requested from MongoDB.

Only, when you do an action will Spark request data to be processed. At this stage the Mongo Spark Connector will partition the data you have requested and return the partition information to the Spark Driver. The Spark Driver will allocate tasks to the Spark Worker and each worker will ask for the relevant partition from the Mongo Spark Connector.

One of the nice features of DataFrames / Datasets is that when using filters the underlying Mongo Connector code constructs an aggregation pipeline to filter the data in MongoDB before sending it to Spark. This means that not all the data is sent across the wire! Just the data you need.

Things to be aware of make sure you are using the latest Mongo Spark Connector. Also there is a ticket to push the filters down into the partitioning logic as well. Potentially, reducing the number of empty partitions and providing further speed ups.