stevenjin 2024-10-15 11:28 采纳率: 98%
浏览 3
已结题

Quartz.NET多个计划各自实例化调度器时,怎样指向同一个数据库

Quartz.NET
由于要同时执行多个计划。复制了一份FeedingSchedule,各自实例化了调度器,执行时报错说数据库哪重复了
1.多个计划,怎样指向同一数据库,并可以重启后各自恢复
2.由于多个计划共享同一个job,怎样避免并发执行同一个job(我用了锁+DisallowConcurrentExecution,不知这样可行吗?)


 
private async Task FeedingSchedule()
{
    await Task.Run(() => {
        //实例化调度器           
        NameValueCollection pars = new NameValueCollection
        {
            //线程池个数20
            ["quartz.threadPool.threadCount"] = "20",
            //scheduler名字
            ["quartz.scheduler.instanceName"] = "MyAdoJobStoreScheduler",
            //类型为JobStoreXT,事务
            ["quartz.jobStore.type"] = "Quartz.Impl.AdoJobStore.JobStoreTX, Quartz",
            //数据源名称
            ["quartz.jobStore.dataSource"] = "QuartzDb",
            //使用mysql的Ado操作代理类
            ["quartz.jobStore.driverDelegateType"] = "Quartz.Impl.AdoJobStore.MySQLDelegate, Quartz",
            //数据源连接字符串
            ["quartz.dataSource.QuartzDb.connectionString"] = @"server=127.0.0.1;Database=quartzmanager;user id=root;password=123;SslMode=none;",
            //数据源的数据库
            ["quartz.dataSource.QuartzDb.provider"] = "MySql",
            //序列化类型
            ["quartz.serializer.type"] = "json",
            //自动生成scheduler实例ID,主要为了保证集群中的实例具有唯一标识
            ["quartz.scheduler.instanceId"] = "AUTO"
        };
 
        //实例化调度器
        ISchedulerFactory schedulefactory = new StdSchedulerFactory(pars);
        IScheduler scheduler = schedulefactory.GetScheduler().Result;
        //开启调度器
        scheduler.Start();
 
        List<Feeding> lstFeeding = new List<Feeding>();
        lstFeeding.Add(new Feeding()
        {
            PlanID = 1,
            Interval = 2,            
            PhaseTime = 2,
            PlanState = 3,
            Step = 1,
            Times = 3,
            Wait = 4,
            lstSubPlan = new List<SubPlan>() {
                  new SubPlan()
                  {
                    Interval = 1,
                    Times = 2,
                  },
                  new SubPlan()
                  {
                    Interval = 3,
                    Times = 4,
                  }
            }
        });
 
        lstFeeding.Add(new Feeding()
        {
            PlanID = 1,
            Interval = 2,          
            PhaseTime = 2,
            PlanState = 3,
            Step = 2,
            Times = 3,
            Wait = 2,
            lstSubPlan = new List<SubPlan>() {
            new SubPlan()
            {
                Interval = 2,
                Times = 2,
            },
           new SubPlan()
           {
             Interval = 3,
             Times = 4,
           }
         }
        });
 
        foreach (var feeding in lstFeeding)
        //for (int i = 0; i < 10; i++)
        {
            JobKey firstJobKey = JobKey.Create("FirstJob", "Pipeline");
            JobKey secondJobKey = JobKey.Create("SecondJob", "Pipeline");
            JobKey thirdJobKey = JobKey.Create("ThirdJob", "Pipeline");
 
            // Create job and trigger
            IJobDetail firstJob = JobBuilder.Create<FeedingJob.OuterJob>()
            .WithIdentity(firstJobKey)
            .UsingJobData("outerSays", "Hello World,I'm outSide!")
            //.StoreDurably(true)
            .Build();
 
            IJobDetail secondJob = JobBuilder.Create<FeedingJob.InnnerJob>()
            .WithIdentity(secondJobKey)
            .UsingJobData("innerSays", "Hello World,I'm inSide1!")
            .UsingJobData("executeNext", "false")
            .StoreDurably(true)//指示Quartz在Job成为“孤儿”时不要删除Job(当Job不再有Trigger引用它时),按顺序触发关键
            .Build();
 
            IJobDetail thirdJob = JobBuilder.Create<FeedingJob.InnnerJob>()
            .WithIdentity(thirdJobKey)
            .UsingJobData("innerSays", "Hello World,I'm inSide2!")
            .UsingJobData("executeNext", "true")
            .StoreDurably(true)
            .Build();
 
            ITrigger firstJobTrigger = TriggerBuilder.Create()
            .WithIdentity("Trigger", "Pipeline")
            .WithSimpleSchedule(x => x
            .WithMisfireHandlingInstructionFireNow()
            .WithIntervalInSeconds(4)
            .WithRepeatCount(0)).StartAt(DateTime.Now.AddSeconds(1))
            .Build();
 
            //创建计划链表
            JobChainingJobListener listener = new JobChainingJobListener("JobLink");
            listener.AddJobChainLink(firstJobKey, secondJobKey);
            listener.AddJobChainLink(secondJobKey, thirdJobKey);
 
            //将trigger监听器注册到调度器
            //scheduler.ListenerManager.AddTriggerListener(new CustomTriggerListener());
            scheduler.ListenerManager.AddJobListener(listener, GroupMatcher<JobKey>.GroupEquals("Pipeline"));
 
            // Run it all in chain
            scheduler.Start();
            scheduler.ScheduleJob(firstJob, firstJobTrigger);
            scheduler.AddJob(secondJob, false, true);
            scheduler.AddJob(thirdJob, false, true);
 
            int subPlanWait = feeding.lstSubPlan.Sum(o => o.Interval * o.Times);
            int totalWait = (feeding.Wait + feeding.PhaseTime) * feeding.Interval + subPlanWait;
            Thread.Sleep(totalWait * 1000);
        }
 
 
        //Console.ReadLine();
        //scheduler.Shutdown();
        //Console.WriteLine("Scheduler shutdown.");
        //Console.WriteLine(history);
        //Console.ReadLine();
    });
  
}
 
 
public class FeedingJob
{
    [DisallowConcurrentExecution]
    public class OuterJob : IJob
    {
        private static object _lockOuterJob = new object();
        public Task Execute(IJobExecutionContext context)
        {
            lock (_lockOuterJob)
            {
                JobKey jobKey = context.JobDetail.Key;
                JobDataMap dataMap = context.JobDetail.JobDataMap;
                string jobSays = dataMap.GetString("outerSays");
                Console.WriteLine($"Outer job executed[{jobKey}]:{DateTime.Now},Content:{jobSays}");
                Thread.Sleep(1000);
                return Task.CompletedTask;
            }
 
        }
    }
 
    [DisallowConcurrentExecution]
    public class InnnerJob : IJob
    {
        private static object _lockInnnerJob = new object();
        public Task Execute(IJobExecutionContext context)
        {
            lock (_lockInnnerJob)
            {
                JobKey jobKey = context.JobDetail.Key;
                JobDataMap dataMap = context.JobDetail.JobDataMap;
                string jobSays = dataMap.GetString("innerSays");
                string executeNext = dataMap.GetString("executeNext");
                Console.WriteLine($"Innner job executed[{jobKey}]:{DateTime.Now},Content:{jobSays},executeNext:{executeNext}");
                Thread.Sleep(1000);
            }             
            return Task.CompletedTask;
        } 
    }
}
  • 写回答

1条回答 默认 最新

  • 月下独码 Java领域新星创作者 2024-10-15 11:51
    关注

    本答案参考 多次询问 ChatGPT-3.5 后选择的最佳回答,若是满意,还请采纳,谢谢

    1. 多个计划指向同一数据库并在重启后恢复
      • 问题分析
        • 在Quartz.NET中,当多个调度器实例试图使用相同的数据库时,可能会出现配置冲突。每个调度器实例都试图创建或访问相同的数据库表,导致报错。
      • 解决方案
        • 共享数据库配置
          • 确保所有的调度器实例使用完全相同的数据库连接字符串和配置参数。从你提供的代码来看,你已经在每个FeedingSchedule方法中实例化了调度器并设置了数据库相关的配置。为了共享配置,可以将数据库配置提取出来作为一个公共的配置对象。例如:
            NameValueCollection commonPars = new NameValueCollection
            {
            //线程池个数20
            ["quartz.threadPool.threadCount"] = "20",
            //scheduler名字
            ["quartz.scheduler.instanceName"] = "MyAdoJobStoreScheduler",
            //类型为JobStoreTX,事务
            ["quartz.jobStore.type"] = "Quartz.Impl.AdoJobStore.JobStoreTX, Quartz",
            //数据源名称
            ["quartz.jobStore.dataSource"] = "QuartzDb",
            //使用mysql的Ado操作代理类
            ["quartz.jobStore.driverDelegateType"] = "Quartz.Impl.AdoJobStore.MySQLDelegate, Quartz",
            //数据源连接字符串
            ["quartz.dataSource.QuartzDb.connectionString"] = @"server=127.0.0.1;Database=quartzmanager;user id=root;password=123;SslMode=none;",
            //数据源的数据库
            ["quartz.dataSource.QuartzDb.provider"] = "MySql",
            //序列化类型
            ["quartz.serializer.type"] = "json",
            //自动生成scheduler实例ID,主要为了保证集群中的实例具有唯一标识
            ["quartz.scheduler.instanceId"] = "AUTO"
            };
            
          • 然后在每个FeedingSchedule方法中使用这个公共的配置对象来实例化调度器:
            ISchedulerFactory schedulefactory = new StdSchedulerFactory(commonPars);
            
        • 确保数据库表结构正确
          • 检查数据库中的Quartz相关表是否已经正确创建并且没有被破坏。如果表结构不正确,可能会导致调度器在访问数据库时出现问题。可以使用Quartz.NET提供的数据库脚本(针对不同的数据库类型,如MySQL)来重新创建或修复表结构。
        • 处理实例ID
          • 在多调度器实例共享数据库的情况下,quartz.scheduler.instanceId的设置需要谨慎。如果设置为"AUTO",确保它在多实例环境下能够生成唯一的实例标识,并且不会导致数据库冲突。你可能需要根据实际情况调整这个设置,例如可以使用机器名或其他唯一标识来确保每个实例在数据库中有唯一的标识。
    2. 避免并发执行同一个job
      • 关于DisallowConcurrentExecution和锁的使用
        • **DisallowConcurrentExecution**:
          • 这个特性是Quartz.NET提供的一种机制,用于确保在同一个调度器实例内,同一个Job不会并发执行。当一个Job被标记为[DisallowConcurrentExecution],并且调度器尝试在该Job正在执行时再次触发它,调度器会阻止新的执行直到当前执行完成。
          • 在你的代码中,你在OuterJobInnnerJob类上使用了这个特性,这在单个调度器实例内是有效的,可以防止同一个Job的并发执行。
        • 手动锁的使用
          • 在你的代码中,你还在Execute方法内部使用了手动锁(lock语句)。这种做法在一定程度上是多余的,因为DisallowConcurrentExecution已经提供了类似的功能。如果在单个调度器实例中,DisallowConcurrentExecution应该足以防止并发执行。然而,如果在多调度器实例共享数据库的情况下,并且这些调度器可能会同时触发同一个Job,手动锁可能会有一些问题。因为不同调度器实例中的锁对象(如_lockOuterJob_lockInnnerJob)是不同的实例,它们不能跨调度器实例进行互斥操作。
      • 多调度器实例下的并发控制
        • 全局锁机制(数据库级或分布式锁)
          • 如果要在多调度器实例共享同一个Job的情况下避免并发执行,可以考虑使用数据库级别的锁或者分布式锁机制。例如,在Job执行之前,尝试在数据库中插入一条记录来表示该Job正在执行,如果插入成功则执行Job,执行完成后删除记录;如果插入失败(表示另一个调度器实例已经在执行该Job),则等待一段时间后重试。
          • 对于分布式锁,可以使用一些成熟的分布式锁框架,如Redis的分布式锁或者Zookeeper的分布式锁机制。这些机制可以确保在多个调度器实例(甚至是跨机器的调度器实例)之间对同一个Job的执行进行互斥控制。
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

问题事件

  • 已结题 (查看结题原因) 10月15日
  • 已采纳回答 10月15日
  • 修改了问题 10月15日
  • 修改了问题 10月15日
  • 展开全部

悬赏问题

  • ¥60 Matlab联合CRUISE仿真编译dll文件报错
  • ¥15 脱敏项目合作,ner需求合作
  • ¥15 脱敏项目合作,ner需求合作
  • ¥30 Matlab打开默认名称带有/的光谱数据
  • ¥50 easyExcel模板 动态单元格合并列
  • ¥15 res.rows如何取值使用
  • ¥15 在odoo17开发环境中,怎么实现库存管理系统,或独立模块设计与AGV小车对接?开发方面应如何设计和开发?请详细解释MES或WMS在与AGV小车对接时需完成的设计和开发
  • ¥15 CSP算法实现EEG特征提取,哪一步错了?
  • ¥15 游戏盾如何溯源服务器真实ip?需要30个字。后面的字是凑数的
  • ¥15 vue3前端取消收藏的不会引用collectId