Does reading multiple files & collect bring them to driver in spark

1.8k views Asked by At

Code snippet :

val inp = sc.textFile("C:\\mk\\logdir\\foldera\\foldera1\\log.txt").collect.mkString(" ")

I know above code reads the entire file & combine them in one string & executes it driver node(single execution. not parallel one).

 val inp = sc.textFile("C:\\mk\\logdir\\*\\*\\log.txt")
 code block{ }
 sc.stop

Q1 )Here I am reading multiple files (which are present in above folder structure). I believe in this case each file will be created as partition & will be sent to separate node & executed parallely. Am I correct in my understanding? Can someone confirm this? Or is there anyway i can confirm it systematically?

val inp = sc.textFile("C:\\mk\\logdir\\*\\*\\log.txt")
val cont = inp.collect.mkString(" ")
 code block{ }
 sc.stop

Q2) How the spark handles this case. though I am doing collect, I assume that it will not collect all content from all files but just the one file . Am I right? Can someone help me understand this?

Thank you very much in Advance for your time & help.

2

There are 2 answers

0
Ram Ghadiyaram On BEST ANSWER

Q1 )Here I am reading multiple files (which are present in above folder structure). I believe in this case each file will be created as partition & will be sent to separate node & executed parallely. Am I correct in my understanding? Can someone confirm this? Or is there anyway i can confirm it systematically?

ANSWER :

SparkContext’s TextFile method, i.e., sc.textFile creates a RDD with each line as an element. If there are 10 files in data i.e yourtextfilesfolder folder, 10 partitions will be created. You can verify the number of partitions by:

yourtextfilesfolder.partitions.length

However, Partitioning is determined by data locality. This may result in too few partitions by default. AFAIK there is no guarantee that one partition will be created please see the code of 'SparkContext.textFile'.

& 'minPartitions' - suggested minimum number of partitions for the resulting RDD

For better understanding see below method.

/**
           * Read a text file from HDFS, a local file system (available on all nodes), or any
           * Hadoop-supported file system URI, and return it as an RDD of Strings.
           */
          def textFile(
              path: String,
              minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
            assertNotStopped()
            hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
              minPartitions).map(pair => pair._2.toString).setName(path)
          }

you can mention minPartitions as shown above from SparkContext.scala

Q2) How the spark handles this case. though I am doing collect, I assume that it will not collect all content from all files but just the one file . Am I right? Can someone help me understand this?

ANSWER : Your rdd constructed with multiple text files. so collect will collect from all partitions to driver from different files, not one file at a time.

you can verify : using rdd.collect


However, If you want read multiple text files you can also use wholeTextFiles please see the @note in below method Small files are preferred, large file is also allowable, but may cause bad performance.

See spark-core-sc-textfile-vs-sc-wholetextfiles

Doc :

RDD> wholeTextFiles(String path, int minPartitions) Read a directory of text files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI.

/**
   * Read a directory of text files from HDFS, a local file system (available on all nodes), or any
   * Hadoop-supported file system URI. Each file is read as a single record and returned in a
   * key-value pair, where the key is the path of each file, the value is the content of each file.
   *
   * <p> For example, if you have the following files:
   * {{{
   *   hdfs://a-hdfs-path/part-00000
   *   hdfs://a-hdfs-path/part-00001
   *   ...
   *   hdfs://a-hdfs-path/part-nnnnn
   * }}}
   *
   * Do `val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path")`,
   *
   * <p> then `rdd` contains
   * {{{
   *   (a-hdfs-path/part-00000, its content)
   *   (a-hdfs-path/part-00001, its content)
   *   ...
   *   (a-hdfs-path/part-nnnnn, its content)
   * }}}
   *
   * @note Small files are preferred, large file is also allowable, but may cause bad performance.
   * @note On some filesystems, `.../path/&#42;` can be a more efficient way to read all files
   *       in a directory rather than `.../path/` or `.../path`
   * @note Partitioning is determined by data locality. This may result in too few partitions
   *       by default.
   *
   * @param path Directory to the input data files, the path can be comma separated paths as the
   *             list of inputs.
   * @param minPartitions A suggestion value of the minimal splitting number for input data.
   * @return RDD representing tuples of file path and the corresponding file content
   */
  def wholeTextFiles(
      path: String,
      minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope {
.....
  }

Examples :

val distFile = sc.textFile("data.txt")
Above command returns the content of the file:
scala> distFile.collect()
res16: Array[String] = Array(1,2,3, 4,5,6)


 SparkContext.wholeTextFiles can return (filename, content).
    val distFile = sc.wholeTextFiles("/tmp/tmpdir")

scala> distFile.collect()
res17: Array[(String, String)] =
Array((maprfs:/tmp/tmpdir/data3.txt,"1,2,3
4,5,6
"), (maprfs:/tmp/tmpdir/data.txt,"1,2,3
4,5,6
"), (maprfs:/tmp/tmpdir/data2.txt,"1,2,3
4,5,6
"))

In your case I d prefer SparkContext.wholeTextFiles where you can get filename and its content after collect as described above, if thats the thing you wanted.

3
himanshuIIITian On

Spark is a fast and general engine for large-scale data processing. It processes all the data in parallel. So, to answer first question, then Yes, in following case:

val inp = sc.textFile("C:\\mk\\logdir\\*\\*\\log.txt")
code block{ }
sc.stop

Each file will be created as partition & will be sent to separate node & executed in parallel. But, depending on the size of a file number of partitions can be greater than the number of files being processed. For example, if log.txt in folder1 and folder2 are of few KB in size, then only 2 partitions are created as there will 2 files and they will be processed in parallel.

But, if log.txt in folder1 has size in GB(s), then multiple partitions will be created for it and number of partitions will be greater than the number of files.

However, we can always change the number of partitions of an RDD using repartition() or coalesce() method.

To answer second question, then in following case:

val inp = sc.textFile("C:\\mk\\logdir\\*\\*\\log.txt")
val cont = inp.collect.mkString(" ")
code block{ }
sc.stop

Spark will collect content from all files and not just from one file. Since, collect() means to get all content in stored an RDD and get it back to Driver in form of a collection.