消息队列

什么场景使用消息队列,为什么用它,讲讲它的优势,以及在项目中使用RocketMQ延时消息修改优惠券结束状态。

Posted by Zheng Yang on August 22, 2024

RocketMQ5.x延时消息修改优惠券结束状态

业务背景

在之前的章节里,我们完成了优惠券模板的创建、结束等功能,但是还遗留了一个小功能,那就是如果说优惠券模板有效期结束了之后,我们的模板状态依然是生效中。基于这个功能点考虑,我们需要在定时任务和定时消息中进行选择,以此来满足精准关闭优惠券模板功能。

p433720.png

使用 RocketMQ 定时消息有如下优势:

  • 定时精度高、开发门槛低:可以轻松实现任意精度事件触发,无需业务去重。
  • 高性能、可扩展:传统的定时实现方案较为复杂,需要进行数据库扫描,容易遇到性能瓶颈的问题,RocketMQ 可以基于定时消息特性完成事件驱动,实现百万级消息 TPS 能力。

image-20240821181935677.png

消息队列介绍

1. 什么是消息队列?

消息队列是一种用于异步通信的机制,它的主要作用是将消息从发送者传递到接收者,同时解耦这两个组件的直接依赖。

4d5bb40416a75b115476647e6613a3d2.png ==一个ConsumerGroup中的Consumer实例平均分摊消费生产者发送的消息==。例如某个Topic有九条消息,其中一个Consumer Group有三个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中的3条消息。

1.1 消息队列顺序消费

image-20240825102302842.png rocketmq、kafka等mq均有分区的概念。

一个 Topic 可以包含多个队列(Queue),这些队列分布在不同的 Broker 节点上。这样做的目的和 Kafka 类似,主要是为了实现以下几个目标:

  1. 扩展性:通过将消息分散到多个队列,可以提高系统的处理能力,因为每个队列可以独立地处理消息。

  2. 负载均衡:消息可以均匀地分配到不同的队列,避免单个队列或 Broker 过载。

  3. 容错性:分布式的队列可以提高系统的稳定性,即使某个队列或 Broker 出现故障,其他队列仍然可以继续工作。

f58246dc97284c6cc8f9c9dd423b7802.png ==生产者发送的时候可以指定一个key选择同一个Queue,则这一批消息的消费将是顺序消息(并由同一个consumer完成消息)==

例如:电商的订单创建,以订单 ID 作为 Sharding Key,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。

既保证业务的顺序,同时又能保证业务的高性能。

1.2 消息队列不丢失消息

生产者端重试

向broker发送消息时,如果由于网络抖动等原因导致消息发送失败,可以设置失败重试次数让消息重发

消费者端重试

由于网络等原因导致消息没法从broker发送到消费者端,此时MQ会重试直到发送成功(集群模式)

==如果消费者端在执行后续消息处理后因为网络原因,队列未收到ack,为了保证消息是肯定被至少消费成功一次,RocketMQ 会进行重试,把这批消息在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个 ConsumerGroup。==而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到 DLQ 死信队列。应用可以监控死信队列来做人工干预

9ce36ffa1e3564247e70d44e57b3e700.png

1.3 如何解决重复消费

生产者成功发送消息给队列时,队列会返回ack给生产者,但是当网络出现问题,队列成功收到消息,但是ack出现问题。生产者一般会重发消息,所以会导致队列中存在多条重复消息。

==使用分布式锁保证接口的幂等性==

==或者通过去重表保证消息队列的幂等性==

使用自定义注解+AOP解决接口防重复提交与消息队列重复消费的幂等问题。 (yuque.com)

2. 什么是 RocketMQ?

RocketMQ 是一个开源的分布式消息中间件,由阿里巴巴开发并贡献给 Apache 软件基金会。它主要用于==高吞吐量、低延迟==的消息传递需求。

  • 高吞吐量和低延迟:RocketMQ 设计用于处理大量的消息,并提供低延迟的消息传递服务,适合需要高性能的场景。

  • 消息可靠性:RocketMQ 支持消息持久化和多副本机制,确保在系统故障时不会丢失消息。这使得消息的可靠性和一致性得到了保障。

3. 消息队列都有哪些作用?

3.1 异步解耦

最常见的一个场景是用户注册后,需要发送注册邮件和短信通知,以告知用户注册成功。传统的做法有以下两种:

串行方式:

imgimg

数据流动如下所述:

  1. 注册页面填写账号和密码并提交注册信息,这些注册信息首先会被写入注册系统。
  2. 注册信息写入注册系统成功后,再发送请求至邮件通知系统。邮件通知系统收到请求后向用户发送邮件通知。
  3. 邮件通知系统接收注册系统请求后再向下游的短信通知系统发送请求。短信通知系统收到请求后向用户发送短信通知。

以上三个任务全部完成后,才返回注册结果到客户端,用户才能使用账号登录。假设每个任务耗时分别为 50ms,则用户需要在注册页面等待总共 150ms 才能登录。

并行形式:

==对于用户来说,注册功能实际只需要注册系统存储用户的账户信息后,该用户便可以登录,后续的注册短信和邮件不是即时需要关注的步骤。==

对于注册系统而言,发送注册成功的短信和邮件通知并不一定要绑定在一起同步完成,所以实际当数据写入注册系统后,注册系统就可以把其他的操作放入对应的 RocketMQ 中然后马上返回用户结果,由 RocketMQ 异步地进行这些操作。

p429333.png 数据流动如下所述:

  1. 用户在注册页面填写账号和密码并提交注册信息,这些注册信息首先会被写入注册系统。
  2. 注册信息写入注册系统成功后,再发送消息至 RocketMQ。 RocketMQ 会马上返回响应给注册系统,注册完成。用户可立即登录。
  3. 下游的邮件和短信通知系统订阅 RocketMQ 的此类注册请求消息,即可向用户发送邮件和短信通知,完成所有的注册流程。

用户只需在注册页面等待注册数据写入注册系统和 RocketMQ 的时间,即等待 55ms 即可登录。

3.2 削峰填谷

流量削峰也是 RocketMQ 的常用场景,一般在秒杀或团队抢购活动中使用广泛。

在秒杀或团队抢购活动中,由于用户请求量较大,导致流量暴增,秒杀的应用在处理如此大量的访问流量后,下游的通知系统无法承载海量的调用量,甚至会导致系统崩溃等问题而发生漏通知的情况。为解决这些问题,可在应用和下游通知系统之间加入 RocketMQ。

p429350.png 秒杀处理流程如下所述:

  1. 用户发起海量秒杀请求到秒杀业务处理系统。
  2. 秒杀处理系统按照秒杀处理逻辑将满足秒杀条件的请求发送 RocketMQ。
  3. 下游的通知系统订阅 RocketMQ 的秒杀相关消息,再将秒杀成功的消息发送到相应用户。
  4. 用户收到秒杀成功的通知。

3.3 分布式定时/延时调度

RocketMQ 提供精确度到秒级的分布式定时消息能力(5.0架构后),可广泛应用于订单超时中心处理、分布式延时调度系统等场景。

使用 RocketMQ 定时消息有如下优势:

  • 定时精度高、开发门槛低:消息定时时间精确到秒级,可以轻松实现任意精度事件触发,无需业务去重。
  • 高性能、可扩展:传统的定时实现方案较为复杂,需要进行数据库扫描,容易遇到性能瓶颈的问题,RocketMQ 可以基于定时消息特性完成事件驱动,实现百万级消息 TPS 能力。

项目实战

1. 添加 Maven 依赖

<!-- 消息队列相关依赖 -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.3.0</version>
</dependency>

2. application.yaml 添加配置

application.yaml 中添加 RocketMQ 相关配置。

rocketmq:
  name-server: 127.0.0.1:9876 # NameServer 地址,如果 VM 参数里设置了星球云服务器 RocketMQ 地址,运行时会替换
  producer:
    # 通用生产者组,其中的 ${unique-name:} 是为了避免大家公用一个 Topic,造成你发的消息被其他同学消费,其他同学发的消息被你消费等问题
    group: oneCoupon_merchant-admin${unique-name:}-service_common-message-execute_pg
    send-message-timeout: 2000 # 发送超时时间
    retry-times-when-send-failed: 1 # 同步发送重试次数
    retry-times-when-send-async-failed: 1 # 异步发送重试次数

3. 发送 RocketMQ 任意延迟消息

3.1 生产者代码

这一块相当于是生产者,业务代码如下:

private final RocketMQTemplate rocketMQTemplate;
private final ConfigurableEnvironment configurableEnvironment;

@Override
public void createCouponTemplate(CouponTemplateSaveReqDTO requestParam) {
    // ......
    // 使用 RocketMQ5.x 发送任意时间延时消息
    // 定义 Topic
    String couponTemplateDelayCloseTopic = "one-coupon_merchant-admin-service_coupon-template-delay_topic${unique-name:}";

    // 通过 Spring 上下文解析占位符,也就是把咱们 VM 参数里的 unique-name 替换到字符串中
    couponTemplateDelayCloseTopic = configurableEnvironment.resolvePlaceholders(couponTemplateDelayCloseTopic);

    // 定义消息体
    JSONObject messageBody = new JSONObject();
    messageBody.put("couponTemplateId", couponTemplateDO.getId());
    messageBody.put("shopNumber", UserContext.getShopNumber());

    // 设置消息的送达时间,毫秒级 Unix 时间戳
    Long deliverTimeStamp = couponTemplateDO.getValidEndTime().getTime();

    // 构建消息体
    String messageKeys = UUID.randomUUID().toString();
    Message<JSONObject> message = MessageBuilder
            .withPayload(messageBody)
            .setHeader(MessageConst.PROPERTY_KEYS, messageKeys)
            .build();

    // 执行 RocketMQ5.x 消息队列发送&异常处理逻辑
    SendResult sendResult;
    try {
        sendResult = rocketMQTemplate.syncSendDeliverTimeMills(couponTemplateDelayCloseTopic, message, deliverTimeStamp);
        log.info("[生产者] 优惠券模板延时关闭 - 发送结果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), messageKeys);
    } catch (Exception ex) {
        log.error("[生产者] 优惠券模板延时关闭 - 消息发送失败,消息体:{}", couponTemplateDO.getId(), ex);
    }
}

==在创建优惠券模板的同时,设置了一条消息,这条消息会在优惠券模板快到期的时候被发送出去,以便进行一些操作(比如提醒用户)。这个过程是通过RocketMQ这个消息队列工具来实现的。==

3.2 日志打印

可以看到这里日志参数打印的非常详细,尤其是打印了消息 ID 和 Keys,这两个参数大有用途,可以帮助我们排查生产问题。

// 执行 RocketMQ5.x 消息队列发送&异常处理逻辑
SendResult sendResult;
try {
    sendResult = rocketMQTemplate.syncSendDeliverTimeMills(couponTemplateDelayCloseTopic, message, deliverTimeStamp);
    log.info("[生产者] 优惠券模板延时关闭 - 发送结果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), messageKeys);
} catch (Exception ex) {
    log.error("[生产者] 优惠券模板延时关闭 - 消息发送失败,消息体:{}", couponTemplateDO.getId(), ex);
}

举个例子,你发送个消息,下游服务说没收到,你说这是谁的问题?如果我们打印了这个消息 ID 和 Keys 就可以去控制台查询消息的详细信息。以下面这个发送举例:

2024-08-21T22:45:26.280+08:00  INFO 78983 --- [io-10010-exec-1] c.n.o.m.a.s.i.CouponTemplateServiceImpl  : [生产者] 优惠券模板延时关闭 - 发送结果SEND_OK消息ID2408820760D4CCC06CC04DD27B33332C3487251A69D76BE1483A0000消息Keysa88bb1e1-e932-429e-bca6-fbe6fa52c

4. 定义消息消费者

4.1 消费者代码

优惠券模板到期结束消费者代码定义如下所示:

package com.nageoffer.onecoupon.merchant.admin.mq.consumer;

import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.nageoffer.onecoupon.merchant.admin.common.enums.CouponTemplateStatusEnum;
import com.nageoffer.onecoupon.merchant.admin.dao.entity.CouponTemplateDO;
import com.nageoffer.onecoupon.merchant.admin.service.CouponTemplateService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * 优惠券推送延迟执行-变更记录发送状态消费者
 * <p>
 * 作者:马丁
 * 加项目群:早加入就是优势!500人内部项目群,分享的知识总有你需要的 <a href="https://t.zsxq.com/cw7b9" />
 * 开发时间:2024-08-21
 */
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(
        topic = "one-coupon_merchant-admin-service_coupon-template-delay_topic${unique-name:}",
        consumerGroup = "one-coupon_merchant-admin-service_coupon-template-delay-status_cg${unique-name:}"
)
@Slf4j(topic = "CouponTemplateDelayExecuteStatusConsumer")
public class CouponTemplateDelayExecuteStatusConsumer implements RocketMQListener<JSONObject> {

    private final CouponTemplateService couponTemplateService;

    @Override
    public void onMessage(JSONObject message) {
        // 开头打印日志,平常可 Debug 看任务参数,线上可报平安(比如消息是否消费,重新投递时获取参数等)
        log.info("[消费者] 优惠券模板定时执行@变更模板表状态 - 执行消费逻辑,消息体:{}", message.toString());

        // 修改指定优惠券模板状态为已结束
        LambdaUpdateWrapper<CouponTemplateDO> updateWrapper = Wrappers.lambdaUpdate(CouponTemplateDO.class)
                .eq(CouponTemplateDO::getShopNumber, message.getLong("shopNumber"))
                .eq(CouponTemplateDO::getId, message.getLong("couponTemplateId"))
                .set(CouponTemplateDO::getStatus, CouponTemplateStatusEnum.ENDED.getStatus());
        couponTemplateService.update(updateWrapper);
    }
}

代码比较简答,两步就可以:

  • 添加 @RocketMQMessageListener 注解,其中加上 Topic 和消费者组定义。

  • 实现 RocketMQListener 消息监听接口,泛型的类型是我们生产者发送消息的类定义。

代码作用:监听特定的消息队列(RocketMQ中的topic),当接收到消息时,它会更新数据库中优惠券模板的状态。

4.2 是否需要幂等?

其实不需要,幂等的逻辑是多次执行结果一致,不过我们是修改为结束状态,变更多少次都是一样的。

4.3 @Slf4j(topic=xxx) 什么意思?

如果没有 topic 这个属性,那么你的日志打印是这样的:

2024-08-22T19:26:15.172+08:00  INFO 90884 --- [io-10010-exec-1] c.n.o.m.a.s.i.CouponTemplateServiceImpl  : [生产者] 优惠券模板延时关闭 - 发送结果SEND_OK消息ID2408820760D4CCC0901EE0E538FD681A6304251A69D7705148480000消息Keysd904fbe7-f8c6-4e77-997c-6b08f83868a3

添加了 Topic 后,就会将日志打印引用类规范化。

2024-08-22T19:23:17.456+08:00  INFO 78983 --- [cg-mading0924_2] CouponTemplateDelayExecuteStatusConsumer : [消费者] 优惠券模板定时执行@变更模板表状态 - 执行消费逻辑消息体{"couponTemplateId":1826580899668439042,"shopNumber":1810714735922956666}