FlatMapTest

package com.test;

import java.time.Duration;
import java.util.Arrays;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import reactor.core.publisher.Flux;

/**
 * @author: ffzs
 * @Date: 2020/8/7 下午8:18
 */

@Slf4j
public class FlatMapTest {
//->a->f->b->f->c->z->d->s
  @Test
  public void flatMap () throws InterruptedException {
    Flux.just("abcd", "ffzs")
        .flatMap(i -> Flux.fromArray(i.split("")).delayElements(Duration.ofMillis(10)))
        .subscribe(i -> System.out.print("->"+i));
    Thread.sleep(100);
  }
//->a->b->c->d->f->f->z->s
  @Test
  public void flatMapSequential () throws InterruptedException {
    Flux.just("abcd", "ffzs")
        .flatMapSequential(i -> Flux.fromArray(i.split("")).delayElements(Duration.ofMillis(10)))
        .subscribe(i -> System.out.print("->"+i));
    Thread.sleep(1000);
  }

 /* @Test
  public void testConcurrencyAndPrefetch() {
    int concurrency = 3;
    int prefetch = 6;
    Flux.range(1, 1)
        //.log()
        .flatMap(i -> Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
               .log()
            ,
            concurrency, prefetch)
        .subscribe(i -> System.out.print("->"+i));
  }*/
//->a->b->c->d->f->f->z->s
    @Test
  public void flatMapIterable () {
    Flux.just("abcd", "ffzs")
        .flatMapIterable(i -> Arrays.asList(i.split("")))
        .subscribe(i -> System.out.print("->"+i));
  }
//->a->b->c->d->f->f->z->s
  @Test
  public void concatMap () throws InterruptedException {
    Flux.just("abcd", "ffzs")
        .concatMap(i -> Flux.fromArray(i.split("")).delayElements(Duration.ofMillis(10)))
        .subscribe(i -> System.out.print("->"+i));
    Thread.sleep(110);
  }
//->a->b->c->d->f->f->z->s
 @Test
  public void concatMapIterable () {
    Flux.just("abcd", "ffzs")
        .concatMapIterable(i -> Arrays.asList(i.split("")))
        .subscribe(i -> System.out.print("->"+i));
  }
}
原文地址:https://www.cnblogs.com/tonggc1668/p/13999917.html