stevenjin 2024-10-15 15:37 采纳率: 98%
浏览 10
已结题

多调度器多个实例下,共享同一个Job的情况下避免并发执行

Quartz.NET
1.多调度器多个实例共享同一个Job的情况下避免并发执行
2.假如复制了一份FeedingSchedule,各自实例化了调度器
3.特性DisallowConcurrentExecution和lock只能防止同一个调试器,如果是多个调试器,最好用哪些方法呢?

private async Task FeedingSchedule()
{
    await Task.Run(() => {
        //实例化调度器           
        NameValueCollection pars = new NameValueCollection
        {
            //线程池个数20
            ["quartz.threadPool.threadCount"] = "20",
            //scheduler名字
            ["quartz.scheduler.instanceName"] = "MyAdoJobStoreScheduler",
            //类型为JobStoreXT,事务
            ["quartz.jobStore.type"] = "Quartz.Impl.AdoJobStore.JobStoreTX, Quartz",
            //数据源名称
            ["quartz.jobStore.dataSource"] = "QuartzDb",
            //使用mysql的Ado操作代理类
            ["quartz.jobStore.driverDelegateType"] = "Quartz.Impl.AdoJobStore.MySQLDelegate, Quartz",
            //数据源连接字符串
            ["quartz.dataSource.QuartzDb.connectionString"] = @"server=127.0.0.1;Database=quartzmanager;user id=root;password=123;SslMode=none;",
            //数据源的数据库
            ["quartz.dataSource.QuartzDb.provider"] = "MySql",
            //序列化类型
            ["quartz.serializer.type"] = "json",
            //自动生成scheduler实例ID,主要为了保证集群中的实例具有唯一标识
            ["quartz.scheduler.instanceId"] = "AUTO"
        };
 
        //实例化调度器
        ISchedulerFactory schedulefactory = new StdSchedulerFactory(pars);
        IScheduler scheduler = schedulefactory.GetScheduler().Result;
        //开启调度器
        scheduler.Start();
 
        List<Feeding> lstFeeding = new List<Feeding>();
        lstFeeding.Add(new Feeding()
        {
            PlanID = 1,
            Interval = 2,            
            PhaseTime = 2,
            PlanState = 3,
            Step = 1,
            Times = 3,
            Wait = 4,
            lstSubPlan = new List<SubPlan>() {
                  new SubPlan()
                  {
                    Interval = 1,
                    Times = 2,
                  },
                  new SubPlan()
                  {
                    Interval = 3,
                    Times = 4,
                  }
            }
        });
 
        lstFeeding.Add(new Feeding()
        {
            PlanID = 1,
            Interval = 2,          
            PhaseTime = 2,
            PlanState = 3,
            Step = 2,
            Times = 3,
            Wait = 2,
            lstSubPlan = new List<SubPlan>() {
            new SubPlan()
            {
                Interval = 2,
                Times = 2,
            },
           new SubPlan()
           {
             Interval = 3,
             Times = 4,
           }
         }
        });
 
        foreach (var feeding in lstFeeding)
        //for (int i = 0; i < 10; i++)
        {
            JobKey firstJobKey = JobKey.Create("FirstJob", "Pipeline");
            JobKey secondJobKey = JobKey.Create("SecondJob", "Pipeline");
            JobKey thirdJobKey = JobKey.Create("ThirdJob", "Pipeline");
 
            // Create job and trigger
            IJobDetail firstJob = JobBuilder.Create<FeedingJob.OuterJob>()
            .WithIdentity(firstJobKey)
            .UsingJobData("outerSays", "Hello World,I'm outSide!")
            //.StoreDurably(true)
            .Build();
 
            IJobDetail secondJob = JobBuilder.Create<FeedingJob.InnnerJob>()
            .WithIdentity(secondJobKey)
            .UsingJobData("innerSays", "Hello World,I'm inSide1!")
            .UsingJobData("executeNext", "false")
            .StoreDurably(true)//指示Quartz在Job成为“孤儿”时不要删除Job(当Job不再有Trigger引用它时),按顺序触发关键
            .Build();
 
            IJobDetail thirdJob = JobBuilder.Create<FeedingJob.InnnerJob>()
            .WithIdentity(thirdJobKey)
            .UsingJobData("innerSays", "Hello World,I'm inSide2!")
            .UsingJobData("executeNext", "true")
            .StoreDurably(true)
            .Build();
 
            ITrigger firstJobTrigger = TriggerBuilder.Create()
            .WithIdentity("Trigger", "Pipeline")
            .WithSimpleSchedule(x => x
            .WithMisfireHandlingInstructionFireNow()
            .WithIntervalInSeconds(4)
            .WithRepeatCount(0)).StartAt(DateTime.Now.AddSeconds(1))
            .Build();
 
            //创建计划链表
            JobChainingJobListener listener = new JobChainingJobListener("JobLink");
            listener.AddJobChainLink(firstJobKey, secondJobKey);
            listener.AddJobChainLink(secondJobKey, thirdJobKey);
 
            //将trigger监听器注册到调度器
            //scheduler.ListenerManager.AddTriggerListener(new CustomTriggerListener());
            scheduler.ListenerManager.AddJobListener(listener, GroupMatcher<JobKey>.GroupEquals("Pipeline"));
 
            // Run it all in chain
            scheduler.Start();
            scheduler.ScheduleJob(firstJob, firstJobTrigger);
            scheduler.AddJob(secondJob, false, true);
            scheduler.AddJob(thirdJob, false, true);
 
            int subPlanWait = feeding.lstSubPlan.Sum(o => o.Interval * o.Times);
            int totalWait = (feeding.Wait + feeding.PhaseTime) * feeding.Interval + subPlanWait;
            Thread.Sleep(totalWait * 1000);
        }
    });
  
}
 
 
public class FeedingJob
{
    [DisallowConcurrentExecution]
    public class OuterJob : IJob
    {
        private static object _lockOuterJob = new object();
        public Task Execute(IJobExecutionContext context)
        {
            lock (_lockOuterJob)
            {
                JobKey jobKey = context.JobDetail.Key;
                JobDataMap dataMap = context.JobDetail.JobDataMap;
                string jobSays = dataMap.GetString("outerSays");
                Console.WriteLine($"Outer job executed[{jobKey}]:{DateTime.Now},Content:{jobSays}");
                Thread.Sleep(1000);
                return Task.CompletedTask;
            }
 
        }
    }
 
    [DisallowConcurrentExecution]
    public class InnnerJob : IJob
    {
        private static object _lockInnnerJob = new object();
        public Task Execute(IJobExecutionContext context)
        {
            lock (_lockInnnerJob)
            {
                JobKey jobKey = context.JobDetail.Key;
                JobDataMap dataMap = context.JobDetail.JobDataMap;
                string jobSays = dataMap.GetString("innerSays");
                string executeNext = dataMap.GetString("executeNext");
                Console.WriteLine($"Innner job executed[{jobKey}]:{DateTime.Now},Content:{jobSays},executeNext:{executeNext}");
                Thread.Sleep(1000);
            }             
            return Task.CompletedTask;
        } 
    }
}

  • 写回答

1条回答 默认 最新

  • Fei Xu 2024-10-16 09:59
    关注

    本答案参考ChatGPT-3.5选择的最佳回答,若是满意,还请采纳,谢谢

    当多个调度器实例共享同一个 Job 时,DisallowConcurrentExecution 特性和 C# 的 lock 关键字只能防止同一个调度器内部的并发执行。对于多个调度器的情况,建议采用以下方法:

    方法 1:使用分布式锁(如 Redis、Zookeeper)
    使用分布式锁库(例如 Redis、Zookeeper 或 Consul)来实现跨调度器的互斥。Quartz.NET 内置支持 Redis 分布式锁,通过配置可以防止多个调度器实例同时执行同一个 Job。

    public class MyJob : IJob
    {
        public async Task Execute(IJobExecutionContext context)
        {
            // 使用分布式锁来控制互斥
            using (var distributedLock = await DistributedLockProvider.AcquireLockAsync("MyJobLock"))
            {
                if (distributedLock != null)
                {
                    // 执行任务
                    Console.WriteLine("Job executing...");
                }
                else
                {
                    Console.WriteLine("Failed to acquire lock, job skipped.");
                }
            }
        }
    }
    

    方法 2:使用 Quartz.NET 的数据库 AdoJobStore
    启用 Quartz.NET 的 AdoJobStore,将调度数据存储在共享的数据库中。配置所有调度器实例使用相同的数据库和相同的 SchedulerName,这样 Quartz.NET 可以在数据库中管理锁和执行控制。

    在 appsettings.json 配置中指定 AdoJobStore:

    {
      "quartz": {
        "jobStore": {
          "type": "Quartz.Impl.AdoJobStore.JobStoreTX, Quartz",
          "driverDelegateType": "Quartz.Impl.AdoJobStore.StdAdoDelegate, Quartz",
          "dataSource": "default",
          "tablePrefix": "QRTZ_",
          "useProperties": "true",
          "lockHandler": "Quartz.Impl.AdoJobStore.UpdateLockRowSemaphore, Quartz"
        },
        "dataSource": {
          "default": {
            "provider": "SqlServer",
            "connectionString": "Server=yourserver;Database=yourdb;User Id=youruser;Password=yourpassword;"
          }
        }
      }
    }
    

    方法 3:使用文件锁或其他共享资源
    可以使用文件锁或其他共享资源(如文件、数据库记录)来实现多调度器实例的互斥执行。这个方法适用于不适用分布式锁或数据库存储的情况。

    public class MyJob : IJob
    {
        private static readonly string LockFilePath = "path_to_lock_file.lock";
        public Task Execute(IJobExecutionContext context)
        {
            // 使用文件锁来控制并发
            using (FileStream fs = new FileStream(LockFilePath, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.None))
            {
                try
                {
                    // 加锁成功,执行任务
                    Console.WriteLine("Job executing...");
                    // 模拟长时间运行的任务
                    Thread.Sleep(5000);
                }
                catch (IOException)
                {
                    // 另一个调度器持有锁,跳过任务
                    Console.WriteLine("Failed to acquire lock, job skipped.");
                }
            }
    
            return Task.CompletedTask;
        }
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论 编辑记录

报告相同问题?

问题事件

  • 已结题 (查看结题原因) 10月18日
  • 已采纳回答 10月18日
  • 创建了问题 10月15日

悬赏问题

  • ¥60 Matlab联合CRUISE仿真编译dll文件报错
  • ¥15 脱敏项目合作,ner需求合作
  • ¥15 脱敏项目合作,ner需求合作
  • ¥30 Matlab打开默认名称带有/的光谱数据
  • ¥50 easyExcel模板 动态单元格合并列
  • ¥15 res.rows如何取值使用
  • ¥15 在odoo17开发环境中,怎么实现库存管理系统,或独立模块设计与AGV小车对接?开发方面应如何设计和开发?请详细解释MES或WMS在与AGV小车对接时需完成的设计和开发
  • ¥15 CSP算法实现EEG特征提取,哪一步错了?
  • ¥15 游戏盾如何溯源服务器真实ip?需要30个字。后面的字是凑数的
  • ¥15 vue3前端取消收藏的不会引用collectId