java流stream中的Collector源码笔记

package java.util.stream;

import java.util.Collections;
import java.util.EnumSet;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;

/**
 * A <a href="package-summary.html#Reduction">mutable reduction operation</a> that
 * accumulates input elements into a mutable result container, optionally transforming
 * the accumulated result into a final representation after all input elements
 * have been processed.  Reduction operations can be performed either sequentially
 * or in parallel.
 一个可变的数据归约操作,将输入元素累加到一个可变的结果容器中,也可以在元素经过前面stream的所有操作后,转变累加结果为最终形态。数据归约操作可以是串行的,也可以是并行的。
 *
 * <p>Examples of mutable reduction operations include:
 * accumulating elements into a {@code Collection}; concatenating
 * strings using a {@code StringBuilder}; computing summary information about
 * elements such as sum, min, max, or average; computing "pivot table数据透视表" summaries
 * such as "maximum valued transaction by seller", etc.  The class {@link Collectors}
 * provides implementations of many common mutable reductions.
 数据归约操作包含:收集元素到集合,拼接字符串,计算结果集,计算数据透视表,比如‘根据卖家计算最大交易额’,这个类提供了很多公共的数据归约操作的实现。
 *
 * <p>A {@code Collector} is specified by four functions that work together to
 * accumulate entries into a mutable result container, and optionally perform
 * a final transform on the result.  They are: <ul>
 一个Collector规定由四个方法共同工作来将元素累积到一个可变容器,并且可以在最终结果上执行转换操作。他们是:
 *     <li>creation of a new result container ({@link #supplier()})</li>
 供应者,创建一个结果容器。
 *     <li>incorporating a new data element into a result container ({@link #accumulator()})</li>
 累加器,合并一个新的数据到一个结果容器。
 *     <li>combining two result containers into one ({@link #combiner()})</li>
 组合器,将两个结果容器组合为一个。
 *     <li>performing an optional final transform on the container ({@link #finisher()})</li>
 整理器,可以选择的在最终结果容器上执行一个转换操作。
 * </ul>
 *
 * <p>Collectors also have a set of characteristics, such as
 * {@link Characteristics#CONCURRENT}, that provide hints that can be used by a
 * reduction implementation to provide better performance.
 收集器也有一个参数集合,用于给归约操作的实现类提供线索以便实现类拥有更好的性能。
 *
 * <p>A sequential implementation of a reduction using a collector would
 * create a single result container using the supplier function, and invoke the
 * accumulator function once for each input element.  A parallel implementation
 * would partition the input, create a result container for each partition,
 * accumulate the contents of each partition into a subresult for that partition,
 * and then use the combiner function to merge the subresults into a combined
 * result.
 一个串行的归约实现会用生产者接口只提供一个结果容器,然后对每个输入元素使用累加器。
 一个并行的归约实现会对输入进行分组,对每个分组都单独创建一个结果容器,然后对每个分组的元素应用累加器,各分组分别累加到分了组的子结果容器中,然后使用组合器将子结果集组合成一个。
 *
 * <p>To ensure that sequential and parallel executions produce equivalent
 * results, the collector functions must satisfy an <em>identity</em> and an
 * <a href="package-summary.html#Associativity">associativity</a> constraints.
 为了确保串行和并行的操作生产出的结果是相等的,收集器必须满足同一性和结合性的约束。
 
 
 * <p>The identity constraint says that for any partially accumulated result,
 * combining it with an empty result container must produce an equivalent
 * result.  That is, for a partially accumulated result {@code a} that is the
 * result of any series of accumulator and combiner invocations, {@code a} must
 * be equivalent to {@code combiner.apply(a, supplier.get())}.
 同一性约束是说,对于任一部分累加的结果,将他与空集合组合,必须产生一个相等的集合。
 对于部分累积的结果,任何累加器和组合器的调用,必须满足combiner.apply(a, supplier.get())这个式子。
 *
 * <p>The associativity constraint says that splitting the computation must
 * produce an equivalent result.  That is, for any input elements {@code t1}
 * and {@code t2}, the results {@code r1} and {@code r2} in the computation
 * below must be equivalent:
 同一性是说,拆分开的计算与未拆分开的计算,其计算结果必须相等。
 下面的代码,r1,是先累加了两次,直接结束的
 而r2,是分别累加了两次,再将两次的结果合并起来,才结束的。
 但是最终r1必须等于r2.
 * <pre>{@code
 *     A a1 = supplier.get();
 *     accumulator.accept(a1, t1);
 *     accumulator.accept(a1, t2);
 *     R r1 = finisher.apply(a1);  // result without splitting
 *
 *     A a2 = supplier.get();
 *     accumulator.accept(a2, t1);
 *     A a3 = supplier.get();
 *     accumulator.accept(a3, t2);
 *     R r2 = finisher.apply(combiner.apply(a2, a3));  // result with splitting
 * } </pre>
 *
 * <p>For collectors that do not have the {@code UNORDERED} characteristic,
 * two accumulated results {@code a1} and {@code a2} are equivalent if
 * {@code finisher.apply(a1).equals(finisher.apply(a2))}.  For unordered
 * collectors, equivalence is relaxed to allow for non-equality related to
 * differences in order.  (For example, an unordered collector that accumulated
 * elements to a {@code List} would consider two lists equivalent if they
 * contained the same elements, ignoring order.)
 对于没有特性的累加操作而言,如果a1和a2满足finisher.apply(a1).equals(finisher.apply(a2)
 ,那么a1就等于a2。
 对于无序的集合而言,相等性的要求会放宽一些,比如一个无序的集合与另一个集合比较时,只要他们包含的元素是一样的,那就视为他们相等。
 *
 * <p>Libraries that implement reduction based on {@code Collector}, such as
 * {@link Stream#collect(Collector)}, must adhere to the following constraints:
 基于Collector接口实现的库,必须坚持一下约束:
 * <ul>
 *     <li>The first argument passed to the accumulator function, both
 *     arguments passed to the combiner function, and the argument passed to the
 *     finisher function must be the result of a previous invocation of the
 *     result supplier, accumulator, or combiner functions.</li>
 累加器的第一个参数必须是之前调用生产者的结果,
 合并器的两个参数必须是之前调用累加器的结果,
 终结器的参数必须是之前调用合并器的结果。
 
 *     <li>The implementation should not do anything with the result of any of
 *     the result supplier, accumulator, or combiner functions other than to
 *     pass them again to the accumulator, combiner, or finisher functions,
 *     or return them to the caller of the reduction operation.</li>
 实现类除了传递生产者,累加器,合并器,终结器的参数,以及将结果返回给调用者外,不得对他们的操作结果进行任何修改。
 
 *     <li>If a result is passed to the combiner or finisher
 *     function, and the same object is not returned from that function, it is
 *     never used again.</li>
  如果一个结果已经传递到了组合器或终结器,那么它将不会再返回一次,它将不会再被使用。
  
 *     <li>Once a result is passed to the combiner or finisher function, it
 *     is never passed to the accumulator function again.</li>
 如果一个结果已经传递到了组合器或终结器,那么它不会再被传递到累加器。
 
 *     <li>For non-concurrent collectors, any result returned from the result
 *     supplier, accumulator, or combiner functions must be serially
 *     thread-confined.  This enables collection to occur in parallel without
 *     the {@code Collector} needing to implement any additional synchronization.
 *     The reduction implementation must manage that the input is properly
 *     partitioned, that partitions are processed in isolation, and combining
 *     happens only after accumulation is complete.</li>
 对于非同步的收集器,任何从生产者,累加器,组合器返回的结果必须被串行的线程限制,这使得收集操作能够并行的执行,其无需依赖于额外的同步操作。
 归并操作的实现类必须确保输入被适当的分区,并且这些分区是隔离的,而且组合操作必须在累加操作完成后进行。
 
 
 *     <li>For concurrent collectors, an implementation is free to (but not
 *     required to) implement reduction concurrently.  A concurrent reduction
 *     is one where the accumulator function is called concurrently from
 *     multiple threads, using the same concurrently-modifiable result container,
 *     rather than keeping the result isolated during accumulation.
 *     A concurrent reduction should only be applied if the collector has the
 *     {@link Characteristics#UNORDERED} characteristics or if the
 *     originating源头 data is unordered.</li>
 对于同步收集器,实现类无需实现同步归并。一个同步的归并操作是累加器被多线程同步的调用,并且使用同步可变容器作为结果容器,而不是在累加期间保持结果的隔离。同步收集器应该仅仅用于无序集合。
 * </ul>
 *
 * <p>In addition to the predefined implementations in {@link Collectors}, the
 * static factory methods {@link #of(Supplier, BiConsumer, BinaryOperator, Characteristics...)}
 * can be used to construct collectors.  For example, you could create a collector
 * that accumulates widgets into a {@code TreeSet} with:
 除了预定义的收集器之外,你也可以自定义收集器,比如自定义一个收集TreeSet的Collector
 *
 * <pre>{@code
 *     Collector<Widget, ?, TreeSet<Widget>> intoSet =
 *         Collector.of(TreeSet::new, TreeSet::add,
 *                      (left, right) -> { left.addAll(right); return left; });
 * }</pre>
 *
 * (This behavior is also implemented by the predefined collector
 * {@link Collectors#toCollection(Supplier)}).
 *
 * @apiNote
 * Performing a reduction operation with a {@code Collector} should produce a
 * result equivalent to:
 使用收集器执行一个归并操作,其结果应该等价于:
 * <pre>{@code
 *     R container = collector.supplier().get();
 *     for (T t : data)
 *         collector.accumulator().accept(container, t);
 *     return collector.finisher().apply(container);
 * }</pre>
 *
 * <p>However, the library is free to partition the input, perform执行 the reduction
 * on the partitions, and then use the combiner function to combine the partial
 * results to achieve a parallel reduction.  (Depending on the specific reduction
 * operation, this may perform better or worse, depending on the relative cost
 * of the accumulator and combiner functions.)
 无论如何,实现类可以自由的分区,并在分区上执行归并,然后使用组合器来组合分区结果来实现并行归并。
 (性能取决于特定的累加和组合操作的性能)
 *
 * <p>Collectors are designed to be <em>composed</em>; many of the methods
 * in {@link Collectors} are functions that take a collector and produce
 * a new collector.  For example, given the following collector that computes
 * the sum of the salaries of a stream of employees:
 收集器被定义为可组合的,很多收集器方法都是获取一个收集器,再生产一个收集器。比方说,下面的收集器可以嵌套使用,收集器里再用收集器。
 *
 * <pre>{@code
 *     Collector<Employee, ?, Integer> summingSalaries
 *         = Collectors.summingInt(Employee::getSalary))
 * }</pre>
 *
 * If we wanted to create a collector to tabulate汇总 the sum of salaries by
 * department, we could reuse the "sum of salaries" logic using
 * {@link Collectors#groupingBy(Function, Collector)}:
 *
 * <pre>{@code
 *     Collector<Employee, ?, Map<Department, Integer>> summingSalariesByDept
 *         = Collectors.groupingBy(Employee::getDepartment, summingSalaries);
 * }</pre>
 *
 * @see Stream#collect(Collector)
 * @see Collectors
 *
 * @param <T> the type of input elements to the reduction operation
 * @param <A> the mutable accumulation type of the reduction operation (often
 *            hidden as an implementation detail)
 * @param <R> the result type of the reduction operation
 * @since 1.8
 */
public interface Collector<T, A, R> {
    /**
     * A function that creates and returns a new mutable result container.
     用于创建和返回一个可变容器的方法
     *
     * @return a function which returns a new, mutable result container
     */
    Supplier<A> supplier();

    /**
     * A function that folds a value into a mutable result container.
     *返回一个可以将值折叠进一个可变容器的方法
     * @return a function which folds a value into a mutable result container
     */
    BiConsumer<A, T> accumulator();

    /**
     * A function that accepts two partial results and merges them.  The
     * combiner function may fold state from one argument into the other and
     * return that, or may return a new result container.
     *接受两个部分结果,然后合并他们。组合器函数可以将状态从一个参数折叠到另一个参数并返回该参数,或者可以返回一个新的结果容器。
     * @return a function which combines two partial results into a combined
     * result
     */
    BinaryOperator<A> combiner();

    /**
     * Perform执行 the final transformation from the intermediate accumulation type
     从中间计算结果执行最终转化操作
     * {@code A} to the final result type {@code R}.
     *
     * <p>If the characteristic {@code IDENTITY_FINISH} is
     * set, this function may be presumed to be an identity transform with an
     * unchecked cast from {@code A} to {@code R}.
     *如果设置了IDENTITY_FINISH参数,这个函数可能被推断为未经过类型校检的同一性转换
     * @return a function which transforms the intermediate result to the final
     * result
     */
    Function<A, R> finisher();

    /**
     * Returns a {@code Set} of {@code Collector.Characteristics} indicating
     * the characteristics of this Collector.  This set should be immutable.
     *返回一个包含指定集合参数的set,set是不可变的。
     该集合包含了收集器的3个特性,在自定义收集器时,收集器的数据有什么特性才添加什么特性,不可随意添加
     * @return an immutable set of collector characteristics
     */
    Set<Characteristics> characteristics();

    /**
     * Returns a new {@code Collector} described by the given {@code supplier},
     * {@code accumulator}, and {@code combiner} functions.  The resulting
     * {@code Collector} has the {@code Collector.Characteristics.IDENTITY_FINISH}
     * characteristic.
     *	根据给定的3个函数,返回具有Collector.Characteristics.IDENTITY_FINISH参数的收集器,不含finisher。
     * @param supplier The supplier function for the new collector
     * @param accumulator The accumulator function for the new collector
     * @param combiner The combiner function for the new collector
     * @param characteristics The collector characteristics for the new
     *                        collector
     * @param <T> The type of input elements for the new collector
     * @param <R> The type of intermediate accumulation result, and final result,
     *           for the new collector
     * @throws NullPointerException if any argument is null
     * @return the new {@code Collector}
     */
    public static<T, R> Collector<T, R, R> of(Supplier<R> supplier,
                                              BiConsumer<R, T> accumulator,
                                              BinaryOperator<R> combiner,
                                              Characteristics... characteristics) {
        Objects.requireNonNull(supplier);
        Objects.requireNonNull(accumulator);
        Objects.requireNonNull(combiner);
        Objects.requireNonNull(characteristics);
        Set<Characteristics> cs = (characteristics.length == 0)
                                  ? Collectors.CH_ID
                                  : Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH,
                                                                           characteristics));
        return new Collectors.CollectorImpl<>(supplier, accumulator, combiner, cs);
    }

    /**
     * Returns a new {@code Collector} described by the given {@code supplier},
     * {@code accumulator}, {@code combiner}, and {@code finisher} functions.
     *根据给定的4个函数,返回一个收集器。
     * @param supplier The supplier function for the new collector
     * @param accumulator The accumulator function for the new collector
     * @param combiner The combiner function for the new collector
     * @param finisher The finisher function for the new collector
     * @param characteristics The collector characteristics for the new
     *                        collector
     * @param <T> The type of input elements for the new collector
     * @param <A> The intermediate accumulation type of the new collector
     * @param <R> The final result type of the new collector
     * @throws NullPointerException if any argument is null
     * @return the new {@code Collector}
     */
    public static<T, A, R> Collector<T, A, R> of(Supplier<A> supplier,
                                                 BiConsumer<A, T> accumulator,
                                                 BinaryOperator<A> combiner,
                                                 Function<A, R> finisher,
                                                 Characteristics... characteristics) {
        Objects.requireNonNull(supplier);
        Objects.requireNonNull(accumulator);
        Objects.requireNonNull(combiner);
        Objects.requireNonNull(finisher);
        Objects.requireNonNull(characteristics);
        Set<Characteristics> cs = Collectors.CH_NOID;
        if (characteristics.length > 0) {
            cs = EnumSet.noneOf(Characteristics.class);
            Collections.addAll(cs, characteristics);
            cs = Collections.unmodifiableSet(cs);
        }
        return new Collectors.CollectorImpl<>(supplier, accumulator, combiner, finisher, cs);
    }

    /**
     * Characteristics indicating properties of a {@code Collector}, which can
     * be used to optimize优化 reduction implementations.
     指定收集器的特定参数,用于优化实现类。
     */
    enum Characteristics {
        /**
         * Indicates that this collector is <em>concurrent</em>, meaning that
         * the result container can support the accumulator function being
         * called concurrently with the same result container from multiple
         * threads.
         指出收集器是并发的,意味着结果容器支持累加器(使用同一个结果容器)被多线程调用
         *
         * <p>If a {@code CONCURRENT} collector is not also {@code UNORDERED},
         * then it should only be evaluated concurrently if applied to an
         * unordered data source.
         如果收集器是无序的,只有在其运用于无序数据源时,才应认为他是并发的。
         
         加上这个特性后,中间结果容器将只有一个且会被多个线程调用(在累加阶段),因此结果容器必须是线程安全的,否则会有一定几率报错
         如果自定义收集器用到这个特性,绝对不要在累加器中做多余操作(比如打印集合),否则可能会报错,因为一个集合在多个线程修改的同时又被遍历就会抛出同步修改异常。
         加上这个特性,且使用parallelStream,那么combiner函数的lamboda就不会被调用,因为中间结果容器只有一个,不用合并了。
         如果一个集合使用parallelStream,但收集器没有这个属性,那么累加器会创建多个中间结果容器(由supplier函数不断提供),如果使用了parallelStream,收集器又有这个特性,那么累加器只会使用一个中间结果容器。
         */
        CONCURRENT,

        /**
         * Indicates that the collection operation does not commit to preserving保存
         * the encounter遭遇 order of input elements.  (This might be true if the
         * result container has no intrinsic内在的 order, such as a {@link Set}.)
         指出集合操作不保证保留输入元素的顺序(如果集合容器没有内在顺序的话,比如Set)
         */
        UNORDERED,

        /**
         * Indicates that the finisher function is the identity function and
         * can be elided.  If set, it must be the case that an unchecked cast
         * from A to R will succeed.
         指出终结函数就是同一性函数并且可以被省略(指中间结果可以直接转换为最终结果)。如果设置了这个参数,一个未经类型检查的结果映射操作一定会成功(中间结果强制转换为最终结果必须成功)。
         */
        IDENTITY_FINISH
    }
}



import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;

/**
自定义一个收集器,将set收集为map
**/
public class MyCollector2<T> implements Collector<T, Set<T>, HashMap<T,T>> {
    @Override
    public Supplier<Set<T>> supplier() {
        return HashSet<T>::new;
    }

    @Override
    public BiConsumer<Set<T>, T> accumulator() {
        return (set, item)->{
            System.out.println(Thread.currentThread().getName());
            set.add(item);};
    }

    @Override
    public BinaryOperator<Set<T>> combiner() {
        return (set1, set2)->{set1.addAll(set2);return set1;};
    }

    @Override
    public Function<Set<T>, HashMap<T, T>> finisher() {
        return (set)->{
            HashMap<T, T> map = new HashMap<>();
            set.forEach(item->{map.put(item,item);});
            return map;
        };
    }

    @Override
    public Set<Characteristics> characteristics() {
        return Collections.unmodifiableSet(EnumSet.of(Characteristics.UNORDERED));
    }

    public static void main(String[] args) {
        List<String> list = Arrays.asList("hello", "world", "welcome", "hello", "a", "b", "d", "c", "e", "f", "g", "h", "J");
        HashSet<String> set = new HashSet<>();
        set.addAll(list);
        System.out.println("set: "+ set);
        HashMap<String, String> map = set.parallelStream().collect(new MyCollector2<>());
        System.out.println(map);
    }
}

原文地址:https://www.cnblogs.com/Lothlorien/p/12142422.html