Java 8: stop reduction operation from examining all Stream elements

5k views Asked by At

I am trying to understand if there is a way to terminate reduction operation without examining the whole stream and I cannot figure out a way.

The use-case is roughly as follows: let there be a long list of Integers which needs to be folded into an Accumulator. Each element examination is potentially expensive, so within the Accumulator, I perform a check on the incoming Accumulator to see if we even need to perform expensive operation - if we don't, then I simply return the accumulator.

This is obviously a fine solution for small(er) lists but huge lists incur unnecessary stream element visiting costs I'd like to avoid.

Here's a code sketch - assume serial reductions only.

class Accumulator {
    private final Set<A> setA = new HashSet<>;
    private final Set<B> setB = new HashSet<>;
}

class ResultSupplier implements Supplier<Result> {

    private final List<Integer> ids;

    @Override
    public Result get() {
        Accumulator acc = ids.stream().reduce(new Accumulator(), f(), (x, y) -> null);

        return (acc.setA.size > 1) ? Result.invalid() : Result.valid(acc.setB);
    }

    private static BiFunction<Accumulator, Integer, Accumulator> f() {
        return (acc, element) -> {
            if (acc.setA.size() <= 1) {
                // perform expensive ops and accumulate results
            }
            return acc;
        };
    }
}

In addition to having to traverse the whole Stream, there is another fact I dislike - I have to check the same condition twice (namely, setA size check).

I have considered map() and collect() operations but they just seemed like more of the same and didn't find they materially change the fact that I just can't finish the fold operation without examining the entire stream.

Additionally, my thinking is that imaginary takeWhile(p : (A) => boolean) Stream API correspondent would also buy us nothing, as the terminating condition depends on the accumulator, not stream elements per se.

Bear in mind I am a relative newcomer to FP so - is there a way to make this work as I expect it? Have I set up the whole problem improperly or is this limitation by design?

6

There are 6 answers

1
the8472 On BEST ANSWER

Instead of starting with ids.stream() you can

  1. use ids.spliterator()
  2. wrap resulting spliterator into custom spliterator that has a volatile boolean flag
  3. have the custom spliterator's tryAdvance return false if the flag is changed
  4. turn your custom spliterator into a stream with StreamSupport.stream(Spliterator<T>, boolean)
  5. continue your stream pipeline as before
  6. shut down the stream by toggling the boolean when your accumulator is full

Add some static helper methods to keep it functional.

the resulting API could look about this

Accumulator acc = terminateableStream(ids, (stream, terminator) ->
   stream.reduce(new Accumulator(terminator), f(), (x, y) -> null));

Additionally, my thinking is that imaginary takeWhile(p : (A) => boolean) Stream API correspondent would also buy us nothing

It does work if the condition is dependent on the accumulator state and not on the stream members. That's essentially the approach i've outlined above.

It probably would be forbidden in a takeWhile provided by the JDK but a custom implementation using spliterators is free to take a stateful approach.

4
Tagir Valeev On

I think, it's possible to throw a RuntimeException of special type from your custom collector (or reduce operation) which incorporates the result inside the exception object and catch it outside of collect operation unwrapping the result. I know that using the exception for non-exceptional control flow is not idiomatic, but it should work in your case even for parallel streams.

Actually there are many cases when short-circuit reduction could be useful. For example, collect the enum values to EnumSet (you can stop as soon as you discover that all the possible enum values are already collected). Or intersect all the elements of Stream<Set> (you can stop if your resulting set becomes empty after some step: continuing the reduction is useless). Internally there's a SHORT_CIRCUIT flag used in stream operations like findFirst, but it's not exposed to the public API.

4
Lukas Eder On

Of course, there will be an interesting, purely FP answer that might help solve this problem in the way you intend.

In the meantime, why use FP at all when the simple solution is pragmatically imperative and your original data source is a List anyway, which is already fully materialised, and you will use serial reduction, not parallel reduction. Write this instead:

@Override
public Result get() {
    Accumulator acc = new Accumulator();

    for (Integer id : ids) {
        if (acc.setA.size() <= 1) {
            // perform expensive ops and accumulate results
        }

        // Easy:
        if (enough)
            break;
    }

    return (acc.setA.size > 1) ? Result.invalid() : Result.valid(acc.setB);
}
0
ggalmazor On

I agree with all previous answers. You're doing it wrong by forcing a reduction on a mutable accumulator. Also, the process that you're describing can't be expressed as a pipeline of transformations and reductions.

If you really, really need to do it FP style, I'd do as @the8472 points out.

Anyway, I give you a new more compact alternative, similar to @lukas-eder's solution, using an Iterator:

Function<Integer, Integer> costlyComputation = Function.identity();

Accumulator acc = new Accumulator();

Iterator<Integer> ids = Arrays.asList(1, 2, 3).iterator();

while (!acc.hasEnough() && ids.hasNext())
  costlyComputation.andThen(acc::add).apply(ids.next());

You have two different concerns regarding FP here:

How to stop iterating

As you're depending on mutable state, FP is only going to make your life harder. You can iterate externally the collection or use an Iterator as I propose.

Then, use an if() to stop iteration.

You can have think of different strategies, but at the end of the day, this is what you're using.

I prefer the iterator because is more idiomatic (expresses better your intention in this case).

How to design the Accumulator and the costly operation

This is the most interesting for me.

A pure function can't have state, must receive something and must return something, and always the same something for the same input (like a mathematical function). Can you express your costly operation like this?

Does it need some shared state with the Accumulator? Maybe that shared doesn't belong to neither of them.

Will you transform your input and then append it in the Accumulator or is that the Accumulator's responsibility? Does it make sense to inject the function into the Accumulator?

0
Holger On

There is no real FP solution, simply because your entire accumulator isn’t FP. We can’t help you in this regard as we don’t know what it is actually doing. All we see is that it relies on two mutable collections and hence, can’t be a part of a pure FP solution.

If you accept the limitations and that there is no clean way using the Stream API you might strive for the simple way. The simple way incorporates a stateful Predicate which is not the best thing around but sometimes unavoidable:

public Result get() {
    int limit = 1;
    Set<A> setA=new HashSet<>();
    Set<B> setB=new HashSet<>();
    return ids.stream().anyMatch(i -> {
        // perform expensive ops and accumulate results
        return setA.size() > limit;
    })? Result.invalid(): Result.valid(setB);
}

But I want to note that given your specific logic, i.e. that your result is considered invalid when the set grows too large, your attempt of processing not too much elements is an optimization of the erroneous case. You shouldn’t waste effort on optimizing that. If a valid result is the result of processing all elements, then process all elements…

7
Marco13 On

As mentioned in the comments: The usage scenario sounds a bit dubious. On the one hand, because of the usage of reduce instead of collect, on the other hand because of the fact that the condition that should be used for stopping the reduction also appears in the accumulator. It sounds like simply limiting the stream to a certain number of elements, or based on a condition, as shown in another question, may be more appropriate here.

Of course, in the real application, it might be that the condition is in fact unrelated to the number of elements that have been processed. For this case, I sketched a solution here that basically corresponds to the answer by the8472, and is very similar to the solution from the question mentioned above: It uses a Stream that is created from a Spliterator that simply delegates to the original Spliterator, unless the stopping condition is met.

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class StopStreamReduction
{
    public static void main(String[] args)
    {
        ResultSupplier r = new ResultSupplier();
        System.out.println(r.get());
    }
}

class Accumulator
{
    final Set<Integer> set = new HashSet<Integer>();
}

class ResultSupplier implements Supplier<String>
{
    private final List<Integer> ids;
    ResultSupplier()
    {
        ids = new ArrayList<Integer>(Collections.nCopies(20, 1));
    }

    public String get()
    {
        //return getOriginal();
        return getStopping();
    }

    private String getOriginal()
    {
        Accumulator acc =
            ids.stream().reduce(new Accumulator(), f(), (x, y) -> null);
        return (acc.set.size() > 11) ? "invalid" : String.valueOf(acc.set);
    }

    private String getStopping()
    {
        Spliterator<Integer> originalSpliterator = ids.spliterator();
        Accumulator accumulator = new Accumulator();
        Spliterator<Integer> stoppingSpliterator = 
            new Spliterators.AbstractSpliterator<Integer>(
                originalSpliterator.estimateSize(), 0)
            {
                @Override
                public boolean tryAdvance(Consumer<? super Integer> action)
                {
                    return accumulator.set.size() > 10 ? false : 
                        originalSpliterator.tryAdvance(action);
                }
            };
        Stream<Integer> stream = 
            StreamSupport.stream(stoppingSpliterator, false);
        Accumulator acc =
            stream.reduce(accumulator, f(), (x, y) -> null);
        return (acc.set.size() > 11) ? "invalid" : String.valueOf(acc.set);
    }

    private static int counter = 0;
    private static BiFunction<Accumulator, Integer, Accumulator> f()
    {
        return (acc, element) -> {

            System.out.print("Step " + counter);
            if (acc.set.size() <= 10)
            {
                System.out.print(" expensive");
                acc.set.add(counter);
            }
            System.out.println();
            counter++;
            return acc;
        };
    }
}

Edit in response to the comments:

Of course, it is possible to write it "more functional". However, due to the vague descriptions in the questions and the rather "sketchy" code example, it's hard to find "THE" most appropriate solution here. (And "appropriate" refers to the specific caveats of the actual task, and to the question of how functional it should be without sacrificing readability).

Possible functionalization steps might include the creation of a generic StoppingSpliterator class that operates on a delegate Spliterator and has a Supplier<Boolean> as its stopping condition, and feeding this with a Predicate on the actual Accumulator, together with using some utility methods and method references here and there.

But again: It is debatable whether this is actually an appropriate solution, or whether one should not rather use the simple and pragmatic solution from the answer by Lukas Eder...

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.StreamSupport;

public class StopStreamReduction
{
    public static void main(String[] args)
    {
        List<Integer> collection = 
            new ArrayList<Integer>(Collections.nCopies(20, 1));
        System.out.println(compute(collection));
    }

    private static String compute(List<Integer> collection)
    {
        Predicate<Accumulator> stopCondition = (a) -> a.set.size() > 10;
        Accumulator result = reduceStopping(collection, 
            new Accumulator(), StopStreamReduction::accumulate, stopCondition);
        return (result.set.size() > 11) ? "invalid" : String.valueOf(result.set);
    }

    private static int counter;
    private static Accumulator accumulate(Accumulator a, Integer element)
    {
        System.out.print("Step " + counter);
        if (a.set.size() <= 10)
        {
            System.out.print(" expensive");
            a.set.add(counter);
        }
        System.out.println();
        counter++;
        return a;
    }

    static <U, T> U reduceStopping(
        Collection<T> collection, U identity,
        BiFunction<U, ? super T, U> accumulator,
        Predicate<U> stopCondition)
    {
       // This assumes that the accumulator always returns
       // the identity instance (with the accumulated values).
       // This may not always be true!
       return StreamSupport.stream(
           new StoppingSpliterator<T>(
               collection.spliterator(), 
               () -> stopCondition.test(identity)), false).
                   reduce(identity, accumulator, (x, y) -> null);
    }
}

class Accumulator
{
    final Set<Integer> set = new HashSet<Integer>();
}

class StoppingSpliterator<T> extends Spliterators.AbstractSpliterator<T>
{
    private final Spliterator<T> delegate;
    private final Supplier<Boolean> stopCondition;

    StoppingSpliterator(Spliterator<T> delegate, Supplier<Boolean> stopCondition)
    {
        super(delegate.estimateSize(), 0);
        this.delegate = delegate;
        this.stopCondition = stopCondition;
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action)
    {
        if (stopCondition.get())
        {
            return false;
        }
        return delegate.tryAdvance(action);
    }
}