WebFlux源码之DispatcherHandler

WebFlux之DispatcherHandler

DispatcherHandler

处理服务器请求

	public Mono<Void> handle(ServerWebExchange exchange) {
		if (this.handlerMappings == null) {
			return createNotFoundError();
		}
		return Flux.fromIterable(this.handlerMappings)
				.concatMap(mapping -> mapping.getHandler(exchange))
				.next()
				.switchIfEmpty(createNotFoundError())
				.flatMap(handler -> invokeHandler(exchange, handler))
				.flatMap(result -> handleResult(exchange, result));
	}

	private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) {
		if (this.handlerAdapters != null) {
			for (HandlerAdapter handlerAdapter : this.handlerAdapters) {
				if (handlerAdapter.supports(handler)) {
					return handlerAdapter.handle(exchange, handler);
				}
			}
		}
		return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler));
	}

	private Mono<Void> handleResult(ServerWebExchange exchange, HandlerResult result) {
		return getResultHandler(result).handleResult(exchange, result)
				.onErrorResume(ex -> result.applyExceptionHandler(ex).flatMap(exceptionResult ->
						getResultHandler(exceptionResult).handleResult(exchange, exceptionResult)));
	}

在DispatcherHandler中三个依次调用的核心接口是HandlerMapping、HandlerAdapter、HandlerResultHandler。DispatcherHandler实现了ApplicationContextAware接口,会自动调用setApplicationContext方法,然后在initStrategies中注入三个接口的所有实例。

	protected void initStrategies(ApplicationContext context) {
		Map<String, HandlerMapping> mappingBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
				context, HandlerMapping.class, true, false);

		ArrayList<HandlerMapping> mappings = new ArrayList<>(mappingBeans.values());
		AnnotationAwareOrderComparator.sort(mappings);
		this.handlerMappings = Collections.unmodifiableList(mappings);

		Map<String, HandlerAdapter> adapterBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
				context, HandlerAdapter.class, true, false);

		this.handlerAdapters = new ArrayList<>(adapterBeans.values());
		AnnotationAwareOrderComparator.sort(this.handlerAdapters);

		Map<String, HandlerResultHandler> beans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
				context, HandlerResultHandler.class, true, false);

		this.resultHandlers = new ArrayList<>(beans.values());
		AnnotationAwareOrderComparator.sort(this.resultHandlers);
	}

HandlerMapping

HandlerMapping负责路径到handler的映射

public interface HandlerMapping {

	/**
	 * Return a handler for this request.
	 * @param exchange current server exchange
	 * @return a {@link Mono} that emits one value or none in case the request
	 * cannot be resolved to a handler
	 */
	Mono<Object> getHandler(ServerWebExchange exchange);

}

实现类有

RequestMappingHandlerMapping 生成@Controller和@RequestMapping封装的接口
RouterFunctionMapping 支持RouterFunction
SimpleUrlHandlerMapping 支持ResourceWebHandler

RequestMappingHandlerMapping

  • 注册bean

    org.springframework.web.reactive.config.WebFluxConfigurationSupport#requestMappingHandlerMapping

    	@Bean
    	public RequestMappingHandlerMapping requestMappingHandlerMapping() {
    		RequestMappingHandlerMapping mapping = createRequestMappingHandlerMapping();
    		mapping.setOrder(0);
    		mapping.setContentTypeResolver(webFluxContentTypeResolver());
    		mapping.setCorsConfigurations(getCorsConfigurations());
    
    		PathMatchConfigurer configurer = getPathMatchConfigurer();
    		Boolean useTrailingSlashMatch = configurer.isUseTrailingSlashMatch();
    		if (useTrailingSlashMatch != null) {
    			mapping.setUseTrailingSlashMatch(useTrailingSlashMatch);
    		}
    		Boolean useCaseSensitiveMatch = configurer.isUseCaseSensitiveMatch();
    		if (useCaseSensitiveMatch != null) {
    			mapping.setUseCaseSensitiveMatch(useCaseSensitiveMatch);
    		}
    		Map<String, Predicate<Class<?>>> pathPrefixes = configurer.getPathPrefixes();
    		if (pathPrefixes != null) {
    			mapping.setPathPrefixes(pathPrefixes);
    		}
    		return mapping;
    	}
    
  • 初始化mapping

    	@Override
    	public void afterPropertiesSet() {
    
    		initHandlerMethods();
    
    		// Total includes detected mappings + explicit registrations via registerMapping..
    		int total = this.getHandlerMethods().size();
    
    		if ((logger.isTraceEnabled() && total == 0) || (logger.isDebugEnabled() && total > 0) ) {
    			logger.debug(total + " mappings in " + formatMappingName());
    		}
    	}
    

    initHandlerMethods遍历所有bean,如果bean标注了Controller或RequestMapping的注解,则调用detectHandlerMethods

    	protected void initHandlerMethods() {
    		String[] beanNames = obtainApplicationContext().getBeanNamesForType(Object.class);
    
    		for (String beanName : beanNames) {
    			if (!beanName.startsWith(SCOPED_TARGET_NAME_PREFIX)) {
    				Class<?> beanType = null;
    				try {
    					beanType = obtainApplicationContext().getType(beanName);
    				}
    				catch (Throwable ex) {
    					// An unresolvable bean type, probably from a lazy bean - let's ignore it.
    					if (logger.isTraceEnabled()) {
    						logger.trace("Could not resolve type for bean '" + beanName + "'", ex);
    					}
    				}
    				if (beanType != null && isHandler(beanType)) {
    					detectHandlerMethods(beanName);
    				}
    			}
    		}
    		handlerMethodsInitialized(getHandlerMethods());
    	}
    

    detectHandlerMethods遍历所有标注了RequestMapping的方法,注册到MappingRegistry. 当请求匹配路径是,lookupHandlerMethod会遍历MappingRegistry中的mapping.

    	protected void detectHandlerMethods(final Object handler) {
    		Class<?> handlerType = (handler instanceof String ?
    				obtainApplicationContext().getType((String) handler) : handler.getClass());
    
    		if (handlerType != null) {
    			final Class<?> userType = ClassUtils.getUserClass(handlerType);
    			Map<Method, T> methods = MethodIntrospector.selectMethods(userType,
    					(MethodIntrospector.MetadataLookup<T>) method -> getMappingForMethod(method, userType));
    			if (logger.isTraceEnabled()) {
    				logger.trace(formatMappings(userType, methods));
    			}
    			methods.forEach((key, mapping) -> {
    				Method invocableMethod = AopUtils.selectInvocableMethod(key, userType);
    				registerHandlerMethod(handler, invocableMethod, mapping);
    			});
    		}
    	}
    

RouterFunctionMapping

同RequestMappingHandler类似,在WebFluxConfigurationSupport中注册RouterFunctionMapping的bean,实例化过程中导入所有的RouterFunction

	public void afterPropertiesSet() throws Exception {
		if (CollectionUtils.isEmpty(this.messageReaders)) {
			ServerCodecConfigurer codecConfigurer = ServerCodecConfigurer.create();
			this.messageReaders = codecConfigurer.getReaders();
		}

		if (this.routerFunction == null) {
			initRouterFunctions();
		}
	}

initRouterFunctions方法从ApplicationContext中查找所有的RouterFunction

	protected void initRouterFunctions() {
		List<RouterFunction<?>> routerFunctions = routerFunctions();
		this.routerFunction = routerFunctions.stream().reduce(RouterFunction::andOther).orElse(null);
		logRouterFunctions(routerFunctions);
	}

	private List<RouterFunction<?>> routerFunctions() {
		List<RouterFunction<?>> functions = obtainApplicationContext()
				.getBeanProvider(RouterFunction.class)
				.orderedStream()
				.map(router -> (RouterFunction<?>)router)
				.collect(Collectors.toList());
		return (!CollectionUtils.isEmpty(functions) ? functions : Collections.emptyList());
	}

一个简单的RouterFunction示例

    @Bean
    public RouterFunction<ServerResponse> routes(PostHandler postController) {
        return route(GET("/posts"), postController::all)
            .andRoute(POST("/posts"), postController::create)
            .andRoute(GET("/posts/{id}"), postController::get)
            .andRoute(PUT("/posts/{id}"), postController::update)
            .andRoute(DELETE("/posts/{id}"), postController::delete);
    }

SimpleUrlHandlerMapping

SimpleUrlHandlerMapping可以提供静态资源的访问

例如org.springframework.web.reactive.config.WebFluxConfigurationSupport#resourceHandlerMapping方法返回的是SimpleUrlHandlerMapping的实例对象

	@Bean
	public HandlerMapping resourceHandlerMapping() {
		ResourceLoader resourceLoader = this.applicationContext;
		if (resourceLoader == null) {
			resourceLoader = new DefaultResourceLoader();
		}
		ResourceHandlerRegistry registry = new ResourceHandlerRegistry(resourceLoader);
		registry.setResourceUrlProvider(resourceUrlProvider());
		addResourceHandlers(registry);

		AbstractHandlerMapping handlerMapping = registry.getHandlerMapping();
		if (handlerMapping != null) {
			PathMatchConfigurer configurer = getPathMatchConfigurer();
			Boolean useTrailingSlashMatch = configurer.isUseTrailingSlashMatch();
			Boolean useCaseSensitiveMatch = configurer.isUseCaseSensitiveMatch();
			if (useTrailingSlashMatch != null) {
				handlerMapping.setUseTrailingSlashMatch(useTrailingSlashMatch);
			}
			if (useCaseSensitiveMatch != null) {
				handlerMapping.setUseCaseSensitiveMatch(useCaseSensitiveMatch);
			}
		}
		else {
			handlerMapping = new EmptyHandlerMapping();
		}
		return handlerMapping;
	}

addResourceHandlers通过WebFluxConfigurer的addResourceHandlers配置静态资源(每个路径对应ResourceHandlerRegistration对象)

WebFluxConfig继承WebFluxConfigurer,并默认注入了几个静态资源访问路径,分别是

  • /webjars/**
  • /** (WebFluxProperties的默认值)
		public void addResourceHandlers(ResourceHandlerRegistry registry) {
			if (!this.resourceProperties.isAddMappings()) {
				logger.debug("Default resource handling disabled");
				return;
			}
			if (!registry.hasMappingForPattern("/webjars/**")) {
				ResourceHandlerRegistration registration = registry.addResourceHandler("/webjars/**")
						.addResourceLocations("classpath:/META-INF/resources/webjars/");
				configureResourceCaching(registration);
				customizeResourceHandlerRegistration(registration);
			}
			String staticPathPattern = this.webFluxProperties.getStaticPathPattern();
			if (!registry.hasMappingForPattern(staticPathPattern)) {
				ResourceHandlerRegistration registration = registry.addResourceHandler(staticPathPattern)
						.addResourceLocations(this.resourceProperties.getStaticLocations());
				configureResourceCaching(registration);
				customizeResourceHandlerRegistration(registration);
			}
		}

ResourceHandlerRegistry.getHandleerMapping将ResourceHandlerRegistration转成ResourceWebHandler,然后封装成SimpleUrlHandlerMapping

	protected AbstractUrlHandlerMapping getHandlerMapping() {
		if (this.registrations.isEmpty()) {
			return null;
		}
		Map<String, WebHandler> urlMap = new LinkedHashMap<>();
		for (ResourceHandlerRegistration registration : this.registrations) {
			for (String pathPattern : registration.getPathPatterns()) {
				ResourceWebHandler handler = registration.getRequestHandler();
				handler.getResourceTransformers().forEach(transformer -> {
					if (transformer instanceof ResourceTransformerSupport) {
						((ResourceTransformerSupport) transformer).setResourceUrlProvider(this.resourceUrlProvider);
					}
				});
				try {
					handler.afterPropertiesSet();
				}
				catch (Throwable ex) {
					throw new BeanInitializationException("Failed to init ResourceHttpRequestHandler", ex);
				}
				urlMap.put(pathPattern, handler);
			}
		}
		SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping();
		handlerMapping.setOrder(this.order);
		handlerMapping.setUrlMap(urlMap);
		return handlerMapping;
	}

HandlerAdapter

public interface HandlerAdapter {

	boolean supports(Object handler);


	Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler);

}

HandlerAdapter负责适配HandlerMapping返回的handler,调用handle方法生成HandlerResult

HandlerMapping getHandler HandlerAdapter
RequestMappingHandlerMapping HandlerMethod RequestMappingHandlerAdapter
SimpleUrlHandlerMapping ResourceWebHandler SimpleHandlerAdapter
RouterFunctionMapping HandlerFunction HandlerFunctionAdapter

WebFluxConfigurationSupport

WebFluxConfigurationSupport注册了上面三个HandlerAdapter的bean。

	@Bean
	public RequestMappingHandlerAdapter requestMappingHandlerAdapter() {
		RequestMappingHandlerAdapter adapter = createRequestMappingHandlerAdapter();
		adapter.setMessageReaders(serverCodecConfigurer().getReaders());
		adapter.setWebBindingInitializer(getConfigurableWebBindingInitializer());
		adapter.setReactiveAdapterRegistry(webFluxAdapterRegistry());

		ArgumentResolverConfigurer configurer = new ArgumentResolverConfigurer();
		configureArgumentResolvers(configurer);
		adapter.setArgumentResolverConfigurer(configurer);

		return adapter;
	}

	@Bean
	public HandlerFunctionAdapter handlerFunctionAdapter() {
		return new HandlerFunctionAdapter();
	}

	@Bean
	public SimpleHandlerAdapter simpleHandlerAdapter() {
		return new SimpleHandlerAdapter();
	}

HandlerResultHandler

HandlerResultHandler
ResponseBodyResultHandler @ResponseBody
ResponseEntityResultHandler HttpEntity、RequestEntity、HttpHeaders
ServerResponseResultHandler ServerResponse
ViewResolutionResultHandler @ModelAttribute、CharSequence、Render、Model、Map、Void、View、"Simple" property(a primitive, a String or other CharSequence, a Number, a Date, a URI, a URL, a Locale, a Class, or a corresponding array)

ReactiveAdapter

将泛型类映射为实体类,Reactor支持Mono、Flux、Publisher、CompletableFuture

例如如下示例,返回值为Flux时,Response自动转为成Message对象的json数组

    @GetMapping
    Flux<Message> allMessages(){
        return Flux.just(
            Message.builder().body("hello Spring 5").build(),
            Message.builder().body("hello Spring Boot 2").build()
        );
    }

ReactiveAdapterRegistry

负责加载ClassPath中的ReactiveAdapter

	public ReactiveAdapterRegistry() {
		ClassLoader classLoader = ReactiveAdapterRegistry.class.getClassLoader();

		// Reactor
		boolean reactorRegistered = false;
		if (ClassUtils.isPresent("reactor.core.publisher.Flux", classLoader)) {
			new ReactorRegistrar().registerAdapters(this);
			reactorRegistered = true;
		}
		this.reactorPresent = reactorRegistered;

		// RxJava1
		if (ClassUtils.isPresent("rx.Observable", classLoader) &&
				ClassUtils.isPresent("rx.RxReactiveStreams", classLoader)) {
			new RxJava1Registrar().registerAdapters(this);
		}

		// RxJava2
		if (ClassUtils.isPresent("io.reactivex.Flowable", classLoader)) {
			new RxJava2Registrar().registerAdapters(this);
		}

		// Java 9+ Flow.Publisher
		if (ClassUtils.isPresent("java.util.concurrent.Flow.Publisher", classLoader)) {
			new ReactorJdkFlowAdapterRegistrar().registerAdapter(this);
		}
		// If not present, do nothing for the time being...
		// We can fall back on "reactive-streams-flow-bridge" (once released)
	}

以Reactor为例,通过ReactorRegistrar注册支持的ReactiveType

void registerAdapters(ReactiveAdapterRegistry registry) {
			// Register Flux and Mono before Publisher...

			registry.registerReactiveType(
					ReactiveTypeDescriptor.singleOptionalValue(Mono.class, Mono::empty),
					source -> (Mono<?>) source,
					Mono::from
			);

			registry.registerReactiveType(
					ReactiveTypeDescriptor.multiValue(Flux.class, Flux::empty),
					source -> (Flux<?>) source,
					Flux::from);

			registry.registerReactiveType(
					ReactiveTypeDescriptor.multiValue(Publisher.class, Flux::empty),
					source -> (Publisher<?>) source,
					source -> source);

			registry.registerReactiveType(
					ReactiveTypeDescriptor.singleOptionalValue(CompletableFuture.class, () -> {
						CompletableFuture<?> empty = new CompletableFuture<>();
						empty.complete(null);
						return empty;
					}),
					source -> Mono.fromFuture((CompletableFuture<?>) source),
					source -> Mono.from(source).toFuture()
			);
		}
原文地址:https://www.cnblogs.com/huiyao/p/14444280.html