Spark type mismatch error

4.9k views Asked by At

I have a function below:-

def doSomething(line: RDD[(String, String)]): (String) = {
       val c = line.toLocalIterator.mkString
       val file2 = KeepEverythingExtractor.INSTANCE.getText(c)
       (file2)
    }      

It's of type org.apache.spark.rdd.RDD[(String, String)])String

I have some files stored at hdfs which I have to access as below:-

val logData = sc.wholeTextFiles("hdfs://localhost:9000/home/akshat/recipes/recipes/simplyrecipes/*/*/*/*")

It's of type org.apache.spark.rdd.RDD[(String, String)]

I have to map these files according to doSomething function

val mapper = logData.map(doSomething)

But an error comes out like this:-

<console>:32: error: type mismatch;
 found   : org.apache.spark.rdd.RDD[(String, String)] => String
 required: ((String, String)) => ?
       val mapper = logData.map(doSomething)
                                ^

I have defined in my function what type of input and output I should have and I am giving the input according to that only. Why is this error coming then and what should I change in order to rectify this error?
Thanks in advance!

1

There are 1 answers

0
zero323 On

What is passed to map function is not RDD[(String, String)] but sequence of pairs (String, String), hence the error. Same way when you map over list you don't get list itself, but elements of the list, one by one.

Lets say want to extract file path then what you need is something like this:

def doSomething(x: (String, String)): String = {
    x match {
        case (fname, content) => fname
    }
}

or simply:

logData.map(_._1)