I'm Trying to create a RecursiveTask<Map<Short, Long>>
I'm using this article as Reference
public class SearchTask2 extends RecursiveTask<Map<Short, Long>> {
private final int majorDataThreshold = 16000;
private ConcurrentNavigableMap<Short, Long> dataMap;
private long fromRange;
private long toRange;
private boolean fromInclusive;
private boolean toInclusive;
public SearchTask2(final Map<Short, Long> dataSource, final long fromRange, final long toRange,
final boolean fromInclusive, final boolean toInclusive) {
System.out.println("SearchTask :: ");
this.dataMap = new ConcurrentSkipListMap<>(dataSource);
this.fromRange = fromRange;
this.toRange = toRange;
this.fromInclusive = fromInclusive;
this.toInclusive = toInclusive;
}
@Override
protected Map<Short, Long> compute() {
System.out.println("SearchTask :: compute ");
//Map<Short, Long> result = new HashMap<>();
int size = dataMap.size();
if (size > majorDataThreshold + 2500) {
return ForkJoinTask.invokeAll(createSubtasks()).parallelStream().map(ForkJoinTask::join)
.collect(Collectors.toConcurrentMap(keyMapper, valueMapper));
//.forEach(entry -> result.put( entry.getKey(), (Long) entry.getValue()));
}
return search();
}
private List<SearchTask2> createSubtasks() {
final short lastKey = dataMap.lastKey();
final short midkey = (short) (lastKey / 2);
final short firstKey = dataMap.firstKey();
final List<SearchTask2> dividedTasks = new ArrayList<>();
dividedTasks.add(new SearchTask2(new HashMap<>(dataMap.subMap(firstKey, true, midkey, false)), fromRange,
toRange, fromInclusive, toInclusive));
dividedTasks.add(new SearchTask2(new HashMap<>(dataMap.subMap(midkey, true, lastKey, true)), fromRange, toRange,
fromInclusive, toInclusive));
return dividedTasks;
}
private HashMap<Short,Long> search(){
//My Search logic for values
return new HashMap<>();
}
}
Could some one help me keyMapper
and 'valueMapper' for my result Map,
I tried Collectors.toConcurrentMap(entry -> entry.getKey(), entry -> entry.getValue())
But it's showing me an error
Cannot infer type argument(s) for <R, A> collect(Collector<? super T,A,R>)
Your
ForkJoinTask::join
is returning a map, so you have a stream of maps. You seem to be expecting a stream of entries. You can useflatMap
to get from a stream of maps to a stream of entries like so:As a slight improvement, you can also use method references rather than the lambda expressions you were trying: