ABP4.6旧版 集成 MassTransit rabbitMQ 重试,熔断

废话不多说!!!上代码:

代码粗略!本人本地演示====

1提前安装 RabbitMQ 服务等

2:配置方法图:

 代码:

/// <summary>
/// 配置 Masstransit
/// </summary>
/// <param name="services"></param>
private void ConfigureDistributedEventBus(IServiceCollection services)
{
// var options = _appConfiguration.GetSection("RabbitMQConfigInfo");//.Get<MassTransitEventBusOptions>();

services.AddHealthChecks();

services.Configure<HealthCheckPublisherOptions>(options =>
{
options.Delay = TimeSpan.FromSeconds(2);
options.Predicate = (check) => check.Tags.Contains("ready");
});
var mqConnectionString = "rabbitmq://127.0.0.1"; // + options.ConnectionString;
services.AddMassTransit(mtConfig =>
{
//inject consumers into IOC from assembly
// mtConfig.AddConsumersFromContainer(Assembly.GetExecutingAssembly());

mtConfig.AddConsumers(Assembly.GetExecutingAssembly());
//mtConfig.AddConsumers(typeof(AuthCenterEventBusHostModule));
//mtConfig.AddConsumers(typeof(SmsTokenValidationCreatedEvent));
mtConfig.SetKebabCaseEndpointNameFormatter();
mtConfig.AddBus(provider =>
{
var bus = Bus.Factory.CreateUsingRabbitMq(mqConfig =>
{
mqConfig.Host(new Uri(mqConnectionString), h =>
{
//h.Username(options.UserName);
//h.Password(options.Password);
h.Username("guest");
h.Password("guest");
});
// set special message serializer
mqConfig.UseBsonSerializer();

// integrated existed logger compontent
// mqConfig.UseExtensionsLogging(provider.GetService<ILoggerFactory>());

mqConfig.ReceiveEndpoint("BangFaoffsite.Web", q =>
{
q.UseRetry(ret =>
{
ret.Interval(3, TimeSpan.FromSeconds(10)); // 每隔10s重试一次,最多重试3次
});

//set rabbitmq prefetch count
q.PrefetchCount = 20000000;

//set message retry policy
q.UseMessageRetry(r => r.Interval(3, 100));
q.UseCircuitBreaker(cb =>
{
cb.TrackingPeriod = TimeSpan.FromMinutes(1); // 跟踪周期:1min
cb.TripThreshold = 15; // 失败比例达到15%后才会打开熔断器
cb.ActiveThreshold = 5; // 至少发生5次请求后才会打开熔断器
cb.ResetInterval = TimeSpan.FromMinutes(3); // 熔断时间间隔:5mins
});
// q.Consumer<SmsTokenValidationCreatedEvent>(provider);
//EndpointConvention.Map<SmsTokenValidationCreatedEvent>(q.InputAddress);

});

//mqConfig.ReceiveEndpoint(host, "user-synchronization", q =>
//{
// //set rabbitmq prefetch count
// q.PrefetchCount = 50;
// //q.UseRateLimit(100, TimeSpan.FromSeconds(1));
// //q.UseConcurrencyLimit(2);

// //set message retry policy
// q.UseMessageRetry(r => r.Interval(3, 100));

// q.Consumer<UserSyncEventConsumer>(provider);
// EndpointConvention.Map<UserSyncEvent>(q.InputAddress);
//});

mqConfig.ConfigureEndpoints(provider);

//mqConfig.UseAuditingFilter(provider, o =>
//{
// o.ReplaceAuditing = true;
//});

});
// set authtication middleware for user identity
// bus.ConnectAuthenticationObservers(provider);
bus.Start();
return bus;
});
});
}

调用:

 代码:

 ConfigureDistributedEventBus(services);

3:appsettings.json配置:

 定义消费者类:

 =内部:

 代码:

public class AuthCenterEventBusHostModule : IConsumer<IOrder>
{
public Task Consume(ConsumeContext<IOrder> context)
{
Console.WriteLine($" {DateTime.Now}失败的成功===AuthCenterEventBusHostModule{ context.Message.Name}");
context.NotifyConsumed(TimeSpan.FromSeconds(1), typeof(IUser).Name);
return context.ConsumeCompleted;
}


}

三:协议对象:

代码:

/// <summary>
/// 传输协议
/// </summary>
public interface IData
{
string Name { get; set; }
string Url { get; set; }
}
/// <summary>
/// 订单协议
/// </summary>
public interface IOrder
{
string Name { get; set; }
}

/// <summary>
/// 用户协议
/// </summary>
public interface IUser
{
string Name { get; set; }
}

 四:其他两个消费者类似;

五:生产者定义:在控制器中使用

 

 代码就不贴了;比较简单,亲自写写加深印象!!哈哈哈

接下来启动项目====测试!!!

5结果:

原文地址:https://www.cnblogs.com/tianxujun/p/14788812.html