future_map not finding global values when running in multisession

388 views Asked by At

I'm trying to understand how to pass in future_map a list of character vectors in .x that is evaluated by .f. In the code below, I feed bestVars (a list of variable vectors) to .x which in turn passes each element of the list to a custom function, run_sim_in_par. The custom function then uses mget(.x) to get the values of each variable in .x from the globals and then Reduces these values and finally performs a few other steps.

When I attempt to run the run_sims_in_par function in multisession I keep getting:

Error in (function (.x, .f, ..., .progress = FALSE) : ℹ In index:

  1. Caused by error: ! value for 'a1' not found

After reading several questions and other sources on this error, I understand that it's not possible to automatically identify global variables specified via a character string (https://cran.r-project.org/web/packages/future/vignettes/future-4-issues.html). In my example, what is the proper way to have future_map get global values referenced in .x using a list of character vectors? I've been unsuccessful in the many different ways I've set globals and options arguments.

The workaround suggested in the future vignette link above recommends the following:

The workaround is to tell the future framework what additional globals are needed. This can be done via argument globals using:

> f <- future(my_sum("a"), globals = structure(TRUE, add = "a"))
> y <- value(f)
> y
[1] 6
or by injecting variable a at the beginning of the future expression, e.g.

> f <- future({ a; my_sum("a") })
> y <- value(f)
> y
[1] 6

But I'm having a hard time understanding how to modify my code based on the suggested action.

I'm sure this will come up so I'll pre-emptively mention it: the reason I'm assigning the df values to my global environment is because I'm trying to lower the size of the globals exported by future as this is significantly slowing the code when running multisession on remote AWS clusters.

library(future)
library(furrr)
library(kit)
library(tidyverse)

## reprex data
vars <- paste0(letters,1:10)
bestVars <- combn(vars, 5, simplify = F)
df <- data.frame(
  matrix(data = rnorm(50000*length(vars),200,500), nrow = 50000, ncol = length(vars))
)
names(df) <- vars
df$value <- rnorm(n = nrow(df), 350, 300)
df <- df %>%
  dplyr::select(value,everything(.))
df <- lapply(split.default(x = df, names(df)), function(x) x[[1]])

list2env(df, globalenv())

rm(df)

run_sim_in_par <- function(vars_to_sim)
{
  sampled_rows <- sample(x = 1:length(value), size = 50, replace = F)
  varname <- paste(names(vars_to_sim), collapse = "*")
  best <- Reduce(vars_to_sim, f = '*')[sampled_rows]
  row_idx <- kit::topn(best, n = 5, decreasing = T, hasna = FALSE, index = TRUE)
  
  best_row_value <- value[sampled_rows][row_idx]
  
  sim <- data.frame(var = varname,
                    mean_value = mean(best_row_value))
  return(sim)
}

## working when explicitly declaring .x
x <- bestVars[[1]]
simulated_res <- run_sim_in_par(vars_to_sim = mget(x))

## not recognizing .x
simulated_res <- future_map_dfr(
  .x = bestVars,
  .f = ~run_sim_in_par(vars_to_sim = mget(.x))
)

# Error in (function (.x, .f, ..., .progress = FALSE)  : 
#             ℹ In index: 1.
#           Caused by error:
#             ! value for 'a1' not found

## same erro when setting furr_options for 'globals'
simulated_res <- future_map_dfr(
  .x = bestVars,
  .f = ~run_sim_in_par(vars_to_sim = mget(.x)),
  .options = furrr_options(globals = TRUE) 
)

## attempt at declaring all globals for just the first element of bestVars
simulated_res <- future_map_dfr(
  .x = bestVars[[1]],
  .f = ~run_sim_in_par(vars_to_sim = mget(.x)),
  .options = furrr_options(globals = c(bestVars[[1]], "run_sim_in_par", "value"))
)

# Error in (function (.x, .f, ..., .progress = FALSE)  : 
#             ℹ In index: 1.
#           Caused by error:
#             ! value for 'a1' not found
3

There are 3 answers

6
Umar On
library(future)
library(furrr)
library(kit)
library(tidyverse)

# Reproducible data generation
set.seed(123)

vars <- paste0(letters, 1:10)
bestVars <- combn(vars, 5, simplify = FALSE)

df <- data.frame(
  matrix(data = rnorm(50000 * length(vars), 200, 500), nrow = 50000, ncol = length(vars))
)
names(df) <- vars
df$value <- rnorm(n = nrow(df), 350, 300)
df <- df %>%
  dplyr::select(value, everything())
df <- lapply(split.default(x = df, names(df)), function(x) x[[1]])

list2env(df, globalenv())

rm(df)

# Explicitly declare the global variables used in run_sim_in_par
globals <- c("value")

run_sim_in_par <- function(vars_to_sim)
{
  # Set the seed for reproducibility
  set.seed(123)
  
  # Extract value from the global environment
  value <- get("value", envir = .GlobalEnv)
  
  # Make sure each variable in vars_to_sim is available globally
  for (var in vars_to_sim) {
    assign(var, as.numeric(get(var, envir = .GlobalEnv)), envir = .GlobalEnv)
  }
  
  sampled_rows <- sample(x = 1:length(value), size = 50, replace = FALSE)
  varname <- paste(names(vars_to_sim), collapse = "*")
  best <- Reduce('*', lapply(vars_to_sim, function(var) as.numeric(get(var, envir = .GlobalEnv))))[sampled_rows]
  row_idx <- kit::topn(best, n = 5, decreasing = TRUE, hasna = FALSE, index = TRUE)
  
  best_row_value <- value[sampled_rows][row_idx]
  
  sim <- data.frame(var = varname,
                    mean_value = mean(best_row_value))
  return(sim)
}

# Use future_map_dfr with furrr_options to pass the globals and set seed
simulated_res <- future_map_dfr(
  .x = bestVars,
  .f = ~run_sim_in_par(vars_to_sim = .x),
  .options = furrr_options(
    globals = c("value", "run_sim_in_par"),
    seed = TRUE
  )
)

# View the results
  ls(simulated_res)
  head(simulated_res)
  var mean_value
1     196.645210
2     178.927198
3       9.733175
4     106.984018
5     372.041245
6     378.836707
6
jan-glx On

In my other answer I showed how to solve the error message and how to get control which globals are available for each future. Here I give hints how to speedup your computations in general.

One thing that helps a lot is not computing the product for all the not sampled rows:

set.seed(1)

vars <- paste0(letters,1:10)

bestVars <- combn(vars, 5, simplify = F)
names(bestVars) <- sapply(bestVars, paste, collapse = "*")

df <- data.frame(
  matrix(data = rnorm(50000*length(vars),200,500), nrow = 50000, ncol = length(vars))
)
names(df) <- vars
df$value <- rnorm(n = nrow(df), 350, 300)

N <- nrow(df)
run_sim_in_par <- function(vars_to_sim)
{
  sampled_rows <- sample(x = N, size = 50, replace = F)
  
  vars_to_sim = df[sampled_rows, vars_to_sim]
  
  best <- Reduce(vars_to_sim, f = '*')
  row_idx <- head(order(-best), n = 5)  # head(order(best), n = 5) #kit::topn(best, n = 5, decreasing = T, hasna = FALSE, index = TRUE)
  
  best_row_value <- df[sampled_rows[row_idx], "value"]
  
  mean(best_row_value)
}

stime <- Sys.time()
simulated_res <- sapply(bestVars, run_sim_in_par)
simulated_res <- data.frame(var=names(simulated_res), mean = as.vector(simulated_res))
tail(simulated_res)
#>                  var     mean
#> 65775 u1*v2*w3*x4*y5 438.0177
#> 65776 u1*v2*w3*x4*z6 543.4459
#> 65777 u1*v2*w3*y5*z6 520.7860
#> 65778 u1*v2*x4*y5*z6 226.6832
#> 65779 u1*w3*x4*y5*z6 307.0623
#> 65780 v2*w3*x4*y5*z6 493.8572
Sys.time()-stime
#> Time difference of 15.72114 secs
10
jan-glx On

You are just missing a , inherits = TRUE in the mget here. (Without it, mget (in contrast to get (by default)), it will only search the current enviroment calling mget ( which in your direct test happens to be the global env - the same env containing your variables). The correct title of the question would thus be: "mget not finding global variables when called from child enviroment", (function(){x <- bestVars[[1]]; simulated_res <- run_sim_in_par(vars_to_sim = mget(x))})()) fails with Error: value for ‘a1’ not found too)

But the globals available during future evaluation are the same for each iteration of future_map_dfr even if not every iteration needs all of them. Thus, you don't save anything by adding them to globals individually. You could, however, work around that limitation by replacing future_map_dfr by your own variant that uses separate globals for each iteration:

simulated_res <- lapply(bestVars, function(vars_to_sim) future(run_sim_in_par(vars_to_sim = mget(vars_to_sim, inherits =TRUE)),  globals = vars_to_sim))
simulated_res <- lapply(simulated_res, future::value)
simulated_res <- dplyr::bind_rows(simulated_res)

But this will not help at all here, because the number of columns is small compared to the combinations of columns that you iterate over.

Make sure your data is transferred only once to each worker - use manual chunking and a nested loop inside each iteration if necessary. (EDIT: this only apply to the batchtools backends, with plan(multisession) and furrr:furrr_options(scheduling=1) (the default) this is the case already)