RecursiveTask Throwing StackOverflowError While executing ForkJoin

1.5k views Asked by At

I designed a RecursiveTask

Here is the code for task I designed.

public class SearchTask extends RecursiveTask<Map<Short, Long>> {

private static final long serialVersionUID = 1L;
private int majorDataThreshold = 16001;
private ConcurrentNavigableMap<Short, Long> dataMap;
private long fromRange;
private long toRange;
private boolean fromInclusive;
private boolean toInclusive;

public SearchTask(final Map<Short, Long> dataSource, final long fromRange, final long toRange,
        final boolean fromInclusive, final boolean toInclusive) {
    this.dataMap = new ConcurrentSkipListMap<>(dataSource);
    this.fromRange = fromRange;
    this.toRange = toRange;
    this.fromInclusive = fromInclusive;
    this.toInclusive = toInclusive;
}

@Override
protected Map<Short, Long> compute() {
    final int size = dataMap.size();
    // This is not a perfect RecursiveTask, because the if condition is designed to overcome a stackoverflow error when map filled with 32k data
    if (size > majorDataThreshold+1000) {
        // List<SearchTask> tasks = createSubtasks();
        // tasks.get(0).fork();
        // tasks.get(1).fork();

        // Map<Short, Long> map = new ConcurrentHashMap<>(tasks.get(0).join());
        // map.putAll(tasks.get(1).join());
        // return map;

        return ForkJoinTask.invokeAll(createSubtasks()).stream().map(ForkJoinTask::join)
                .flatMap(map -> map.entrySet().stream())
                .collect(Collectors.toConcurrentMap(Entry::getKey, Entry::getValue));
    }
    return search();
}

private List<SearchTask> createSubtasks() {
    final short lastKey = dataMap.lastKey();
    final short midkey = (short) (lastKey / 2);
    final short firstKey = dataMap.firstKey();
    final List<SearchTask> dividedTasks = new ArrayList<>();
    dividedTasks.add(
            new SearchTask(new ConcurrentSkipListMap<Short, Long>(dataMap.subMap(firstKey, true, midkey, false)),
                    fromRange, toRange, fromInclusive, toInclusive));
    dividedTasks
            .add(new SearchTask(new ConcurrentSkipListMap<Short, Long>(dataMap.subMap(midkey, true, lastKey, true)),
                    fromRange, toRange, fromInclusive, toInclusive));
    return dividedTasks;
}

private Map<Short, Long> search() {
    final Map<Short, Long> result = dataMap.entrySet().stream()
            .filter(serchPredicate(fromRange, toRange, fromInclusive, toInclusive))
            .collect(Collectors.toConcurrentMap(p -> p.getKey(), p -> p.getValue()));
    return result;
}

private static Predicate<? super Entry<Short, Long>> serchPredicate(final long fromValue, final long toValue,
        final boolean fromInclusive, final boolean toInclusive) {
    if (fromInclusive && !toInclusive)
        return p -> (p.getValue() >= fromValue && p.getValue() < toValue);
    else if (!fromInclusive && toInclusive)
        return p -> (p.getValue() > fromValue && p.getValue() <= toValue);
    else if (fromInclusive && toInclusive)
        return p -> (p.getValue() >= fromValue && p.getValue() <= toValue);
    else
        return p -> (p.getValue() > fromValue && p.getValue() < toValue);
}

Maximum data handled this task is 32000 (32k)

In the code I'm splitting up tasks if it pass a Threshold

 if (size > majorDataThreshold)

When I try to reduce the majorDataThreshold less than 16001 value I'm getting an error

Stack Trace

at java.util.concurrent.RecursiveTask.exec(Unknown Source)
at java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.util.concurrent.ForkJoinPool.helpStealer(Unknown Source)
at java.util.concurrent.ForkJoinPool.awaitJoin(Unknown Source)
at java.util.concurrent.ForkJoinTask.doJoin(Unknown Source)
at java.util.concurrent.ForkJoinTask.invokeAll(Unknown Source)
at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:52)
at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:1)
...........................Same trace
at java.util.concurrent.ForkJoinTask.invokeAll(Unknown Source)
at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:52)
Caused by: java.lang.StackOverflowError
    ... 1024 more
Caused by: java.lang.StackOverflowError
    ... 1024 more
    .................Same trace
Caused by: java.lang.StackOverflowError
    at java.util.Collection.stream(Unknown Source)
    at com.ed.search.framework.forkjoin.SearchTask.search(SearchTask.java:74)
    at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:56)
    at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:1)
    at java.util.concurrent.RecursiveTask.exec(Unknown Source)
    at java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
    at java.util.concurrent.ForkJoinTask.doInvoke(Unknown Source)
    at java.util.concurrent.ForkJoinTask.invokeAll(Unknown Source)
    at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:52)
    at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:1)

To Solve this I tried to use

Collectors.toMap()
ConcurrentHashMap
Join Manually

Still issue didn't got resolved

Could some one help me to find what is wrong in my RecursiveTasktask.

Unit Test Code

public class Container32kUniqueDataTest {

private ForkJoinRangeContainer forkJoinContianer;

@Before
public void setUp(){
    long[] data = genrateTestData();
    forkJoinContianer = new ForkJoinRangeContainer(data)
}

private long[] genrateTestData(){
    long[] data= new long[32000];
    for (int i = 0; i < 32000; i++) {
        data[i]=i+1;
    }
    return data;
}

@Test
public void runARangeQuery_forkJoin(){
    Set<Short> ids = forkJoinContianer.findIdsInRange(14, 17, true, true);
    assertEquals(true, ids.size()>0);
}
}   

A skimmed version of Container Code

public class ForkJoinRangeContainer {

private Map<Short, Long> dataSource = new HashMap<Short, Long>();

public ForkJoinRangeContainer(long[] data) {
    populateData(data);
}

private void populateData(final long[] data) {
    for (short i = 0; i < data.length; i++) {
        dataSource.put(i, data[i]);
    }
}

public Set<Short> findIdsInRange(final long fromValue, long toValue, boolean fromInclusive, boolean toInclusive) {
    ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
    SearchTask task = new SearchTask(dataSource, fromValue, toValue, fromInclusive, toInclusive);
    Map<Short, Long> map = forkJoinPool.invoke(task);
    forkJoinPool.shutdown();
    return map.keySet();
}

public static void main(String[] args) {

    long[] data = new long[32000];
    for (int i = 0; i < 32000; i++) {
        data[i] = i + 1;
    }
    ForkJoinRangeContainer rf2 = new ForkJoinRangeContainer(data);
    Set<Short> ids = rf2.findIdsInRange(14, 17, true, true);
    if (ids.size() > 0) {
        System.out.println("Found Ids");
    }
}
1

There are 1 answers

2
edharned On

You’re stuck in a never-ending loop at SearchTask return ForkJoinTask.invokeAll(createSubtasks())

The createSubtasks() creates subtasks over and over again with the same values since you never reduce the dataMap size.

F/J works by splitting an object into Left and Right. Each Left and Right creates new Left and Right with half the value thereof. This halving keeps going on until a threshold where you “do the work.’

The first lesson I ever learned in programming was to Keep It Simple.

You’re mixing Map, ArrayMap, ConcurrentSkipListMap, ConcurrentNavigableMap, List, stream.Collectors, HashMap and Set along with the F/J Classes. Most confusing which makes it very difficult to follow and usually leads to failure. Simple is better.

When you create a List for ForkJoinTask.invokeAll(), create the List at one time, before the invoke(). The List should contain all the subtasks you need to complete the work, each subtask half the value of the one before. Don’t use a stream; you don’t have a stream, just a few subtasks in a List.

Either that or split Left and Right and do Left.fork() Right.fork(). Each forked task then splits again with half the value, etc.

Exactly how to reduce the object dataMap “size to split” is up to you.