《Pro Spring Boot 2》第六章:WebFlux and Reactive Data with Spring Boot

 

 

 

 

 

 

package com.apress.reactor.example;

import com.apress.reactor.example.domain.ToDo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;

@Configuration
public class MonoExample {

    static private Logger LOG = LoggerFactory.getLogger(MonoExample.class);

    @Bean
    public CommandLineRunner runMonoExample(){
        return args -> {


            MonoProcessor<ToDo> promise = MonoProcessor.create();

            Mono<ToDo> result = promise
                    .doOnSuccess(p -> LOG.info("MONO >> ToDo: {}", p.getDescription()))
                    .doOnTerminate( () -> LOG.info("MONO >> Done"))
                    .doOnError(t -> LOG.error(t.getMessage(), t))
                    .subscribeOn(Schedulers.single());

            promise.onNext(new ToDo("Buy my ticket for SpringOne Platform 2018"));
            //promise.onError(new IllegalArgumentException("There is an error processing the ToDo..."));

            result.block(Duration.ofMillis(1000));
        };
    }
}

 

package com.apress.reactor.example;

import com.apress.reactor.example.domain.ToDo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.util.List;

@Configuration
public class FluxExample {


    static private Logger LOG = LoggerFactory.getLogger(FluxExample.class);

    @Bean
    public CommandLineRunner runFluxExample(){
        return args -> {

            EmitterProcessor<ToDo> stream = EmitterProcessor.create();

            // Log values passing through the Flux and capture the first coming signal
            Mono<List<ToDo>> promise = stream
                    .filter( s -> s.isCompleted())
                    .doOnNext(s -> LOG.info("FLUX >>> ToDo: {}", s.getDescription()))
                    .collectList()
                    .subscribeOn(Schedulers.single());

            // Publish a value
            stream.onNext(new ToDo("Read a Book",true));
            stream.onNext(new ToDo("Listen Classical Music",true));
            stream.onNext(new ToDo("Workout in the Mornings"));
            stream.onNext(new ToDo("Organize my room", true));
            stream.onNext(new ToDo("Go to the Car Wash", true));
            stream.onNext(new ToDo("SP1 2018 is coming", true));

            stream.onComplete();

            promise.block();

        };
    }
}

 

 

 

 

原文地址:https://www.cnblogs.com/JasonPeng1/p/12271837.html