Value registerAsTable is not a member of org.apache.spark.rdd.RDD[Tweet]

1.7k views Asked by At

I am trying to extract twitter data using rest API in zeppelin. Tried both option registerAsTable and registerTempTable, both ways are not working. Please help me to resolve the error. Getting below error while executing zeppelin Tutorial Code:

error: value registerAsTable is not a member of org.apache.spark.rdd.RDD[Tweet] ).foreachRDD(rdd=> rdd.registerAsTable("tweets")

3

There are 3 answers

0
sag On

RDD cannot be registered as Table whereas dataframe can. You can convert your RDD into dataframe and then write the resulting dataframe as tempTable or table.

You can convert RDD into Dataframe as below

val sqlContext = new SQLContext(sc) 
import sqlContext.implicits._
rdd.toDF()

Refer How to convert rdd object to dataframe in spark and http://spark.apache.org/docs/latest/sql-programming-guide.html

0
donald On

in zepplin interpretors add external dependency of org.apache.bahir:spark-streaming-twitter_2.11:2.0.0 from GUI and after that run following using spark-2.0.1

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.{ SparkConf, SparkContext}
import org.apache.spark.storage.StorageLevel
import scala.io.Source
//import org.apache.spark.Logging
import java.io.File
import org.apache.log4j.Logger
import org.apache.log4j.Level
import sys.process.stringSeqToProcess

import scala.collection.mutable.HashMap
/** Configures the Oauth Credentials for accessing Twitter */
def configureTwitterCredentials(apiKey: String, apiSecret: String, accessToken: String, accessTokenSecret: String) {
  val configs = new HashMap[String, String] ++= Seq(
    "apiKey" -> apiKey, "apiSecret" -> apiSecret, "accessToken" -> accessToken, "accessTokenSecret" -> accessTokenSecret)
  println("Configuring Twitter OAuth")
  configs.foreach{ case(key, value) =>
    if (value.trim.isEmpty) {
      throw new Exception("Error setting authentication - value for " + key + " not set")
    }
    val fullKey = "twitter4j.oauth." + key.replace("api", "consumer")
    System.setProperty(fullKey, value.trim)
    println("\tProperty " + fullKey + " set as [" + value.trim + "]")
  }
  println()
}


// Configure Twitter credentials , following config values will not work,it is for show off
val apiKey = "7AVLnhssAqumpgY6JtMa59w6Tr"
val apiSecret = "kRLstZgz0BYazK6nqfMkPvtJas7LEqF6IlCp9YB1m3pIvvxrRZl"
val accessToken = "79438845v6038203392-CH8jDX7iUSj9xmQRLpHqLzgvlLHLSdQ"
val accessTokenSecret = "OXUpYu5YZrlHnjSacnGJMFkgiZgi4KwZsMzTwA0ALui365"
configureTwitterCredentials(apiKey, apiSecret, accessToken, accessTokenSecret)

import org.apache.spark.{ SparkConf, SparkContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.SparkContext._

val ssc = new StreamingContext(sc, Seconds(2))

val tweets = TwitterUtils.createStream(ssc, None)
val twt = tweets.window(Seconds(10))

//twt.print


val sqlContext= new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

case class Tweet(createdAt:Long, text:String)

val tweet = twt.map(status=>
  Tweet(status.getCreatedAt().getTime()/1000, status.getText())
)


tweet.foreachRDD(rdd=>rdd.toDF.registerTempTable("tweets"))
ssc.start()
//ssc.stop()

After that run some queries in the table in another zappelin cell

%sql select createdAt, text  from tweets   limit 50
1
vaquar khan On
val data = sc.textFile("/FileStore/tables/uy43p2971496606385819/testweet.json");

//convert RDD to DF

val inputs= data.toDF();
inputs.createOrReplaceTempView("tweets");