Consul和Ocelot和polly服务降级熔断

1.Consul

启动命令 consul agent -dev -client=0.0.0.0    //注意,一定要加上 -client=0.0.0.0

startup.cs

app.MapWhen(context => context.Request.Path.Equals("/Api/Health/Index"),
applicationBuilder => applicationBuilder.Run(async context => {
Console.WriteLine("This server is health.");
context.Response.StatusCode = (int)HttpStatusCode.OK;
await context.Response.WriteAsync("OK");
})
);
#region Consul
app.RegisterConsul(lifetime, option, httpFactory,configuration);
#endregion

    public static class ConsulExtension
    {
        public static IApplicationBuilder RegisterConsul(this IApplicationBuilder app, IHostApplicationLifetime lifetime, ConsulOption option, 
            IHttpClientFactory httpClientFactory,
            IConfiguration configuration) {
            ConsulClient consulClient = new ConsulClient(c => {
                c.Address = new Uri(option.ConsulAddreess);
                c.Datacenter = option.ConsulDoc;
            });
            string ip = configuration["ip"];
            int port = int.Parse(configuration["port"]);

            AgentServiceRegistration registration = new AgentServiceRegistration()
            {
                ID = "Service" + Guid.NewGuid(),
                Name = option.ServiceName,
                Address = ip,
                Port = port,
                Tags = new string[] { option.Weight },
                Check = new AgentServiceCheck()
                {
                    Interval = TimeSpan.FromSeconds(120),
                    HTTP = $"http://{ip}:{port}/Api/Health/Index",
                    Timeout = TimeSpan.FromSeconds(5),
                    DeregisterCriticalServiceAfter = TimeSpan.FromSeconds(20)
                }
            };

            consulClient.Agent.ServiceRegister(registration);

            lifetime.ApplicationStopped.Register(() =>
            {
                Console.WriteLine("触发停止事件");
                var httpClient = httpClientFactory.CreateClient();
                string url = $"{option.ConsulAddreess}/v1/agent/service/deregister/{registration.ID}";
                Console.WriteLine($"发送url:{url}");
                var requestMessage = new HttpRequestMessage(HttpMethod.Put, url);
                httpClient.SendAsync(requestMessage).Wait();
                Console.WriteLine($"删除服务成功");

            });
            return app;


        }
    }

appsettings.json

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft": "Warning",
      "Microsoft.Hosting.Lifetime": "Information"
    }
  },
  "AllowedHosts": "*",
  "Consul": {
    "ServiceName": "Tom",
    "IP": "192.168.0.107",
    "Port": "2001",
    "Weight": "4",
    "HealthCheckAddess": "http://192.168.0.107:2001/Api/Health/Index",
    "ConsulAddreess": "http://192.168.0.107:8500",
    "ConsulDoc": "dc1"
  }
}

2.Ocelot

可以做请求转发(类似nginx),限流等

services.AddOcelot().AddConsul();

 #region  ocelot
   app.UseOcelot();

  #endregion

添加配置文件configuration.json


{
//************************************本地单点负债均衡******************
//"Routes": [
// {
// "DownstreamPathTemplate": "/{url}",
// "DownstreamScheme": "http",
// "DownstreamHostAndPorts": [
// {
// "Host": "192.168.0.107",
// "Port": 2001
// },
// {
// "Host": "192.168.0.107",
// "Port": 2002
// }
// ],
// "LoadBalancerOptions": {
// "Type": "RoundRobin"
// },
// "UpstreamPathTemplate": "/{url}",
// "UpstreamHttpMethod": [ "Get", "Post" ]
// }
//],
//"GlobalConfiguration": {
// "BaseUrl": "https://localhost:5000"
//},
//***********************Ocelot + Consul**********************
"Routes": [
{
"DownstreamPathTemplate": "/{url}",
"DownstreamScheme": "http",
"UseServiceDiscovery": true,
"ServiceName": "Tom",
"LoadBalancerOptions": {
"Type": "RoundRobin"
},
"UpstreamPathTemplate": "/{url}",
"UpstreamHttpMethod": [ "Get", "Post", "Put" ],
"RateLimitOptions": {
"EnableRateLimiting": true,
"Period": "3m",
"PeriodTimespan": 30, //多少秒后重试
"Limit": 5 //统计时间范围内可以请求的最大数量
},
"FileCacheOptions": { //缓存
"TtlSeconds": 15, //Second
"Region": "UserCache"
}
}
],
"GlobalConfiguration": {
"BaseUrl": "http://192.168.0.107:6008",
"ServiceDiscoveryProvider": {
"Host": "192.168.0.107",
"Port": 8500,
"Type": "Consul"
},
"RateLimitOptions": {
"QuotaExceededMessage": "Too many requests, maybe late?",
"HttpStatusCode": 666
}
}


}

 

program.cs

   public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args).ConfigureAppConfiguration(conf => {
                    conf.AddJsonFile("configuration.json", optional: true, reloadOnChange: true);
                })
                .ConfigureWebHostDefaults(webBuilder =>
                {
                    webBuilder.UseStartup<Startup>();
                });

 缓存 

    1.引用Ocelot.Cache.CacheManager

 2.startup.cs中配置

 services.AddOcelot().AddConsul().AddPolly().AddCacheManager(x => { 
                x.WithDictionaryHandle();  //默认字典存储
            });

 3.若要缓存扩展,添加一个类继承接口IOcelotCache<CachedResponse>,并在IOC容器中注册单例

using Ocelot.Cache;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace OcelotDemo.Model
{
    public class CustomeCache :IOcelotCache<CachedResponse>
    {
        private class CacheDataModel {
            public CachedResponse CachedResponse { set; get; }
            public DateTime TimeOut {set;get;}
            public string Region { set; get; }
        }

        private static Dictionary<string, CacheDataModel> CustomeCacheDictionary = new Dictionary<string, CacheDataModel>();
        public CustomeCache() { 
        
        }

        public void Add(string key, CachedResponse value, TimeSpan ttl, string region) {
            Console.WriteLine($"This is {nameof(CustomeCache)}.{nameof(Add)}");
            CustomeCacheDictionary[key] = new CacheDataModel()
            {
                CachedResponse = value,
                Region = region,
                TimeOut = DateTime.Now.Add(ttl)
            };
        }

        public CachedResponse Get(string key, string region) {
            Console.WriteLine("CustomeCache get cache*************");
            bool hasCache = CustomeCacheDictionary.ContainsKey(key)
                && CustomeCacheDictionary[key] != null
                && CustomeCacheDictionary[key].TimeOut > DateTime.Now
                && CustomeCacheDictionary[key].Region.Equals(region);
            if (hasCache) {
                return CustomeCacheDictionary[key].CachedResponse;
            } else {
                return null;
            }
        }

        public void ClearRegion(string region) {
            Console.WriteLine("CustomeCache ClearRegion **********");
            var keyList = CustomeCacheDictionary.Where(kv =>kv.Value.Region.Equals(region)).Select(kv => kv.Key);
            foreach (var key in keyList) {
                CustomeCacheDictionary.Remove(key);
            }
        }

        public void AddAndDelete(string key, CachedResponse value, TimeSpan ttl, string region) {
            Console.WriteLine($"This is {nameof(CustomeCache)}.{nameof(AddAndDelete)}");
            CustomeCacheDictionary[key] = new CacheDataModel()
            {
                CachedResponse = value,
                Region = region,
                TimeOut = DateTime.Now.Add(ttl)
            };
        }
    }
}

在IOC容器中祖册单例

            services.AddSingleton<IOcelotCache<CachedResponse>, CustomeCache>();

3.Polly服务降级和熔断

降级和熔断

//在startup.cs,要用单例模式注册AccountServiceImpl,polly方法在AccountServiceImpl中
services.AddSingleton<IAccountService, AccountServiceImpl>();

  AccountServiceImpl.cs

            //超时策略
            var timeout = Polly.Policy.TimeoutAsync(1, TimeoutStrategy.Optimistic, (con, t, task) => {   //超过1秒就超时
                Console.WriteLine("超时*************");
                return Task.CompletedTask;
            });

            //熔断策略
            var circuitBreaker = Polly.Policy.Handle<Exception>().CircuitBreakerAsync(
                    exceptionsAllowedBeforeBreaking: 5,  //降级5次就熔断
                    durationOfBreak: TimeSpan.FromSeconds(60),
                    onBreak: (exception, breakDelay) => {
                        Console.WriteLine($"熔断******{breakDelay.TotalMilliseconds} ms, {exception.Message}");
                    },
                    onReset: () => {
                        Console.WriteLine("熔断器关闭***********");
                    },
                    onHalfOpen: () => {
                        Console.WriteLine("进入半开状态***************");
                    }
                );
            _asyncPolicy = Policy<bool>.Handle<Exception>().FallbackAsync(AccountServiceFallback(), x =>
            {
                Console.WriteLine("降级处理************");
                return Task.CompletedTask;
            }).
            WrapAsync(circuitBreaker).
            WrapAsync(timeout);
        }

        public bool AccountServiceFallback()
        {
            Console.WriteLine("触发降级策略********************");
            return false;
        }

超时

重试,若重试和熔断策略都要执行,Polly.Wrap()方法

        /// <summary>
        /// 重试
        /// </summary>
        public string GetRetryData(string serviceName) {
            PolicyBuilder builder = Policy.Handle<Exception>();
            RetryPolicy retryPolicy = builder.Retry(retryCount: 10, onRetry: (ex, count) => {
                Console.WriteLine($"重试{count}次");
                Console.WriteLine($"错误信息:{ex.Message}");
            });
            string result = retryPolicy.Execute<string>(()=> {
                string url = "http://tom/WeatherForecast/GetData";
                var uri = new Uri(url);

                var agentService = AgentBalanceHelper.GetService(serviceName);
                url = $"{uri.Scheme}://{agentService.Address}:{agentService.Port}{uri.PathAndQuery}";
                Console.WriteLine($"{url} invoke");
                var result = HttpClient.InvokeGet(url);
                Thread.Sleep(10000);

                Console.WriteLine($"GetData Api *******************Status:{result.Status}");
                return result.Result;
            });
            return result;
        }
原文地址:https://www.cnblogs.com/kingsmart/p/15270441.html