I've tried to run my own Pregel method for a relatively small graph (250k vertices, 1.5M edges). The algorithm which I use may (high chances are) be non-convergent meaning in most cases maxIterations setting is actually acting as hard stop finishing all calculations.

I'm using AWS EMR with apache spark and m5.2xlarge instances for all nodes in a setup with EMR-managed scaling. Initially, cluster is set to run 1 master and 4 worker nodes with expansion up to 8 max.

For the same setup of cluster, I was increasing the number of maxIterations gradually from 100 to 500 with step of 100 [100, 200, 300, 400, 500]. I was under the assumption that setup enough for 100 iterations will be also enough for any other number just because not used memory will be freeing up.

However, when I ran a set of jobs with maxIterations increasing from 100 to 500 I found that all jobs with maxIterations > 100 were terminated due to step error. I've checked logs of Spark to find issues and this is what I got:

log start

21/02/13 21:23:24 INFO ApplicationMaster: Preparing Local resources
21/02/13 21:23:25 INFO ApplicationMaster: ApplicationAttemptId: appattempt_1613251201422_0001_000001
21/02/13 21:23:25 INFO ApplicationMaster: Starting the user application in a separate Thread
21/02/13 21:23:25 INFO ApplicationMaster: Waiting for spark context initialization...
21/02/13 21:23:25 INFO SparkContext: Running Spark version 2.4.7-amzn-0
21/02/13 21:23:25 INFO SparkContext: Submitted application: Read JDBC Datasites2
21/02/13 21:23:25 INFO Utils: Successfully started service 'sparkDriver' on port 41117.
21/02/13 21:23:25 INFO SparkEnv: Registering MapOutputTracker
21/02/13 21:23:25 INFO SparkEnv: Registering BlockManagerMaster
21/02/13 21:23:25 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
21/02/13 21:23:25 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
21/02/13 21:23:25 INFO DiskBlockManager: Created local directory at /mnt/yarn/usercache/hadoop/appcache/application_1613251201422_0001/blockmgr-bc544c91-1a59-41f3-890f-faaa392bea09
21/02/13 21:23:25 INFO DiskBlockManager: Created local directory at /mnt1/yarn/usercache/hadoop/appcache/application_1613251201422_0001/blockmgr-14e3f36f-6d3f-4ffe-a28c-fa3f81f0c5c9
21/02/13 21:23:26 INFO MemoryStore: MemoryStore started with capacity 1008.9 MB
21/02/13 21:23:26 INFO SparkEnv: Registering OutputCommitCoordinator
21/02/13 21:23:26 INFO JettyUtils: Adding filter org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter to /jobs, /jobs/json, /jobs/job, /jobs/job/json, /stages, /stages/json, /stages/stage, /stages/stage/json, /stages/pool, /stages/pool/json, /storage, /storage/json, /storage/rdd, /storage/rdd/json, /environment, /environment/json, /executors, /executors/json, /executors/threadDump, /executors/threadDump/json, /static, /, /api, /jobs/job/kill, /stages/stage/kill.
21/02/13 21:23:26 INFO Utils: Successfully started service 'SparkUI' on port 43659.
21/02/13 21:23:26 INFO SparkUI: Bound SparkUI to, and started at http://ip-172-31-21-88.ec2.internal:43659
21/02/13 21:23:26 INFO YarnClusterScheduler: Created YarnClusterScheduler
21/02/13 21:23:26 INFO SchedulerExtensionServices: Starting Yarn extension services with app application_1613251201422_0001 and attemptId Some(appattempt_1613251201422_0001_000001)
21/02/13 21:23:26 INFO Utils: Using initial executors = 100, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances
21/02/13 21:23:26 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 34665.
21/02/13 21:23:26 INFO Utils: Using initial executors = 100, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances
21/02/13 21:23:26 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
21/02/13 21:23:27 INFO RMProxy: Connecting to ResourceManager at ip-172-31-29-
then approximately 2Mbytes of same output and then it finishes:

21/02/13 21:28:25 INFO TaskSetManager: Finished task 199.0 in stage 37207.0 (TID 93528) in 8 ms on ip-172-31-25-102.ec2.internal (executor 2) (158/200)

21/02/13 21:28:25 INFO BlockManagerInfo: Added rdd_2738_31 in memory on ip-172-31-21-88.ec2.internal:45667 (size: 252.3 KB, free: 2.1 GB)
21/02/13 21:28:25 ERROR ApplicationMaster: Exception from Reporter thread.
org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException: Application attempt appattempt_1613251201422_0001_000001 doesn't exist in ApplicationMasterService cache.
    at org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService.allocate(ApplicationMasterService.java:353)
    at org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl.allocate(ApplicationMasterProtocolPBServiceImpl.java:60)
    at org.apache.hadoop.yarn.proto.ApplicationMasterProtocol$ApplicationMasterProtocolService$2.callBlockingMethod(ApplicationMasterProtocol.java:99)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:507)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1034)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1003)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:931)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1926)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2854)

21/02/13 21:28:25 INFO ApplicationMaster: Final app status: FAILED, exitCode: 12, (reason: Application attempt appattempt_1613251201422_0001_000001 doesn't exist in ApplicationMasterService cache.
  • Am I correct that Pregel doesn't finish 200 or more iterations due to OutOfMemory error on some of the cluster nodes?
  • If so, how does Pregel work that 100 iterations are not causing it and 200 or 300 are causing? My understand before this issue was that Pregel as many other iterative approaches only 'store' previous and current iteration values and results and iteration by iteration values are changing, but their quantity is not increasing, meaning it is still graph with 250k vertices and 1.5m edges and only messages valid for current iteration are adding up to the heap.
  • Throughout the log I was not able to find any information on low memory and as seen, there are Gigabytes of it available on each node before it terminates

