定时分发优惠卷

这是一个基于XXL-Job框架的系统,它会定期扫描并执行已经安排好的优惠券发送任务,确保优惠券能按时发送给用户。

Posted by Zheng Yang on September 16, 2024

开发定时执行优惠券分发任务

1. 引入 XXL-Job Maven 依赖

<dependency>
    <groupId>com.xuxueli</groupId>
    <artifactId>xxl-job-core</artifactId>
    <version>2.4.1</version>
</dependency>

2. 设置 XXL-Job 配置类

2.1 配置 application.yaml

xxl-job:
  access-token: default_token
  admin:
    addresses: http://localhost:8088/xxl-job-admin
  executor:
    application-name: one-coupon-merchant-admin
    ip: 127.0.0.1
    log-retention-days: 30
    port: 19999

2.2 配置 XXLJobConfiguration

之前我们说过通过 SpringBoot Starter 可以帮助我们快速引入组件库,不需要繁琐的 Spring Bean 配置。很明显,XXL-Job 就没有适配 Starter,需要我们配置相关的 SpringBean 配置。

package com.nageoffer.onecoupon.merchant.admin.config;

import cn.hutool.core.util.StrUtil;
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.nio.file.Paths;

/**
 * XXL-Job 配置类
 * <p>
 * 作者:马丁
 * 加项目群:早加入就是优势!500人内部项目群,分享的知识总有你需要的 <a href="https://t.zsxq.com/cw7b9" />
 * 开发时间:2024-07-12
 */
@Configuration
public class XXLJobConfiguration {

    @Value("${xxl-job.admin.addresses:}")
    private String adminAddresses;

    @Value("${xxl-job.access-token:}")
    private String accessToken;

    @Value("${xxl-job.executor.application-name}")
    private String applicationName;

    @Value("${xxl-job.executor.ip}")
    private String ip;

    @Value("${xxl-job.executor.port}")
    private int port;

    @Value("${xxl-job.executor.log-path:}")
    private String logPath;

    @Value("${xxl-job.executor.log-retention-days}")
    private int logRetentionDays;

    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(applicationName);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(StrUtil.isNotEmpty(accessToken) ? accessToken : null);
        xxlJobSpringExecutor.setLogPath(StrUtil.isNotEmpty(logPath) ? logPath : Paths.get("").toAbsolutePath().getParent() + "/tmp");
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
        return xxlJobSpringExecutor;
    }
}

2.3 查看 XXL-Job 执行器地址

完成上述配置后,我们尝试启动项目,然后访问 http://localhost:8088/xxl-job-admin/jobgroup 查看执行器地址是否有值。如果正确有数据即为创建成功。

3. 编写 XXL-Job 处理器

开发 XXL-Job 调用处理类 CouponTaskJobHandler 完成定时调用。

package com.nageoffer.onecoupon.merchant.admin.job;

import cn.hutool.core.collection.CollUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.nageoffer.onecoupon.merchant.admin.common.enums.CouponTaskStatusEnum;
import com.nageoffer.onecoupon.merchant.admin.dao.entity.CouponTaskDO;
import com.nageoffer.onecoupon.merchant.admin.dao.mapper.CouponTaskMapper;
import com.nageoffer.onecoupon.merchant.admin.mq.event.CouponTaskExecuteEvent;
import com.nageoffer.onecoupon.merchant.admin.mq.producer.CouponTaskActualExecuteProducer;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.List;

/**
 * 优惠券推送任务扫描定时发送记录 XXL-Job 处理器
 * <p>
 * 作者:马丁
 * 加项目群:早加入就是优势!500人内部项目群,分享的知识总有你需要的 <a href="https://t.zsxq.com/cw7b9" />
 * 开发时间:2024-07-12
 */
@Component
@RequiredArgsConstructor
public class CouponTaskJobHandler extends IJobHandler {

    private final CouponTaskMapper couponTaskMapper;
    private final CouponTaskActualExecuteProducer couponTaskActualExecuteProducer;

    private static final int MAX_LIMIT = 100;

    @XxlJob(value = "couponTemplateTask")
    public void execute() throws Exception {
        long initId = 0;
        Date now = new Date();

        while (true) {
            // 获取已到执行时间待执行的优惠券定时分发任务
            List<CouponTaskDO> couponTaskDOList = fetchPendingTasks(initId, now);

            if (CollUtil.isEmpty(couponTaskDOList)) {
                break;
            }

            // 调用分发服务对用户发送优惠券
            for (CouponTaskDO each : couponTaskDOList) {
                distributeCoupon(each);
            }

            // 查询出来的数据如果小于 MAX_LIMIT 意味着后面将不再有数据,返回即可
            if (couponTaskDOList.size() < MAX_LIMIT) {
                break;
            }

            // 更新 initId 为当前列表中最大 ID
            initId = couponTaskDOList.stream()
                    .mapToLong(CouponTaskDO::getId)
                    .max()
                    .orElse(initId);
        }
    }

    private void distributeCoupon(CouponTaskDO couponTask) {
        // 修改延时执行推送任务任务状态为执行中
        CouponTaskDO couponTaskDO = CouponTaskDO.builder()
                .id(couponTask.getId())
                .status(CouponTaskStatusEnum.IN_PROGRESS.getStatus())
                .build();
        couponTaskMapper.updateById(couponTaskDO);
        // 通过消息队列发送消息,由分发服务消费者消费该消息
        CouponTaskExecuteEvent couponTaskExecuteEvent = CouponTaskExecuteEvent.builder()
                .couponTaskId(couponTask.getId())
                .build();
        couponTaskActualExecuteProducer.sendMessage(couponTaskExecuteEvent);
    }

    private List<CouponTaskDO> fetchPendingTasks(long initId, Date now) {
        LambdaQueryWrapper<CouponTaskDO> queryWrapper = Wrappers.lambdaQuery(CouponTaskDO.class)
                .eq(CouponTaskDO::getStatus, CouponTaskStatusEnum.PENDING.getStatus())
                .le(CouponTaskDO::getSendTime, now)
                .gt(CouponTaskDO::getId, initId)
                .last("LIMIT " + MAX_LIMIT);
        return couponTaskMapper.selectList(queryWrapper);
    }
}

execute() 这是 XXL-Job 调度任务的入口方法。当定时任务触发时,XXL-Job 框架会调用此方法。

首先方法会初始化变量initId 用于标识已经处理过的任务的最大 ID,now 用于记录当前时间。接下来会执行 while 循环,逻辑如下所示:

  • 调用 fetchPendingTasks(initId, now) 方法获取符合条件的待执行任务列表。
  • 如果 couponTaskDOList 为空,意味着没有更多的任务需要处理,循环终止。
  • 遍历 couponTaskDOList,对每个任务调用 distributeCoupon(each) 方法,将任务修改状态变更为执行中,并发送到消息队列进行异步处理。
  • 检查当前获取的任务列表大小,如果小于 MAX_LIMIT,表示已经是最后一批数据,循环终止。
  • 更新 initId 为当前批次中最大的任务 ID,以确保下一次循环获取到新的任务。

调用逻辑时序图如下所示:

image-20240825173306175.png

  1. 开始检查:首先,系统会看看有没有任务需要做。
  2. 任务数量检查:如果任务的数量没有超过一个设定的最大值,那么系统会继续。
  3. 获取任务:系统会查找那些还没有开始的任务。
  4. 分配任务:找到任务后,系统会决定把这些任务分配给谁去做。
  5. 执行任务:分配完任务后,就会开始执行这些任务。
  6. 更新状态:任务开始执行后,系统会更新任务的状态,表示它正在进行中。
  7. 发送消息:任务开始后,系统可能会发送一些消息,比如通知其他人任务已经开始了。
  8. 确认消息:发送完消息后,系统会确认消息已经成功发送。
  9. 使用消息队列:在这个过程中,系统可能会用到一个叫做RocketMQ的工具来帮助发送和接收消息。
  10. 返回任务列表:最后,系统会给出一个更新后的任务列表,显示哪些任务已经完成了。

4. 创建定时执行的分发任务

首先创建一条定时发送类型的优惠券分发任务。

image-20240825174509479.png