Proper way to parallelize across local and AWS instances and cores in future_map

86 views Asked by At

I would like to run the code below in parallel between my local machine and remote AWS machines. After finally being able to SSH to an AWS machine and connecting back to my computer, I'm having a very difficult time wrapping my head around how to properly set up multi-level parallel code. I've read (re-read) and attempted many plan() variants as discussed in the various links below but apparently I'm too dense to grasp this concept.

https://furrr.futureverse.org/articles/remote-connections.html
https://rc2e.com/usefultricks#recipe-parallel_remote
https://gist.github.com/dantonnoriega/87d41db62fc9637abd1447eaedd7613c
https://gist.github.com/DavisVaughan/865d95cf0101c24df27b37f4047dd2e5#gistcomment-2606332
https://stackoverflow.com/questions/59447052/r-package-future-why-does-a-loop-with-remote-workers-hangs-the-local-r-session

The code is straight forward. I run plan(multisession) locally to start 8 workers on my machine and feed .x = bestVars and a custom function to .f = run_sim_in_par() of future_map. What I want to do is to parallelize across my local instance (like I have been doing with plan(multisession)) and EC2 instance(s) and parallelize across cores of my local & remote instance(s).

library(future)
library(furrr)
library(data.table)
library(kit)
library(tictoc)
library(collapse)

plan(multisession)

## reprex data
vars <- paste0(letters,1:10)
bestVars <- combn(vars, 4, simplify = F)
df <- data.frame(
  matrix(data = rnorm(10000*length(vars),200,500), nrow = 10000, ncol = length(vars))
)
names(df) <- vars
df$value <- rnorm(n = nrow(df), 350, 300)
df <- df %>%
  dplyr::select(value,everything(.))
df <- lapply(split.default(x = df, names(df)), function(x) x[[1]])

## break up the list of variables into chunks otherwise memory error on real data (>11 million)
chunks_run <- collapse::rsplit(1:length(bestVars), ceiling(seq_along(bestVars)/1000))
chunks_list <- vector("list", length = length(chunks_run))

## function to run variables combination in paralell in local machine
run_sim_in_par <- function(df, var_to_sim)
{     
  sampled_rows <- sample(x = 1:length(df[["value"]]), size = 50, replace = F)
  
  varname <- paste(var_to_sim, collapse = "*")
  best <- Reduce(df[var_to_sim], f = '*')[sampled_rows]
  row_idx <- kit::topn(best, n = 5, decreasing = T, hasna = FALSE, index = TRUE)
  
  best_row_value <- df[["value"]][sampled_rows][row_idx]
  
  sim <- data.table(var = varname,
                    mean_value = mean(best_row_value))
  
  return(sim)
}

chunks_idx <- 1
for (chunks_idx in seq_along(chunks_run))
{
  tic()
  simulated_roi <- future_map(
    .x = bestVars[ chunks_run[[chunks_idx]] ],
    .f = function(x) run_sim_in_par(df = df[c("value", x)],
                                    var_to_sim = x))
  toc()
  
  chunks_list[[chunks_idx]] <- rbindlist(simulated_roi)
}

simulated_roi <- rbindlist(chunks_list)

base::closeAllConnections()

## start remote cluster and manually connect (remote=FALSE hangs & doesn't connect)
public_ip <- "x.xx.aa.bc"
ssh_private_key_file <- "path/r.pem"

cl <- makeClusterPSOCK(
  
  # Public IP number of EC2 instance
  workers = public_ip,
  
  # User name (always 'ubuntu')
  user = "ubuntu",
  
  ## Use private SSH key registered with AWS
  rshopts = c(
    "-o", "StrictHostKeyChecking=no",
    "-o", "IdentitiesOnly=yes",
    "-i", ssh_private_key_file
  ),
  
  rscript_args = c(
    ## Set up .libPaths() for the 'ubuntu' user
    "-e", shQuote(
      paste0(
        "local({",
        "p <- Sys.getenv('R_LIBS_USER'); ",
        "dir.create(p, recursive = TRUE, showWarnings = FALSE); ",
        ".libPaths(p)",
        "})"
      )
    ),
    ## Install reuqired libraries
    "-e", shQuote("install.packages(c('furrr','purrr','kit','dqrng','tictoc','data.table'))")
  ),
  
  # Switch this to TRUE to see the code that is run on the workers without
  # making the connection
  dryrun = FALSE,
  manual = TRUE
)

cl

## socket cluster with 1 nodes on host ‘xx.xxx.xx.xxx’

## attempt at running code with local and aws clusters
## doesn't seem like either local clusters or remote clusters are processing anything
local_workers <- makeClusterPSOCK(8)
plan(list(tweak(cluster, workers = c(cl, local_workers)), multisession))

chunks_idx <- 1
for (chunks_idx in seq_along(chunks_run))
{
  tic()
  simulated_roi <- future_map(
    .x = c(1:12), # 8 local, 4 remote
    .f = ~{
      future_map(
        .x = bestVars[ chunks_run[[chunks_idx]] ],
        .f = function(x) run_sim_in_par(df = df[c("value", x)],
                                        var_to_sim = x)
      )
    }
  )
  toc()
  chunks_list[[chunks_idx]] <- rbindlist(simulated_roi)
}
1

There are 1 answers

1
HenrikB On

I don't think you want to set up nested parallel workers here. Instead, I suspect you want to keep it a flat structure of parallel workers.

For nested parallelization

If you really want to use nested parallelization, note that the default number of parallel workers when you nest is one. This is to protect against exploding the parallelization and frying your CPUs. Thus, when nesting plan(multisession) becomes the same as plan(multisession, workers = 1). To override that, you need to hard code it to something like tweak(multisession, workers = 8). That is mentioned in https://future.futureverse.org/articles/future-3-topologies.html#an-ad-hoc-compute-cluster.

For flat parallelization

See https://parallelly.futureverse.org/reference/makeClusterPSOCK.html. There's an example showing how to launch three parallel workers on two remote hosts;

workers <- c("n1.remote.org", "n2.remote.org", "n1.remote.org")
cl <- makeClusterPSOCK(workers)

Note how you specify the same host name multiple times; one per parallel worker.

The analogue when launching two local workers is:

cl <- makeClusterPSOCK(rep("localhost", times = 2))

Here's an example launching two remote parallel workers on one host, and three local workers;

library(parallelly)
library(future)

cl_remote <- makeClusterPSOCK(rep("c4-dev3.ucsf.edu", times = 2))
print(cl_remote)
## Socket cluster with 2 nodes where 2 nodes are on host 'c4-dev3.ucsf.edu'
## (R version 4.3.2 (2023-10-31), platform x86_64-pc-linux-gnu)

cl_local <- makeClusterPSOCK(3)
print(cl_local)
## Socket cluster with 3 nodes where 3 nodes are on host 'localhost'
## (R version 4.3.2 (2023-10-31), platform x86_64-pc-linux-gnu)

cl_all <- c(cl_remote, cl_local)
print(cl_all)
## Socket cluster with 5 nodes where 3 nodes are on host 'localhost'
## (R version 4.3.2 (2023-10-31), platform x86_64-pc-linux-gnu),
## 2 nodes are on host 'c4-dev3.ucsf.edu' (R version 4.3.2 (2023-10-31),
## platform x86_64-pc-linux-gnu)

plan(cluster, workers = cl_all)
nbrOfWorkers()
## [1] 5

We can use future::futureSessionInfo() to verify that all 5 parallel workers are available. We're interested in the "Backend" output below.

> future::futureSessionInfo()
future 1.33.0, parallelly 1.36.0, parallel 4.3.2, globals 0.16.2, listenv 0.9.0

*** Allocations
availableCores():
system  nproc 
     8      8 
availableWorkers():
$nproc
[1] "localhost" "localhost" "localhost" "localhost" "localhost" "localhost"
[7] "localhost" "localhost"

$system
[1] "localhost" "localhost" "localhost" "localhost" "localhost" "localhost"
[7] "localhost" "localhost"


*** Settings
- future.plan=<not set>
- future.fork.multithreading.enable=<not set>
- future.globals.maxSize=<not set>
- future.globals.onReference=<not set>
- future.resolve.recursive=<not set>
- future.rng.onMisuse=<not set>
- future.wait.timeout=<not set>
- future.wait.interval=<not set>
- future.wait.alpha=<not set>
- future.startup.script=<not set>

*** Backends
Number of workers: 5
List of future strategies:
1. cluster:
   - args: function (..., workers = "<Socket cluster with 5 nodes where 3 nodes are on host 'localhost' (R version 4.3.2 (2023-10-31), platform x86_64-pc-linux-gnu), 2 nodes are on host 'c4-dev3.ucsf.edu' (R version 4.3.2 (2023-10-31), platform x86_64-pc-linux-gnu)>", envir = parent.frame())
   - tweaked: TRUE
   - call: plan(cluster, workers = cl_all)

*** Basic tests
Main R session details:
     pid     r sysname          release
1 148300 4.3.2   Linux 6.2.0-37-generic
                                                           version nodename
1 #38~22.04.1-Ubuntu SMP PREEMPT_DYNAMIC Thu Nov  2 18:01:13 UTC 2  host001
  machine   login    user effective_user
1  x86_64 user001 user001        user001
Worker R session details:
  worker    pid     r sysname                     release
1      1  27770 4.3.2   Linux 3.10.0-1160.49.1.el7.x86_64
2      2  27954 4.3.2   Linux 3.10.0-1160.49.1.el7.x86_64
3      3 148379 4.3.2   Linux            6.2.0-37-generic
4      4 148378 4.3.2   Linux            6.2.0-37-generic
5      5 148377 4.3.2   Linux            6.2.0-37-generic
                                                           version nodename
1                              #1 SMP Tue Nov 30 15:51:32 UTC 2021  host001
2                              #1 SMP Tue Nov 30 15:51:32 UTC 2021  host001
3 #38~22.04.1-Ubuntu SMP PREEMPT_DYNAMIC Thu Nov  2 18:01:13 UTC 2  host002
4 #38~22.04.1-Ubuntu SMP PREEMPT_DYNAMIC Thu Nov  2 18:01:13 UTC 2  host002
5 #38~22.04.1-Ubuntu SMP PREEMPT_DYNAMIC Thu Nov  2 18:01:13 UTC 2  host002
  machine   login    user effective_user
1  x86_64 user001 user001        user001
2  x86_64 user001 user001        user001
3  x86_64 user001 user001        user001
4  x86_64 user001 user001        user001
5  x86_64 user001 user001        user001
Number of unique worker PIDs: 5 (as expected)