热门IT资讯网

Spring Boot集成Spring Scheduler和Quartz Scheduler

发表于:2024-11-21 作者:热门IT资讯网编辑
编辑最后更新 2024年11月21日,本文介绍了Spring Boot集成Spring Scheduler和Quartz Scheduler的基础知识,利用ShedLock解决Spring Scheduler多实例运行冲突,介绍了Quar

本文介绍了Spring Boot集成Spring Scheduler和Quartz Scheduler的基础知识,利用ShedLock解决Spring Scheduler多实例运行冲突,介绍了Quartz ScheduleBuilder、Calendar,介绍了动态创建Quartz Job的方法。

GitHub源码

Spring Scheduler

Spring Framework提供了简单、易用的Job调度框架Spring Scheduler。

示例

在Spring Boot中,只需两步即可启用Scheduler:

  1. 启用Scheduling
package org.itrunner.heroes.scheduling;import org.springframework.context.annotation.Configuration;import org.springframework.scheduling.annotation.EnableScheduling;@Configuration@EnableSchedulingpublic class ScheduleConfig {}
  1. 定义Schedule方法
package org.itrunner.heroes.scheduling;import lombok.extern.slf4j.Slf4j;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;@Component@Slf4jpublic class HelloSpring {    @Scheduled(cron = "0 */10 * * * *")    public void sayHello() {        log.info("Hello Spring Scheduler");    }}

@Scheduled支持cron、fixedDelay、fixedRate三种定义方式,方法必须没有参数,返回void类型。

ShedLock

默认情况下,Spring无法同步多个实例的调度程序,而是在每个节点上同时执行作业。我们可以使用shedlock-spring解决这一问题,确保在同一时间仅调度一次任务。

    net.javacrumbs.shedlock    shedlock-spring    4.1.0    net.javacrumbs.shedlock    shedlock-provider-jdbc-template    4.1.0

ShedLock是利用数据库锁机制实现的,当前支持DynamoDB、Hazelcast、Mongo、Redis、ZooKeeper和任何JDBC Driver。为了使用JDBC,增加下面依赖:

  net.javacrumbs.shedlock  shedlock-provider-jdbc-template  4.1.0

创建Shedlock Entity:

package org.itrunner.heroes.domain;import lombok.Data;import javax.persistence.Column;import javax.persistence.Entity;import javax.persistence.Id;import javax.persistence.Table;import java.time.LocalDateTime;@Entity@Table(name = "shedlock")@Datapublic class Shedlock {    @Id    @Column(name = "name", length = 64)    private String name;    @Column(name = "lock_until")    private LocalDateTime lockUntil;    @Column(name = "locked_at")    private LocalDateTime lockedAt;    @Column(name = "locked_by")    private String lockedBy;}

启用ShedLock:

package org.itrunner.heroes.scheduling;import net.javacrumbs.shedlock.core.LockProvider;import net.javacrumbs.shedlock.provider.jdbctemplate.JdbcTemplateLockProvider;import net.javacrumbs.shedlock.spring.annotation.EnableSchedulerLock;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.scheduling.annotation.EnableScheduling;import javax.sql.DataSource;@Configuration@EnableScheduling@EnableSchedulerLock(defaultLockAtMostFor = "PT30S")public class ScheduleConfig {    @Bean    public LockProvider lockProvider(DataSource dataSource) {        return new JdbcTemplateLockProvider(dataSource);    }}
package org.itrunner.heroes.scheduling;import lombok.extern.slf4j.Slf4j;import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;@Component@Slf4jpublic class HelloSpring {    @Scheduled(cron = "0 */10 * * * *")    @SchedulerLock(name = "helloSpringScheduler", lockAtLeastFor = "PT30S", lockAtMostFor = "PT3M")    public void sayHello() {        log.info("Hello Spring Scheduler");    }}

其中lockAtLeastFor和lockAtMostFor设置lock的最短和最长时间,上例分别为30秒、3分钟。

Quartz Scheduler

Quartz Scheduler是功能强大的任务调度框架,在Spring Scheduler不能满足需求时可以使用Quartz。

集成Quartz

  1. POM依赖

Spring Boot项目中仅需引入依赖spring-boot-starter-quartz:

    org.springframework.boot    spring-boot-starter-quartz
  1. 配置Quartz

默认,使用内存JobStore,在生产环境中应配置使用数据库:

spring:  quartz:    auto-startup: true    job-store-type: jdbc    jdbc:      initialize-schema: always    overwrite-existing-jobs: true    properties:      org.quartz.threadPool.threadCount: 5

在spring.quartz.properties中可以配置Quartz高级属性。

  1. 定义Quartz Job
package org.itrunner.heroes.scheduling;import lombok.extern.slf4j.Slf4j;import org.quartz.JobExecutionContext;import org.quartz.JobExecutionException;import org.springframework.scheduling.quartz.QuartzJobBean;@Slf4jpublic class HelloQuartz extends QuartzJobBean {    @Override    protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {        log.info("Hello Quartz Scheduler");    }}
  1. 注册Job

集成Quartz后,一个Scheduler被自动配置。SchedulerFactoryBean负责创建和配置Quartz Scheduler,作为Spring application context的一部分管理其生命周期。scheduler可以在其它组件中注入。

所有的JobDetail、Calendar和Trigger Bean自动与scheduler关联,在Spring Boot初始化时自动启动scheduler,并在销毁时将其关闭。

静态注册Job
仅需在启动时静态注册Job的情况下,只需声明Bean,无需在程序中访问scheduler实例本身,如下:

package org.itrunner.heroes.scheduling;import org.itrunner.heroes.util.DateUtils;import org.quartz.*;import org.quartz.impl.calendar.HolidayCalendar;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.time.LocalDate;@Configurationpublic class QuartzConfig {    private static final String CRON_EXPRESSION = "0 0/5 * * * ?";    private static final String GROUP = "iTRunner";    @Bean    public Trigger helloJobTrigger(JobDetail helloJob) {        CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(CRON_EXPRESSION);        return TriggerBuilder.newTrigger().forJob(helloJob).withIdentity(getTriggerKey(helloJob.getKey())).withSchedule(scheduleBuilder).modifiedByCalendar("holidayCalendar").build();    }    @Bean    public JobDetail helloJob() {        return JobBuilder.newJob(HelloQuartz.class).withIdentity(getJobKey(HelloQuartz.class)).storeDurably().build();    }    @Bean    public Calendar holidayCalendar() {        HolidayCalendar calendar = new HolidayCalendar();        LocalDate date = LocalDate.of(2020, 1, 1);        calendar.addExcludedDate(DateUtils.toDate(date));        return calendar;    }    private static  JobKey getJobKey(Class cls) {        return new JobKey(cls.getSimpleName(), GROUP);    }    private static TriggerKey getTriggerKey(JobKey jobKey) {        return new TriggerKey(jobKey.getName(), GROUP);    }}

ScheduleBuilder

上例,我们使用了CronScheduleBuilder,Quartz还支持SimpleScheduleBuilder、DailyTimeIntervalScheduleBuilder、CalendarIntervalScheduleBuilder。

  • SimpleScheduleBuilder 按指定的时间间隔执行任务,单位可以为毫秒、秒、分或小时;可以指定执行任务的总次数。
  • DailyTimeIntervalScheduleBuilder 每天按指定的时间间隔执行任务,单位可以为秒、分或小时;可以指定每天执行任务的起止时间;可以指定在星期几执行;可以指定每天执行任务的总次数
  • CalendarIntervalScheduleBuilder 按指定的时间间隔执行任务,单位可以为秒、分、时、日、周、月、年
  • CronScheduleBuilder 使用Cron表达式定义任务执行时间,也支持dailyAtHourAndMinute()、atHourAndMinuteOnGivenDaysOfWeek()、weeklyOnDayAndHourAndMinute()、monthlyOnDayAndHourAndMinute()等便利方法。

Cron-Expression
Cron 表达式由 6 个必选字段和一个可选字段组成,字段间由空格分隔。

Field NameAllowed ValuesAllowed Special Characters
0-59, - * /
0-59, - * /
0-23, - * /
1-31, - * ? / L W
0-11 或 JAN-DEC, - * /
1-7 或 SUN-SAT, - * ? / L #
年 (可选)空 或 1970-2199, - * /
* 可用在所有字段,例如,在分钟字段表示每分钟? 允许应用在日和周字段,用于指定"非特定值",相当于占位符- 用于指定范围,如在小时字段,"10-12"表示10,11,12, 指定列表值,如在周字段,"MON,WED,FRI"表示周一,周三,周五/ 指定步长,如在秒字段,"0/15"表示0,15,30,45;"5/15"表示5, 20, 35, 50。如使用*/x,相当于0/x。L 只用于日和周字段,意为"last"。在日字段,如一月的31号,非闰年二月的28号;在周字段,表示7或"SAT",若在L前还有一个值,如6L,表示这个月最后的周五。W 仅用于日字段,表示离指定日期最近的工作日(周一至周五),如15W,表示离该月15号最近的工作日,注意不能跨月。LW组合, 表示当月最后一个工作日# 仅用于周字段,表示第几,如"6#3",表示本月第3个周五

示例:

0 0/5 * * * ?    每5分钟10 0/5 * * * ?  每5分钟,10秒时执行,如10:00:10, 10:05:100 30 10-13 ? * WED,FRI  每周三和周五的10:30, 11:30, 12:30 和 13:300 0/30 8-9 5,20 * ?  每月5号和20号的8:00, 8:30, 9:00 和 9:30

Calendar

Calendar不定义实际的触发时间,而是与Trigger结合使用,用于排除特定的时间。

AnnualCalendar 排除每年中的一天或多天
CronCalendar 使用Cron表达式定义排除的时间,如"* * 0-7,18-23 ? * *",排除每天的8点至17点
DailyCalendar 排除每天指定的时间段
HolidayCalendar 排除节假日,需要指定确切的日期
MonthlyCalendar 排除每月的一天或多天
WeeklyCalendar 排除每周的一天或多天,默认排除周六、周日

动态Job

在很多情况下我们需要动态创建或启停Job,比如Job数据是动态的、Job间有依赖关系、根据条件启停Job等。

下面示例简单演示了动态创建Job、添加calendar、添加listener、启停job的方法:

package org.itrunner.heroes.scheduling;import lombok.extern.slf4j.Slf4j;import org.quartz.*;import org.quartz.impl.calendar.WeeklyCalendar;import org.quartz.impl.matchers.GroupMatcher;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import static org.itrunner.heroes.scheduling.Constants.*;@Service@Slf4jpublic class ScheduleService {    private final Scheduler scheduler;    @Autowired    public ScheduleService(Scheduler scheduler) { // 注入scheduler        this.scheduler = scheduler;        try {            addJobListener();            addCalendar();            scheduleJob();        } catch (SchedulerException e) {            log.error(e.getMessage(), e);        }    }    public void unscheduleJob(String jobName) throws SchedulerException {        scheduler.pauseJob(JobKey.jobKey(jobName, GROUP_NAME));        scheduler.unscheduleJob(TriggerKey.triggerKey(jobName, GROUP_NAME));    }     /**     * 立即触发job     */    public void triggerJob(String jobName) throws SchedulerException {        scheduler.triggerJob(JobKey.jobKey(jobName, GROUP_NAME));    }    private void addJobListener() throws SchedulerException {        UnscheduleJobListener jobListener = new UnscheduleJobListener();        GroupMatcher groupMatcher = GroupMatcher.jobGroupEquals(GROUP_NAME);        this.scheduler.getListenerManager().addJobListener(jobListener, groupMatcher);    }    private void addCalendar() throws SchedulerException {        WeeklyCalendar calendar = new WeeklyCalendar();        calendar.setDayExcluded(1, true); // 排除周日        calendar.setDayExcluded(7, false);        this.scheduler.addCalendar("weekly", calendar, false, false);    }    private void scheduleJob() throws SchedulerException {        JobDetail jobDetail = createJobDetail();        Trigger trigger = createTrigger(jobDetail);        scheduler.scheduleJob(jobDetail, trigger);    }    private JobDetail createJobDetail() {        JobDataMap jobDataMap = new JobDataMap(); // 添加Job数据        jobDataMap.put(JOB_NAME, "getHeroes");        jobDataMap.put(JOB_REST_URI, "http://localhost:8080/api/heroes");        jobDataMap.put(JOB_REQUEST_METHOD, "GET");        return JobBuilder.newJob(RestJob.class).withIdentity("getHeroes", GROUP_NAME).usingJobData(jobDataMap).storeDurably().build();    }    private Trigger createTrigger(JobDetail jobDetail) {        DailyTimeIntervalScheduleBuilder scheduleBuilder = DailyTimeIntervalScheduleBuilder.dailyTimeIntervalSchedule().withIntervalInMinutes(1).onEveryDay();        return TriggerBuilder.newTrigger().forJob(jobDetail).withIdentity("getHeroes", GROUP_NAME).withSchedule(scheduleBuilder).modifiedByCalendar("weekly").build();    }}

Job定义

下面Job调用了REST服务,调用成功后在JobExecutionContext中添加stop标志:

package org.itrunner.heroes.scheduling;import lombok.extern.slf4j.Slf4j;import org.quartz.JobDataMap;import org.quartz.JobExecutionContext;import org.quartz.JobExecutionException;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.http.HttpMethod;import org.springframework.http.ResponseEntity;import org.springframework.scheduling.quartz.QuartzJobBean;import java.util.List;import static org.itrunner.heroes.scheduling.Constants.JOB_REST_URI;import static org.itrunner.heroes.scheduling.Constants.JOB_STOP_FLAG;@Slf4jpublic class RestJob extends QuartzJobBean {    @Autowired    private RestService restService;    @Override    protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {        JobDataMap jobDataMap = jobExecutionContext.getMergedJobDataMap();        String restUri = jobDataMap.getString(JOB_REST_URI);        ResponseEntity responseEntity = restService.requestForEntity(restUri, HttpMethod.GET, List.class);        log.info(responseEntity.getBody().toString());        // set stop flag        jobExecutionContext.put(JOB_STOP_FLAG, true);    }}

JobListener
UnscheduleJobListener检查JobExecutionContext中是否有stop标志,如有则停止Job:

package org.itrunner.heroes.scheduling;import org.quartz.JobExecutionContext;import org.quartz.JobExecutionException;import org.quartz.JobListener;import org.quartz.SchedulerException;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class UnscheduleJobListener implements JobListener {    private static Logger log = LoggerFactory.getLogger(UnscheduleJobListener.class);    @Override    public String getName() {        return "HERO_UnscheduleJobListener";    }    @Override    public void jobToBeExecuted(JobExecutionContext context) {        log.info(getJobName(context) + " is about to be executed.");    }    @Override    public void jobExecutionVetoed(JobExecutionContext context) {        log.info(getJobName(context) + " Execution was vetoed.");    }    @Override    public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {        log.info(getJobName(context) + " was executed.");        Boolean stop = (Boolean) context.get(Constants.JOB_STOP_FLAG);        if (stop == null || !stop) {            return;        }        String jobName = getJobName(context);        log.info("Unschedule " + jobName);        try {            context.getScheduler().unscheduleJob(context.getTrigger().getKey());        } catch (SchedulerException e) {            log.error("Unable to unschedule " + jobName, e);        }    }    private String getJobName(JobExecutionContext context) {        return "Hero job " + context.getJobDetail().getKey().getName();    }}
0