I am trying to read data into spark using the mongo-hadoop connector. The problem is that if I am trying to set a limit regarding the data read, I get in the RDD the limit * the number of partitions.
mongodbConfig.set("mongo.job.input.format","com.mongodb.hadoop.MongoInputFormat");
mongodbConfig.set("mongo.input.uri", "mongodb://localhost:27017/test.restaurants");
mongodbConfig.set("mongo.input.limit","3");
JavaPairRDD<Object, BSONObject> documents = sc.newAPIHadoopRDD(
mongodbConfig, // Configuration
MongoInputFormat.class, // InputFormat: read from a live cluster.
Object.class, // Key class
BSONObject.class // Value class
);
long count = documents.count();
System.out.println("Collection Count: " + count);
System.out.println("Partitions: " + documents.partitions().size());
//9 elements in the RDD = limit * nrOfPartions = 3 * 3
//3 partitions
This behavior is reproducible for others limits (I always get limit * 3).
I get similar behavior if I try to query simply by objectId (It creates an RDD with the same object * number of partitions - in my case 3 elements with same document).
I can also provide the script for creating the mongo collection if it would be helpfull.
It is a feature not a bug.
mongo.input.limitis used to setlimitparameter forMongoInputSplithence it is applied on partition by partition basis not globally.In general it is not possible (or to be precise, practical) to limit number of fetched records globally. Each split is processed independently and typically there is no a priori knowledge about the number of records which is yielded from each split.