gRPC 重试策略

前言


.Net Core gRPC常见的重试策略。

gRPC RetryPolicy


RetryPolicy 是微软官方提供的一种重试策略。允许在创建gRPC的时候配置一次重试策略。


var defaultMethodConfig = new MethodConfig
{
    Names = { MethodName.Default },
    RetryPolicy = new RetryPolicy
    {
        MaxAttempts = 5,
        InitialBackoff = TimeSpan.FromSeconds(1),
        MaxBackoff = TimeSpan.FromSeconds(5),
        BackoffMultiplier = 1.5,
        RetryableStatusCodes = { StatusCode.Unavailable }
    }
};

var channel = GrpcChannel.ForAddress("https://localhost:5000", new GrpcChannelOptions
{
    ServiceConfig = new ServiceConfig { MethodConfigs = { defaultMethodConfig } }
});

创建一个 RetryPolicy 重试配置,在创建 gRPC 的指定重试配置,重试策略可以按方法配置,而方法可以使用 Names 属性进行匹配 MethodName.Default 将应用于此通道调用的所有 gRPC 方法。RetryPolicy 应该是最简单的方式来实现重试了,但是它也有弊端,它没有留下扩展的入口,想加个日志查看不可以。

  • MaxAttempts:最大调用尝试次数,包括原始尝试次数,值必须大于1。
  • InitialBackoff:重试尝试之间初始的延迟。每次尝试后,当前值将乘以 BackoffMultiplier。这个值是必须的,且值必须大于 0。
  • MaxBackoff:重试尝试之间的最大延迟,这个值是必须的,且值必须大于 0。
  • BackoffMultiplier:每次重试尝试后,InitialBackoff 会乘以该值,并将在乘数大于 1 的情况下以指数方式增加。这个值是必须的,且值必须大于 0。
  • RetryableStatusCodes:状态代码的集合。 匹配的状态将会重试,状态代码为 Unavailable 的会进行重试。

gRPC hedging


var defaultMethodConfig = new MethodConfig
{
    Names = { MethodName.Default },
    HedgingPolicy = new HedgingPolicy
    {
        HedgingDelay = TimeSpan.FromSeconds(1),
        MaxAttempts = 5,
        NonFatalStatusCodes = { StatusCode.Unavailable }
    }
};

var channel = GrpcChannel.ForAddress("https://localhost:5000", new GrpcChannelOptions
{
    ServiceConfig = new ServiceConfig { MethodConfigs = { defaultMethodConfig } }
});

Hedging 是一种微软提供的一种备选重试策略。 Hedging 允许在不等待gRPC响应的情况下,发送多个调用请求,并且只会取第一个请求的返回结果,所以你的这个gRPC服务必须具有幂等性(幂等性的实质是一次或多次请求同一个资源,其结果是相同的。其关注的是对资源产生的影响(副作用)而不是结果,结果可以不同)。

  • MaxAttempts: 发送的调用数量上限。 MaxAttempts 表示所有尝试的总数,包括原始尝试。 值受 GrpcChannelOptions.MaxRetryAttempts(默认值为 5)的限制。 必须为该选项提供值,且值必须大于 2。
  • HedgingDelay:除去第一次调用,后续 hedging 调用将按该值延迟发送。 如果延迟设置为零或 null,那么所有所有调用都将立即发送。 默认值为 0。
  • NonFatalStatusCodes:如果返回非致命状态代码将会继续发送请求, 否则,将取消未完成的请求,并将错误返回到应。

HttpClientFactory 组合 Polly


gRPC 的调用请求最终还是由 HttpClient 来发送,所以我们可以结合 HttpClientFactory 和 Polly 来实现重试功能,引用 Microsoft.Extensions.Http.Polly 包。

AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
services.AddGrpcClient<UserClient>(options =>
{
    options.Address = new Uri("http://localhost:5005");
}).AddPolicyHandler(
    HttpPolicyExtensions.HandleTransientHttpError()
        .OrResult(res => res.StatusCode != HttpStatusCode.OK)
        .RetryAsync(5, (result, count) =>
        {
            Console.WriteLine($"StatusCode:{result.Result?.StatusCode},Exception:{result.Exception?.Message},正在进行第{count}次重试"); 
            
        }));

通过 AddPolicyHandler 来扩展需要处理的异常和处理机制。

图片名称

还可以通过 WaitANdRetryAsync 来指定重试的时间

AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
services.AddGrpcClient<UserClient>(options => { options.Address = new Uri("http://localhost:5005"); })
    .AddPolicyHandler(
        HttpPolicyExtensions.HandleTransientHttpError()
            .OrResult(res => res.StatusCode != HttpStatusCode.OK)
            .WaitAndRetryAsync(
                5,
                retryAttempt => TimeSpan.FromSeconds(3),
                (result, time, count, context) =>
                {
                    Console.WriteLine($"正在进行第{count}次重试,间隔{time.TotalSeconds}秒");
                }
                
            ));

图片名称

Interceptor

Interceptor 是gRPC中的拦截器,类似 MVC 中的中间件和过滤器。

public class ClientLoggerInterceptor : Interceptor
{
    public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(
        TRequest request,
        ClientInterceptorContext<TRequest, TResponse> context,
        AsyncUnaryCallContinuation<TRequest, TResponse> continuation)
    {
        return Policy<AsyncUnaryCall<TResponse>> // Return type for single request - single response call.
            .Handle<RpcException>(s => s.StatusCode != StatusCode.OK)
            .OrResult(asyncCall =>
            {
                if (asyncCall.GetAwaiter().IsCompleted)
                {
                    return asyncCall.GetStatus().StatusCode == StatusCode.OK;
                }
                try
                {
                    asyncCall.ResponseAsync.Wait();
                }
                catch (AggregateException)
                {
                    return true;
                }
                return false;
            })
            .WaitAndRetryAsync(3, m => TimeSpan.FromSeconds(2), (result, time, count, context) =>
            {
                Console.WriteLine($"正在进行第{count}次重试。间隔{time.TotalSeconds}秒");
            })
            .ExecuteAsync(() => Task.FromResult(continuation(request, context))).Result;
    }
}
private static async Task Main(string[] args)
{
    AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
    var channel = GrpcChannel.ForAddress("http://localhost:5000");
    var intercept = channel.Intercept(new ClientLoggerInterceptor());
    var client = new Greeter.GreeterClient(intercept);
    var response = await client.SayHelloAsync(
        new HelloRequest
        {
            Name = "World"
        });

    Console.WriteLine(response.Message);
}
图片名称

总结


简单的实现了gRPC的重试策略,在开发过程可以根据自己的需求进行拓展。

原文地址:https://www.cnblogs.com/linhuiy/p/14972507.html