Quartz.Net + mysql 高可用模式

     当下定时服务组件,相信很多大佬都很熟悉了 ,比如hangfire、quartz.net、以及xxljob,公司级别个人感觉还是xxljob最好用,开源简单,还有集群,故障恢复,定时准确,对比其他定时组件有很大的优势,但总有它实现不了的功能......

     之前做的定时业务中,都是使用xxljob来实现定时的,非常方便快捷,使用cron表达式就可以实现定时或者循环定时的功能。

这个时候就有一个比较尴尬的问题了,定时的时间是不规则的,比如每47秒执行一次,这个时候使用cron表达式就无法准确的定时了,就会出现这种场面:

 可以看出来假设我们设置的是每47秒执行一次,会出现每00秒也执行一次,达不到业务的要求,所以这个时候在想其他方式解决这个问题,最终还是选择了quartz.net 。


回到正题,使用quartz.net + mysql 首先需要建立数据库表:

官方建表的脚本:

 https://github.com/quartznet/quartznet/blob/main/database/tables/tables_mysql_innodb.sql

 通过官方给的脚本一键建表非常的方便,接下来我们就是引入Quartz.net 包,引入之后就是初始化了:

我们需要写个初始化方法,这里很重要,一定需要携带数据源参数(这个坑踩了好久,不然启动不了):

    /// <summary>
    /// 初始化quartnet
    /// </summary>
    public static class QuartzSchedulerFatory
    {
        public async static Task Init(string connection, long count)
        {
            try
            {
                //1.首先创建一个作业调度池
                var properties = new NameValueCollection();
                //schedule名称
                properties["quartz.scheduler.instanceName"] = "MyClusteredScheduler";

                properties["quartz.scheduler.instanceId"] = "AUTO";

                properties["quartz.threadPool.type"] = "Quartz.Simpl.DefaultThreadPool, Quartz";

                properties["quartz.threadPool.threadCount"] = $"{count}";

                properties["quartz.jobStore.type"] = "Quartz.Impl.AdoJobStore.JobStoreTX,Quartz";
                //表明前缀
                properties["quartz.jobStore.tablePrefix"] = "QRTZ_";
                // 序列化类型,必须添加,不添加无法注册数据库
                properties["quartz.serializer.type"] = "binary";
                //驱动类型
                properties["quartz.jobStore.driverDelegateType"] = "Quartz.Impl.AdoJobStore.MySQLDelegate,Quartz";
                //数据源名称
                properties["quartz.jobStore.dataSource"] = "myDS";

                properties["quartz.jobStore.clustered"] = "true";

                properties["quartz.jobStore.clusterCheckinInterval"] = "20000";
                //连接字符串             
                properties["quartz.dataSource.myDS.connectionString"] = connection;
                //版本
                properties["quartz.dataSource.myDS.provider"] = "MySql";
                //最大链接数
                properties["quartz.dataSource.myDS.maxConnections"] = "100";
                // First we must get a reference to a scheduler
                var schedulerFactory = new StdSchedulerFactory(properties);
                var scheduler = await schedulerFactory.GetScheduler();
                scheduler.Start();
            }
            catch (Exception ex)
            {
                Log.Logger.Error($"{DateTime.UtcNow}: 初始化quartz.net异常:{ex.Message}");
            }
        }
    }

然后在startup类中的ConfigureServices方法中注入:

QuartzSchedulerFatory.Init(_configuration.GetValue<string>("Mysql:Default"), _configuration.GetValue<long>("JobMaxCount"));

然后就是写我们执行的方法了:

    /// <summary>
    /// Quartz 执行类
    /// </summary>
    public class JobExcuteService : IJob
    {
        /// <summary>
        /// 触发器触发之后执行的方法
        /// </summary>
        /// <param name="context"></param>
        /// <returns></returns>
        public async Task Execute(IJobExecutionContext context)
        {
            var id = Convert.ToInt64(context.JobDetail.Key.Name);
            Log.Logger.Information($"{id} 任务执行,执行时间:" + DateTime.Now.ToString());

            // 依赖注入
            var provider = ServiceLocator.ServiceProvider;
            var mongo = provider.GetService<IMongoService>();   // 获取实例
        }
    }

这里需要注意的一点,因为执行类是不支持构造方法中注入实例的,所以我们需要写个获取实例的方法:

    public static class ServiceLocator
    {
        public static IServiceProvider ServiceProvider { get; private set; }
        public static void SetService(IServiceProvider service)
        {
            ServiceProvider = service;
        }
    }

然后需要在startup注入IServiceProvider 对象:

ServiceLocator.SetService(service: services.BuildServiceProvider());

这样就解决了无法构造注入的问题了。写好执行类之后就需要编写创建类:

 /// <summary>
    /// Quartzjob操作类
    /// </summary>
    public static class QuartzJobHelper
    {
        /// <summary>
        /// job 组
        /// </summary>
        public static readonly string group = "Operation_BroadCast";


        /// <summary>
        /// 创建任务
        /// </summary>
        /// <param name="name"></param>
        /// <param name="startTime"></param>
        /// <param name="endTime"></param>
        /// <param name="frequency"></param>
        public static async Task<bool> CreateJob(string name, long startTime, long endTime, int frequency)
        {
            var result = false;
            try
            {
                Log.Logger.Information($"{DateTime.UtcNow}: 新建定时任务({name})开始---------------------- 
");
                //IContainer container = JobModule.ConfigureContainer(new ContainerBuilder()).Build();
                ISchedulerFactory schedulerFactory = new StdSchedulerFactory();
                IScheduler scheduler = await schedulerFactory.GetScheduler("MyClusteredScheduler");
                //IScheduler scheduler = (IScheduler)StdSchedulerFactory.GetDefaultScheduler().Result;

                IJobDetail job = JobBuilder.Create<JobExcuteService>()
                                 .WithIdentity(name, group)
                                 .Build();

                long delayTime = endTime + (frequency * 1000 * 3) + 1000;
                var end = DateTimeOffset.FromUnixTimeMilliseconds(delayTime);
                ITrigger trigger;
                if (startTime != 0)
                {
                    var start = DateTimeOffset.FromUnixTimeMilliseconds(startTime);
                    trigger = TriggerBuilder.Create()
                                               .WithIdentity(name, group)
                                               .StartAt(start)
                                               .EndAt(end)
                                               .WithSimpleSchedule(s => s
                                               .WithIntervalInSeconds(frequency)
                                               .RepeatForever())
                                               .Build();
                }
                else
                {
                    trigger = TriggerBuilder.Create()
                                               .WithIdentity(name, group)
                                               .StartNow()
                                               .EndAt(end)
                                               .WithSimpleSchedule(s => s
                                               .WithIntervalInSeconds(frequency)
                                               .RepeatForever())
                                               .Build();
                }

                //var cts = new CancellationTokenSource();
                //var scheduler = container.Resolve<IScheduler>();
                await scheduler.ScheduleJob(job, trigger);
                await scheduler.Start();
                Log.Logger.Information($"
 {DateTime.UtcNow}: 新建定时任务({name}) 成功---------------------- 
");
                result = true;
            }
            catch (Exception ex)
            {
                Log.Logger.Error($"{DateTime.UtcNow}: 新建定时任务异常:{ex.Message}");
            }

            return result;
}

然后就可以根据业务场景进行创建任务删除任务,达到我们的目的,但是官方宣称使用mysql或者其他数据库持久化的时候,是有可能出现死锁的情况,尽管出现的概率不高,所以需要监控使用,有问题及时处理。

如有错误,欢迎指正,互相学习。谢谢!
原文地址:https://www.cnblogs.com/Ivan-Wu/p/14989135.html