Collector 源码分析

Collectors

Collectors 配合 stream 可以实现 MapReduce 操作,也可以单独完成流中元素的收集。

收集器接口和实现

/**
 * 收集器接口
 */
public interface Collector<T, A, R> {
    /**
     *  创建并返回一个可变的结果容器
     */
    Supplier<A> supplier();

    /**
     *  创建并返回一个将值归约到可变结果容器中的累加器
     */
    BiConsumer<A, T> accumulator();

    /**
     *  创建并返回一个将两个结果容器进行合并的组合器,适用于并行计算
     */
    BinaryOperator<A> combiner();

    /**
     * 创建并返回一个将结果容器转换为最终输出的完成器
     */
    Function<A, R> finisher();

    /**
     *  此收集器所持有的 Collector.Characteristics 特征集
     */
    Set<Characteristics> characteristics();

    /**
     *  用于优化归约操作的收集器特征值
     */
    enum Characteristics {
        /**
         *  结果容器支持并发处理
         */
        CONCURRENT,

        /**
         *  收集器处理元素时是无序的
         */
        UNORDERED,

        /**
         *  结果容器就是最终返回的结果,不需要执行完成器操作
         */
        IDENTITY_FINISH
    }
}

    /**
     * Collector 接口的简单实现
     */
    static class CollectorImpl<T, A, R> implements Collector<T, A, R> {
        private final Supplier<A> supplier;
        private final BiConsumer<A, T> accumulator;
        private final BinaryOperator<A> combiner;
        private final Function<A, R> finisher;
        private final Set<Characteristics> characteristics;

        CollectorImpl(Supplier<A> supplier,
                      BiConsumer<A, T> accumulator,
                      BinaryOperator<A> combiner,
                      Function<A,R> finisher,
                      Set<Characteristics> characteristics) {
            this.supplier = supplier;
            this.accumulator = accumulator;
            this.combiner = combiner;
            this.finisher = finisher;
            this.characteristics = characteristics;
        }

        CollectorImpl(Supplier<A> supplier,
                      BiConsumer<A, T> accumulator,
                      BinaryOperator<A> combiner,
                      Set<Characteristics> characteristics) {
            this(supplier, accumulator, combiner, castingIdentity(), characteristics);
        }

        @Override
        public BiConsumer<A, T> accumulator() {
            return accumulator;
        }

        @Override
        public Supplier<A> supplier() {
            return supplier;
        }

        @Override
        public BinaryOperator<A> combiner() {
            return combiner;
        }

        @Override
        public Function<A, R> finisher() {
            return finisher;
        }

        @Override
        public Set<Characteristics> characteristics() {
            return characteristics;
        }
    }

简单的结果收集

  • toList():将流中的元素收集到一个 ArrayList 中
    /**
     *  将流中的元素收集到一个 ArrayList 中
     */
    public static <T>
    Collector<T, ?, List<T>> toList() {
        return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add,
                (left, right) -> { left.addAll(right); return left; },
                CH_ID);
    }

ReferencePipeline#
    @Override
    @SuppressWarnings("unchecked")
    public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
        A container;
        // 1)如果是并行收集 && 收集器的结果容器支持并发处理 && 元素是无序的
        if (isParallel()
                && collector.characteristics().contains(Collector.Characteristics.CONCURRENT)
                && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
            // 通过共享的结果容器完成收集
            container = collector.supplier().get();
            final BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
            forEach(u -> accumulator.accept(container, u));
        }
        // 2)收集过程是串行的 || 并行处理的收集器结果容器不支持并发
        else {
            // 执行收集过程,并返回结果容器
            container = evaluate(ReduceOps.makeRef(collector));
        }
        /**
         * 1)如果是 IDENTITY_FINISH,则直接返回结果容器
         * 2)否则执行合并操作
         */
        return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
                ? (R) container
                        : collector.finisher().apply(container);
    }

ReduceOps#
    /**
     *  创建一个用于归约引用对象的终端收集操作
     */
    public static <T, I> TerminalOp<T, I>
    makeRef(Collector<? super T, I, ?> collector) {
        // 获取结果容器的生成器
        final Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
        // 获取累加器的生成器
        final BiConsumer<I, ? super T> accumulator = collector.accumulator();
        // 获取合并器的生成器
        final BinaryOperator<I> combiner = collector.combiner();
        // 定义 ReducingSink
        class ReducingSink extends Box<I>
        implements AccumulatingSink<T, I, ReducingSink> {
            @Override
            public void begin(long size) {
                // 生成结果容器并写入 state
                state = supplier.get();
            }

            @Override
            public void accept(T t) {
                // 使用累加器进行归约
                accumulator.accept(state, t);
            }

            @Override
            public void combine(ReducingSink other) {
                // 使用合并器进行两个结果容器的合并
                state = combiner.apply(state, other.state);
            }
        }
        return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
            @Override
            public ReducingSink makeSink() {
                return new ReducingSink();
            }

            @Override
            public int getOpFlags() {
                return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
                        ? StreamOpFlag.NOT_ORDERED
                                : 0;
            }
        };
    }

    /**
     *  用于执行 reduce 操作的终端 sink
     */
    private interface AccumulatingSink<T, R, K extends AccumulatingSink<T, R, K>>
            extends TerminalSink<T, R> {
        void combine(K other);
    }

    /**
     * 用于保存单个结果值的容器
     */
    private abstract static class Box<U> {
        /**
         *  结果值
         */
        U state;

        Box() {} // Avoid creation of special accessor

        public U get() {
            return state;
        }
    }

    /**
     * 一个终端归约操作,用于评估流水线中产生的每个元素,并将其发送到收集器中
     */
    private abstract static class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>>
            implements TerminalOp<T, R> {
        /**
         *  输入元素类型
         */
        private final StreamShape inputShape;

        ReduceOp(StreamShape shape) {
            inputShape = shape;
        }

        /**
         * 创建 sink
         */
        public abstract S makeSink();

        @Override
        public StreamShape inputShape() {
            return inputShape;
        }

        /**
         *  串行处理流水线中生成的元素
         */
        @Override
        public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
                                           Spliterator<P_IN> spliterator) {
            return helper.wrapAndCopyInto(makeSink(), spliterator).get();
        }

        /**
         *  并行处理流水线中生成的元素
         */
        @Override
        public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
                                         Spliterator<P_IN> spliterator) {
            return new ReduceTask<>(this, helper, spliterator).invoke().get();
        }
    }

    /**
     * A {@code ForkJoinTask} for performing a parallel reduce operation.
     */
    @SuppressWarnings("serial")
    private static final class ReduceTask<P_IN, P_OUT, R,
                                          S extends AccumulatingSink<P_OUT, R, S>>
            extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> {
        /**
         * 终端归约操作
         */
        private final ReduceOp<P_OUT, R, S> op;

        ReduceTask(ReduceOp<P_OUT, R, S> op,
                   PipelineHelper<P_OUT> helper,
                   Spliterator<P_IN> spliterator) {
            super(helper, spliterator);
            this.op = op;
        }

        ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent,
                   Spliterator<P_IN> spliterator) {
            super(parent, spliterator);
            this.op = parent.op;
        }

        /**
         * 创建一个子任务
         */
        @Override
        protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) {
            return new ReduceTask<>(this, spliterator);
        }

        /**
         * 执行此叶子任务的元素收集
         */
        @Override
        protected S doLeaf() {
            return helper.wrapAndCopyInto(op.makeSink(), spliterator);
        }

        @Override
        public void onCompletion(CountedCompleter<?> caller) {
            // 如果不是叶子任务,则需要合并两个子节点的结果
            if (!isLeaf()) {
                // 读取左子节点的任务执行结果
                S leftResult = leftChild.getLocalResult();
                // 读取右子节点的任务执行结果,并使用合并器进行累加
                leftResult.combine(rightChild.getLocalResult());
                // 写入结果到当前任务中
                setLocalResult(leftResult);
            }
            // GC spliterator, left and right child
            super.onCompletion(caller);
        }
    }
  • toCollection():将流中的元素收集到一个 Collection 中
    /**
     *  将流中的元素收集到一个 Collection 中
     */
    public static <T, C extends Collection<T>>
    Collector<T, ?, C> toCollection(Supplier<C> collectionFactory) {
        return new CollectorImpl<>(collectionFactory, Collection<T>::add,
                (r1, r2) -> { r1.addAll(r2); return r1; },
                CH_ID);
    }
  • toSet():将流中的元素收集到一个 HashSet 中
    /**
     *  将流中的元素收集到一个 HashSet 中
     */
    public static <T>
    Collector<T, ?, Set<T>> toSet() {
        return new CollectorImpl<>((Supplier<Set<T>>) HashSet::new, Set::add,
                (left, right) -> {
                    // 根据结果容器的大小,优化合并过程
                    if (left.size() < right.size()) {
                        right.addAll(left); return right;
                    } else {
                        left.addAll(right); return left;
                    }
                },
                CH_UNORDERED_ID);
    }
  • toMap():将流中的元素收集到一个 map 中,键不能重复、值不能为 null。如果键出现重复,则抛出 IllegalStateException 异常
    /**
     * 将流中的元素收集到一个 map 中,如果键出现重复,则抛出 IllegalStateException 异常。
     * 
     * @param <T> 输入元素类型
     * @param <K> 键元素类型
     * @param <U> 值元素类型
     * @param keyMapper 基于输入元素生成键
     * @param valueMapper 基于输入元素生成值
     */
    public static <T, K, U>
    Collector<T, ?, Map<K,U>> toMap(Function<? super T, ? extends K> keyMapper,
            Function<? super T, ? extends U> valueMapper) {
        return new CollectorImpl<>(HashMap::new,
                uniqKeysMapAccumulator(keyMapper, valueMapper),
                uniqKeysMapMerger(),
                CH_ID);
    }

    private static <T, K, V>
    BiConsumer<Map<K, V>, T> uniqKeysMapAccumulator(Function<? super T, ? extends K> keyMapper,
            Function<? super T, ? extends V> valueMapper) {
        return (map, element) -> {
            // 生成键
            final K k = keyMapper.apply(element);
            // 生成值,值不能为 null
            final V v = Objects.requireNonNull(valueMapper.apply(element));
            // 如果键已经存在,则抛出 IllegalStateException 异常
            final V u = map.putIfAbsent(k, v);
            if (u != null) {
                throw duplicateKeyException(k, u, v);
            }
        };
    }

    /**
     *  将两个 map 中的键值对进行合并【考虑计算 size 进行优化处理】。
     *  键出现重复,则抛出 IllegalStateException 异常
     */
    private static <K, V, M extends Map<K,V>>
    BinaryOperator<M> uniqKeysMapMerger() {
        return (m1, m2) -> {
            for (final Map.Entry<K,V> e : m2.entrySet()) {
                final K k = e.getKey();
                final V v = Objects.requireNonNull(e.getValue());
                final V u = m1.putIfAbsent(k, v);
                if (u != null) {
                    throw duplicateKeyException(k, u, v);
                }
            }
            return m1;
        };
    }
  • toMap():将流中的元素收集到一个 map 中,合并时执行 megre 操作【键冲突不会跑异常】
    /**
     *  将流中的元素收集到一个 map 中,合并时执行 megre 操作【键冲突不会跑异常】
     */
    public static <T, K, U>
    Collector<T, ?, Map<K,U>> toMap(Function<? super T, ? extends K> keyMapper,
            Function<? super T, ? extends U> valueMapper,
            BinaryOperator<U> mergeFunction) {
        return toMap(keyMapper, valueMapper, mergeFunction, HashMap::new);
    }

    public static <T, K, U, M extends Map<K, U>>
    Collector<T, ?, M> toMap(Function<? super T, ? extends K> keyMapper,
            Function<? super T, ? extends U> valueMapper,
            BinaryOperator<U> mergeFunction,
            Supplier<M> mapFactory) {
        // 使用 merge 函数进行归约
        final BiConsumer<M, T> accumulator
        = (map, element) -> map.merge(keyMapper.apply(element),
                valueMapper.apply(element), mergeFunction);
        return new CollectorImpl<>(mapFactory, accumulator, mapMerger(mergeFunction), CH_ID);
    }

    /**
     * 将右侧的 map 合并进左侧的 map,键冲突时使用 merge 函数进行处理
     */
    private static <K, V, M extends Map<K,V>>
    BinaryOperator<M> mapMerger(BinaryOperator<V> mergeFunction) {
        return (m1, m2) -> {
            for (final Map.Entry<K,V> e : m2.entrySet()) {
                m1.merge(e.getKey(), e.getValue(), mergeFunction);
            }
            return m1;
        };
    }
  • toConcurrentMap:将流中的元素收集到一个 ConcurrentHashMap 中,键和值都不能为 null,键不能重复。
    /**
     *  将流中的元素收集到一个 ConcurrentHashMap 中
     */
    public static <T, K, U>
    Collector<T, ?, ConcurrentMap<K,U>> toConcurrentMap(Function<? super T, ? extends K> keyMapper,
            Function<? super T, ? extends U> valueMapper) {
        return new CollectorImpl<>(ConcurrentHashMap::new,
                uniqKeysMapAccumulator(keyMapper, valueMapper),
                uniqKeysMapMerger(),
                CH_CONCURRENT_ID);
    }
  • toUnmodifiableList():将流中的元素收集到一个不可变的 List 中
    /**
     *  将流中的元素收集到一个不可变的 List 中
     * @since 10
     */
    @SuppressWarnings("unchecked")
    public static <T>
    Collector<T, ?, List<T>> toUnmodifiableList() {
        return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add,
                (left, right) -> { left.addAll(right); return left; },
                list -> (List<T>)List.of(list.toArray()),
                CH_NOID);
    }
  • toUnmodifiableSet():将流中的元素收集到一个不可变的 set 中
    /**
     *  将流中的元素收集到一个不可变的 set 中
     * @since 10
     */
    @SuppressWarnings("unchecked")
    public static <T>
    Collector<T, ?, Set<T>> toUnmodifiableSet() {
        return new CollectorImpl<>((Supplier<Set<T>>) HashSet::new, Set::add,
                (left, right) -> {
                    if (left.size() < right.size()) {
                        right.addAll(left); return right;
                    } else {
                        left.addAll(right); return left;
                    }
                },
                set -> (Set<T>)Set.of(set.toArray()),
                CH_UNORDERED_NOID);
    }
  • toUnmodifiableMap():将流中的元素收集到一个不可变的 Map 中
    /**
     *  将流中的元素收集到一个不可变的 Map 中
     * @since 10
     */
    @SuppressWarnings({"rawtypes", "unchecked"})
    public static <T, K, U>
    Collector<T, ?, Map<K,U>> toUnmodifiableMap(Function<? super T, ? extends K> keyMapper,
            Function<? super T, ? extends U> valueMapper) {
        Objects.requireNonNull(keyMapper, "keyMapper");
        Objects.requireNonNull(valueMapper, "valueMapper");
        return collectingAndThen(
                toMap(keyMapper, valueMapper),
                map -> (Map<K,U>)Map.ofEntries(map.entrySet().toArray(new Map.Entry[0])));
    }

自定义归约

  • reducing():使用 BinaryOperator 对流中的元素进行归约,返回一个 Optional 对象,未指定初始值。
    /**
     *  使用 BinaryOperator 对流中的元素进行归约,返回一个 Optional 对象
     */
    public static <T> Collector<T, ?, Optional<T>>
    reducing(BinaryOperator<T> op) {
        class OptionalBox implements Consumer<T> {
            /**
             *  结果值
             */
            T value = null;
            /**
             * 是否已经有结果值
             */
            boolean present = false;

            @Override
            public void accept(T t) {
                // 1)如果已经有结果值,则执行归约
                if (present) {
                    value = op.apply(value, t);
                }
                // 2)第一个元素,则直接写入结果值中
                else {
                    value = t;
                    present = true;
                }
            }
        }

        return new CollectorImpl<>(
                OptionalBox::new, OptionalBox::accept,
                (a, b) -> { if (b.present) {
                    a.accept(b.value);
                } return a; },
                a -> Optional.ofNullable(a.value), CH_NOID);
    }
  • 使用 BinaryOperator 对流中的元素进行归约,指定初始值为 identity。
    /**
     */
    public static <T> Collector<T, ?, T>
    reducing(T identity, BinaryOperator<T> op) {
        return new CollectorImpl<>(
                // 初始值生成器
                boxSupplier(identity),
                // 通过数组来暂存值
                (a, t) -> { a[0] = op.apply(a[0], t); },
                (a, b) -> { a[0] = op.apply(a[0], b[0]); return a; },
                a -> a[0],
                CH_NOID);
    }

    @SuppressWarnings("unchecked")
    private static <T> Supplier<T[]> boxSupplier(T identity) {
        return () -> (T[]) new Object[] { identity };
    }
  • 使用 BinaryOperator 对流中的元素进行归约,归约前先使用 mapper 进行映射处理,指定初始值为 identity。
    /**
     */
    public static <T, U>
    Collector<T, ?, U> reducing(U identity,
            Function<? super T, ? extends U> mapper,
            BinaryOperator<U> op) {
        return new CollectorImpl<>(
                boxSupplier(identity),
                // 先使用 mapper 进行映射,然后再归约
                (a, t) -> { a[0] = op.apply(a[0], mapper.apply(t)); },
                (a, b) -> { a[0] = op.apply(a[0], b[0]); return a; },
                a -> a[0], CH_NOID);
    }

统计

  • averagingInt():先将流中的元素转换为 int 值,然后统计平均值
    /**
     *  先将流中的元素转换为 int 值,然后统计平均值
     */
    public static <T> Collector<T, ?, Double>
    averagingInt(ToIntFunction<? super T> mapper) {
        return new CollectorImpl<>(
                () -> new long[2],
                // 第一个 slot 记录总和,第二个 slot 记录总个数
                (a, t) -> { a[0] += mapper.applyAsInt(t); a[1]++; },
                (a, b) -> { a[0] += b[0]; a[1] += b[1]; return a; },
                a -> a[1] == 0 ? 0.0d : (double) a[0] / a[1], CH_NOID);
    }
  • averagingLong():先将流中的元素转换为 long 值,然后统计平均值
    /**
     *  先将流中的元素转换为 long 值,然后统计平均值
     */
    public static <T> Collector<T, ?, Double>
    averagingLong(ToLongFunction<? super T> mapper) {
        return new CollectorImpl<>(
                () -> new long[2],
                // 第一个 slot 记录总和,第二个 slot 记录总个数
                (a, t) -> { a[0] += mapper.applyAsLong(t); a[1]++; },
                (a, b) -> { a[0] += b[0]; a[1] += b[1]; return a; },
                a -> a[1] == 0 ? 0.0d : (double) a[0] / a[1], CH_NOID);
    }
  • averagingDouble():先将流中的元素转换为 double 值,然后统计平均值
    /**
     *  先将流中的元素转换为 double 值,然后统计平均值
     */
    public static <T> Collector<T, ?, Double>
    averagingDouble(ToDoubleFunction<? super T> mapper) {
        /*
         * In the arrays allocated for the collect operation, index 0
         * holds the high-order bits of the running sum, index 1 holds
         * the low-order bits of the sum computed via compensated
         * summation, and index 2 holds the number of values seen.
         */
        return new CollectorImpl<>(
                () -> new double[4],
                (a, t) -> { final double val = mapper.applyAsDouble(t); sumWithCompensation(a, val); a[2]++; a[3]+= val;},
                (a, b) -> { sumWithCompensation(a, b[0]); sumWithCompensation(a, b[1]); a[2] += b[2]; a[3] += b[3]; return a; },
                a -> a[2] == 0 ? 0.0d : computeFinalSum(a) / a[2],
                        CH_NOID);
    }
  • summingInt():先将流中的元素转换为 int 值,然后统计总和
    /**
     *  先将流中的元素转换为 int 值,然后统计总和
     */
    public static <T> Collector<T, ?, Integer>
    summingInt(ToIntFunction<? super T> mapper) {
        return new CollectorImpl<>(
                () -> new int[1],
                (a, t) -> { a[0] += mapper.applyAsInt(t); },
                (a, b) -> { a[0] += b[0]; return a; },
                a -> a[0], CH_NOID);
    }
  • summingLong():先将流中的元素转换为 long 值,然后统计总和
    /**
     *  先将流中的元素转换为 long 值,然后统计总和
     */
    public static <T> Collector<T, ?, Long>
    summingLong(ToLongFunction<? super T> mapper) {
        return new CollectorImpl<>(
                () -> new long[1],
                (a, t) -> { a[0] += mapper.applyAsLong(t); },
                (a, b) -> { a[0] += b[0]; return a; },
                a -> a[0], CH_NOID);
    }
  • summingDouble():先将流中的元素转换为 double 值,然后统计总和
    /**
     *  先将流中的元素转换为 double 值,然后统计总和
     */
    public static <T> Collector<T, ?, Double>
    summingDouble(ToDoubleFunction<? super T> mapper) {
        /*
         * In the arrays allocated for the collect operation, index 0
         * holds the high-order bits of the running sum, index 1 holds
         * the low-order bits of the sum computed via compensated
         * summation, and index 2 holds the simple sum used to compute
         * the proper result if the stream contains infinite values of
         * the same sign.
         */
        return new CollectorImpl<>(
                () -> new double[3],
                (a, t) -> { final double val = mapper.applyAsDouble(t);
                sumWithCompensation(a, val);
                a[2] += val;},
                (a, b) -> { sumWithCompensation(a, b[0]);
                a[2] += b[2];
                return sumWithCompensation(a, b[1]); },
                a -> computeFinalSum(a),
                CH_NOID);
    }
  • summarizingInt():先将流中的元素转换为 int 值,然后统计到 IntSummaryStatistics【合并了 count、sum、min、max、avg】 中。
    /**
     *  先将流中的元素转换为 int 值,然后统计到 IntSummaryStatistics【合并了 count、sum、min、max、avg】 中。
     */
    public static <T>
    Collector<T, ?, IntSummaryStatistics> summarizingInt(ToIntFunction<? super T> mapper) {
        return new CollectorImpl<>(
                IntSummaryStatistics::new,
                (r, t) -> r.accept(mapper.applyAsInt(t)),
                (l, r) -> { l.combine(r); return l; }, CH_ID);
    }
  • summarizingLong():先将流中的元素转换为 long 值,然后统计到 LongSummaryStatistics【合并了 count、sum、min、max、avg】 中。
    /**
     *  先将流中的元素转换为 long 值,然后统计到 LongSummaryStatistics【合并了 count、sum、min、max、avg】 中。
     */
    public static <T>
    Collector<T, ?, LongSummaryStatistics> summarizingLong(ToLongFunction<? super T> mapper) {
        return new CollectorImpl<>(
                LongSummaryStatistics::new,
                (r, t) -> r.accept(mapper.applyAsLong(t)),
                (l, r) -> { l.combine(r); return l; }, CH_ID);
    }
  • summarizingDouble():先将流中的元素转换为 double 值,然后统计到 DoubleSummaryStatistics【合并了 count、sum、min、max、avg】 中。
    /**
     *  先将流中的元素转换为 double 值,然后统计到 DoubleSummaryStatistics【合并了 count、sum、min、max、avg】 中。
     */
    public static <T>
    Collector<T, ?, DoubleSummaryStatistics> summarizingDouble(ToDoubleFunction<? super T> mapper) {
        return new CollectorImpl<>(
                DoubleSummaryStatistics::new,
                (r, t) -> r.accept(mapper.applyAsDouble(t)),
                (l, r) -> { l.combine(r); return l; }, CH_ID);
    }
  • counting():统计流中的总元素数
    /**
     *  统计流中的总元素数
     */
    public static <T> Collector<T, ?, Long>
    counting() {
        return summingLong(e -> 1L);
    }
- minBy():读取流中的最小值
```java
    /**
     *  读取流中的最小值
     */
    public static <T> Collector<T, ?, Optional<T>>
    minBy(Comparator<? super T> comparator) {
        return reducing(BinaryOperator.minBy(comparator));
    }
  • maxBy():读取流中的最大值
    /**
     *  读取流中的最大值
     */
    public static <T> Collector<T, ?, Optional<T>>
    maxBy(Comparator<? super T> comparator) {
        return reducing(BinaryOperator.maxBy(comparator));
    }

分组

  • groupingBy【串行分组】
    /**
     *  使用指定的分类器 classifier 将流中的元素进行分组,同一个组中的元素被收集到一个 ArrayList 中。
     */
    public static <T, K> Collector<T, ?, Map<K, List<T>>>
    groupingBy(Function<? super T, ? extends K> classifier) {
        return groupingBy(classifier, toList());
    }

    /**
     *  使用指定的分类器 classifier 将流中的元素进行分组,同一个组中的元素被收集到一个 downstream 收集器中。
     */
    public static <T, K, A, D>
    Collector<T, ?, Map<K, D>> groupingBy(Function<? super T, ? extends K> classifier,
            Collector<? super T, A, D> downstream) {
        return groupingBy(classifier, HashMap::new, downstream);
    }

    /**
     *  使用指定的分类器 classifier 将流中的元素进行分组,同一个组中的元素被收集到一个 downstream 收集器中。
     */
    public static <T, K, D, A, M extends Map<K, D>>
    Collector<T, ?, M> groupingBy(Function<? super T, ? extends K> classifier,
            Supplier<M> mapFactory,
            Collector<? super T, A, D> downstream) {
        // 同一组元素的结果容器
        final Supplier<A> downstreamSupplier = downstream.supplier();
        // 同一组元素的累加器
        final BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator();
        // 分组累加器
        final BiConsumer<Map<K, A>, T> accumulator = (m, t) -> {
            // 计算分组键
            final K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key");
            // 读取分组容器
            final A container = m.computeIfAbsent(key, k -> downstreamSupplier.get());
            // 执行归约
            downstreamAccumulator.accept(container, t);
        };
        // 并行分组的合并器
        final BinaryOperator<Map<K, A>> merger = Collectors.<K, A, Map<K, A>>mapMerger(downstream.combiner());
        // 结果容器的生成器
        final Supplier<Map<K, A>> mangledFactory = (Supplier<Map<K, A>>) mapFactory;
        // 1)无序合并处理
        if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) {
            return new CollectorImpl<>(mangledFactory, accumulator, merger, CH_ID);
        }
        // 2)执行合并处理
        else {
            @SuppressWarnings("unchecked")
            final
            Function<A, A> downstreamFinisher = (Function<A, A>) downstream.finisher();
            final Function<Map<K, A>, M> finisher = intermediate -> {
                // 对所有的分组执行最后的映射
                intermediate.replaceAll((k, v) -> downstreamFinisher.apply(v));
                @SuppressWarnings("unchecked")
                final
                M castResult = (M) intermediate;
                return castResult;
            };
            return new CollectorImpl<>(mangledFactory, accumulator, merger, finisher, CH_NOID);
        }
    }
  • groupingByConcurrent【并行分组】
    /**
     * 【并行分组】使用指定的分类器 classifier 将流中的元素进行分组,同一个组中的元素被收集到一个 ArrayList 中。
     */
    public static <T, K>
    Collector<T, ?, ConcurrentMap<K, List<T>>>
    groupingByConcurrent(Function<? super T, ? extends K> classifier) {
        return groupingByConcurrent(classifier, ConcurrentHashMap::new, toList());
    }

    /**
     *  【并行分组】使用指定的分类器 classifier 将流中的元素进行分组,同一个组中的元素被收集到一个 downstream 收集器中。
     */
    public static <T, K, A, D>
    Collector<T, ?, ConcurrentMap<K, D>> groupingByConcurrent(Function<? super T, ? extends K> classifier,
            Collector<? super T, A, D> downstream) {
        return groupingByConcurrent(classifier, ConcurrentHashMap::new, downstream);
    }

    /**
     *  【并行分组】使用指定的分类器 classifier 将流中的元素进行分组,同一个组中的元素被收集到一个 downstream 收集器中。
     */
    public static <T, K, A, D, M extends ConcurrentMap<K, D>>
    Collector<T, ?, M> groupingByConcurrent(Function<? super T, ? extends K> classifier,
            Supplier<M> mapFactory,
            Collector<? super T, A, D> downstream) {
        final Supplier<A> downstreamSupplier = downstream.supplier();
        final BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator();
        final BinaryOperator<ConcurrentMap<K, A>> merger = Collectors.<K, A, ConcurrentMap<K, A>>mapMerger(downstream.combiner());
        @SuppressWarnings("unchecked")
        final
        Supplier<ConcurrentMap<K, A>> mangledFactory = (Supplier<ConcurrentMap<K, A>>) mapFactory;
        BiConsumer<ConcurrentMap<K, A>, T> accumulator;
        // 同一个分组的结果容器是线程安全的
        if (downstream.characteristics().contains(Collector.Characteristics.CONCURRENT)) {
            accumulator = (m, t) -> {
                final K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key");
                final A resultContainer = m.computeIfAbsent(key, k -> downstreamSupplier.get());
                downstreamAccumulator.accept(resultContainer, t);
            };
        }
        // 同一个分组的结果容器是不是线程安全的,则使用 synchronized 实现同步
        else {
            accumulator = (m, t) -> {
                final K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key");
                final A resultContainer = m.computeIfAbsent(key, k -> downstreamSupplier.get());
                synchronized (resultContainer) {
                    downstreamAccumulator.accept(resultContainer, t);
                }
            };
        }

        if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) {
            return new CollectorImpl<>(mangledFactory, accumulator, merger, CH_CONCURRENT_ID);
        }
        else {
            @SuppressWarnings("unchecked")
            final
            Function<A, A> downstreamFinisher = (Function<A, A>) downstream.finisher();
            final Function<ConcurrentMap<K, A>, M> finisher = intermediate -> {
                intermediate.replaceAll((k, v) -> downstreamFinisher.apply(v));
                @SuppressWarnings("unchecked")
                final
                M castResult = (M) intermediate;
                return castResult;
            };
            return new CollectorImpl<>(mangledFactory, accumulator, merger, finisher, CH_CONCURRENT_NOID);
        }
    }
  • partitioningBy【分区】
    /**
     *  使用指定的断言 predicate 对流中的元素进行二分,
     *  断言结果为 true 的所有元素同一个组,断言结果为 false 的所有元素同一个组,
     *  同一个组中的元素收集到一个 ArrayList 中。
     */
    public static <T>
    Collector<T, ?, Map<Boolean, List<T>>> partitioningBy(Predicate<? super T> predicate) {
        return partitioningBy(predicate, toList());
    }

    /**
     *  使用指定的断言 predicate 对流中的元素进行二分,
     *  断言结果为 true 的所有元素同一个组,断言结果为 false 的所有元素同一个组,
     *  使用指定的收集器来收集同一个组中的元素。
     */
    public static <T, D, A>
    Collector<T, ?, Map<Boolean, D>> partitioningBy(Predicate<? super T> predicate,
            Collector<? super T, A, D> downstream) {
        // 同一组的累加器
        final BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator();
        // 流的累加器
        final BiConsumer<Partition<A>, T> accumulator = (result, t) ->
        downstreamAccumulator.accept(predicate.test(t) ? result.forTrue : result.forFalse, t);
        // 同一组的合并器
        final BinaryOperator<A> op = downstream.combiner();
        // 流的合并器
        final BinaryOperator<Partition<A>> merger = (left, right) ->
        new Partition<>(op.apply(left.forTrue, right.forTrue),
                op.apply(left.forFalse, right.forFalse));
        // 结果容器生成器
        final Supplier<Partition<A>> supplier = () ->
        new Partition<>(downstream.supplier().get(),
                downstream.supplier().get());
        if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) {
            return new CollectorImpl<>(supplier, accumulator, merger, CH_ID);
        }
        else {
            final Function<Partition<A>, Map<Boolean, D>> finisher = par ->
            new Partition<>(downstream.finisher().apply(par.forTrue),
                    downstream.finisher().apply(par.forFalse));
            return new CollectorImpl<>(supplier, accumulator, merger, finisher, CH_NOID);
        }
    }

其他

  • filtering():先对流中的元素执行过滤,满足断言的再执行收集。
    /**
     *  先对流中的元素执行过滤,满足断言的再执行收集。
     * @since 9
     */
    public static <T, A, R>
    Collector<T, ?, R> filtering(Predicate<? super T> predicate,
            Collector<? super T, A, R> downstream) {
        final BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator();
        return new CollectorImpl<>(downstream.supplier(),
                (r, t) -> {
                    if (predicate.test(t)) {
                        downstreamAccumulator.accept(r, t);
                    }
                },
                downstream.combiner(), downstream.finisher(),
                downstream.characteristics());
    }
  • mapping():先对流中的元素执行映射,然后再执行收集。
    /**
     *  先对流中的元素执行映射,然后再执行收集。
     */
    public static <T, U, A, R>
    Collector<T, ?, R> mapping(Function<? super T, ? extends U> mapper,
            Collector<? super U, A, R> downstream) {
        final BiConsumer<A, ? super U> downstreamAccumulator = downstream.accumulator();
        return new CollectorImpl<>(downstream.supplier(),
                (r, t) -> downstreamAccumulator.accept(r, mapper.apply(t)),
                downstream.combiner(), downstream.finisher(),
                downstream.characteristics());
    }
  • flatMapping():通过流中的每一个元素生成一个子流,然后再对子流中的元素执行收集
    /**
     *  通过流中的每一个元素生成一个子流,然后再对子流中的元素执行收集
     * @since 9
     */
    public static <T, U, A, R>
    Collector<T, ?, R> flatMapping(Function<? super T, ? extends Stream<? extends U>> mapper,
            Collector<? super U, A, R> downstream) {
        final BiConsumer<A, ? super U> downstreamAccumulator = downstream.accumulator();
        return new CollectorImpl<>(downstream.supplier(),
                (r, t) -> {
                    try (Stream<? extends U> result = mapper.apply(t)) {
                        if (result != null) {
                            result.sequential().forEach(u -> downstreamAccumulator.accept(r, u));
                        }
                    }
                },
                downstream.combiner(), downstream.finisher(),
                downstream.characteristics());
    }
  • joining():字符序列流的拼接
    /**
     *  对元素类型为 String 的流中的每一个元素执行拼接
     */
    public static Collector<CharSequence, ?, String> joining() {
        return new CollectorImpl<>(
                StringBuilder::new, StringBuilder::append,
                (r1, r2) -> { r1.append(r2); return r1; },
                StringBuilder::toString, CH_NOID);
    }

    /**
     *  对元素类型为 String 的流中的每一个元素执行拼接,指定分割符为 delimiter
     */
    public static Collector<CharSequence, ?, String> joining(CharSequence delimiter) {
        return joining(delimiter, "", "");
    }

    /**
     *  对元素类型为 String 的流中的每一个元素执行拼接,指定分割符为 delimiter,结果前缀为 prefix,结果后缀为 suffix
     */
    public static Collector<CharSequence, ?, String> joining(CharSequence delimiter,
            CharSequence prefix,
            CharSequence suffix) {
        return new CollectorImpl<>(
                () -> new StringJoiner(delimiter, prefix, suffix),
                StringJoiner::add, StringJoiner::merge,
                StringJoiner::toString, CH_NOID);
    }
原文地址:https://www.cnblogs.com/zhuxudong/p/10160448.html