Using HDFS with Apache Spark on Amazon EC2

1.8k views Asked by At

I have a spark cluster setup by using the spark EC2 script. I setup the cluster and I am now trying to put a file on HDFS, so that I can have my cluster do work.

On my master I have a file data.txt. I added it to hdfs by doing ephemeral-hdfs/bin/hadoop fs -put data.txt /data.txt

Now, in my code, I have:

JavaRDD<String> rdd = sc.textFile("hdfs://data.txt",8);

I get an exception when doing this:

Exception in thread "main" java.net.UnknownHostException: unknown host: data.txt
    at org.apache.hadoop.ipc.Client$Connection.<init>(Client.java:214)
    at org.apache.hadoop.ipc.Client.getConnection(Client.java:1196)
    at org.apache.hadoop.ipc.Client.call(Client.java:1050)
    at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
    at com.sun.proxy.$Proxy6.getProtocolVersion(Unknown Source)
    at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:396)
    at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379)
    at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:119)
    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:238)
    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:203)
    at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:89)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:176)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.RangePartitioner.<init>(Partitioner.scala:123)
    at org.apache.spark.rdd.OrderedRDDFunctions.sortByKey(OrderedRDDFunctions.scala:62)
    at org.apache.spark.rdd.RDD.sortBy(RDD.scala:488)
    at org.apache.spark.api.java.JavaRDD.sortBy(JavaRDD.scala:188)
    at SimpleApp.sortBy(SimpleApp.java:118)
    at SimpleApp.main(SimpleApp.java:30)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

How do I properly put this file into HDFS, so that I can use my cluster to start working on the dataset? I also tried just adding the local file path such as:

JavaRDD<String> rdd = sc.textFile("/home/ec2-user/data.txt",8);

When I do this, and submit a job as:

./spark/bin/spark-submit --class SimpleApp --master spark://ec2-xxx.amazonaws.com:7077 --total-executor-cores 8 /home/ec2-user/simple-project-1.0.jar

I only have one executor and the worker nodes in the cluster don't seem to be getting involved. I assumed that it was because I was using a local file, and ec2 does not have a NFS.

2

There are 2 answers

0
Holden On

So the first part you need to provide after the // in the hdfs://data.txt is the hostname, so it would be hdfs://{active_master}:9000/data.txt (in case it is useful in the future, the default port with the spark-ec2 scripts for the persistent hdfs is 9010).

0
Alex North On

AWS Elastic Map Reduce now supports Spark natively, and includes HDFS out of the box.

See http://aws.amazon.com/elasticmapreduce/details/spark/, with more detail and a walkthrough in the introductory blog post.

Spark in EMR uses EMRFS to directly access data in S3 without needing to copy it into HDFS first.

The walkthrough includes an example of loading data from S3.