开发定时执行优惠券分发任务
1. 引入 XXL-Job Maven 依赖
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.4.1</version>
</dependency>
2. 设置 XXL-Job 配置类
2.1 配置 application.yaml
xxl-job:
access-token: default_token
admin:
addresses: http://localhost:8088/xxl-job-admin
executor:
application-name: one-coupon-merchant-admin
ip: 127.0.0.1
log-retention-days: 30
port: 19999
2.2 配置 XXLJobConfiguration
之前我们说过通过 SpringBoot Starter 可以帮助我们快速引入组件库,不需要繁琐的 Spring Bean 配置。很明显,XXL-Job 就没有适配 Starter,需要我们配置相关的 SpringBean 配置。
package com.nageoffer.onecoupon.merchant.admin.config;
import cn.hutool.core.util.StrUtil;
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.nio.file.Paths;
/**
* XXL-Job 配置类
* <p>
* 作者:马丁
* 加项目群:早加入就是优势!500人内部项目群,分享的知识总有你需要的 <a href="https://t.zsxq.com/cw7b9" />
* 开发时间:2024-07-12
*/
@Configuration
public class XXLJobConfiguration {
@Value("${xxl-job.admin.addresses:}")
private String adminAddresses;
@Value("${xxl-job.access-token:}")
private String accessToken;
@Value("${xxl-job.executor.application-name}")
private String applicationName;
@Value("${xxl-job.executor.ip}")
private String ip;
@Value("${xxl-job.executor.port}")
private int port;
@Value("${xxl-job.executor.log-path:}")
private String logPath;
@Value("${xxl-job.executor.log-retention-days}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(applicationName);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(StrUtil.isNotEmpty(accessToken) ? accessToken : null);
xxlJobSpringExecutor.setLogPath(StrUtil.isNotEmpty(logPath) ? logPath : Paths.get("").toAbsolutePath().getParent() + "/tmp");
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
}
2.3 查看 XXL-Job 执行器地址
完成上述配置后,我们尝试启动项目,然后访问 http://localhost:8088/xxl-job-admin/jobgroup 查看执行器地址是否有值。如果正确有数据即为创建成功。
3. 编写 XXL-Job 处理器
开发 XXL-Job 调用处理类 CouponTaskJobHandler 完成定时调用。
package com.nageoffer.onecoupon.merchant.admin.job;
import cn.hutool.core.collection.CollUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.nageoffer.onecoupon.merchant.admin.common.enums.CouponTaskStatusEnum;
import com.nageoffer.onecoupon.merchant.admin.dao.entity.CouponTaskDO;
import com.nageoffer.onecoupon.merchant.admin.dao.mapper.CouponTaskMapper;
import com.nageoffer.onecoupon.merchant.admin.mq.event.CouponTaskExecuteEvent;
import com.nageoffer.onecoupon.merchant.admin.mq.producer.CouponTaskActualExecuteProducer;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.List;
/**
* 优惠券推送任务扫描定时发送记录 XXL-Job 处理器
* <p>
* 作者:马丁
* 加项目群:早加入就是优势!500人内部项目群,分享的知识总有你需要的 <a href="https://t.zsxq.com/cw7b9" />
* 开发时间:2024-07-12
*/
@Component
@RequiredArgsConstructor
public class CouponTaskJobHandler extends IJobHandler {
private final CouponTaskMapper couponTaskMapper;
private final CouponTaskActualExecuteProducer couponTaskActualExecuteProducer;
private static final int MAX_LIMIT = 100;
@XxlJob(value = "couponTemplateTask")
public void execute() throws Exception {
long initId = 0;
Date now = new Date();
while (true) {
// 获取已到执行时间待执行的优惠券定时分发任务
List<CouponTaskDO> couponTaskDOList = fetchPendingTasks(initId, now);
if (CollUtil.isEmpty(couponTaskDOList)) {
break;
}
// 调用分发服务对用户发送优惠券
for (CouponTaskDO each : couponTaskDOList) {
distributeCoupon(each);
}
// 查询出来的数据如果小于 MAX_LIMIT 意味着后面将不再有数据,返回即可
if (couponTaskDOList.size() < MAX_LIMIT) {
break;
}
// 更新 initId 为当前列表中最大 ID
initId = couponTaskDOList.stream()
.mapToLong(CouponTaskDO::getId)
.max()
.orElse(initId);
}
}
private void distributeCoupon(CouponTaskDO couponTask) {
// 修改延时执行推送任务任务状态为执行中
CouponTaskDO couponTaskDO = CouponTaskDO.builder()
.id(couponTask.getId())
.status(CouponTaskStatusEnum.IN_PROGRESS.getStatus())
.build();
couponTaskMapper.updateById(couponTaskDO);
// 通过消息队列发送消息,由分发服务消费者消费该消息
CouponTaskExecuteEvent couponTaskExecuteEvent = CouponTaskExecuteEvent.builder()
.couponTaskId(couponTask.getId())
.build();
couponTaskActualExecuteProducer.sendMessage(couponTaskExecuteEvent);
}
private List<CouponTaskDO> fetchPendingTasks(long initId, Date now) {
LambdaQueryWrapper<CouponTaskDO> queryWrapper = Wrappers.lambdaQuery(CouponTaskDO.class)
.eq(CouponTaskDO::getStatus, CouponTaskStatusEnum.PENDING.getStatus())
.le(CouponTaskDO::getSendTime, now)
.gt(CouponTaskDO::getId, initId)
.last("LIMIT " + MAX_LIMIT);
return couponTaskMapper.selectList(queryWrapper);
}
}
execute() 这是 XXL-Job 调度任务的入口方法。当定时任务触发时,XXL-Job 框架会调用此方法。
首先方法会初始化变量,initId 用于标识已经处理过的任务的最大 ID,now 用于记录当前时间。接下来会执行 while 循环,逻辑如下所示:
- 调用
fetchPendingTasks(initId, now)方法获取符合条件的待执行任务列表。 - 如果
couponTaskDOList为空,意味着没有更多的任务需要处理,循环终止。 - 遍历
couponTaskDOList,对每个任务调用distributeCoupon(each)方法,将任务修改状态变更为执行中,并发送到消息队列进行异步处理。 - 检查当前获取的任务列表大小,如果小于
MAX_LIMIT,表示已经是最后一批数据,循环终止。 - 更新
initId为当前批次中最大的任务 ID,以确保下一次循环获取到新的任务。
调用逻辑时序图如下所示:
- 开始检查:首先,系统会看看有没有任务需要做。
- 任务数量检查:如果任务的数量没有超过一个设定的最大值,那么系统会继续。
- 获取任务:系统会查找那些还没有开始的任务。
- 分配任务:找到任务后,系统会决定把这些任务分配给谁去做。
- 执行任务:分配完任务后,就会开始执行这些任务。
- 更新状态:任务开始执行后,系统会更新任务的状态,表示它正在进行中。
- 发送消息:任务开始后,系统可能会发送一些消息,比如通知其他人任务已经开始了。
- 确认消息:发送完消息后,系统会确认消息已经成功发送。
- 使用消息队列:在这个过程中,系统可能会用到一个叫做
RocketMQ的工具来帮助发送和接收消息。 - 返回任务列表:最后,系统会给出一个更新后的任务列表,显示哪些任务已经完成了。
4. 创建定时执行的分发任务
首先创建一条定时发送类型的优惠券分发任务。

