java基础-Steam[1]-接口

UML

image-20210228184632801

操作分类

  • 中间操作
    • 有状态
    • 无状态
  • 终结操作
    • 短路操作
    • 非短路操作

image-20210316090314720

uml

Stream原理解析

Stream接口

public interface Stream<T> extends BaseStream<T, Stream<T>> {

    //返回一个包含所有符合predicate的元素的Stream
    //是一个StreamOps,立即操作:intermediate operation
    //statelessn 无状态的操作
    Stream<T> filter(Predicate<? super T> predicate);

    //返回一个对当前stream执行给定Function后生成的元素的列表
    //是一个StreamOps,立即操作:intermediate operation
    //statelessn 无状态的操作
    <R> Stream<R> map(Function<? super T, ? extends R> mapper);

    //返回一个针对当前stream执行Function后生成的IntStream,
    //是一个StreamOps,立即操作:intermediate operation
    //statelessn 无状态的操作
    IntStream mapToInt(ToIntFunction<? super T> mapper);
    LongStream mapToLong(ToLongFunction<? super T> mapper);
    DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper);

    //针对每个元素执行Function生成一个Stream。在最后会把所有Stream合并返回一个Stream
    //是一个StreamOps,立即操作:intermediate operation
    //statelessn 无状态的操作
    <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);
    IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper);
    LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper);
    DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper);

    //针对当前stream去重,进而返回一个由不同元素组成的Stream
    //对于一个排序的Steam,第一个重复的元素会被保留;对于一个未排序的Stream,没有稳定的保证
    //有状态的立即操作stateful intermediate operation
    //在并行的parallel pipelines中,未排序的或者移除排序约束ordering constraint,会由更高的执行效率。或者转换为序列化执行sequential()
    Stream<T> distinct();

    //对当前stream的元素排序,并返回排序后的stream。元素必须实现Comparable接口,否则报java.lang.ClassCastException(执行terminal operation时)
    Stream<T> sorted();
    Stream<T> sorted(Comparator<? super T> comparator);

    //返回当前Stream,只是多个对每个元素执行Consumer()方法
    //此方法主要用于debug,比如打印中间状态时由哪些元素
    Stream<T> peek(Consumer<? super T> action);

    //截取不大于maxSize长度的元素Stream
    //short-circuiting stateful intermediate operation
    //在sequential stream上执行代价小,而在ordered parallel pipelines代价大
    Stream<T> limit(long maxSize);

    //丢弃最开始的n哥元素,生成新的Stream
    //cheap operation on sequential stream pipelines,expensive on ordered parallel pipelines
    Stream<T> skip(long n);

    // takeWhile() 方法使用一个断言作为参数,返回给定 Stream 的子集直到断言语句第一次返回 false。如果第一个值不满足断言条件,将返回一个空的 Stream。
    // takeWhile() 方法在有序的 Stream 中,takeWhile 返回从开头开始的尽量多的元素;在无序的 Stream 中,takeWhile 返回从开头开始的符合 Predicate 要求的元素的子集。
    default Stream<T> takeWhile(Predicate<? super T> predicate) {
        Objects.requireNonNull(predicate);
        // Reuses the unordered spliterator, which, when encounter is present,
        // is safe to use as long as it configured not to split
        return StreamSupport.stream(
                new WhileOps.UnorderedWhileSpliterator.OfRef.Taking<>(spliterator(), true, predicate),
                isParallel()).onClose(this::close);
    }

    // dropWhile 方法和 takeWhile 作用相反的,使用一个断言作为参数,直到断言语句第一次返回 false 才返回给定 Stream 的子集。
    default Stream<T> dropWhile(Predicate<? super T> predicate) {
        Objects.requireNonNull(predicate);
        // Reuses the unordered spliterator, which, when encounter is present,
        // is safe to use as long as it configured not to split
        return StreamSupport.stream(
                new WhileOps.UnorderedWhileSpliterator.OfRef.Dropping<>(spliterator(), true, predicate),
                isParallel()).onClose(this::close);
    }

    // 为每个元素执行Consumer操作
    void forEach(Consumer<? super T> action);

    // 按元素顺序执行Consumer,比如parallel条件下,forEach无法保证顺序
    void forEachOrdered(Consumer<? super T> action);

    Object[] toArray();
    <A> A[] toArray(IntFunction<A[]> generator);

    // 对所有元素执行二元操作,如 Sum, min, max, average, and string concatenation都是reduce特殊情况
    T reduce(T identity, BinaryOperator<T> accumulator);

    //同上,避免返回空值
    Optional<T> reduce(BinaryOperator<T> accumulator);

    /**
     * 设置了初始值
     * <pre>{@code
     *     U result = identity;
     *     for (T element : this stream)
     *         result = accumulator.apply(result, element)
     *     return result;
     * }</pre>
     */
    <U> U reduce(U identity,
                 BiFunction<U, ? super T, U> accumulator,
                 BinaryOperator<U> combiner);

    /**伪代码如下
     * <pre>{@code
     *     R result = supplier.get();
     *     for (T element : this stream)
     *         accumulator.accept(result, element);
     *     return result;
     * }</pre>
     * <pre>{@code
     *     List<String> asList = stringStream.collect(ArrayList::new, ArrayList::add,
     *                                                ArrayList::addAll);
     * }</pre>
     * Supplier:生产者,也是返回的结果类型
     * accumulator:将流中的元素添加到Supplier中
     * BiConsumer:合并两个Supplier,
     */
    <R> R collect(Supplier<R> supplier,
                  BiConsumer<R, ? super T> accumulator,
                  BiConsumer<R, R> combiner);

    /**
     * Collector封装了#collect(Supplier, BiConsumer, BiConsumer)参数
     * terminal operation
     * 即使使用非线程安全的数据结构,也没有线程安全问题,如ArrayList
     * List<String> asList = stringStream.collect(Collectors.toList());
     * Map<String, List<Person>> peopleByCity
     *         = personStream.collect(Collectors.groupingBy(Person::getCity));
     * Map<String, Map<String, List<Person>>> peopleByStateAndCity
     *         = personStream.collect(Collectors.groupingBy(Person::getState,                                                    Collectors.groupingBy(Person::getCity)));
     */
    <R, A> R collect(Collector<? super T, A, R> collector);


    Optional<T> min(Comparator<? super T> comparator);
    Optional<T> max(Comparator<? super T> comparator);
    long count();

    /**
     * 返回流的元素是否满足 predicate,如果满足直接返回
     * short-circ足uiting terminal operation
     */
    boolean anyMatch(Predicate<? super T> predicate);

    /**
     * 是否所有元素都满足predicate
     * 如果stream为空,直接返回true
     */
    boolean allMatch(Predicate<? super T> predicate);

    //是否没有元素满足predicate,stream为空直接返回true
    //short-circuiting terminal operation
    boolean noneMatch(Predicate<? super T> predicate);

    //返回流的第一个元素,没有则返回空
    Optional<T> findFirst();

    //不保证返回的结果每次都相同
    Optional<T> findAny();

    // Static factories
    public static<T> Builder<T> builder() {
        return new Streams.StreamBuilderImpl<>();
    }

    //Returns an empty sequential {@code Stream}.
    public static<T> Stream<T> empty() {
        return StreamSupport.stream(Spliterators.<T>emptySpliterator(), false);
    }

    // Returns a sequential {@code Stream} containing a single element.
    public static<T> Stream<T> of(T t) {
        return StreamSupport.stream(new Streams.StreamBuilderImpl<>(t), false);
    }
 
    // Returns a sequential {@code Stream} containing a single element, if non-null, otherwise returns an empty {@code Stream}.
    public static<T> Stream<T> ofNullable(T t) {
        return t == null ? Stream.empty()
                         : StreamSupport.stream(new Streams.StreamBuilderImpl<>(t), false);
    }

    // Returns a sequential ordered stream whose elements are the specified values.
    @SafeVarargs
    @SuppressWarnings("varargs") // Creating a stream from an array is safe
    public static<T> Stream<T> of(T... values) {
        return Arrays.stream(values);
    }

    // 返回一个无穷连续由UnaryOperator产生的以seed为初始值的流
    //第一个元素就是seed,第n个元素由第n-1的元素执行UnaryOperator生成
    public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f) {
        Objects.requireNonNull(f);
        Spliterator<T> spliterator = new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE,
               Spliterator.ORDERED | Spliterator.IMMUTABLE) {
            T prev;
            boolean started;

            @Override
            public boolean tryAdvance(Consumer<? super T> action) {
                Objects.requireNonNull(action);
                T t;
                if (started)
                    t = f.apply(prev);
                else {
                    t = seed;
                    started = true;
                }
                action.accept(prev = t);
                return true;
            }
        };
        return StreamSupport.stream(spliterator, false);
    }
    public static<T> Stream<T> iterate(T seed, Predicate<? super T> hasNext, UnaryOperator<T> next) {
        Objects.requireNonNull(next);
        Objects.requireNonNull(hasNext);
        Spliterator<T> spliterator = new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE,
               Spliterator.ORDERED | Spliterator.IMMUTABLE) {
            T prev;
            boolean started, finished;

            @Override
            public boolean tryAdvance(Consumer<? super T> action) {
                Objects.requireNonNull(action);
                if (finished)
                    return false;
                T t;
                if (started)
                    t = next.apply(prev);
                else {
                    t = seed;
                    started = true;
                }
                if (!hasNext.test(t)) {
                    prev = null;
                    finished = true;
                    return false;
                }
                action.accept(prev = t);
                return true;
            }

            @Override
            public void forEachRemaining(Consumer<? super T> action) {
                Objects.requireNonNull(action);
                if (finished)
                    return;
                finished = true;
                T t = started ? next.apply(prev) : seed;
                prev = null;
                while (hasNext.test(t)) {
                    action.accept(t);
                    t = next.apply(t);
                }
            }
        };
        return StreamSupport.stream(spliterator, false);
    }

    // 返回无穷连续的未排序的流,由Supplier生成,适用于random elements
    public static<T> Stream<T> generate(Supplier<? extends T> s) {
        Objects.requireNonNull(s);
        return StreamSupport.stream(
                new StreamSpliterators.InfiniteSupplyingSpliterator.OfRef<>(Long.MAX_VALUE, s), false);
    }

    //拼接两个流
    public static <T> Stream<T> concat(Stream<? extends T> a, Stream<? extends T> b) {
        Objects.requireNonNull(a);
        Objects.requireNonNull(b);

        @SuppressWarnings("unchecked")
        Spliterator<T> split = new Streams.ConcatSpliterator.OfRef<>(
                (Spliterator<T>) a.spliterator(), (Spliterator<T>) b.spliterator());
        Stream<T> stream = StreamSupport.stream(split, a.isParallel() || b.isParallel());
        return stream.onClose(Streams.composedClose(a, b));
    }


    public interface Builder<T> extends Consumer<T> {

        // Adds an element to the stream being built.
        @Override
        void accept(T t);

        default Builder<T> add(T t) {
            accept(t);
            return this;
        }

        // Builds the stream, transitioning this builder to the built state.
        Stream<T> build();

    }
}

原文地址:https://www.cnblogs.com/froggengo/p/14669840.html