MongoHadoop Connector used with Spark duplicates results by number of partitions

300 views Asked by At

I am trying to read data into spark using the mongo-hadoop connector. The problem is that if I am trying to set a limit regarding the data read, I get in the RDD the limit * the number of partitions.

mongodbConfig.set("mongo.job.input.format","com.mongodb.hadoop.MongoInputFormat");
mongodbConfig.set("mongo.input.uri", "mongodb://localhost:27017/test.restaurants");
mongodbConfig.set("mongo.input.limit","3");
JavaPairRDD<Object, BSONObject> documents = sc.newAPIHadoopRDD(
            mongodbConfig,            // Configuration
            MongoInputFormat.class,   // InputFormat: read from a live cluster.
            Object.class,             // Key class
            BSONObject.class          // Value class
    );

    long count = documents.count();
    System.out.println("Collection Count: " + count);
    System.out.println("Partitions: " + documents.partitions().size());

//9 elements in the RDD = limit * nrOfPartions = 3 * 3
//3 partitions

This behavior is reproducible for others limits (I always get limit * 3).

I get similar behavior if I try to query simply by objectId (It creates an RDD with the same object * number of partitions - in my case 3 elements with same document).

I can also provide the script for creating the mongo collection if it would be helpfull.

1

There are 1 answers

0
zero323 On

It is a feature not a bug. mongo.input.limit is used to set limit parameter for MongoInputSplit hence it is applied on partition by partition basis not globally.

In general it is not possible (or to be precise, practical) to limit number of fetched records globally. Each split is processed independently and typically there is no a priori knowledge about the number of records which is yielded from each split.