I designed a RecursiveTask
Here is the code for task I designed.
public class SearchTask extends RecursiveTask<Map<Short, Long>> {
private static final long serialVersionUID = 1L;
private int majorDataThreshold = 16001;
private ConcurrentNavigableMap<Short, Long> dataMap;
private long fromRange;
private long toRange;
private boolean fromInclusive;
private boolean toInclusive;
public SearchTask(final Map<Short, Long> dataSource, final long fromRange, final long toRange,
final boolean fromInclusive, final boolean toInclusive) {
this.dataMap = new ConcurrentSkipListMap<>(dataSource);
this.fromRange = fromRange;
this.toRange = toRange;
this.fromInclusive = fromInclusive;
this.toInclusive = toInclusive;
}
@Override
protected Map<Short, Long> compute() {
final int size = dataMap.size();
// This is not a perfect RecursiveTask, because the if condition is designed to overcome a stackoverflow error when map filled with 32k data
if (size > majorDataThreshold+1000) {
// List<SearchTask> tasks = createSubtasks();
// tasks.get(0).fork();
// tasks.get(1).fork();
// Map<Short, Long> map = new ConcurrentHashMap<>(tasks.get(0).join());
// map.putAll(tasks.get(1).join());
// return map;
return ForkJoinTask.invokeAll(createSubtasks()).stream().map(ForkJoinTask::join)
.flatMap(map -> map.entrySet().stream())
.collect(Collectors.toConcurrentMap(Entry::getKey, Entry::getValue));
}
return search();
}
private List<SearchTask> createSubtasks() {
final short lastKey = dataMap.lastKey();
final short midkey = (short) (lastKey / 2);
final short firstKey = dataMap.firstKey();
final List<SearchTask> dividedTasks = new ArrayList<>();
dividedTasks.add(
new SearchTask(new ConcurrentSkipListMap<Short, Long>(dataMap.subMap(firstKey, true, midkey, false)),
fromRange, toRange, fromInclusive, toInclusive));
dividedTasks
.add(new SearchTask(new ConcurrentSkipListMap<Short, Long>(dataMap.subMap(midkey, true, lastKey, true)),
fromRange, toRange, fromInclusive, toInclusive));
return dividedTasks;
}
private Map<Short, Long> search() {
final Map<Short, Long> result = dataMap.entrySet().stream()
.filter(serchPredicate(fromRange, toRange, fromInclusive, toInclusive))
.collect(Collectors.toConcurrentMap(p -> p.getKey(), p -> p.getValue()));
return result;
}
private static Predicate<? super Entry<Short, Long>> serchPredicate(final long fromValue, final long toValue,
final boolean fromInclusive, final boolean toInclusive) {
if (fromInclusive && !toInclusive)
return p -> (p.getValue() >= fromValue && p.getValue() < toValue);
else if (!fromInclusive && toInclusive)
return p -> (p.getValue() > fromValue && p.getValue() <= toValue);
else if (fromInclusive && toInclusive)
return p -> (p.getValue() >= fromValue && p.getValue() <= toValue);
else
return p -> (p.getValue() > fromValue && p.getValue() < toValue);
}
Maximum data handled this task is 32000 (32k)
In the code I'm splitting up tasks if it pass a Threshold
if (size > majorDataThreshold)
When I try to reduce the majorDataThreshold less than 16001 value I'm getting an error
Stack Trace
at java.util.concurrent.RecursiveTask.exec(Unknown Source)
at java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.util.concurrent.ForkJoinPool.helpStealer(Unknown Source)
at java.util.concurrent.ForkJoinPool.awaitJoin(Unknown Source)
at java.util.concurrent.ForkJoinTask.doJoin(Unknown Source)
at java.util.concurrent.ForkJoinTask.invokeAll(Unknown Source)
at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:52)
at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:1)
...........................Same trace
at java.util.concurrent.ForkJoinTask.invokeAll(Unknown Source)
at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:52)
Caused by: java.lang.StackOverflowError
... 1024 more
Caused by: java.lang.StackOverflowError
... 1024 more
.................Same trace
Caused by: java.lang.StackOverflowError
at java.util.Collection.stream(Unknown Source)
at com.ed.search.framework.forkjoin.SearchTask.search(SearchTask.java:74)
at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:56)
at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:1)
at java.util.concurrent.RecursiveTask.exec(Unknown Source)
at java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.util.concurrent.ForkJoinTask.doInvoke(Unknown Source)
at java.util.concurrent.ForkJoinTask.invokeAll(Unknown Source)
at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:52)
at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:1)
To Solve this I tried to use
Collectors.toMap()
ConcurrentHashMap
Join Manually
Still issue didn't got resolved
Could some one help me to find what is wrong in my RecursiveTask
task.
Unit Test Code
public class Container32kUniqueDataTest {
private ForkJoinRangeContainer forkJoinContianer;
@Before
public void setUp(){
long[] data = genrateTestData();
forkJoinContianer = new ForkJoinRangeContainer(data)
}
private long[] genrateTestData(){
long[] data= new long[32000];
for (int i = 0; i < 32000; i++) {
data[i]=i+1;
}
return data;
}
@Test
public void runARangeQuery_forkJoin(){
Set<Short> ids = forkJoinContianer.findIdsInRange(14, 17, true, true);
assertEquals(true, ids.size()>0);
}
}
A skimmed version of Container Code
public class ForkJoinRangeContainer {
private Map<Short, Long> dataSource = new HashMap<Short, Long>();
public ForkJoinRangeContainer(long[] data) {
populateData(data);
}
private void populateData(final long[] data) {
for (short i = 0; i < data.length; i++) {
dataSource.put(i, data[i]);
}
}
public Set<Short> findIdsInRange(final long fromValue, long toValue, boolean fromInclusive, boolean toInclusive) {
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
SearchTask task = new SearchTask(dataSource, fromValue, toValue, fromInclusive, toInclusive);
Map<Short, Long> map = forkJoinPool.invoke(task);
forkJoinPool.shutdown();
return map.keySet();
}
public static void main(String[] args) {
long[] data = new long[32000];
for (int i = 0; i < 32000; i++) {
data[i] = i + 1;
}
ForkJoinRangeContainer rf2 = new ForkJoinRangeContainer(data);
Set<Short> ids = rf2.findIdsInRange(14, 17, true, true);
if (ids.size() > 0) {
System.out.println("Found Ids");
}
}
You’re stuck in a never-ending loop at SearchTask return ForkJoinTask.invokeAll(createSubtasks())
The createSubtasks() creates subtasks over and over again with the same values since you never reduce the dataMap size.
F/J works by splitting an object into Left and Right. Each Left and Right creates new Left and Right with half the value thereof. This halving keeps going on until a threshold where you “do the work.’
The first lesson I ever learned in programming was to Keep It Simple.
You’re mixing Map, ArrayMap, ConcurrentSkipListMap, ConcurrentNavigableMap, List, stream.Collectors, HashMap and Set along with the F/J Classes. Most confusing which makes it very difficult to follow and usually leads to failure. Simple is better.
When you create a List for ForkJoinTask.invokeAll(), create the List at one time, before the invoke(). The List should contain all the subtasks you need to complete the work, each subtask half the value of the one before. Don’t use a stream; you don’t have a stream, just a few subtasks in a List.
Either that or split Left and Right and do Left.fork() Right.fork(). Each forked task then splits again with half the value, etc.
Exactly how to reduce the object dataMap “size to split” is up to you.