/* * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this * particular file as subject to the "Classpath" exception as provided * by Oracle in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ /** * Classes to support functional-style operations on streams of elements, such * as map-reduce transformations on collections. For example: * *
{@code
 *     int sum = widgets.stream()
 *                      .filter(b -> b.getColor() == RED)
 *                      .mapToInt(b -> b.getWeight())
 *                      .sum();
 * }
* *

Here we use {@code widgets}, a {@code Collection}, * as a source for a stream, and then perform a filter-map-reduce on the stream * to obtain the sum of the weights of the red widgets. (Summation is an * example of a reduction * operation.) * *

The key abstraction introduced in this package is stream. The * classes {@link java.util.stream.Stream}, {@link java.util.stream.IntStream}, * {@link java.util.stream.LongStream}, and {@link java.util.stream.DoubleStream} * are streams over objects and the primitive {@code int}, {@code long} and * {@code double} types. Streams differ from collections in several ways: * *

* * Streams can be obtained in a number of ways. Some examples include: * * *

Additional stream sources can be provided by third-party libraries using * these techniques. * *

Stream operations and pipelines

* *

Stream operations are divided into intermediate and * terminal operations, and are combined to form stream * pipelines. A stream pipeline consists of a source (such as a * {@code Collection}, an array, a generator function, or an I/O channel); * followed by zero or more intermediate operations such as * {@code Stream.filter} or {@code Stream.map}; and a terminal operation such * as {@code Stream.forEach} or {@code Stream.reduce}. * *

Intermediate operations return a new stream. They are always * lazy; executing an intermediate operation such as * {@code filter()} does not actually perform any filtering, but instead * creates a new stream that, when traversed, contains the elements of * the initial stream that match the given predicate. Traversal * of the pipeline source does not begin until the terminal operation of the * pipeline is executed. * *

Terminal operations, such as {@code Stream.forEach} or * {@code IntStream.sum}, may traverse the stream to produce a result or a * side-effect. After the terminal operation is performed, the stream pipeline * is considered consumed, and can no longer be used; if you need to traverse * the same data source again, you must return to the data source to get a new * stream. In almost all cases, terminal operations are eager, * completing their traversal of the data source and processing of the pipeline * before returning. Only the terminal operations {@code iterator()} and * {@code spliterator()} are not; these are provided as an "escape hatch" to enable * arbitrary client-controlled pipeline traversals in the event that the * existing operations are not sufficient to the task. * *

Processing streams lazily allows for significant efficiencies; in a * pipeline such as the filter-map-sum example above, filtering, mapping, and * summing can be fused into a single pass on the data, with minimal * intermediate state. Laziness also allows avoiding examining all the data * when it is not necessary; for operations such as "find the first string * longer than 1000 characters", it is only necessary to examine just enough * strings to find one that has the desired characteristics without examining * all of the strings available from the source. (This behavior becomes even * more important when the input stream is infinite and not merely large.) * *

Intermediate operations are further divided into stateless * and stateful operations. Stateless operations, such as {@code filter} * and {@code map}, retain no state from previously seen element when processing * a new element -- each element can be processed * independently of operations on other elements. Stateful operations, such as * {@code distinct} and {@code sorted}, may incorporate state from previously * seen elements when processing new elements. * *

Stateful operations may need to process the entire input * before producing a result. For example, one cannot produce any results from * sorting a stream until one has seen all elements of the stream. As a result, * under parallel computation, some pipelines containing stateful intermediate * operations may require multiple passes on the data or may need to buffer * significant data. Pipelines containing exclusively stateless intermediate * operations can be processed in a single pass, whether sequential or parallel, * with minimal data buffering. * *

Further, some operations are deemed short-circuiting operations. * An intermediate operation is short-circuiting if, when presented with * infinite input, it may produce a finite stream as a result. A terminal * operation is short-circuiting if, when presented with infinite input, it may * terminate in finite time. Having a short-circuiting operation in the pipeline * is a necessary, but not sufficient, condition for the processing of an infinite * stream to terminate normally in finite time. * *

Parallelism

* *

Processing elements with an explicit {@code for-}loop is inherently serial. * Streams facilitate parallel execution by reframing the computation as a pipeline of * aggregate operations, rather than as imperative operations on each individual * element. All streams operations can execute either in serial or in parallel. * The stream implementations in the JDK create serial streams unless parallelism is * explicitly requested. For example, {@code Collection} has methods * {@link java.util.Collection#stream} and {@link java.util.Collection#parallelStream}, * which produce sequential and parallel streams respectively; other * stream-bearing methods such as {@link java.util.stream.IntStream#range(int, int)} * produce sequential streams but these streams can be efficiently parallelized by * invoking their {@link java.util.stream.BaseStream#parallel()} method. * To execute the prior "sum of weights of widgets" query in parallel, we would * do: * *

{@code
 *     int sumOfWeights = widgets.}parallelStream(){@code
 *                               .filter(b -> b.getColor() == RED)
 *                               .mapToInt(b -> b.getWeight())
 *                               .sum();
 * }
* *

The only difference between the serial and parallel versions of this * example is the creation of the initial stream, using "{@code parallelStream()}" * instead of "{@code stream()}". When the terminal operation is initiated, * the stream pipeline is executed sequentially or in parallel depending on the * orientation of the stream on which it is invoked. Whether a stream will execute in serial or * parallel can be determined with the {@code isParallel()} method, and the * orientation of a stream can be modified with the * {@link java.util.stream.BaseStream#sequential()} and * {@link java.util.stream.BaseStream#parallel()} operations. When the terminal * operation is initiated, the stream pipeline is executed sequentially or in * parallel depending on the mode of the stream on which it is invoked. * *

Except for operations identified as explicitly nondeterministic, such * as {@code findAny()}, whether a stream executes sequentially or in parallel * should not change the result of the computation. * *

Most stream operations accept parameters that describe user-specified * behavior, which are often lambda expressions. To preserve correct behavior, * these behavioral parameters must be non-interfering, and in * most cases must be stateless. Such parameters are always instances * of a functional interface such * as {@link java.util.function.Function}, and are often lambda expressions or * method references. * *

Non-interference

* * Streams enable you to execute possibly-parallel aggregate operations over a * variety of data sources, including even non-thread-safe collections such as * {@code ArrayList}. This is possible only if we can prevent * interference with the data source during the execution of a stream * pipeline. Except for the escape-hatch operations {@code iterator()} and * {@code spliterator()}, execution begins when the terminal operation is * invoked, and ends when the terminal operation completes. For most data * sources, preventing interference means ensuring that the data source is * not modified at all during the execution of the stream pipeline. * The notable exception to this are streams whose sources are concurrent * collections, which are specifically designed to handle concurrent modification. * Concurrent stream sources are those whose {@code Spliterator} reports the * {@code CONCURRENT} characteristic. * *

Accordingly, behavioral parameters in stream pipelines whose source might * not be concurrent should never modify the stream's data source. * A behavioral parameter is said to interfere with a non-concurrent * data source if it modifies, or causes to be * modified, the stream's data source. The need for non-interference applies * to all pipelines, not just parallel ones. Unless the stream source is * concurrent, modifying a stream's data source during execution of a stream * pipeline can cause exceptions, incorrect answers, or nonconformant behavior. * * For well-behaved stream sources, the source can be modified before the * terminal operation commences and those modifications will be reflected in * the covered elements. For example, consider the following code: * *

{@code
 *     List l = new ArrayList(Arrays.asList("one", "two"));
 *     Stream sl = l.stream();
 *     l.add("three");
 *     String s = sl.collect(joining(" "));
 * }
* * First a list is created consisting of two strings: "one"; and "two". Then a * stream is created from that list. Next the list is modified by adding a third * string: "three". Finally the elements of the stream are collected and joined * together. Since the list was modified before the terminal {@code collect} * operation commenced the result will be a string of "one two three". All the * streams returned from JDK collections, and most other JDK classes, * are well-behaved in this manner; for streams generated by other libraries, see * Low-level stream * construction for requirements for building well-behaved streams. * *

Stateless behaviors

* * Stream pipeline results may be nondeterministic or incorrect if the behavioral * parameters to the stream operations are stateful. A stateful lambda * (or other object implementing the appropriate functional interface) is one * whose result depends on any state which might change during the execution * of the stream pipeline. An example of a stateful lambda is the parameter * to {@code map()} in: * *
{@code
 *     Set seen = Collections.synchronizedSet(new HashSet<>());
 *     stream.parallel().map(e -> { if (seen.add(e)) return 0; else return e; })...
 * }
* * Here, if the mapping operation is performed in parallel, the results for the * same input could vary from run to run, due to thread scheduling differences, * whereas, with a stateless lambda expression the results would always be the * same. * *

Note also that attempting to access mutable state from behavioral parameters * presents you with a bad choice with respect to safety and performance; if * you do not synchronize access to that state, you have a data race and * therefore your code is broken, but if you do synchronize access to that * state, you risk having contention undermine the parallelism you are seeking * to benefit from. The best approach is to avoid stateful behavioral * parameters to stream operations entirely; there is usually a way to * restructure the stream pipeline to avoid statefulness. * *

Side-effects

* * Side-effects in behavioral parameters to stream operations are, in general, * discouraged, as they can often lead to unwitting violations of the * statelessness requirement, as well as other thread-safety hazards. * *

If the behavioral parameters do have side-effects, unless explicitly * stated, there are no guarantees as to the * visibility * of those side-effects to other threads, nor are there any guarantees that * different operations on the "same" element within the same stream pipeline * are executed in the same thread. Further, the ordering of those effects * may be surprising. Even when a pipeline is constrained to produce a * result that is consistent with the encounter order of the stream * source (for example, {@code IntStream.range(0,5).parallel().map(x -> x*2).toArray()} * must produce {@code [0, 2, 4, 6, 8]}), no guarantees are made as to the order * in which the mapper function is applied to individual elements, or in what * thread any behavioral parameter is executed for a given element. * *

Many computations where one might be tempted to use side effects can be more * safely and efficiently expressed without side-effects, such as using * reduction instead of mutable * accumulators. However, side-effects such as using {@code println()} for debugging * purposes are usually harmless. A small number of stream operations, such as * {@code forEach()} and {@code peek()}, can operate only via side-effects; * these should be used with care. * *

As an example of how to transform a stream pipeline that inappropriately * uses side-effects to one that does not, the following code searches a stream * of strings for those matching a given regular expression, and puts the * matches in a list. * *

{@code
 *     ArrayList results = new ArrayList<>();
 *     stream.filter(s -> pattern.matcher(s).matches())
 *           .forEach(s -> results.add(s));  // Unnecessary use of side-effects!
 * }
* * This code unnecessarily uses side-effects. If executed in parallel, the * non-thread-safety of {@code ArrayList} would cause incorrect results, and * adding needed synchronization would cause contention, undermining the * benefit of parallelism. Furthermore, using side-effects here is completely * unnecessary; the {@code forEach()} can simply be replaced with a reduction * operation that is safer, more efficient, and more amenable to * parallelization: * *
{@code
 *     Listresults =
 *         stream.filter(s -> pattern.matcher(s).matches())
 *               .collect(Collectors.toList());  // No side-effects!
 * }
* *

Ordering

* *

Streams may or may not have a defined encounter order. Whether * or not a stream has an encounter order depends on the source and the * intermediate operations. Certain stream sources (such as {@code List} or * arrays) are intrinsically ordered, whereas others (such as {@code HashSet}) * are not. Some intermediate operations, such as {@code sorted()}, may impose * an encounter order on an otherwise unordered stream, and others may render an * ordered stream unordered, such as {@link java.util.stream.BaseStream#unordered()}. * Further, some terminal operations may ignore encounter order, such as * {@code forEach()}. * *

If a stream is ordered, most operations are constrained to operate on the * elements in their encounter order; if the source of a stream is a {@code List} * containing {@code [1, 2, 3]}, then the result of executing {@code map(x -> x*2)} * must be {@code [2, 4, 6]}. However, if the source has no defined encounter * order, then any permutation of the values {@code [2, 4, 6]} would be a valid * result. * *

For sequential streams, the presence or absence of an encounter order does * not affect performance, only determinism. If a stream is ordered, repeated * execution of identical stream pipelines on an identical source will produce * an identical result; if it is not ordered, repeated execution might produce * different results. * *

For parallel streams, relaxing the ordering constraint can sometimes enable * more efficient execution. Certain aggregate operations, * such as filtering duplicates ({@code distinct()}) or grouped reductions * ({@code Collectors.groupingBy()}) can be implemented more efficiently if ordering of elements * is not relevant. Similarly, operations that are intrinsically tied to encounter order, * such as {@code limit()}, may require * buffering to ensure proper ordering, undermining the benefit of parallelism. * In cases where the stream has an encounter order, but the user does not * particularly care about that encounter order, explicitly de-ordering * the stream with {@link java.util.stream.BaseStream#unordered() unordered()} may * improve parallel performance for some stateful or terminal operations. * However, most stream pipelines, such as the "sum of weight of blocks" example * above, still parallelize efficiently even under ordering constraints. * *

Reduction operations

* * A reduction operation (also called a fold) takes a sequence * of input elements and combines them into a single summary result by repeated * application of a combining operation, such as finding the sum or maximum of * a set of numbers, or accumulating elements into a list. The streams classes have * multiple forms of general reduction operations, called * {@link java.util.stream.Stream#reduce(java.util.function.BinaryOperator) reduce()} * and {@link java.util.stream.Stream#collect(java.util.stream.Collector) collect()}, * as well as multiple specialized reduction forms such as * {@link java.util.stream.IntStream#sum() sum()}, {@link java.util.stream.IntStream#max() max()}, * or {@link java.util.stream.IntStream#count() count()}. * *

Of course, such operations can be readily implemented as simple sequential * loops, as in: *

{@code
 *    int sum = 0;
 *    for (int x : numbers) {
 *       sum += x;
 *    }
 * }
* However, there are good reasons to prefer a reduce operation * over a mutative accumulation such as the above. Not only is a reduction * "more abstract" -- it operates on the stream as a whole rather than individual * elements -- but a properly constructed reduce operation is inherently * parallelizable, so long as the function(s) used to process the elements * are associative and * stateless. * For example, given a stream of numbers for which we want to find the sum, we * can write: *
{@code
 *    int sum = numbers.stream().reduce(0, (x,y) -> x+y);
 * }
* or: *
{@code
 *    int sum = numbers.stream().reduce(0, Integer::sum);
 * }
* *

These reduction operations can run safely in parallel with almost no * modification: *

{@code
 *    int sum = numbers.parallelStream().reduce(0, Integer::sum);
 * }
* *

Reduction parallellizes well because the implementation * can operate on subsets of the data in parallel, and then combine the * intermediate results to get the final correct answer. (Even if the language * had a "parallel for-each" construct, the mutative accumulation approach would * still required the developer to provide * thread-safe updates to the shared accumulating variable {@code sum}, and * the required synchronization would then likely eliminate any performance gain from * parallelism.) Using {@code reduce()} instead removes all of the * burden of parallelizing the reduction operation, and the library can provide * an efficient parallel implementation with no additional synchronization * required. * *

The "widgets" examples shown earlier shows how reduction combines with * other operations to replace for loops with bulk operations. If {@code widgets} * is a collection of {@code Widget} objects, which have a {@code getWeight} method, * we can find the heaviest widget with: *

{@code
 *     OptionalInt heaviest = widgets.parallelStream()
 *                                   .mapToInt(Widget::getWeight)
 *                                   .max();
 * }
* *

In its more general form, a {@code reduce} operation on elements of type * {@code } yielding a result of type {@code } requires three parameters: *

{@code
 *  U reduce(U identity,
 *              BiFunction accumulator,
 *              BinaryOperator combiner);
 * }
* Here, the identity element is both an initial seed value for the reduction * and a default result if there are no input elements. The accumulator * function takes a partial result and the next element, and produces a new * partial result. The combiner function combines two partial results * to produce a new partial result. (The combiner is necessary in parallel * reductions, where the input is partitioned, a partial accumulation computed * for each partition, and then the partial results are combined to produce a * final result.) * *

More formally, the {@code identity} value must be an identity for * the combiner function. This means that for all {@code u}, * {@code combiner.apply(identity, u)} is equal to {@code u}. Additionally, the * {@code combiner} function must be associative and * must be compatible with the {@code accumulator} function: for all {@code u} * and {@code t}, {@code combiner.apply(u, accumulator.apply(identity, t))} must * be {@code equals()} to {@code accumulator.apply(u, t)}. * *

The three-argument form is a generalization of the two-argument form, * incorporating a mapping step into the accumulation step. We could * re-cast the simple sum-of-weights example using the more general form as * follows: *

{@code
 *     int sumOfWeights = widgets.stream()
 *                               .reduce(0,
 *                                       (sum, b) -> sum + b.getWeight())
 *                                       Integer::sum);
 * }
* though the explicit map-reduce form is more readable and therefore should * usually be preferred. The generalized form is provided for cases where * significant work can be optimized away by combining mapping and reducing * into a single function. * *

Mutable reduction

* * A mutable reduction operation accumulates input elements into a * mutable result container, such as a {@code Collection} or {@code StringBuilder}, * as it processes the elements in the stream. * *

If we wanted to take a stream of strings and concatenate them into a * single long string, we could achieve this with ordinary reduction: *

{@code
 *     String concatenated = strings.reduce("", String::concat)
 * }
* *

We would get the desired result, and it would even work in parallel. However, * we might not be happy about the performance! Such an implementation would do * a great deal of string copying, and the run time would be O(n^2) in * the number of characters. A more performant approach would be to accumulate * the results into a {@link java.lang.StringBuilder}, which is a mutable * container for accumulating strings. We can use the same technique to * parallelize mutable reduction as we do with ordinary reduction. * *

The mutable reduction operation is called * {@link java.util.stream.Stream#collect(Collector) collect()}, * as it collects together the desired results into a result container such * as a {@code Collection}. * A {@code collect} operation requires three functions: * a supplier function to construct new instances of the result container, an * accumulator function to incorporate an input element into a result * container, and a combining function to merge the contents of one result * container into another. The form of this is very similar to the general * form of ordinary reduction: *

{@code
 *  R collect(Supplier supplier,
 *               BiConsumer accumulator,
 *               BiConsumer combiner);
 * }
*

As with {@code reduce()}, a benefit of expressing {@code collect} in this * abstract way is that it is directly amenable to parallelization: we can * accumulate partial results in parallel and then combine them, so long as the * accumulation and combining functions satisfy the appropriate requirements. * For example, to collect the String representations of the elements in a * stream into an {@code ArrayList}, we could write the obvious sequential * for-each form: *

{@code
 *     ArrayList strings = new ArrayList<>();
 *     for (T element : stream) {
 *         strings.add(element.toString());
 *     }
 * }
* Or we could use a parallelizable collect form: *
{@code
 *     ArrayList strings = stream.collect(() -> new ArrayList<>(),
 *                                                (c, e) -> c.add(e.toString()),
 *                                                (c1, c2) -> c1.addAll(c2));
 * }
* or, pulling the mapping operation out of the accumulator function, we could * express it more succinctly as: *
{@code
 *     List strings = stream.map(Object::toString)
 *                                  .collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
 * }
* Here, our supplier is just the {@link java.util.ArrayList#ArrayList() * ArrayList constructor}, the accumulator adds the stringified element to an * {@code ArrayList}, and the combiner simply uses {@link java.util.ArrayList#addAll addAll} * to copy the strings from one container into the other. * *

The three aspects of {@code collect} -- supplier, accumulator, and * combiner -- are tightly coupled. We can use the abstraction of a * {@link java.util.stream.Collector} to capture all three aspects. The * above example for collecting strings into a {@code List} can be rewritten * using a standard {@code Collector} as: *

{@code
 *     List strings = stream.map(Object::toString)
 *                                  .collect(Collectors.toList());
 * }
* *

Packaging mutable reductions into a Collector has another advantage: * composability. The class {@link java.util.stream.Collectors} contains a * number of predefined factories for collectors, including combinators * that transform one collector into another. For example, suppose we have a * collector that computes the sum of the salaries of a stream of * employees, as follows: * *

{@code
 *     Collector summingSalaries
 *         = Collectors.summingInt(Employee::getSalary);
 * }
* * (The {@code ?} for the second type parameter merely indicates that we don't * care about the intermediate representation used by this collector.) * If we wanted to create a collector to tabulate the sum of salaries by * department, we could reuse {@code summingSalaries} using * {@link java.util.stream.Collectors#groupingBy(java.util.function.Function, java.util.stream.Collector) groupingBy}: * *
{@code
 *     Map salariesByDept
 *         = employees.stream().collect(Collectors.groupingBy(Employee::getDepartment,
 *                                                            summingSalaries));
 * }
* *

As with the regular reduction operation, {@code collect()} operations can * only be parallelized if appropriate conditions are met. For any partially * accumulated result, combining it with an empty result container must * produce an equivalent result. That is, for a partially accumulated result * {@code p} that is the result of any series of accumulator and combiner * invocations, {@code p} must be equivalent to * {@code combiner.apply(p, supplier.get())}. * *

Further, however the computation is split, it must produce an equivalent * result. For any input elements {@code t1} and {@code t2}, the results * {@code r1} and {@code r2} in the computation below must be equivalent: *

{@code
 *     A a1 = supplier.get();
 *     accumulator.accept(a1, t1);
 *     accumulator.accept(a1, t2);
 *     R r1 = finisher.apply(a1);  // result without splitting
 *
 *     A a2 = supplier.get();
 *     accumulator.accept(a2, t1);
 *     A a3 = supplier.get();
 *     accumulator.accept(a3, t2);
 *     R r2 = finisher.apply(combiner.apply(a2, a3));  // result with splitting
 * }
* *

Here, equivalence generally means according to {@link java.lang.Object#equals(Object)}. * but in some cases equivalence may be relaxed to account for differences in * order. * *

Reduction, concurrency, and ordering

* * With some complex reduction operations, for example a {@code collect()} that * produces a {@code Map}, such as: *
{@code
 *     Map> salesByBuyer
 *         = txns.parallelStream()
 *               .collect(Collectors.groupingBy(Transaction::getBuyer));
 * }
* it may actually be counterproductive to perform the operation in parallel. * This is because the combining step (merging one {@code Map} into another by * key) can be expensive for some {@code Map} implementations. * *

Suppose, however, that the result container used in this reduction * was a concurrently modifiable collection -- such as a * {@link java.util.concurrent.ConcurrentHashMap}. In that case, the parallel * invocations of the accumulator could actually deposit their results * concurrently into the same shared result container, eliminating the need for * the combiner to merge distinct result containers. This potentially provides * a boost to the parallel execution performance. We call this a * concurrent reduction. * *

A {@link java.util.stream.Collector} that supports concurrent reduction is * marked with the {@link java.util.stream.Collector.Characteristics#CONCURRENT} * characteristic. However, a concurrent collection also has a downside. If * multiple threads are depositing results concurrently into a shared container, * the order in which results are deposited is non-deterministic. Consequently, * a concurrent reduction is only possible if ordering is not important for the * stream being processed. The {@link java.util.stream.Stream#collect(Collector)} * implementation will only perform a concurrent reduction if *

    *
  • The stream is parallel;
  • *
  • The collector has the * {@link java.util.stream.Collector.Characteristics#CONCURRENT} characteristic, * and;
  • *
  • Either the stream is unordered, or the collector has the * {@link java.util.stream.Collector.Characteristics#UNORDERED} characteristic. *
* You can ensure the stream is unordered by using the * {@link java.util.stream.BaseStream#unordered()} method. For example: *
{@code
 *     Map> salesByBuyer
 *         = txns.parallelStream()
 *               .unordered()
 *               .collect(groupingByConcurrent(Transaction::getBuyer));
 * }
* (where {@link java.util.stream.Collectors#groupingByConcurrent} is the * concurrent equivalent of {@code groupingBy}). * *

Note that if it is important that the elements for a given key appear in * the order they appear in the source, then we cannot use a concurrent * reduction, as ordering is one of the casualties of concurrent insertion. * We would then be constrained to implement either a sequential reduction or * a merge-based parallel reduction. * *

Associativity

* * An operator or function {@code op} is associative if the following * holds: *
{@code
 *     (a op b) op c == a op (b op c)
 * }
* The importance of this to parallel evaluation can be seen if we expand this * to four terms: *
{@code
 *     a op b op c op d == (a op b) op (c op d)
 * }
* So we can evaluate {@code (a op b)} in parallel with {@code (c op d)}, and * then invoke {@code op} on the results. * *

Examples of associative operations include numeric addition, min, and * max, and string concatenation. * *

Low-level stream construction

* * So far, all the stream examples have used methods like * {@link java.util.Collection#stream()} or {@link java.util.Arrays#stream(Object[])} * to obtain a stream. How are those stream-bearing methods implemented? * *

The class {@link java.util.stream.StreamSupport} has a number of * low-level methods for creating a stream, all using some form of a * {@link java.util.Spliterator}. A spliterator is the parallel analogue of an * {@link java.util.Iterator}; it describes a (possibly infinite) collection of * elements, with support for sequentially advancing, bulk traversal, and * splitting off some portion of the input into another spliterator which can * be processed in parallel. At the lowest level, all streams are driven by a * spliterator. * *

There are a number of implementation choices in implementing a * spliterator, nearly all of which are tradeoffs between simplicity of * implementation and runtime performance of streams using that spliterator. * The simplest, but least performant, way to create a spliterator is to * create one from an iterator using * {@link java.util.Spliterators#spliteratorUnknownSize(java.util.Iterator, int)}. * While such a spliterator will work, it will likely offer poor parallel * performance, since we have lost sizing information (how big is the * underlying data set), as well as being constrained to a simplistic * splitting algorithm. * *

A higher-quality spliterator will provide balanced and known-size * splits, accurate sizing information, and a number of other * {@link java.util.Spliterator#characteristics() characteristics} of the * spliterator or data that can be used by implementations to optimize * execution. * *

Spliterators for mutable data sources have an additional challenge; * timing of binding to the data, since the data could change between the time * the spliterator is created and the time the stream pipeline is executed. * Ideally, a spliterator for a stream would report a characteristic of * {@code IMMUTABLE} or {@code CONCURRENT}; if not it should be * late-binding. If a source * cannot directly supply a recommended spliterator, it may indirectly supply * a spliterator using a {@code Supplier}, and construct a stream via the * {@code Supplier}-accepting versions of * {@link java.util.stream.StreamSupport#stream(Supplier, int, boolean) stream()}. * The spliterator is obtained from the supplier only after the terminal * operation of the stream pipeline commences. * *

These requirements significantly reduce the scope of potential * interference between mutations of the stream source and execution of stream * pipelines. Streams based on spliterators with the desired characteristics, * or those using the Supplier-based factory forms, are immune to * modifications of the data source prior to commencement of the terminal * operation (provided the behavioral parameters to the stream operations meet * the required criteria for non-interference and statelessness). See * Non-Interference * for more details. * * @since 1.8 */ package java.util.stream; import java.util.function.BinaryOperator; import java.util.function.UnaryOperator;