spring-cloud-gateway(二)es代理功能需求

实际是该项目的延申 cclient/elasticsearch-multi-cluster-compat-proxy: 网关代理兼容ES6 es7 proxy and compat elasticsearch version 7 and elasticsearch version 6's _search and _bulk request api (github.com)

项目用spring-boot-starter-webflux 实现了es6和es7的兼容层网关,但只是做可行性验证

真正生产使用的网关还需要更多功能,最基本的,限流,熔断,负载均衡,这些都独立往项目里加,每个点都需要花精力整合

实际这方面应用已经有了很成熟的方案,服务治理相关,k8s,ingress,istio,kong,nginx...

但对es这类bare服务,套用云服务的方案并不合适,kong/nginx因为是c+lua的技术栈,定制的成本较高

nodejs已经过气了,go和java,考虑生态,选择java

java方面的网关,很少单独提及,更多是做为java服务治理的一个组件来使用

java类网关,早些年大家都使用netty/mina原生实现

但后期有基于netty的各种封装好的http框架,再完全用netty开发http类网关就比较少见了,当然tcp/udp类,自定义rpc的还是免不了直接和netty打交道

整合netty的 http高性能网关类服务早些年个人用过 jersey,后来用vert.x,再后来就直接上spring-boot-starter-webflux了,抽空把历史代码扒出来

现在为了省去在webflux自行添加限流,熔断,降级等功能,直接使用spring-boot-gateway

实际spring-boot-gateway,本身集成mvn/webflux的两种方案,webflux的底层就是netty

本地代理其实可以理解为替代nginx,并实现一些和业务产品深度结合的功能
因为nginx c+lua的技术栈和开发成本较高

首先spring-cloud-gateway 原生是 spring-cloud的组件,应用场景和spring-cloud深耦合

例如,loadbanlance,依赖spring-cloud的服务发现组件,consule,nacos等

https://docs.spring.io/spring-cloud-commons/docs/current/reference/html/#spring-cloud-loadbalancer

Spring Cloud Commons provides the @EnableDiscoveryClient annotation. This looks for implementations of the DiscoveryClient and ReactiveDiscoveryClient interfaces with META-INF/spring.factories. Implementations of the discovery client add a configuration class to spring.factories under the org.springframework.cloud.client.discovery.EnableDiscoveryClient key. Examples of DiscoveryClient implementations include Spring Cloud Netflix Eureka, Spring Cloud Consul Discovery, and Spring Cloud Zookeeper Discovery.

spring-cloud-gateway 基础支持

Spring Cloud Gateway

spring:
  cloud:
    gateway:
      routes:
      - id: before_route
        uri: https://example.org
        predicates:
        - Before=2017-01-20T17:42:47.789-07:00[America/Denver]

以该项为例,关键是uri 这个参数

网关单点1:1 uri 可以写一个http/https的访问地址

如果需要实现负载均衡gateway: server 1:n 则需要实现 uri("lb://backing-service:8088")

@Bean
public RouteLocator routes(RouteLocatorBuilder builder) {
    return builder.routes()
        .route("circuitbreaker_route", r -> r.path("/consumingServiceEndpoint")
            .filters(f -> f.circuitBreaker(c -> c.name("myCircuitBreaker").fallbackUri("forward:/inCaseOfFailureUseThis").addStatusCode("INTERNAL_SERVER_ERROR"))
                .rewritePath("/consumingServiceEndpoint", "/backingServiceEndpoint")).uri("lb://backing-service:8088")
        .build();
}
es3

这里实际依赖了spring-cloud生态的服务发现组件,注册服务 backing-service 至注册中心,spring-cloud从注册中心获取真实的服务地址host:port,再通过客户端负载均衡lb 路由至真实服务

这是服务治理的基本原理流程

但是对目前的场景不适用,目前的场景,基本可以理解为把spring-cloud-gateway 当nginx用

路由到后端静态的几个地址即可

以es为例 3个client节点

es-client-01 192.168.10.11:9200

es-client-02 192.168.10.12:9200

es-client-03 192.168.10.13:9200

并不存在一个有效的注册中心,实际多一个注册中心组件,项目的复杂度就更高了,对es 这类服务组件,并不需要和spring的生态完全结合

我们需要定制lb://的解析,即,使lb:// 不通过注册中心,而是完全静态配置在本地(先搞静态吧,以后再考虑搞动态,此动态非彼动态)

需要注意withHealthChecks() 顾名思义 添加了withHealthChecks() 则会对 后端server进行健康检查,检查方式为验证对应的后端server /actuator/health 是否可达。这个路径是spring-boot/spring-cloud 组件的默认路径,但后端的es服务,这个路径并不可达。.withHealthChecks() 会返回Service Unavailable

{"timestamp":"2021-06-02T09:04:00.596+00:00","path":"/","status":503,"error":"Service Unavailable","requestId":"d3d01e2e-1"}
	@Bean
	public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
			ConfigurableApplicationContext context) {
		return ServiceInstanceListSupplier.builder()
				.withBase(new CustomServiceInstanceListSupplier())
//				.withHealthChecks()
				.build(context);
	}

健康检查部分代码,关键注意/actuator/health

package org.springframework.cloud.loadbalancer.core;

public class HealthCheckServiceInstanceListSupplier extends DelegatingServiceInstanceListSupplier implements InitializingBean, DisposableBean {
    private static final Log LOG = LogFactory.getLog(HealthCheckServiceInstanceListSupplier.class);
    private final HealthCheck healthCheck;
    private final String defaultHealthCheckPath;
    private final Flux<List<ServiceInstance>> aliveInstancesReplay;
    private Disposable healthCheckDisposable;
    private final BiFunction<ServiceInstance, String, Mono<Boolean>> aliveFunction;

    public HealthCheckServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, HealthCheck healthCheck, BiFunction<ServiceInstance, String, Mono<Boolean>> aliveFunction) {
        super(delegate);
        this.defaultHealthCheckPath = (String)healthCheck.getPath().getOrDefault("default", "/actuator/health");
        this.aliveFunction = aliveFunction;
        this.healthCheck = healthCheck;
        Repeat<Object> aliveInstancesReplayRepeat = Repeat.onlyIf((repeatContext) -> {
            return this.healthCheck.getRefetchInstances();
        }).fixedBackoff(healthCheck.getRefetchInstancesInterval());
        Flux<List<ServiceInstance>> aliveInstancesFlux = Flux.defer(delegate).repeatWhen(aliveInstancesReplayRepeat).switchMap((serviceInstances) -> {
            return this.healthCheckFlux(serviceInstances).map((alive) -> {
                return Collections.unmodifiableList(new ArrayList(alive));
            });
        });
        this.aliveInstancesReplay = aliveInstancesFlux.delaySubscription(healthCheck.getInitialDelay()).replay(1).refCount(1);
    }
}    

我们先不采用withHealthChecks,等后期有时间再自定义或配置withHealthChecks实现

package org.springframework.cloud.loadbalancer.core;
public class DiscoveryClientServiceInstanceListSupplier implements ServiceInstanceListSupplier {
    public static final String SERVICE_DISCOVERY_TIMEOUT = "spring.cloud.loadbalancer.service-discovery.timeout";
    private static final Log LOG = LogFactory.getLog(DiscoveryClientServiceInstanceListSupplier.class);
    private Duration timeout = Duration.ofSeconds(30L);
    private final String serviceId;
    private final Flux<List<ServiceInstance>> serviceInstances;

    public DiscoveryClientServiceInstanceListSupplier(DiscoveryClient delegate, Environment environment) {
        this.serviceId = environment.getProperty("loadbalancer.client.name");
        this.resolveTimeout(environment);
        this.serviceInstances = Flux.defer(() -> {
            return Flux.just(delegate.getInstances(this.serviceId));
        }).subscribeOn(Schedulers.boundedElastic()).timeout(this.timeout, Flux.defer(() -> {
            this.logTimeout();
            return Flux.just(new ArrayList());
        })).onErrorResume((error) -> {
            this.logException(error);
            return Flux.just(new ArrayList());
        });
    }

    public DiscoveryClientServiceInstanceListSupplier(ReactiveDiscoveryClient delegate, Environment environment) {
        this.serviceId = environment.getProperty("loadbalancer.client.name");
        this.resolveTimeout(environment);
        this.serviceInstances = Flux.defer(() -> {
            return delegate.getInstances(this.serviceId).collectList().flux().timeout(this.timeout, Flux.defer(() -> {
                this.logTimeout();
                return Flux.just(new ArrayList());
            })).onErrorResume((error) -> {
                this.logException(error);
                return Flux.just(new ArrayList());
            });
        });
    }

    public String getServiceId() {
        return this.serviceId;
    }

    public Flux<List<ServiceInstance>> get() {
        return this.serviceInstances;
    }

    private void resolveTimeout(Environment environment) {
        String providedTimeout = environment.getProperty("spring.cloud.loadbalancer.service-discovery.timeout");
        if (providedTimeout != null) {
            this.timeout = DurationStyle.detectAndParse(providedTimeout);
        }

    }

    private void logTimeout() {
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("Timeout occurred while retrieving instances for service %s.The instances could not be retrieved during %s", this.serviceId, this.timeout));
        }

    }

    private void logException(Throwable error) {
        LOG.error(String.format("Exception occurred while retrieving instances for service %s", this.serviceId), error);
    }
}


原文地址:https://www.cnblogs.com/zihunqingxin/p/14916177.html