线程池实战

使用线程池异步处理任务,可以优化接口响应时间。

Posted by Zheng Yang on September 2, 2024

通过线程池和延时队列优化接口响应时间

业务背景

在上一节中,我通过 EasyExcel 解析百万数据量的 Excel 行数,避免了 JVM 内存占用过多问题。但是末了还有一个小问题,那就是接口响应太慢,百万数据量需要解析 5 秒,但是能优化还是要优化。在这节课我们通过线程池和 Redis 延迟队列的形式优化接口响应时间。

image-20240823203003150.png

线程池异步解析 Excel 行数

1. 创建线程池

创建一个公共线程池,因为咱们这个逻辑比较简单,所以直接定义即可。

@Service
@RequiredArgsConstructor
public class CouponTaskServiceImpl extends ServiceImpl<CouponTaskMapper, CouponTaskDO> implements CouponTaskService {

    private final ExecutorService executorService = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            Runtime.getRuntime().availableProcessors() << 1,
            60,
            TimeUnit.SECONDS,
            new SynchronousQueue<>(),
            new ThreadPoolExecutor.DiscardPolicy()
    );
  
    // ......
}

线程池处理逻辑如下

1718263071002-c2c40c59-1389-4199-9555-c62837fecead.png

2. 线程池参数解析

解析下我们线程池中的参数为什么这么设置:

  • corePoolSize:因为属于后管任务,大概率不会很频繁,所以直接取服务器 CPU 核数。
  • maximumPoolSize:运行任务属于 IO 密集型,最大线程数直接服务器 CPU 核数 2 倍。
  • workQueue:理论上说我们不会有阻塞的情况,因为设置的线程数不少,所以使用不存储任务的同步队列。
  • handler:如果线程数都在运行,直接将任务丢弃即可,因为我们还有延时队列兜底。

3. 使用线程池异步处理

因为线程池和延时队列都可能会用到 Excel 解析的代码,所以我们把这一块逻辑抽象出来一个方法。因为用到了两个参数,为了避免复杂,直接使用 JSONObject 即可。

private void refreshCouponTaskSendNum(JSONObject delayJsonObject) {
    // 通过 EasyExcel 监听器获取 Excel 中所有行数
    RowCountListener listener = new RowCountListener();
    EasyExcel.read(delayJsonObject.getString("fileAddress"), listener).sheet().doRead();
    int totalRows = listener.getRowCount();

    // 刷新优惠券推送记录中发送行数
    CouponTaskDO updateCouponTaskDO = CouponTaskDO.builder()
            .id(delayJsonObject.getLong("couponTaskId"))
            .sendNum(totalRows)
            .build();
    couponTaskMapper.updateById(updateCouponTaskDO);
}

使用线程池异步解析用户上传的 Excel 文件,代码如下:

@Service
@RequiredArgsConstructor
public class CouponTaskServiceImpl extends ServiceImpl<CouponTaskMapper, CouponTaskDO> implements CouponTaskService {

    private final ExecutorService executorService = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            Runtime.getRuntime().availableProcessors() << 1,
            60,
            TimeUnit.SECONDS,
            new SynchronousQueue<>(),
            new ThreadPoolExecutor.DiscardPolicy()
    );
  
    @Transactional(rollbackFor = Exception.class)
    @Override
    public void createCouponTask(CouponTaskCreateReqDTO requestParam) {
        // 验证非空参数
        // 验证参数是否正确,比如文件地址是否为我们期望的格式等
        // 验证参数依赖关系,比如选择定时发送,发送时间是否不为空等
        CouponTemplateQueryRespDTO couponTemplate = couponTemplateService.findCouponTemplateById(requestParam.getCouponTemplateId());
        if (couponTemplate == null) {
            throw new ClientException("优惠券模板不存在,请检查提交信息是否正确");
        }
        // ......

        // 构建优惠券推送任务数据库持久层实体
        CouponTaskDO couponTaskDO = BeanUtil.copyProperties(requestParam, CouponTaskDO.class);
        couponTaskDO.setBatchId(IdUtil.getSnowflakeNextId());
        couponTaskDO.setOperatorId(Long.parseLong(UserContext.getUserId()));
        couponTaskDO.setShopNumber(UserContext.getShopNumber());
        couponTaskDO.setStatus(
                Objects.equals(requestParam.getSendType(), CouponTaskSendTypeEnum.IMMEDIATE.getType())
                        ? CouponTaskStatusEnum.IN_PROGRESS.getStatus()
                        : CouponTaskStatusEnum.PENDING.getStatus()
        );

        // 保存优惠券推送任务记录到数据库
        couponTaskMapper.insert(couponTaskDO);

        // 为什么需要统计行数?因为发送后需要比对所有优惠券是否都已发放到用户账号
        // 100 万数据大概需要 4 秒才能返回前端,如果加上验证将会时间更长,所以这里将最耗时的统计操作异步化
        JSONObject delayJsonObject = JSONObject
                .of("fileAddress", requestParam.getFileAddress(), "couponTaskId", couponTaskDO.getId());
        executorService.execute(() -> refreshCouponTaskSendNum(delayJsonObject));
    }
}

不管使用线程池执行什么类型的任务,都会有一个通用的致命问题,那就是刚投递到线程池,还没有运行完,应用宕机了怎么整?

所以就需要我们接下来讲到的延时队列兜底,避免这种宕机行为。

Redis 延时队列兜底任务

1. 使用延时队列兜底

==任务投递到线程池后,紧接着我们向延时队列也投递个任务,延迟时间设置为 20 秒==。为什么延迟时间设置 20 秒,原因是我们笃定上面线程池 20 秒之内就能结束任务。

@Service
@RequiredArgsConstructor
public class CouponTaskServiceImpl extends ServiceImpl<CouponTaskMapper, CouponTaskDO> implements CouponTaskService {

    private final RedissonClient redissonClient;

    private final ExecutorService executorService = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            Runtime.getRuntime().availableProcessors() << 1,
            60,
            TimeUnit.SECONDS,
            new SynchronousQueue<>(),
            new ThreadPoolExecutor.DiscardPolicy()
    );
  
    @Transactional(rollbackFor = Exception.class)
    @Override
    public void createCouponTask(CouponTaskCreateReqDTO requestParam) {
        // 验证非空参数
        // 验证参数是否正确,比如文件地址是否为我们期望的格式等
        // 验证参数依赖关系,比如选择定时发送,发送时间是否不为空等
        CouponTemplateQueryRespDTO couponTemplate = couponTemplateService.findCouponTemplateById(requestParam.getCouponTemplateId());
        if (couponTemplate == null) {
            throw new ClientException("优惠券模板不存在,请检查提交信息是否正确");
        }
        // ......

        // 构建优惠券推送任务数据库持久层实体
        CouponTaskDO couponTaskDO = BeanUtil.copyProperties(requestParam, CouponTaskDO.class);
        couponTaskDO.setBatchId(IdUtil.getSnowflakeNextId());
        couponTaskDO.setOperatorId(Long.parseLong(UserContext.getUserId()));
        couponTaskDO.setShopNumber(UserContext.getShopNumber());
        couponTaskDO.setStatus(
                Objects.equals(requestParam.getSendType(), CouponTaskSendTypeEnum.IMMEDIATE.getType())
                        ? CouponTaskStatusEnum.IN_PROGRESS.getStatus()
                        : CouponTaskStatusEnum.PENDING.getStatus()
        );

        // 保存优惠券推送任务记录到数据库
        couponTaskMapper.insert(couponTaskDO);

        // 为什么需要统计行数?因为发送后需要比对所有优惠券是否都已发放到用户账号
        // 100 万数据大概需要 4 秒才能返回前端,如果加上验证将会时间更长,所以这里将最耗时的统计操作异步化
        JSONObject delayJsonObject = JSONObject
                .of("fileAddress", requestParam.getFileAddress(), "couponTaskId", couponTaskDO.getId());
        executorService.execute(() -> refreshCouponTaskSendNum(delayJsonObject));
      
        // 假设刚把任务提交到线程池,突然应用宕机了,我们通过延迟队列进行兜底 Refresh
        RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque("COUPON_TASK_SEND_NUM_DELAY_QUEUE");
        RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
        // 这里延迟时间设置 20 秒,原因是我们笃定上面线程池 20 秒之内就能结束任务
        delayedQueue.offer(delayJsonObject, 20, TimeUnit.SECONDS);
    }
}

2. 定义延时队列消费者

代码逻辑比较简单, 为了避免复杂直接定义一个内部类,实现 CommandLineRunner 接口在项目启动后运行后置任务。流程如下:

  1. ==当系统启动时自动运行一个线程,无限循环获取 Redis 阻塞队列中已经到达时间的任务;==
  2. ==然后判断数据库中的分发任务 Excel 总数是否为空,为空读取 Excel 记录( refreshCouponTaskSendNum方法),然后设置;如果不为空证明线程池已经运行完了。==
@Service
@RequiredArgsConstructor
public class CouponTaskServiceImpl extends ServiceImpl<CouponTaskMapper, CouponTaskDO> implements CouponTaskService {

     /**
     * 优惠券延迟刷新发送条数兜底消费者|这是兜底策略,一般来说不会执行这段逻辑
     * 如果延迟消息没有持久化成功,或者 Redis 挂了怎么办?后续可以人工处理
     * <p>
     */
    @Service
    @RequiredArgsConstructor
    class RefreshCouponTaskDelayQueueRunner implements CommandLineRunner {

        private final CouponTaskMapper couponTaskMapper;
        private final RedissonClient redissonClient;

        @Override
        public void run(String... args) throws Exception {
            Executors.newSingleThreadExecutor(
                            runnable -> {
                                Thread thread = new Thread(runnable);
                                thread.setName("delay_coupon-task_send-num_consumer");
                                thread.setDaemon(Boolean.TRUE);
                                return thread;
                            })
                    .execute(() -> {
                        RBlockingDeque<JSONObject> blockingDeque = redissonClient.getBlockingDeque("COUPON_TASK_SEND_NUM_DELAY_QUEUE");
                        for (; ; ) {
                            try {
                                // 获取延迟队列已到达时间任务
                                JSONObject delayJsonObject = blockingDeque.take();
                                if (delayJsonObject != null) {
                                    // 获取优惠券推送记录,查看发送条数是否已经有值,有的话代表上面线程池已经处理完成,无需再处理
                                    CouponTaskDO couponTaskDO = couponTaskMapper.selectById(delayJsonObject.getLong("couponTaskId"));
                                    if (couponTaskDO.getSendNum() == null) {
                                        refreshCouponTaskSendNum(delayJsonObject);
                                    }
                                }
                            } catch (Throwable ignored) {
                            }
                        }
                    });
        }
    }
} 

3. 能不能直接用 Redis 消息队列?

大家一定要记得,Redis 是个缓存,由于它的持久化机制和主从同步机制,意味着可能会丢数据。为此,我们只是把它作为一个兜底方案,而不是全部方案。

  • ==持久化丢数据是因为即使 AOF 持久化也是异步的,最好的情况也会丢一条数据。==
  • ==主从同步机制,如果主节点在某些写操作尚未同步到从节点时发生故障,这些未同步的写操作将会丢失。==

添加 Spring 事务

细心的同学可能看到我们在方法上加了个 @Transactional(rollbackFor = Exception.class) 注解,==这是因为如果不加注解的话,我们执行数据库插入操作成功了,但是线程池和延时队列都没有执行。这种情况下,发送一条数据就永远不会被刷新。==

我们就将数据库的添加和这些行为绑定一起,也就不会出现这种问题了。

@Service
@RequiredArgsConstructor
public class CouponTaskServiceImpl extends ServiceImpl<CouponTaskMapper, CouponTaskDO> implements CouponTaskService {

    @Transactional(rollbackFor = Exception.class)
    @Override
    public void createCouponTask(CouponTaskCreateReqDTO requestParam) {
        // ......
    }
}

文末总结

通过本篇文章让大家知道线程池的不确定性,以及如何解决。简单一句话:先执行再延迟确认。

完结,撒花 🎉