RecursiveTask Results to a ConcurrentMap

268 views Asked by At

I'm Trying to create a RecursiveTask<Map<Short, Long>>

I'm using this article as Reference

public class SearchTask2 extends RecursiveTask<Map<Short, Long>> {

    private final int majorDataThreshold = 16000;

    private ConcurrentNavigableMap<Short, Long> dataMap;
    private long fromRange;
    private long toRange;
    private boolean fromInclusive;
    private boolean toInclusive;

    public SearchTask2(final Map<Short, Long> dataSource, final long fromRange, final long toRange,
            final boolean fromInclusive, final boolean toInclusive) {
        System.out.println("SearchTask ::  ");
        this.dataMap = new ConcurrentSkipListMap<>(dataSource);
        this.fromRange = fromRange;
        this.toRange = toRange;
        this.fromInclusive = fromInclusive;
        this.toInclusive = toInclusive;
    }

    @Override
    protected Map<Short, Long> compute() {
        System.out.println("SearchTask :: compute ");
        //Map<Short, Long> result = new HashMap<>();
        int size = dataMap.size();
        if (size > majorDataThreshold + 2500) {
             return ForkJoinTask.invokeAll(createSubtasks()).parallelStream().map(ForkJoinTask::join)
             .collect(Collectors.toConcurrentMap(keyMapper, valueMapper));

            //.forEach(entry -> result.put( entry.getKey(), (Long) entry.getValue()));  
        } 
        return  search();
    }

    private List<SearchTask2> createSubtasks() {
        final short lastKey = dataMap.lastKey();
        final short midkey = (short) (lastKey / 2);
        final short firstKey = dataMap.firstKey();
        final List<SearchTask2> dividedTasks = new ArrayList<>();
        dividedTasks.add(new SearchTask2(new HashMap<>(dataMap.subMap(firstKey, true, midkey, false)), fromRange,
                toRange, fromInclusive, toInclusive));
        dividedTasks.add(new SearchTask2(new HashMap<>(dataMap.subMap(midkey, true, lastKey, true)), fromRange, toRange,
                fromInclusive, toInclusive));
        return dividedTasks;
    }

    private HashMap<Short,Long> search(){
        //My Search logic for values
        return new HashMap<>();
    }
}

Could some one help me keyMapper and 'valueMapper' for my result Map, I tried Collectors.toConcurrentMap(entry -> entry.getKey(), entry -> entry.getValue())

But it's showing me an error

Cannot infer type argument(s) for <R, A> collect(Collector<? super T,A,R>)
1

There are 1 answers

2
Michael On BEST ANSWER

Your ForkJoinTask::join is returning a map, so you have a stream of maps. You seem to be expecting a stream of entries. You can use flatMap to get from a stream of maps to a stream of entries like so:

return ForkJoinTask.invokeAll(createSubtasks())
    .parallelStream()
    .map(ForkJoinTask::join)
    .flatMap(map -> map.entrySet().stream())   // you were missing this line
    .collect(
        Collectors.toConcurrentMap(entry -> entry.getKey(), entry -> entry.getValue())
    );

As a slight improvement, you can also use method references rather than the lambda expressions you were trying:

Collectors.toConcurrentMap(Entry::getKey, Entry::getValue)