spring webFlux的认识

Spring  WebFlux是随Spring 5推出的响应式Web框架。

一、服务端技术栈

1、从上图纵向可看出,spring-webflux支持两种开发模式:

        (1)类似于Spring WebMVC的基于注解(@Controller、@RequestMapping)的开发模式;

        (2)Java 8 lambda风格的函数式开发模式。

2、WebFlux是基于响应式流的,可以用来建立异步、非阻塞、事件驱动的服务。默认采用Reactor作为响应式流的实现库,也提供对RxJava的支持。

3、由于响应式编程的特性,webFlux底层也需要支持异步的运行环境:

        (1)Netty、Undertow;

        (2)支持异步I/O的Servlet 3.1的容器,如Tomcat(8.0.23及以上)和Jetty(9.0.4及以上)。

4、WebFlux也支持响应式的WebSocket服务端开发

5、SpringBoot2.0以上版本整合了webFlux,直接通过Spring Initializ就可以很方便的创建WebFlux应用

二、响应式Http客户端

Spring WebFlux提供了一个响应式的Http客户端API  WebClient。它可以用函数式的方式异步非阻塞地发起Http请求并处理响应,底层也是由Netty提供的异步

支持。WebClient与RestTemplate作对比,前者的优势:

        (1)是非阻塞的,可以基于少量线程处理更高并发;

        (2)可以使用Java 8 lambda表达式;

        (3)支持异步的同时也可以支持同步的使用方式;

        (4)可以通过数据流的方式与服务端进行双向通信。

为访问WebSocket,Spring WebFlux也提供了响应式的WebSocket客户端API  WebSocketClient。

三、响应式Spring Data

开发基于响应式流的应用,就像是搭建数据流流动的管道,从而异步的数据能够顺畅流过每个环节。大多数系统免不了与数据库交互,所以我们也需要响应

式的持久层API和支持异步的数据库驱动。一条管道,如果任何一个环节发生阻塞,可能造成整体吞吐量的下降。

各个数据库都开始陆续推出异步驱动,目前支持的可以进行响应式数据访问的数据库有MongoDB、Redis、Apache Cassandra和CouchDB。

四、WebFlux性能测试(和SpringMVC对比)

测试内容:分别基于WebMVC和WebFlux创建两个项目:mvc-with-latency和webFlux-with-latency,来对比观察异步非阻塞能带来多大的性能提升,我们通

过sleep和delayElement来模拟一个简单的带有延迟的场景,然后启动服务使用gatling进行测试并且分析。

1、首先测试mvc-with-latency,测试数据如下(Tomcat最大线程数200,延迟100ms):

由以上数据可知:

        (1)用户量在接近3000的时候,线程数达到默认的最大值200;

        (2)线程数达到200之前,95%的请求响应时长是正常的(比100ms多一点点),之后呈直线上升的态势;

        (3)线程数达到200后,吞吐量增幅逐渐放缓。

2.最高200的线程数是Tomcat的默认设置,我们将其设置为400再次测试,测试结果如下:

由于工作线程数扩大一倍,因此请求排队的情况缓解一半。但是增加线程是有成本的,更多的线程意味着更多的内存、线程上下文切换成本更高。

3.然后测试webFlux-with-latency,测试结果如下:

(1)这里没有统计线程数量,因为对于运行在异步I/O的netty上的webFlux应用来说,其工作线程数量始终维持在一个固定的数量,通常这个固定的数量

         等于CPU核数(reactor-http-nio-x和parallel-x的线程,四核八线程的i7的x为1-8),因为异步非阻塞条件下,程序逻辑是由事件驱动的,并不需要多

         线程并发。

(2)随着用户数的增多,吞吐量基本呈线性增多的趋势;

(3)95%的响应都在100ms+的可控范围内返回了,并未出现延时的情况。

可见,非阻塞的处理方式规避了线程排队等待的情况,从而可以用少量而固定的线程处理应对大量请求的处理。

此外,还直接测试了20000用户的情况:

(1)对mvc-with-latency的测试由于出现了许多的请求fail而以失败告终;

(2)webFlux-with-latency应对20000用户,吞吐量达到7228 req/sec,95%响应时长仅117ms。

mvc-with-latency(200线程)与webFlux-with-latency对比图:

五、WebClient和RestTemplate性能对比:

测试内容:创建两个服务A的项目:restTemplate-as-caller和webClient-as-caller。提供同样的url,通过http请求这个url,返回的数据作为自己的响应。区别在

于:restTemplate-as-caller使用一个基于http连接池构造的RestTemplate作为Http客户端,webClient-as-caller使用WebClient作为Http客户端。

1、首先用RestTemplate直接测试一下6000用户,测试结果:吞吐量为1651 req/sec,95%响应时长为1622ms,和上面测试的mvc-with-latency差不多,可见

      RestTemplate是会阻塞的。

2、利用elastic的调度器将阻塞的调用转化为异步非阻塞的,再次测试RestTemplate,发现结果有了明显改善,测试结果:吞吐量2169 req/sec,95%响应时

     长为121ms。但是,使用schedulers.elastic()其实就相当于将每一次阻塞的RestTemplate调用调度到不同线程里去执行,因为不仅有处理请求的200个线程

     ,还有elastic给分配的工作线程,所以总线程数量达到了1000多个。不过在生产环境中,我们通常不会直接使用弹性线程池,而是使用线程数量可控的线

     程池,RestTemplate用完所有的线程后,依然会造成排队的情况。

3、接下来我们创建一个有最大400个线程的线程池,然后调度到这个线程池上,测试结果:吞吐量2169 req/sec,与弹性线程池的相同,95%响应时长为

     236ms,虽然达不到弹性线程池的效果,但是比完全同步阻塞的方式(1中的方式)要好多了。

4、最后测试一下非阻塞的WebClient,跑一下6000用户的测试,测试结果:吞吐量2195 req/sec,95响应时长109ms。值得注意的是,WebClient不需要大量

     并发的线程就可以轻松的完成了。

总结:WebClient能够以少量而固定的线程数处理高并发的http请求,在基于http的服务间通信方面,可以取代RestTemplate以及AsyncRestTemplate。

六、基于WebFlux的高性能REST API网关设计

API网关可以对外部系统提供统一的服务访问入口,对请求进行鉴权、限流等访问控制处理,通过后将请求路由转发给后端服务。在高并发和潜在的高延迟场

景下,API网关要实现高性能高吞吐量的一个基本要求是全链路异步,不要阻塞线程。支持异步非阻塞的WebFlux就满足上述条件。

基于WebFlux的网关主要组件是WebFilter过滤器(WebFlux中的WebFilter和Servlet Filter是两回事,虽然概念类似,但其底层的运行是基于netty,不是

tomcat,虽然也可适配tomcat或jetty这些传统容器。WebFlux不用Servlet那一套编程模型,如HttpServletRequest、HttpServletResponse,而是用的如

ServerWebExchange、ServerHttpRequest等),多个过滤器分别实现认证授权、限流、请求路由转发、正常响应回写(把后端服务的响应回写给调用网关的

前端系统)等。

参考架构

服务路由

后端服务的url前缀是固定的,可以在application.properties文件中来配置,例如:backend.service.url.prefix=http://127.0.0.1:8080;

同时在配置文件当中可以配置服务超时时间,如backend.service.timeout.inmillis=10000;网关url前缀,如gateway.url.prefix=/api.gateway.demo

然后写个控制器获取上面自定义参数做一些判断处理,forward到相应的url,然后将正常响应或者异常响应回写给调用方等。

这样的话假设前端应用(服务消费者)用如下url调用网关(网关端口为9988)

http://api.gateway.demo:9988/orders/1234

如果请求过滤通过,网关最后会将请求路由转发到http://127.0.0.1:8080/orders/1234。

application.properties参数设置:

#网关服务端口

server.port=9988

#网关url前缀

gateway.url.prefix=/api.gateway.demo

#调用后端服务的超时时间,单位毫秒。实际项目应该是根据具体的服务取对应的超时配置

backend.service.timeout.inmillis=10000

#后端服务endpoint,实际项目通常应该从注册中心获取服务和它对应的endpoint关系

backend.service.url.prefix=http://127.0.0.1:8080

鉴权、限流

写两个过滤器RequestAuthFilter和RateLimitFilter都继承WebFilter,在过滤器中做一些业务处理判断,分别用于鉴权和限流。

七、WebClient的基本使用

创建WebClient对象

1.使用create()创建WebClient的实例:

WebClient webClient = WebClient.create("https://api.github.com");

2.使用WebClient构建器创建WebClient

//使用WebClient构建器,可以自定义选项:包括过滤器、默认标题、cookie、客户端连接器等
WebClient webClient = WebClient.builder()
        .baseUrl("https://api.github.com")
        .defaultHeader(HttpHeaders.CONTENT_TYPE, "application/vnd.github.v3+json")
        .defaultHeader(HttpHeaders.USER_AGENT, "Spring 5 WebClient")
        .build()

WebClient发GET请求并用retriveve()检索响应

//retrieve():异步获取response信息;bodyToFlux():将response body解析为字符串
webClient.get()
        .uri("/user/repos")
        .header("Authorization", "Basic " + Base64Utils.
                encodeToString((username + ":" + token).getBytes(Charset.forName("utf-8"))))
        .retrieve()
        .bodyToFlux(GithubRepo.class);

使用exchange()方法检索响应

//retrieve方法是获取response信息的最简单方法,如果想对response拥有更多控制权,可使用exchange访问整个response标题和正文
//使用flatMap来将ClientResponse映射为Flux;
webClient.get()
        .uri("/user/repos")
        .header("Authorization", "Basic " + Base64Utils.
                encodeToString((username + ":" + token).getBytes(Charset.forName("utf-8"))))
        .exchange()
		.flatMapMany(clientResponse -> clientResponse.bodyToFlux(GithubRepo.class));

在请求URI中使用参数

//uri()中使用参数,参数都被花括号包围,分别传递值。在提出请求之前,这些参数将被WebClient自动替换
webClient.get()
        .uri("/user/repos?sort={sortField}&direction={sortDirection}","updated","desc")
        .header("Authorization", "Basic " + Base64Utils.
                encodeToString((username + ":" + token).getBytes(Charset.forName("utf-8"))))
        .retrieve()
        .bodyToFlux(GithubRepo.class);

使用URIBuilder构造请求URI

webClient.get()
        .uri(uriBuilder -> uriBuilder.path("/user/repos")
								.queryParam("sort","updated")
								.queryParam("direction","desc")
								.build())
        .header("Authorization", "Basic " + Base64Utils.
                encodeToString((username + ":" + token).getBytes(Charset.forName("utf-8"))))
        .retrieve()
        .bodyToFlux(GithubRepo.class);

在WebClient请求中传递Request Body

1.如果有一个Mono或Flux请求体,那么可以直接传递给body()中,否则需要创建响应式类型传递

webClient.post()
        .uri("/user/repos")
		.body(Mono.just(createRepoRequest),RepoRequest.class)
 		.header("Authorization", "Basic " + Base64Utils.
                encodeToString((username + ":" + token).getBytes(Charset.forName("utf-8"))))
        .retrieve()
        .bodyToMono(GithubRepo.class);

2.如果具有实际值而不是Publisher(Flux/Mono),则可以使用syncBody()快捷方式传递请求正文

webClient.post()
        .uri("/user/repos")
		.syncBody(createRepoRequest)
 		.header("Authorization", "Basic " + Base64Utils.
                encodeToString((username + ":" + token).getBytes(Charset.forName("utf-8"))))
        .retrieve()
        .bodyToMono(GithubRepo.class);

3.也可以使用BodyInserters类提供的各种工厂方法来构造一个BodyInserter对象并将其传递给body()方法

webClient.post()
        .uri("/user/repos")
		.body(BodyInserters.fromObject(createRepoRequest))
 		.header("Authorization", "Basic " + Base64Utils.
                encodeToString((username + ":" + token).getBytes(Charset.forName("utf-8"))))
        .retrieve()
        .bodyToMono(GithubRepo.class);

处理WebClient错误

只要接收到状态码为4xx或5xx的响应retrieve(),WebClient中的方法WebClientResponseException就会抛出一个。可以使用onStatus方法自定义

public Flux<GithubRepo> listGithubRepositories(){
	return webClient.get()
        	.uri("/user/repos?sort={sortField}&direction={sortDirection}","updated","desc")
        	.retrieve()
			.onStatus(HttpStatus::is4xxClientError,clientResponse -> 
				Mono.error(new MyCustomClientException()))
			.onStatus(HttpStatus::is5xxClientError,clientResponse -> 
				Mono.error(new MyCustomClientException()))
 			.bodyToFlux(GithubRepo.class);
}

此外也可以用@ExceptionHandler在控制权内部使用这种方式来处理WebClientResponseException并返回适当的响应给客户端

@ExceptionHandler(WebClientResponseException.class)
public ResponseEntity<String> handleException(WebClientResponseException ex){
	return ResponseEntity.status(ex.getRawStatusCode()).body(ex.getResponseBodyAsString());
}

 

           

原文地址:https://www.cnblogs.com/wjp1122/p/9628503.html