Quartz早集群模式下,当其中的某个实例停掉后,Job并不会转移到其他实例上面去,这到底是什么原因呢
配置如下:
instanceId: AUTO
isClustered: true
clusterCheckinInterval: 10000
Quartz早集群模式下,当其中的某个实例停掉后,Job并不会转移到其他实例上面去,这到底是什么原因呢
配置如下:
instanceId: AUTO
isClustered: true
clusterCheckinInterval: 10000
import org.quartz.CronTrigger;
import org.quartz.JobDetail;
import org.quartz.spi.JobFactory;
import org.quartz.spi.TriggerFiredBundle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.beans.factory.config.PropertiesFactoryBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.scheduling.quartz.SpringBeanJobFactory;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.io.IOException;
import java.util.Properties;
/**
* @author zhanglifeng
* @Description Quartz调度配置类
*/
@Configuration
public class QuartzSchedulerConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(QuartzSchedulerConfig.class);
private static final String QUARTZ_YML_NAME = "/quartz.yml";
@Resource
private DataSource dataSource;
@Bean
public JobFactory jobFactory(ApplicationContext applicationContext) {
AutowiringSpringBeanJobFactory jobFactory = new AutowiringSpringBeanJobFactory();
jobFactory.setApplicationContext(applicationContext);
return jobFactory;
}
@Bean
public SchedulerFactoryBean schedulerFactoryBean(JobFactory jobFactory, CronTrigger[] cronTrigger, JobDetail[] jobDetails) {
SchedulerFactoryBean factoryBean = new SchedulerFactoryBean();
try {
factoryBean.setQuartzProperties(initQuartzYml());
factoryBean.setDataSource(dataSource);
factoryBean.setJobFactory(jobFactory);
factoryBean.setTriggers(cronTrigger);
factoryBean.setJobDetails(jobDetails);
factoryBean.setOverwriteExistingJobs(true);
} catch (Exception e) {
LOGGER.error("加载{}配置文件失败.", QUARTZ_YML_NAME, e);
throw new RuntimeException("加载配置文件失败", e);
}
return factoryBean;
}
@Bean
public Properties initQuartzYml() throws IOException {
PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
propertiesFactoryBean.setLocation(new ClassPathResource(QUARTZ_YML_NAME));
propertiesFactoryBean.afterPropertiesSet();
return propertiesFactoryBean.getObject();
}
class AutowiringSpringBeanJobFactory extends SpringBeanJobFactory implements ApplicationContextAware {
private transient AutowireCapableBeanFactory beanFactory;
@Override
public void setApplicationContext(final ApplicationContext context) {
beanFactory = context.getAutowireCapableBeanFactory();
}
@Override
protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception {
final Object job = super.createJobInstance(bundle);
beanFactory.autowireBean(job);
return job;
}
}
}
到这里,quartz的基本配置完成了。还需要代码中具体的job,task来实现定时任务。贴出项目结构来具体看下
5.定时任务触发器 TradeOrderCompleteJob.java
import org.quartz.JobDetail;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.quartz.CronTriggerFactoryBean;
import org.springframework.scheduling.quartz.JobDetailFactoryBean;
import org.springframework.stereotype.Component;
import tp.task.center.zlf.task.TradeOrderCompleteTask;
/**
* [定时任务触发器配置]
* @author zhanglifeng
* @date 20196-11-13
*/
@Component
public class TradeOrderCompleteJob {
@Bean(name = "jobOrderCompleteTrigger")
public CronTriggerFactoryBean jobOrderCompleteTrigger(@Qualifier("jobOrderCompleteDetail") JobDetail jobDetail) {
CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();
cronTriggerFactoryBean.setJobDetail(jobDetail);
cronTriggerFactoryBean.setCronExpression("0 0/3 * * * ?");
return cronTriggerFactoryBean;
}
@Bean(name = "jobOrderCompleteDetail")
public JobDetailFactoryBean jobOrderCompleteDetail(){
JobDetailFactoryBean jobDetailFactoryBean = new JobDetailFactoryBean();
jobDetailFactoryBean.setJobClass(TradeOrderCompleteTask.class);
jobDetailFactoryBean.setDurability(true);
jobDetailFactoryBean.setRequestsRecovery(true);
return jobDetailFactoryBean;
}
}
关系图:
6.任务调度中心
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.springframework.stereotype.Component;
/**
* @author zhanglifeng
*/
@Component
public class TradeOrderCompleteTask extends QuartzJobBean {
private static final Logger LOGGER = LoggerFactory.getLogger(TradeOrderCompleteTask.class);
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) {
try {
LOGGER.info("------------------任务开始--------");
/**
* 具体业务处理逻辑调用地方
*/
LOGGER.info("------------------任务结束--------");
} catch (Exception e) {
LOGGER.error("----------出错:" + e.getMessage(), e);
}
}
}
然后启动项目。即可看到定时任务日志的打印。
根据提供的配置和参考资料,可以看出Quartz集群模式是通过将任务的元数据和状态存储在共享数据库中来实现的。当一个Quartz节点宕机时,集群中的其他节点不能够自动感知这种变化。这就需要一个Quartz Manager来监测任务实例节点的状态,并适当地重新分配任务。而在Quartz的早期集群模式中,并没有提供一个可用的Quartz Manager来监测节点状态,所以Job实例不会自动转移到其他实例上。 解决这个问题的方法是在集群中配置一个Quartz Manager来负责任务调度,并且开启节点的监测功能。以下是具体的步骤:
1.配置Quartz Manager
Quartz Manager是Quartz集群模式的一个关键组件,它主要负责监测节点状态和分配任务。Quartz Manager可以通过各种方式实现,例如使用JMX、使用RMI等。这里以使用JMX为例,展示Quartz Manager的配置步骤。
首先,在application.yml中增加以下配置:
spring: quartz: properties: org: quartz: scheduler: jmx: #启用JMX监控 enable: true
然后,根据需要在启动参数中增加以下JVM参数:
-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=1099 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
这些参数可以开启JMX,并使其监听1099端口,并关闭认证和SSL加密。
2.开启节点的监测功能
要让Quartz Manager能够正确地监测节点状态,集群中的每个节点都需要开启节点监测功能。在application.yml中增加以下配置:
spring: quartz: properties: org: quartz: scheduler: instanceId: AUTO instanceName: MyClusteredScheduler #设置节点监测间隔 jobStore.clusterCheckinInterval: 15000 #启用节点监测 jobStore.isClustered: true threadPool: class: org.quartz.simpl.SimpleThreadPool threadCount: 10 #配置Quartz Manager quartz.scheduler.jmx.proxy: false quartz.scheduler.rmi.export: true quartz.scheduler.rmi.createRegistry: true quartz.scheduler.rmi.registryHost: localhost quartz.scheduler.rmi.registryPort: 1099 quartz.scheduler.rmi.serverPort: 0
这里需要注意几点:
instanceId必须设置为AUTO来允许Quartz自动生成节点Id。不同节点的Id必须不同,否则会导致集群状态混乱。
需要设置jobStore.isClustered为true来开启节点监测功能。
需要设置jobStore.clusterCheckinInterval来指定节点监测的间隔时间。默认值是15000毫秒,表示每隔15秒就会检查一次节点状态。
需要配置Quartz Manager以允许它与各个节点通信。
3.使用PersistJobDataAfterExecution和DisallowConcurrentExecution注解
为了避免任务重复执行,我们需要在任务类上加上PersistJobDataAfterExecution和DisallowConcurrentExecution注解。这样,当一个任务实例在执行时,其他实例就不能并发地执行同一个任务。
@PersistJobDataAfterExecution @DisallowConcurrentExecution @Component("pickNewsJob") public class PickNewsJob implements Job {
@Override
public void execute(JobExecutionContext context) {
//具体的业务逻辑
System.out.println("业务逻辑");
System.out.println(LocalDateTime.now());
}
}
4.使用JobStoreTX
最后,在application.yml中我们需要使用JobStoreTX来实现Quartz数据库持久化。JobStoreTX提供了事务支持,并且可以减少数据库锁定的次数,从而提高性能。
spring: quartz: jdbc: job-store-type: jdbc properties: org: quartz: scheduler: jobStore: class: org.quartz.impl.jdbcjobstore.JobStoreTX driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate tablePrefix: QRTZ_ isClustered: true clusterCheckinInterval: 15000 useProperties: true threadPool: class: org.quartz.simpl.SimpleThreadPool threadCount: 10 threadPriority: 5
总结
Quartz的早期集群模式并不提供节点状态监测功能,因此停止运行的实例上的任务不会自动转移到其他实例上。要解决这个问题,我们需要在集群中配置一个Quartz Manager来监测节点状态,并开启节点监测功能。此外,我们还可以使用PersistJobDataAfterExecution和DisallowConcurrentExecution注解来避免任务重复执行,并使用JobStoreTX来实现Quartz数据库持久化。