Spark/Hadoop/Yarn cluster communication requires external ip?

860 views Asked by At

I deployed Spark (1.3.1) with yarn-client on Hadoop (2.6) cluster using bdutil, by default, the instances are created with Ephemeral external ips, and so far spark works fine. With some security concerns, and assuming the cluster is internal accessed only, I removed the external ips from the instances; after that, the spark-shell will not even run, and seemed it cannot communicate with Yarn/Hadoop, and just stuck indefinitely. Only after I added the external ips back, the spark-shell starts working properly.

My question is, is external ips of the nodes required to run spark over yarn, and why? If yes, will there be any concerns regarding security, etc? Thanks!

1

There are 1 answers

2
Dennis Huo On BEST ANSWER

Short Answer

You need external IP addresses to access GCS, and default bdutil settings set GCS as the default Hadoop filesystem, including for control files. Use ./bdutil -F hdfs ... deploy to use HDFS as the default instead.

Security shouldn't be a concern when using external IP addresses unless you've added too many permissive rules to your firewall rules in your GCE network config.

EDIT: At the moment there appears to be a bug where we set spark.eventLog.dir to a GCS path even if the default_fs is hdfs. I filed https://github.com/GoogleCloudPlatform/bdutil/issues/35 to track this. In the meantime just manually edit /home/hadoop/spark-install/conf/spark-defaults.conf on your master (you might need to sudo -u hadoop vim.tiny /home/hadoop/spark-install/conf/spark-defaults.conf to have edit permissions on it) to set spark.eventLog.dir to hdfs:///spark-eventlog-base or something else in HDFS, and run hadoop fs -mkdir -p hdfs:///spark-eventlog-base to get it working.

Long Answer

By default, bdutil also configures Google Cloud Storage as the "default Hadoop filesystem", which means that control files used by Spark and YARN require access to Google Cloud Storage. Additionally, external IPs are required in order to access Google Cloud Storage.

I did manage to partially repro your case after manually configuring intra-network SSH; during startup I actually see the following:

15/06/26 17:23:05 INFO yarn.Client: Preparing resources for our AM container
15/06/26 17:23:05 INFO gcs.GoogleHadoopFileSystemBase: GHFS version: 1.4.0-hadoop2
15/06/26 17:23:26 WARN http.HttpTransport: exception thrown while executing request
java.net.SocketTimeoutException: connect timed out
  at java.net.PlainSocketImpl.socketConnect(Native Method)
  at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
  at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
  at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
  at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
  at java.net.Socket.connect(Socket.java:579)
  at sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:625)
  at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
  at sun.net.www.http.HttpClient.openServer(HttpClient.java:432)
  at sun.net.www.http.HttpClient.openServer(HttpClient.java:527)
  at sun.net.www.protocol.https.HttpsClient.<init>(HttpsClient.java:275)
  at sun.net.www.protocol.https.HttpsClient.New(HttpsClient.java:371)
  at sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.getNewHttpClient(AbstractDelegateHttpsURLConnection.java:191)
  at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:933)
  at sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:177)
  at sun.net.www.protocol.https.HttpsURLConnectionImpl.connect(HttpsURLConnectionImpl.java:153)
  at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:93)
  at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:965)
  at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:410)
  at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:343)
  at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:460)
  at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getBucket(GoogleCloudStorageImpl.java:1557)
  at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getItemInfo(GoogleCloudStorageImpl.java:1512)
  at com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage.getItemInfo(CacheSupplementedGoogleCloudStorage.java:516)
  at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.getFileInfo(GoogleCloudStorageFileSystem.java:1016)
  at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.exists(GoogleCloudStorageFileSystem.java:382)
  at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.configureBuckets(GoogleHadoopFileSystemBase.java:1639)
  at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem.configureBuckets(GoogleHadoopFileSystem.java:71)
  at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.configure(GoogleHadoopFileSystemBase.java:1587)
  at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.initialize(GoogleHadoopFileSystemBase.java:776)
  at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.initialize(GoogleHadoopFileSystemBase.java:739)
  at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2596)
  at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
  at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
  at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
  at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
  at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:169)
  at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:216)
  at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:384)
  at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:102)
  at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:58)
  at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)
  at org.apache.spark.SparkContext.<init>(SparkContext.scala:381)
  at org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:1016)
  at $line3.$read$$iwC$$iwC.<init>(<console>:9)
  at $line3.$read$$iwC.<init>(<console>:18)
  at $line3.$read.<init>(<console>:20)
  at $line3.$read$.<init>(<console>:24)
  at $line3.$read$.<clinit>(<console>)
  at $line3.$eval$.<init>(<console>:7)
  at $line3.$eval$.<clinit>(<console>)
  at $line3.$eval.$print(<console>)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:606)
  at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
  at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
  at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
  at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
  at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
  at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856)
  at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
  at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813)
  at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:123)
  at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:122)
  at org.apache.spark.repl.SparkIMain.beQuietDuring(SparkIMain.scala:324)
  at org.apache.spark.repl.SparkILoopInit$class.initializeSpark(SparkILoopInit.scala:122)
  at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:64)
  at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1$$anonfun$apply$mcZ$sp$5.apply$mcV$sp(SparkILoop.scala:973)
  at org.apache.spark.repl.SparkILoopInit$class.runThunks(SparkILoopInit.scala:157)
  at org.apache.spark.repl.SparkILoop.runThunks(SparkILoop.scala:64)
  at org.apache.spark.repl.SparkILoopInit$class.postInitialization(SparkILoopInit.scala:106)
  at org.apache.spark.repl.SparkILoop.postInitialization(SparkILoop.scala:64)
  at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:990)
  at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
  at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
  at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
  at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944)
  at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058)
  at org.apache.spark.repl.Main$.main(Main.scala:31)
  at org.apache.spark.repl.Main.main(Main.scala)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:606)
  at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
  at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
  at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
  at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
  at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

As expected, simply by calling org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start it tries to contact Google Cloud Storage, and fails because there's not GCS access without external IPs.

To get around this, you can simply use -F hdfs when creating your cluster to use HDFS as your default filesystem; in that case everything should work intra-cluster even without external IP addresses. In that mode, you can still even continue to use GCS whenever you have external IP addresses assigned by specifying full gs://bucket/object paths as your Hadoop arguments. However, note that in that case, as long as you've removed the external IP addresses, you won't be able to use GCS unless you also configure a proxy server and funnal all data through your proxy; the GCS configs for that is fs.gs.proxy.address.

In general, there's no need to worry about security just because of having external IP addresses unless you've opened up new permissive rules in your "default" network firewall rules in Google Compute Engine.