I would like to run the code below in parallel between my local machine and remote AWS machines. After finally being able to SSH to an AWS machine and connecting back to my computer, I'm having a very difficult time wrapping my head around how to properly set up multi-level parallel code. I've read (re-read) and attempted many plan()
variants as discussed in the various links below but apparently I'm too dense to grasp this concept.
https://furrr.futureverse.org/articles/remote-connections.html
https://rc2e.com/usefultricks#recipe-parallel_remote
https://gist.github.com/dantonnoriega/87d41db62fc9637abd1447eaedd7613c
https://gist.github.com/DavisVaughan/865d95cf0101c24df27b37f4047dd2e5#gistcomment-2606332
https://stackoverflow.com/questions/59447052/r-package-future-why-does-a-loop-with-remote-workers-hangs-the-local-r-session
The code is straight forward. I run plan(multisession)
locally to start 8 workers on my machine and feed .x = bestVars
and a custom function to .f = run_sim_in_par()
of future_map
. What I want to do is to parallelize across my local instance (like I have been doing with plan(multisession)
) and EC2 instance(s) and parallelize across cores of my local & remote instance(s).
library(future)
library(furrr)
library(data.table)
library(kit)
library(tictoc)
library(collapse)
plan(multisession)
## reprex data
vars <- paste0(letters,1:10)
bestVars <- combn(vars, 4, simplify = F)
df <- data.frame(
matrix(data = rnorm(10000*length(vars),200,500), nrow = 10000, ncol = length(vars))
)
names(df) <- vars
df$value <- rnorm(n = nrow(df), 350, 300)
df <- df %>%
dplyr::select(value,everything(.))
df <- lapply(split.default(x = df, names(df)), function(x) x[[1]])
## break up the list of variables into chunks otherwise memory error on real data (>11 million)
chunks_run <- collapse::rsplit(1:length(bestVars), ceiling(seq_along(bestVars)/1000))
chunks_list <- vector("list", length = length(chunks_run))
## function to run variables combination in paralell in local machine
run_sim_in_par <- function(df, var_to_sim)
{
sampled_rows <- sample(x = 1:length(df[["value"]]), size = 50, replace = F)
varname <- paste(var_to_sim, collapse = "*")
best <- Reduce(df[var_to_sim], f = '*')[sampled_rows]
row_idx <- kit::topn(best, n = 5, decreasing = T, hasna = FALSE, index = TRUE)
best_row_value <- df[["value"]][sampled_rows][row_idx]
sim <- data.table(var = varname,
mean_value = mean(best_row_value))
return(sim)
}
chunks_idx <- 1
for (chunks_idx in seq_along(chunks_run))
{
tic()
simulated_roi <- future_map(
.x = bestVars[ chunks_run[[chunks_idx]] ],
.f = function(x) run_sim_in_par(df = df[c("value", x)],
var_to_sim = x))
toc()
chunks_list[[chunks_idx]] <- rbindlist(simulated_roi)
}
simulated_roi <- rbindlist(chunks_list)
base::closeAllConnections()
## start remote cluster and manually connect (remote=FALSE hangs & doesn't connect)
public_ip <- "x.xx.aa.bc"
ssh_private_key_file <- "path/r.pem"
cl <- makeClusterPSOCK(
# Public IP number of EC2 instance
workers = public_ip,
# User name (always 'ubuntu')
user = "ubuntu",
## Use private SSH key registered with AWS
rshopts = c(
"-o", "StrictHostKeyChecking=no",
"-o", "IdentitiesOnly=yes",
"-i", ssh_private_key_file
),
rscript_args = c(
## Set up .libPaths() for the 'ubuntu' user
"-e", shQuote(
paste0(
"local({",
"p <- Sys.getenv('R_LIBS_USER'); ",
"dir.create(p, recursive = TRUE, showWarnings = FALSE); ",
".libPaths(p)",
"})"
)
),
## Install reuqired libraries
"-e", shQuote("install.packages(c('furrr','purrr','kit','dqrng','tictoc','data.table'))")
),
# Switch this to TRUE to see the code that is run on the workers without
# making the connection
dryrun = FALSE,
manual = TRUE
)
cl
## socket cluster with 1 nodes on host ‘xx.xxx.xx.xxx’
## attempt at running code with local and aws clusters
## doesn't seem like either local clusters or remote clusters are processing anything
local_workers <- makeClusterPSOCK(8)
plan(list(tweak(cluster, workers = c(cl, local_workers)), multisession))
chunks_idx <- 1
for (chunks_idx in seq_along(chunks_run))
{
tic()
simulated_roi <- future_map(
.x = c(1:12), # 8 local, 4 remote
.f = ~{
future_map(
.x = bestVars[ chunks_run[[chunks_idx]] ],
.f = function(x) run_sim_in_par(df = df[c("value", x)],
var_to_sim = x)
)
}
)
toc()
chunks_list[[chunks_idx]] <- rbindlist(simulated_roi)
}
I don't think you want to set up nested parallel workers here. Instead, I suspect you want to keep it a flat structure of parallel workers.
For nested parallelization
If you really want to use nested parallelization, note that the default number of parallel workers when you nest is one. This is to protect against exploding the parallelization and frying your CPUs. Thus, when nesting
plan(multisession)
becomes the same asplan(multisession, workers = 1)
. To override that, you need to hard code it to something liketweak(multisession, workers = 8)
. That is mentioned in https://future.futureverse.org/articles/future-3-topologies.html#an-ad-hoc-compute-cluster.For flat parallelization
See https://parallelly.futureverse.org/reference/makeClusterPSOCK.html. There's an example showing how to launch three parallel workers on two remote hosts;
Note how you specify the same host name multiple times; one per parallel worker.
The analogue when launching two local workers is:
Here's an example launching two remote parallel workers on one host, and three local workers;
We can use
future::futureSessionInfo()
to verify that all 5 parallel workers are available. We're interested in the "Backend" output below.