DelayTest

Project Reactor - Adding Delay to Mono and Flux

https://www.woolha.com/tutorials/project-reactor-adding-delay-to-mono-and-flux

package com.test.reactor;

import java.util.ArrayList;
import reactor.core.publisher.Mono;

public class DelayTest {

  public static void main(String[] args) {
    DelayTest test = new DelayTest();
    test.testDelay();;
  }

  public void testDelay(){
    Mono.just(new ArrayList<String>())
        .delayUntil(this::before)
        .delayUntil(this::doing)
        .delayUntil(this::after)
        .subscribe(System.out::println);
  }

  Mono<Void> before(ArrayList<String> arrayList){
    System.out.println("add 1");
    arrayList.add("1");
    return Mono.empty().then();
  }

  Mono<Void> doing(ArrayList<String> arrayList){
    System.out.println("add 2");
    arrayList.add("2");
    return Mono.empty().then();
  }

  Mono<Void> after(ArrayList<String> arrayList){
    System.out.println("add 3");
    arrayList.add("3");
    return Mono.empty().then();
  }
}

add 1
add 2
add 3
[1, 2, 3]

package com.test.reactor;

import java.time.Duration;
import reactor.core.publisher.Flux;

public class DelayTest2 {

  public static void main(String[] args) {
    DelayTest2 test = new DelayTest2();
    test.testDelay();;
  }

  public void testDelay(){

      Flux.just(1, 2, 3, 4, 5, 6, 7, 8)
          .delayElements(Duration.ofMillis(1000))
          .buffer(Duration.ofMillis(2000), Duration.ofMillis(2000))
        .subscribe(System.out::println);
    try {
      Thread.sleep(20000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

}

[1]
[2, 3]
[4, 5]
[6, 7]
[8]

 after remove buffer every 1000ms emit 1element

1
2
3
4
5
6
7
8

package com.test.reactor;

import java.time.Duration;
import reactor.core.publisher.Flux;

public class DelayTest2 {

  public static void main(String[] args) {
    DelayTest2 test = new DelayTest2();
    test.testDelay();;
  }

  public void testDelay(){

      Flux.just(1, 2, 3, 4, 5, 6, 7, 8)
          .delayUntil(a -> Flux.just(11, 21, 31).hide().delayElements(Duration.ofMillis(1000)))
        .subscribe(System.out::println);
    try {
      Thread.sleep(20000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

}

every 3000ms emit 1element

1
2
3
4
5
6

原文地址:https://www.cnblogs.com/tonggc1668/p/13999877.html