摘要:本文使用實(shí)現(xiàn)對(duì)定時(shí)任務(wù)的增刪改查啟用停用等功能。并把定時(shí)任務(wù)持久化到數(shù)據(jù)庫(kù)以及支持集群。決定什么時(shí)候來(lái)執(zhí)行任務(wù)。定義的是任務(wù)數(shù)據(jù),而真正的執(zhí)行邏輯是在中。封裝定時(shí)任務(wù)接口添加一個(gè)暫?;謴?fù)刪除修改暫停所有恢復(fù)所有
簡(jiǎn)介
Quartz是一款功能強(qiáng)大的任務(wù)調(diào)度器,可以實(shí)現(xiàn)較為復(fù)雜的調(diào)度功能,如每月一號(hào)執(zhí)行、每天凌晨執(zhí)行、每周五執(zhí)行等等,還支持分布式調(diào)度。本文使用Springboot+Mybatis+Quartz實(shí)現(xiàn)對(duì)定時(shí)任務(wù)的增、刪、改、查、啟用、停用等功能。并把定時(shí)任務(wù)持久化到數(shù)據(jù)庫(kù)以及支持集群。對(duì)于如何創(chuàng)建Springboot項(xiàng)目和與Mybatis整合可以參考上篇文章:[spring-boot整合Mybatis如何部署在weblogic上
][1]。
Scheduler:調(diào)度器。所有的調(diào)度都是由它控制。
Trigger: 觸發(fā)器。決定什么時(shí)候來(lái)執(zhí)行任務(wù)。
JobDetail & Job: JobDetail定義的是任務(wù)數(shù)據(jù),而真正的執(zhí)行邏輯是在Job中。使用JobDetail + Job而不是Job,這是因?yàn)槿蝿?wù)是有可能并發(fā)執(zhí)行,如果Scheduler直接使用Job,就會(huì)存在對(duì)同一個(gè)Job實(shí)例并發(fā)訪問(wèn)的問(wèn)題。而JobDetail & Job 方式,sheduler每次執(zhí)行,都會(huì)根據(jù)JobDetail創(chuàng)建一個(gè)新的Job實(shí)例,這樣就可以規(guī)避并發(fā)訪問(wèn)的問(wèn)題。
如何使用Quartz添加依賴
org.quartz-scheduler quartz 2.2.3 org.quartz-scheduler quartz-jobs 2.2.3
2.創(chuàng)建配置文件
在maven項(xiàng)目的resource目錄下創(chuàng)建quartz.properties
org.quartz.scheduler.instanceName = MyScheduler org.quartz.scheduler.instanceId = AUTO org.quartz.scheduler.rmi.export = false org.quartz.scheduler.rmi.proxy = false org.quartz.scheduler.wrapJobExecutionInUserTransaction = false #線程池配置 org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool org.quartz.threadPool.threadCount = 10 org.quartz.threadPool.threadPriority = 5 org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true #持久化配置 org.quartz.jobStore.misfireThreshold = 50000 org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX #支持集群 org.quartz.jobStore.isClustered = true org.quartz.jobStore.useProperties:true org.quartz.jobStore.clusterCheckinInterval = 15000 #使用weblogic連接Oracle驅(qū)動(dòng) org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.oracle.weblogic.WebLogicOracleDelegate #org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate org.quartz.jobStore.tablePrefix = QRTZ_ org.quartz.jobStore.dataSource = qzDS #數(shù)據(jù)源連接信息,quartz默認(rèn)使用c3p0數(shù)據(jù)源可以被自定義數(shù)據(jù)源覆蓋 org.quartz.dataSource.qzDS.driver = oracle.jdbc.driver.OracleDriver org.quartz.dataSource.qzDS.URL = jdbc:oracle:thin:@localhost:1521/XE org.quartz.dataSource.qzDS.user = root org.quartz.dataSource.qzDS.password = 123456 org.quartz.dataSource.qzDS.maxConnections = 10
說(shuō)明:在使用quartz做持久化的時(shí)候需要用到quartz的11張表,可以去quartz官網(wǎng)下載對(duì)應(yīng)版本的quartz,解壓打開(kāi)docs/dbTables里面有對(duì)應(yīng)數(shù)據(jù)庫(kù)的建表語(yǔ)句。關(guān)于quartz.properties配置的詳細(xì)解釋可以查看quartz官網(wǎng)。另外新建一張表TB_APP_QUARTZ用于存放定時(shí)任務(wù)基本信息和描述等信息,定時(shí)任務(wù)的增、刪、改、執(zhí)行等功能與此表沒(méi)有任何關(guān)系。
quartz的11張表:
//TB_APP_QUARTZ表的實(shí)體類 public class AppQuartz { private Integer quartzId; //id 主鍵 private String jobName; //任務(wù)名稱 private String jobGroup; //任務(wù)分組 private String startTime; //任務(wù)開(kāi)始時(shí)間 private String cronExpression; //corn表達(dá)式 private String invokeParam;//需要傳遞的參數(shù) ...省略set get }
3.Quartz配置
/** * 創(chuàng)建job 實(shí)例工廠,解決spring注入問(wèn)題,如果使用默認(rèn)會(huì)導(dǎo)致spring的@Autowired 無(wú)法注入問(wèn)題 * @author LLQ * */ @Component public class JobFactory extends AdaptableJobFactory{ @Autowired private AutowireCapableBeanFactory capableBeanFactory; @Override protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception { //調(diào)用父類的方法 Object jobInstance = super.createJobInstance(bundle); //進(jìn)行注入 capableBeanFactory.autowireBean(jobInstance); return jobInstance; } }
@Configuration public class SchedulerConfig implements ApplicationListener{ @Autowired private JobFactory jobFactory; @Autowired @Qualifier("dataSource") private DataSource primaryDataSource; @Override public void onApplicationEvent(ContextRefreshedEvent event) { System.out.println("任務(wù)已經(jīng)啟動(dòng)..."+event.getSource()); } @Bean public SchedulerFactoryBean schedulerFactoryBean() throws IOException { //獲取配置屬性 PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean(); propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties")); //在quartz.properties中的屬性被讀取并注入后再初始化對(duì)象 propertiesFactoryBean.afterPropertiesSet(); //創(chuàng)建SchedulerFactoryBean SchedulerFactoryBean factory = new SchedulerFactoryBean(); factory.setQuartzProperties(propertiesFactoryBean.getObject()); //使用數(shù)據(jù)源,自定義數(shù)據(jù)源 factory.setDataSource(this.primaryDataSource); factory.setJobFactory(jobFactory); factory.setWaitForJobsToCompleteOnShutdown(true);//這樣當(dāng)spring關(guān)閉時(shí),會(huì)等待所有已經(jīng)啟動(dòng)的quartz job結(jié)束后spring才能完全shutdown。 factory.setOverwriteExistingJobs(false); factory.setStartupDelay(1); return factory; } /* * 通過(guò)SchedulerFactoryBean獲取Scheduler的實(shí)例 */ @Bean(name="scheduler") public Scheduler scheduler() throws IOException { return schedulerFactoryBean().getScheduler(); } @Bean public QuartzInitializerListener executorListener() { return new QuartzInitializerListener(); } }
4.創(chuàng)建定時(shí)任務(wù)服務(wù)
@Service public class JobUtil { @Autowired @Qualifier("scheduler") private Scheduler scheduler; /** * 新建一個(gè)任務(wù) * */ public String addJob(AppQuartz appQuartz) throws Exception { SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date date=df.parse(appQuartz.getStartTime()); if (!CronExpression.isValidExpression(appQuartz.getCronExpression())) { return "Illegal cron expression"; //表達(dá)式格式不正確 } JobDetail jobDetail=null; //構(gòu)建job信息 if("JobOne".equals(appQuartz.getJobGroup())) { jobDetail = JobBuilder.newJob(JobOne.class).withIdentity(appQuartz.getJobName(), appQuartz.getJobGroup()).build(); } if("JobTwo".equals(appQuartz.getJobGroup())) { jobDetail = JobBuilder.newJob(JobTwo.class).withIdentity(appQuartz.getJobName(), appQuartz.getJobGroup()).build(); } //表達(dá)式調(diào)度構(gòu)建器(即任務(wù)執(zhí)行的時(shí)間,不立即執(zhí)行) CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(appQuartz.getCronExpression()).withMisfireHandlingInstructionDoNothing(); //按新的cronExpression表達(dá)式構(gòu)建一個(gè)新的trigger CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(appQuartz.getJobName(), appQuartz.getJobGroup()).startAt(date) .withSchedule(scheduleBuilder).build(); //傳遞參數(shù) if(appQuartz.getInvokeParam()!=null && !"".equals(appQuartz.getInvokeParam())) { trigger.getJobDataMap().put("invokeParam",appQuartz.getInvokeParam()); } scheduler.scheduleJob(jobDetail, trigger); // pauseJob(appQuartz.getJobName(),appQuartz.getJobGroup()); return "success"; } /** * 獲取Job狀態(tài) * @param jobName * @param jobGroup * @return * @throws SchedulerException */ public String getJobState(String jobName, String jobGroup) throws SchedulerException { TriggerKey triggerKey = new TriggerKey(jobName, jobGroup); return scheduler.getTriggerState(triggerKey).name(); } //暫停所有任務(wù) public void pauseAllJob() throws SchedulerException { scheduler.pauseAll(); } //暫停任務(wù) public String pauseJob(String jobName, String jobGroup) throws SchedulerException { JobKey jobKey = new JobKey(jobName, jobGroup); JobDetail jobDetail = scheduler.getJobDetail(jobKey); if (jobDetail == null) { return "fail"; }else { scheduler.pauseJob(jobKey); return "success"; } } //恢復(fù)所有任務(wù) public void resumeAllJob() throws SchedulerException { scheduler.resumeAll(); } // 恢復(fù)某個(gè)任務(wù) public String resumeJob(String jobName, String jobGroup) throws SchedulerException { JobKey jobKey = new JobKey(jobName, jobGroup); JobDetail jobDetail = scheduler.getJobDetail(jobKey); if (jobDetail == null) { return "fail"; }else { scheduler.resumeJob(jobKey); return "success"; } } //刪除某個(gè)任務(wù) public String deleteJob(AppQuartz appQuartz) throws SchedulerException { JobKey jobKey = new JobKey(appQuartz.getJobName(), appQuartz.getJobGroup()); JobDetail jobDetail = scheduler.getJobDetail(jobKey); if (jobDetail == null ) { return "jobDetail is null"; }else if(!scheduler.checkExists(jobKey)) { return "jobKey is not exists"; }else { scheduler.deleteJob(jobKey); return "success"; } } //修改任務(wù) public String modifyJob(AppQuartz appQuartz) throws SchedulerException { if (!CronExpression.isValidExpression(appQuartz.getCronExpression())) { return "Illegal cron expression"; } TriggerKey triggerKey = TriggerKey.triggerKey(appQuartz.getJobName(),appQuartz.getJobGroup()); JobKey jobKey = new JobKey(appQuartz.getJobName(),appQuartz.getJobGroup()); if (scheduler.checkExists(jobKey) && scheduler.checkExists(triggerKey)) { CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey); //表達(dá)式調(diào)度構(gòu)建器,不立即執(zhí)行 CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(appQuartz.getCronExpression()).withMisfireHandlingInstructionDoNothing(); //按新的cronExpression表達(dá)式重新構(gòu)建trigger trigger = trigger.getTriggerBuilder().withIdentity(triggerKey) .withSchedule(scheduleBuilder).build(); //修改參數(shù) if(!trigger.getJobDataMap().get("invokeParam").equals(appQuartz.getInvokeParam())) { trigger.getJobDataMap().put("invokeParam",appQuartz.getInvokeParam()); } //按新的trigger重新設(shè)置job執(zhí)行 scheduler.rescheduleJob(triggerKey, trigger); return "success"; }else { return "job or trigger not exists"; } } }
@PersistJobDataAfterExecution @DisallowConcurrentExecution @Component public class JonOne implements Job{ @Override public void execute(JobExecutionContext context) throws JobExecutionException{ JobDataMap data=context.getTrigger().getJobDataMap(); String invokeParam =(String) data.get("invokeParam"); //在這里實(shí)現(xiàn)業(yè)務(wù)邏輯 } }
@PersistJobDataAfterExecution @DisallowConcurrentExecution @Component public class JobTwo implements Job{ @Override public void execute(JobExecutionContext context) throws JobExecutionException{ JobDataMap data=context.getTrigger().getJobDataMap(); String invokeParam =(String) data.get("invokeParam"); //在這里實(shí)現(xiàn)業(yè)務(wù)邏輯 } }
說(shuō)明:每個(gè)定時(shí)任務(wù)都必須有一個(gè)分組,名稱和corn表達(dá)式,corn表達(dá)式也就是定時(shí)任務(wù)的觸發(fā)時(shí)間,關(guān)于corn表達(dá)式格式以及含義可以參考一些網(wǎng)絡(luò)資源。每個(gè)定時(shí)任務(wù)都有一個(gè)入口類在這里我把類名當(dāng)成定時(shí)任務(wù)的分組名稱,例如:只要?jiǎng)?chuàng)建定時(shí)任務(wù)的分組是JobOne的都會(huì)執(zhí)行JobOne這個(gè)任務(wù)類里面的邏輯。如果定時(shí)任務(wù)需要額外的參數(shù)可以使用JobDataMap傳遞參數(shù),當(dāng)然也可以從數(shù)據(jù)庫(kù)中獲取需要的數(shù)據(jù)。@PersistJobDataAfterExecution和@DisallowConcurrentExecution注解是不讓某個(gè)定時(shí)任務(wù)并發(fā)執(zhí)行,只有等當(dāng)前任務(wù)完成下一個(gè)任務(wù)才會(huì)去執(zhí)行。
5.封裝定時(shí)任務(wù)接口
@RestController public class JobController { @Autowired private JobUtil jobUtil; @Autowired private AppQuartzService appQuartzService; //添加一個(gè)job @RequestMapping(value="/addJob",method=RequestMethod.POST) public ReturnMsg addjob(@RequestBody AppQuartz appQuartz) throws Exception { appQuartzService.insertAppQuartzSer(appQuartz); result=jobUtil.addJob(appQuartz); } //暫停job @RequestMapping(value="/pauseJob",method=RequestMethod.POST) public ReturnMsg pausejob(@RequestBody Integer[]quartzIds) throws Exception { AppQuartz appQuartz=null; if(quartzIds.length>0){ for(Integer quartzId:quartzIds) { appQuartz=appQuartzService.selectAppQuartzByIdSer(quartzId).get(0); jobUtil.pauseJob(appQuartz.getJobName(), appQuartz.getJobGroup()); } return new ReturnMsg("200","success pauseJob"); }else { return new ReturnMsg("404","fail pauseJob"); } } //恢復(fù)job @RequestMapping(value="/resumeJob",method=RequestMethod.POST) public ReturnMsg resumejob(@RequestBody Integer[]quartzIds) throws Exception { AppQuartz appQuartz=null; if(quartzIds.length>0) { for(Integer quartzId:quartzIds) { appQuartz=appQuartzService.selectAppQuartzByIdSer(quartzId).get(0); jobUtil.resumeJob(appQuartz.getJobName(), appQuartz.getJobGroup()); } return new ReturnMsg("200","success resumeJob"); }else { return new ReturnMsg("404","fail resumeJob"); } } //刪除job @RequestMapping(value="/deletJob",method=RequestMethod.POST) public ReturnMsg deletjob(@RequestBody Integer[]quartzIds) throws Exception { AppQuartz appQuartz=null; for(Integer quartzId:quartzIds) { appQuartz=appQuartzService.selectAppQuartzByIdSer(quartzId).get(0); String ret=jobUtil.deleteJob(appQuartz); if("success".equals(ret)) { appQuartzService.deleteAppQuartzByIdSer(quartzId); } } return new ReturnMsg("200","success deleteJob"); } //修改 @RequestMapping(value="/updateJob",method=RequestMethod.POST) public ReturnMsg modifyJob(@RequestBody AppQuartz appQuartz) throws Exception { String ret= jobUtil.modifyJob(appQuartz); if("success".equals(ret)) { appQuartzService.updateAppQuartzSer(appQuartz); return new ReturnMsg("200","success updateJob",ret); }else { return new ReturnMsg("404","fail updateJob",ret); } } //暫停所有 @RequestMapping(value="/pauseAll",method=RequestMethod.GET) public ReturnMsg pauseAllJob() throws Exception { jobUtil.pauseAllJob(); return new ReturnMsg("200","success pauseAll"); } //恢復(fù)所有 @RequestMapping(value="/repauseAll",method=RequestMethod.GET) public ReturnMsg repauseAllJob() throws Exception { jobUtil.resumeAllJob(); return new ReturnMsg("200","success repauseAll"); } }
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/77331.html
摘要:而我這里定時(shí)任務(wù)的觸發(fā)是要通過(guò)接口的方式來(lái)觸發(fā),所以只用實(shí)現(xiàn)以下的調(diào)度器即可。我這里簡(jiǎn)單說(shuō)下任務(wù)的調(diào)度器,具體的任務(wù)類,觸發(fā)器,任務(wù)什么時(shí)候執(zhí)行是由它決定的。遇到的坑解決方式這個(gè)是因?yàn)椴患嫒莸膯?wèn)題,所以使用是不會(huì)出現(xiàn)這個(gè)錯(cuò)誤的。 實(shí)現(xiàn)定時(shí)任務(wù)的幾種方式: 1.使用linux的crontab 優(yōu)點(diǎn): 1.使用方式很簡(jiǎn)單,只要在crontab中寫好 2.隨時(shí)可以修改,不需要...
閱讀 1176·2023-04-26 02:02
閱讀 2472·2021-09-26 10:11
閱讀 3622·2019-08-30 13:10
閱讀 3818·2019-08-29 17:12
閱讀 782·2019-08-29 14:20
閱讀 2248·2019-08-28 18:19
閱讀 2298·2019-08-26 13:52
閱讀 1023·2019-08-26 13:43