reactive和reactor

Reactive是函数式编程(Functional),管道流(pipeline,stream),异步非阻塞的,事件驱动的.

org.reactivestreams包主要有4个接口

发布者 Publisher

public interface Publisher<T>{
      public void subscribe(Subscriber<? super T> s);
}

订阅者 Subscriber
当接收到Publisher的数据时,会调用响应的回调方法.注册完成时,首先会调用
onSubscribe方法,参数Subscription s包含了注册信息.
为什么注册?注册这个订阅者到发布的信息上

public interface Subscriber<T>{
      //注册完成后,首先被调用
      public void onSubscribe(Subscription s);

      public void onNext(T t);
      
      public void onError(Throwable t);

      public void onComplete();

}

订阅Subscription
这个不是er而是tion,很明显一个是器具一个是motive
1.通过订阅,订阅者Subscriber可以请求数据request,或者取消订阅cancel
2.在请求数据时,参数long n表示希望接收的数据量,防止发布者Publisher发送过多的数据.
3.一旦开始请求,数据就会在流stream中传输.每接收一个,就会调用onNext(T t);发生错误
时,onError(Throwable t)被调用;传输完成后,onComplete()被调用

public interface Subscription{
      //请求数据,参数n为请求的数据量,不是超时时间or other anythings
      public void request(long n);
      //取消订阅
      public void cancel();
}
  • Processor
    可以看出,Processor接口继承了Subscriber和Publisher,是流的中间环节.in the middle
public interface Processor<T,R> extends Subscriber<T>,Publisher<R>{

}

Reactive Stream中数据从Publisher开始,经过若干个Processor,最终到达Subscriber,即完整的Pipeline.


Project Reactor
依赖

<dependency>
      <groupId>io.projectreactor</groupId>
      <artifactId>reactor-core</artifactId>
</dependency>

Mono和Flux

  1. 抽象类Mono和Flux实现了Publisher接口,他们是发布者.
  2. Mono表示少于等于1个数据(即0个,或1个数据)或错误;Flux表示一连串多个数据.

操作

  1. 创建Flux或Mono,调用subscribe()后,数据开始流动
    主要方法有: just,fromArray,fromStream,fromIterable,range
    /**
     * 创建Flux或Mono,调用subscribe()后,数据开始流动
     * 主要方法有: just,fromArray,fromStream,fromIterable,range
     */
    @Test
    public void create(){
        //just方法
        String[] arr = new String[]{"hello","world"};
        Flux<String> flux1 = Flux.just(arr);
        flux1.subscribe(System.out::println);

        //Mono支持单个或0个   
        Mono<String> mono = Mono.just("hi world");
        mono.subscribe(System.out::println);

        //fromArray
        List<String> list = Arrays.asList("welcome", "single one");
        Flux<String> flux2 = Flux.fromIterable(list);
        flux2.subscribe(System.out::println);

        //fromIterable方法
        List<String> fruitList = new ArrayList<>();
        fruitList.add("Apple");
        fruitList.add("Orange");
        Flux<String> flux3 = Flux.fromIterable(fruitList);
        flux3.subscribe(System.out::println);

        //fromStream方法
        Stream<String> stream = Stream.of("hi","hello");
        Flux<String> flux4 = Flux.fromStream(stream);

        //range方法
        Flux<Integer> range = Flux.range(0,5);

        //interval方法,take方法限制个数为5个
        Flux<Long> longFlux = Flux.interval(Duration.ofSeconds(1)).take(5);
        longFlux.subscribe(System.out::println);
    }

输出:

hello
world
hi world
welcome
single one
Apple
Orange

2.合并mergeWith

@Test
public void mergeFlux(){
      Flux<String> source1 = Flux.just("hello","world");
      Flux<String> source2 = Flux.just("hi","G");

      Flux<String> merge = source1.mergeWith(source2);
      merge.subscribe(System.out::println);
}

原文 还有一些其它方法没有加入博客代码中
https://www.jianshu.com/p/9d3a2a28976a

原文地址:https://www.cnblogs.com/ukzq/p/13460324.html