通过线程池和延时队列优化接口响应时间
业务背景
在上一节中,我通过 EasyExcel 解析百万数据量的 Excel 行数,避免了 JVM 内存占用过多问题。但是末了还有一个小问题,那就是接口响应太慢,百万数据量需要解析 5 秒,但是能优化还是要优化。在这节课我们通过线程池和 Redis 延迟队列的形式优化接口响应时间。
线程池异步解析 Excel 行数
1. 创建线程池
创建一个公共线程池,因为咱们这个逻辑比较简单,所以直接定义即可。
@Service
@RequiredArgsConstructor
public class CouponTaskServiceImpl extends ServiceImpl<CouponTaskMapper, CouponTaskDO> implements CouponTaskService {
private final ExecutorService executorService = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() << 1,
60,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
new ThreadPoolExecutor.DiscardPolicy()
);
// ......
}
线程池处理逻辑如下
2. 线程池参数解析
解析下我们线程池中的参数为什么这么设置:
- corePoolSize:因为属于后管任务,大概率不会很频繁,所以直接取服务器 CPU 核数。
- maximumPoolSize:运行任务属于 IO 密集型,最大线程数直接服务器 CPU 核数 2 倍。
- workQueue:理论上说我们不会有阻塞的情况,因为设置的线程数不少,所以使用不存储任务的同步队列。
- handler:如果线程数都在运行,直接将任务丢弃即可,因为我们还有延时队列兜底。
3. 使用线程池异步处理
因为线程池和延时队列都可能会用到 Excel 解析的代码,所以我们把这一块逻辑抽象出来一个方法。因为用到了两个参数,为了避免复杂,直接使用 JSONObject 即可。
private void refreshCouponTaskSendNum(JSONObject delayJsonObject) {
// 通过 EasyExcel 监听器获取 Excel 中所有行数
RowCountListener listener = new RowCountListener();
EasyExcel.read(delayJsonObject.getString("fileAddress"), listener).sheet().doRead();
int totalRows = listener.getRowCount();
// 刷新优惠券推送记录中发送行数
CouponTaskDO updateCouponTaskDO = CouponTaskDO.builder()
.id(delayJsonObject.getLong("couponTaskId"))
.sendNum(totalRows)
.build();
couponTaskMapper.updateById(updateCouponTaskDO);
}
使用线程池异步解析用户上传的 Excel 文件,代码如下:
@Service
@RequiredArgsConstructor
public class CouponTaskServiceImpl extends ServiceImpl<CouponTaskMapper, CouponTaskDO> implements CouponTaskService {
private final ExecutorService executorService = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() << 1,
60,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
new ThreadPoolExecutor.DiscardPolicy()
);
@Transactional(rollbackFor = Exception.class)
@Override
public void createCouponTask(CouponTaskCreateReqDTO requestParam) {
// 验证非空参数
// 验证参数是否正确,比如文件地址是否为我们期望的格式等
// 验证参数依赖关系,比如选择定时发送,发送时间是否不为空等
CouponTemplateQueryRespDTO couponTemplate = couponTemplateService.findCouponTemplateById(requestParam.getCouponTemplateId());
if (couponTemplate == null) {
throw new ClientException("优惠券模板不存在,请检查提交信息是否正确");
}
// ......
// 构建优惠券推送任务数据库持久层实体
CouponTaskDO couponTaskDO = BeanUtil.copyProperties(requestParam, CouponTaskDO.class);
couponTaskDO.setBatchId(IdUtil.getSnowflakeNextId());
couponTaskDO.setOperatorId(Long.parseLong(UserContext.getUserId()));
couponTaskDO.setShopNumber(UserContext.getShopNumber());
couponTaskDO.setStatus(
Objects.equals(requestParam.getSendType(), CouponTaskSendTypeEnum.IMMEDIATE.getType())
? CouponTaskStatusEnum.IN_PROGRESS.getStatus()
: CouponTaskStatusEnum.PENDING.getStatus()
);
// 保存优惠券推送任务记录到数据库
couponTaskMapper.insert(couponTaskDO);
// 为什么需要统计行数?因为发送后需要比对所有优惠券是否都已发放到用户账号
// 100 万数据大概需要 4 秒才能返回前端,如果加上验证将会时间更长,所以这里将最耗时的统计操作异步化
JSONObject delayJsonObject = JSONObject
.of("fileAddress", requestParam.getFileAddress(), "couponTaskId", couponTaskDO.getId());
executorService.execute(() -> refreshCouponTaskSendNum(delayJsonObject));
}
}
不管使用线程池执行什么类型的任务,都会有一个通用的致命问题,那就是刚投递到线程池,还没有运行完,应用宕机了怎么整?
所以就需要我们接下来讲到的延时队列兜底,避免这种宕机行为。
Redis 延时队列兜底任务
1. 使用延时队列兜底
==任务投递到线程池后,紧接着我们向延时队列也投递个任务,延迟时间设置为 20 秒==。为什么延迟时间设置 20 秒,原因是我们笃定上面线程池 20 秒之内就能结束任务。
@Service
@RequiredArgsConstructor
public class CouponTaskServiceImpl extends ServiceImpl<CouponTaskMapper, CouponTaskDO> implements CouponTaskService {
private final RedissonClient redissonClient;
private final ExecutorService executorService = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() << 1,
60,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
new ThreadPoolExecutor.DiscardPolicy()
);
@Transactional(rollbackFor = Exception.class)
@Override
public void createCouponTask(CouponTaskCreateReqDTO requestParam) {
// 验证非空参数
// 验证参数是否正确,比如文件地址是否为我们期望的格式等
// 验证参数依赖关系,比如选择定时发送,发送时间是否不为空等
CouponTemplateQueryRespDTO couponTemplate = couponTemplateService.findCouponTemplateById(requestParam.getCouponTemplateId());
if (couponTemplate == null) {
throw new ClientException("优惠券模板不存在,请检查提交信息是否正确");
}
// ......
// 构建优惠券推送任务数据库持久层实体
CouponTaskDO couponTaskDO = BeanUtil.copyProperties(requestParam, CouponTaskDO.class);
couponTaskDO.setBatchId(IdUtil.getSnowflakeNextId());
couponTaskDO.setOperatorId(Long.parseLong(UserContext.getUserId()));
couponTaskDO.setShopNumber(UserContext.getShopNumber());
couponTaskDO.setStatus(
Objects.equals(requestParam.getSendType(), CouponTaskSendTypeEnum.IMMEDIATE.getType())
? CouponTaskStatusEnum.IN_PROGRESS.getStatus()
: CouponTaskStatusEnum.PENDING.getStatus()
);
// 保存优惠券推送任务记录到数据库
couponTaskMapper.insert(couponTaskDO);
// 为什么需要统计行数?因为发送后需要比对所有优惠券是否都已发放到用户账号
// 100 万数据大概需要 4 秒才能返回前端,如果加上验证将会时间更长,所以这里将最耗时的统计操作异步化
JSONObject delayJsonObject = JSONObject
.of("fileAddress", requestParam.getFileAddress(), "couponTaskId", couponTaskDO.getId());
executorService.execute(() -> refreshCouponTaskSendNum(delayJsonObject));
// 假设刚把任务提交到线程池,突然应用宕机了,我们通过延迟队列进行兜底 Refresh
RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque("COUPON_TASK_SEND_NUM_DELAY_QUEUE");
RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
// 这里延迟时间设置 20 秒,原因是我们笃定上面线程池 20 秒之内就能结束任务
delayedQueue.offer(delayJsonObject, 20, TimeUnit.SECONDS);
}
}
2. 定义延时队列消费者
代码逻辑比较简单, 为了避免复杂直接定义一个内部类,实现 CommandLineRunner 接口在项目启动后运行后置任务。流程如下:
- ==当系统启动时自动运行一个线程,无限循环获取 Redis 阻塞队列中已经到达时间的任务;==
- ==然后判断数据库中的分发任务 Excel 总数是否为空,为空读取 Excel 记录( refreshCouponTaskSendNum方法),然后设置;如果不为空证明线程池已经运行完了。==
@Service
@RequiredArgsConstructor
public class CouponTaskServiceImpl extends ServiceImpl<CouponTaskMapper, CouponTaskDO> implements CouponTaskService {
/**
* 优惠券延迟刷新发送条数兜底消费者|这是兜底策略,一般来说不会执行这段逻辑
* 如果延迟消息没有持久化成功,或者 Redis 挂了怎么办?后续可以人工处理
* <p>
*/
@Service
@RequiredArgsConstructor
class RefreshCouponTaskDelayQueueRunner implements CommandLineRunner {
private final CouponTaskMapper couponTaskMapper;
private final RedissonClient redissonClient;
@Override
public void run(String... args) throws Exception {
Executors.newSingleThreadExecutor(
runnable -> {
Thread thread = new Thread(runnable);
thread.setName("delay_coupon-task_send-num_consumer");
thread.setDaemon(Boolean.TRUE);
return thread;
})
.execute(() -> {
RBlockingDeque<JSONObject> blockingDeque = redissonClient.getBlockingDeque("COUPON_TASK_SEND_NUM_DELAY_QUEUE");
for (; ; ) {
try {
// 获取延迟队列已到达时间任务
JSONObject delayJsonObject = blockingDeque.take();
if (delayJsonObject != null) {
// 获取优惠券推送记录,查看发送条数是否已经有值,有的话代表上面线程池已经处理完成,无需再处理
CouponTaskDO couponTaskDO = couponTaskMapper.selectById(delayJsonObject.getLong("couponTaskId"));
if (couponTaskDO.getSendNum() == null) {
refreshCouponTaskSendNum(delayJsonObject);
}
}
} catch (Throwable ignored) {
}
}
});
}
}
}
3. 能不能直接用 Redis 消息队列?
大家一定要记得,Redis 是个缓存,由于它的持久化机制和主从同步机制,意味着可能会丢数据。为此,我们只是把它作为一个兜底方案,而不是全部方案。
- ==持久化丢数据是因为即使 AOF 持久化也是异步的,最好的情况也会丢一条数据。==
- ==主从同步机制,如果主节点在某些写操作尚未同步到从节点时发生故障,这些未同步的写操作将会丢失。==
添加 Spring 事务
细心的同学可能看到我们在方法上加了个 @Transactional(rollbackFor = Exception.class) 注解,==这是因为如果不加注解的话,我们执行数据库插入操作成功了,但是线程池和延时队列都没有执行。这种情况下,发送一条数据就永远不会被刷新。==
我们就将数据库的添加和这些行为绑定一起,也就不会出现这种问题了。
@Service
@RequiredArgsConstructor
public class CouponTaskServiceImpl extends ServiceImpl<CouponTaskMapper, CouponTaskDO> implements CouponTaskService {
@Transactional(rollbackFor = Exception.class)
@Override
public void createCouponTask(CouponTaskCreateReqDTO requestParam) {
// ......
}
}
文末总结
通过本篇文章让大家知道线程池的不确定性,以及如何解决。简单一句话:先执行再延迟确认。
完结,撒花 🎉

