Stream.skip behavior with unordered terminal operation

1.7k views Asked by At

I've already read this and this questions, but still doubt whether the observed behavior of Stream.skip was intended by JDK authors.

Let's have simple input of numbers 1..20:

List<Integer> input = IntStream.rangeClosed(1, 20).boxed().collect(Collectors.toList());

Now let's create a parallel stream, combine the unordered() with skip() in different ways and collect the result:

System.out.println("skip-skip-unordered-toList: "
        + input.parallelStream().filter(x -> x > 0)
            .skip(1)
            .skip(1)
            .unordered()
            .collect(Collectors.toList()));
System.out.println("skip-unordered-skip-toList: "
        + input.parallelStream().filter(x -> x > 0)
            .skip(1)
            .unordered()
            .skip(1)
            .collect(Collectors.toList()));
System.out.println("unordered-skip-skip-toList: "
        + input.parallelStream().filter(x -> x > 0)
            .unordered()
            .skip(1)
            .skip(1)
            .collect(Collectors.toList()));

Filtering step does essentially nothing here, but adds more difficulty for stream engine: now it does not know the exact size of the output, thus some optimizations are turned off. I have the following results:

skip-skip-unordered-toList: [3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
// absent values: 1, 2
skip-unordered-skip-toList: [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16, 17, 18, 19, 20]
// absent values: 1, 15
unordered-skip-skip-toList: [1, 2, 3, 4, 5, 6, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 19, 20]
// absent values: 7, 18

The results are completely fine, everything works as expected. In the first case I asked to skip first two elements, then collect to list in no particular order. In the second case I asked to skip the first element, then turn into unordered and skip one more element (I don't care which one). In the third case I turned into unordered mode first, then skip two arbitrary elements.

Let's skip one element and collect to the custom collection in unordered mode. Our custom collection will be a HashSet:

System.out.println("skip-toCollection: "
        + input.parallelStream().filter(x -> x > 0)
        .skip(1)
        .unordered()
        .collect(Collectors.toCollection(HashSet::new)));

The output is satisfactory:

skip-toCollection: [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
// 1 is skipped

So in general I expect that as long as stream is ordered, skip() skips the first elements, otherwise it skips arbitrary ones.

However let's use an equivalent unordered terminal operation collect(Collectors.toSet()):

System.out.println("skip-toSet: "
        + input.parallelStream().filter(x -> x > 0)
            .skip(1)
            .unordered()
            .collect(Collectors.toSet()));

Now the output is:

skip-toSet: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 17, 18, 19, 20]
// 13 is skipped

The same result can be achieved with any other unordered terminal operation (like forEach, findAny, anyMatch, etc.). Removing unordered() step in this case changes nothing. Seems that while unordered() step correctly makes the stream unordered starting from the current operation, the unordered terminal operation makes the whole stream unordered starting from very beginning despite that this can affect the result if skip() was used. This seems completely misleading for me: I expect that using the unordered collector is the same as turning the stream into unordered mode just before the terminal operation and using the equivalent ordered collector.

So my questions are:

  1. Is this behavior intended or it's a bug?
  2. If yes is it documented somewhere? I've read Stream.skip() documentation: it does not say anything about unordered terminal operations. Also Characteristics.UNORDERED documentation is not very comprehend and does not say that ordering will be lost for the whole stream. Finally, Ordering section in package summary does not cover this case either. Probably I'm missing something?
  3. If it's intended that unordered terminal operation makes the whole stream unordered, why unordered() step makes it unordered only since this point? Can I rely on this behavior? Or I was just lucky that my first tests work nicely?
2

There are 2 answers

4
Brian Goetz On BEST ANSWER

Recall that the goal of stream flags (ORDERED, SORTED, SIZED, DISTINCT) is to enable operations to avoid doing unnecessary work. Examples of optimizations that involve stream flags are:

  • If we know the stream is already sorted, then sorted() is a no-op;
  • If we know the size of the stream, we can pre-allocate a correct-sized array in toArray(), avoiding a copy;
  • If we know that the input has no meaningful encounter order, we need not take extra steps to preserve encounter order.

Each stage of a pipeline has a set of stream flags. Intermediate operations can inject, preserve, or clear stream flags. For example, filtering preserves sorted-ness / distinct-ness but not sized-ness; mapping preserves sized-ness but not sorted-ness or distinct-ness. Sorting injects sorted-ness. The treatment of flags for intermediate operations is fairly straightforward, because all decisions are local.

The treatment of flags for terminal operations is more subtle. ORDERED is the most relevant flag for terminal ops. And if a terminal op is UNORDERED, then we do back-propagate the unordered-ness.

Why do we do this? Well, consider this pipeline:

set.stream()
   .sorted()
   .forEach(System.out::println);

Since forEach is not constrained to operate in order, the work of sorting the list is completely wasted effort. So we back-propagate this information (until we hit a short-circuiting operation, such as limit), so as not to lose this optimization opportunity. Similarly, we can use an optimized implementation of distinct on unordered streams.

Is this behavior intended or it's a bug?

Yes :) The back-propagation is intended, as it is a useful optimization that should not produce incorrect results. However, the bug part is that we are propagating past a previous skip, which we shouldn't. So the back-propagation of the UNORDERED flag is overly aggressive, and that's a bug. We'll post a bug.

If yes is it documented somewhere?

It should be just an implementation detail; if it were correctly implemented, you wouldn't notice (except that your streams are faster.)

2
Ruben On

@Ruben, you probably don't understand my question. Roughly the problem is: why unordered().collect(toCollection(HashSet::new)) behaves differently than collect(toSet()). Of course I know that toSet() is unordered.

Probably, but, anyway, I will give it a second try.

Having a look at the Javadocs of Collectors toSet and toCollection we can see that toSet delivers an unordered collector

This is an {@link Collector.Characteristics#UNORDERED unordered} Collector.

i.e., a CollectorImpl with the UNORDERED Characteristic. Having a look at the Javadoc of Collector.Characteristics#UNORDERED we can read:

Indicates that the collection operation does not commit to preserving the encounter order of input elements

In the Javadocs of Collector we can also see:

For concurrent collectors, an implementation is free to (but not required to) implement reduction concurrently. A concurrent reduction is one where the accumulator function is called concurrently from multiple threads, using the same concurrently-modifiable result container, rather than keeping the result isolated during accumulation. A concurrent reduction should only be applied if the collector has the {@link Characteristics#UNORDERED} characteristics or if the originating data is unordered

This means to me that, if we set the UNORDERED characteristic, we do not care at all about the order in which the elements of the stream get passed to the accumulator, and, therefore, the elements can be extracted from the pipeline in any order.

Btw, you get the same behavior if you omit the unordered() in your example:

    System.out.println("skip-toSet: "
            + input.parallelStream().filter(x -> x > 0)
                .skip(1)
                .collect(Collectors.toSet()));

Furthermore, the skip() method in Stream gives us a hint:

While {@code skip()} is generally a cheap operation on sequential stream pipelines, it can be quite expensive on ordered parallel pipelines

and

Using an unordered stream source (such as {@link #generate(Supplier)}) or removing the ordering constraint with {@link #unordered()} may result in significant speedups

When using

Collectors.toCollection(HashSet::new)

you are creating a normal "ordered" Collector (one without the UNORDERED characteristic), what to me means that you do care about the ordering, and, therefore, the elements are being extracted in order and you get the expected behavior.