webFlux 学习(一)

在学习webflux 之前我们先要了解一个概念

什么是背压(back press)

对我们以前使用的发布订阅模式来说.我们的以前的模式是 消费只能通过提供者提供的数据来持续消费 如果一直发送消息,那么就只能一直消费

我们对背压做一个详细的比喻

比如我们每家每户,都有一个水龙头.自来水公司相当于发布者者,我们家庭就是个订阅者,水就是数据,在之前的模式我们的订阅者是一个被动接受的概念

背压就是相当于我们家里安装了一个水龙头,我们需要的时候就把他打开.不需要的时候就把他关闭

reactive stream

响应式流.这是jdk9 引入的一套标准,他能够很好的实现背压,但是我去官网的时候,发现jdk9已经结束.我们看看jdk11吧

jdk11有一个flow接口 里面有4个方法 

1.publisher 就是发布者

  subscribe:就是和订阅者产生一个关系

2.subscribe 就是订阅者

  onSubscribe:签署一个订阅关系传入subscription

  onNext(): 接受到一条数据

  onError(): 就是出错

  onComplete(): 完成

3.Subscription接口中就是其中实现背压的关键 其中request方法就是告诉发布者我需要多少资源,发布者那里 就会发布多少资源

4.Processor  既可以做发布者,也可以做订阅者,具体是用来中间环节的数据处理工作

 简单的例子我们来运行下

每次处理完之后告诉发布者我还可以处理的数据是多少

  public static void main(String[] args) throws InterruptedException {
      //1.定义发布者

        SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();

        //2. 定义订阅者

        Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<Integer>() {

            private Flow.Subscription subscription;
            int total = 0;
            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                //保存订阅关系
                this.subscription = subscription;
                //请求一个数据
                subscription.request(1);
            }

            @Override
            public void onNext(Integer item) {
                System.out.println("接受到: "+ item);
                total++;
                System.out.println("接受的条数为  : "+ total);
                this.subscription.request(1);
                //或者到达一定数量告诉发布者不接受数据了
                if(total ==10){
                    this.subscription.cancel();
                    System.out.println("接受数据已经足够");

                }





            }

            @Override
            public void onError(Throwable throwable) {
            throwable.printStackTrace();
            //抛出异常就返回
            this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                System.out.println("数据处理完了.");
            }
        };

        //3发布和订阅 建立订阅关系

        publisher.subscribe(subscriber);


        //4.生产数据
        for (int i = 0; i < 100; i++) {
            publisher.submit(i);
        }

        //5.关闭发布者
        publisher.close();

        Thread.currentThread().join(5000);

    }

processor

  public static void main(String[] args) throws InterruptedException {
      //1.定义发布者

        SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
        //2.定义一个处理器 对数据进行过滤,并转化为string的类型
        MyProcessor myProcessor = new MyProcessor();
        //3.发布者与处理器建立关系
        publisher.subscribe(myProcessor);

        //4. 定义最终订阅者

        Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {

            private Flow.Subscription subscription;
            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                //保存订阅关系
                this.subscription = subscription;
                //请求一个数据
                subscription.request(1);
            }
            LinkedList<String > list = new LinkedList<>();
            @Override
            public void onNext(String item) {
                list.add(item);
                this.subscription.request(1);
                //或者到达一定数量告诉发布者不接受数据了
                System.out.println(item);
                if(list.size() == 10){
                    this.subscription.cancel();
                    System.out.println("接受数据已经足够");
                    this.onComplete();

                }





            }

            @Override
            public void onError(Throwable throwable) {
            throwable.printStackTrace();
            //抛出异常就返回
            this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                System.out.println("数据处理完了."+list.toString());
            }
        };

        //5 处理器和最终的订阅者建立关系

       myProcessor.subscribe(subscriber);


        //4.生产数据
        for (int i = 0; i < 100; i++) {
            publisher.submit(i);
        }

        //5.关闭发布者
        publisher.close();

        Thread.currentThread().join(5000);



    }
    static   class  MyProcessor extends SubmissionPublisher<String> implements Flow.Processor<Integer,String> {
        private Flow.Subscription subscription;
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;

            subscription.request(1);

        }

        @Override
        public void onNext(Integer item) {
//            System.out.println("processor-> 处理器接收到的数据.."+item);
            if(item % 2 ==0){
                this.submit("转->" +item);

            }
            this.subscription.request(1);
        }

        @Override
        public void onError(Throwable throwable) {
        throwable.printStackTrace();
        this.subscription.cancel();
        }

        @Override
        public void onComplete() {
            System.out.println("processor 处理器已经处理完成!");
        }
    }

里面的运行机制

publiser.submit():是一个阻塞方法

订阅者有一个缓冲池.当缓冲池满了之后 submit()方法就会被阻塞.这样就不会再去生产数据了

subscription 缓冲的capacity默认是256个.

原文地址:https://www.cnblogs.com/bj-xiaodao/p/11044732.html