Chapter 0. 背景

0.1 同步消息和异步消息

微服务架构下存在很多服务间相互调用的情况。

我们知道可以通过 OpenFeign 的方式来获取远程服务的响应,但是 OpenFeign 的远程调用是同步的,其优点是同步调用时效强,等待结果返回。但同时会导致:

  • 代码可扩展性差。
  • 性能堪忧。相较于相同项目实现的单体架构,同步的微服务调用方式会多出网络等待时间。

于是我们需要异步调用的方式,这里使用到了发布-订阅者模式

异步调用的优势是,

  • 模块间进一步解耦(发布者和订阅者间无需知道相互之间的信息);
  • 可拓展性强(scalable),添加实例无需更改代码;
  • 异步性能有明显提升;
  • 故障隔离(最终一致性保证);
  • 缓存消息,实现流量削峰填谷;

但是缺点也很明显:

  • 异步实现无法立即得到结果,时效性差,可能导致数据不一致性;
  • 不作额外措施,则不能保证最终一致性(下游业务是否成功)。所以业务安全依赖于 broker 的可靠性;

0.2 Message Queue 选型

Opt RabbitMQ ActiveMQ RocketMQ Kafuka
Company Rabbit(专一,社区极活跃) Apache(大厂支持) Alibaba(开源冲业绩) Apache
Language Erlang(面向并发的语言) Java Java Scala&Java
Protocol Support AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 私有协议,只能被 Java 调用(微服务语言限制) 私有协议
Availability 一般
Monolith Throughput(ops) 一般(数十万上下) 高(数十万) 极高(近数百万)
Latency 微秒级 毫秒级 毫秒级 毫秒以内
Reliability 一般 一般

可以看出:

  • Kafuka 牺牲了部分可靠性(只确保最终一致性)、消息延迟,换取了极高的消息吞吐量。在对消息准确性要求不高(如日志传输)的情况下推荐;

  • RocketMQ 虽然功能丰富,但是 Alibaba 的开源项目大多属于冲业绩,文档和社区支持不佳。

    此外 RocketMQ 依赖于很多 Alibaba 技术栈,如果项目中不打算或者没有 Alibaba 的依赖,那么引入困难;

    另外 RocketMQ 不支持主流协议,只支持他们自己的一套接口,微服务语言局限于 Java;

  • ActiveMQ 是早期的项目,指标不如后辈;

因此我们选择 RabbitMQ

Chapter 1. Introduction to RabbitMQ

1.1 架构

  • queue:暂时存储消息的消息队列;队列有两类:

    • durable queue:持久化队列,信息会被定期持久化到磁盘。这种队列可以提升可靠性,但是会降低性能;

    • non-durable queue:非持久化队列,信息总是保存在内存中。这种队列的速度会快于 durable queue,但是可靠性无法保证(例如不保证掉电不丢失数据);

  • virtual host:虚拟主机,起到不同项目数据隔离作用;

  • exchange:路由消息的交换机。其作用是接受发布者发送的消息,并将接收到的消息按照交换机的配置路由到所有与其绑定的队列中。本身不具备消息暂存能力;

    交换机的配置(种类,或者说路由策略)大致有几种:

    • Fanout(广播):Fanout 交换机。会将自身接受到的消息批量路由到所有与之关联的消息队列中;

    • Direct(定向):Direct 交换机。这种交换机 与 消息队列的关联时需要额外指定一个 bindingKey,并且发布者在向该种交换机发送消息时,必须指定 routingKey

      于是这种交换机只会将接受到的消息发送给 bindingKey 与这条消息的 routingKey 相同的消息队列中;

      bindingKey 不要求对于交换机唯一。所以理论上 Direct 交换机的功能覆盖了 Fanout 交换机。

    • Topic(话题,类似 Kafuka 的按 Topic 订阅):与 Direct 交换机类似,也以 bindingKeyroutingKey 为路由依据,但:

      • bindingKey 这里是 topic,可以使用通配符:# 代表 0 到多个 topic,* 代表 1 个 topic;
      • routingKey 可以是 topic 的组合,使用 .(period)隔开;
    • Headers(请求头):Headers 交换机,

注意,交换机 和 消息队列间的关联需要显式声明 / 配置。

1.2 Spring AMQP

RabbitMQ 支持多种协议,其中就包括 AMQP(Advanced Message Queuing Protocol),其他各种语言都有各自的 AMQP 的实现库。

为了方便起见,在 Spring 项目中常常使用 Spring 框架中实现好的 AMQP 协议接口来完成任务。

spring-boot-starter-amqp 依赖内部提供了针对 AMQP 协议的实现,只需引入该依赖即可操作 RabbitMQ

引入后需要进行一些配置:

1
2
3
4
5
6
7
spring:
rabbitmq:
host: ...
port: ...
virtual-host: ...
username: ...
password: ...
  • 最简单使用:RabbitTemplate.convertAndSend & @RabbitListener

1.2.1 Working Queues 模型

Working Queues 模型:多个消费者绑定到一个队列,共同消费队列中的消息;

  • 结论 1:队列中的消息最多只能被消费一次;所以多个 consumer 监听的情况下,一个消息被某个 consumer 消费后,不会存在于消费队列中被其他 consumer 消费(不存在消息重复);
  • 结论 2:队列传递给所有监听它的消费者的默认行为是绝对平均(轮询)的,没有考虑到各个机器消费消息的性能(可以用 Thread.sleep 测试);

为了改善这个模型下出现的问题,我们可以对这个默认的轮询机制调优:规定每个消费者一次只能获取一条消息,处理完成后才能获取下一条消息

1
2
3
4
5
6
# 在消费者(使用 `@RabbitListener` 注解)所在的模块中配置
spring:
rabbitmq:
listener:
simple:
prefetch: 1

1.2.2 Fanout Exchange

为什么需要广播交换机?考虑一个问题,假设一个发布者发布消息后,我们的业务逻辑要求同时有多个其他服务需要接收这个消息并且执行相应逻辑。举个例子,consumer1consumer2 需要在 publisher 发送消息后各自接收一次消息(也就是都执行一次业务逻辑)。

如果没有广播交换,那么发布者发布的消息在全局范围内只能被一个消费者消费,这就没法实现多个服务都接受到发布者消息的需求了。

在这种需求下,我们只需要为每个微服务建立一个消息队列,并且对应监听;将这些队列与一个公共的 fanout 交换机关联,就能完成上面的需求。如下图所示:

代码中,如果使用了 fanout 交换机,那么 routingKey 可以指定为空字符串 / null

1.2.3 Direct Exchange

  • 路由交换机需要与消息队列以 bindingKey 绑定;一个交换机和一个消息队列可以绑定多个 bindingKey
  • bindingKeyroutingKey 不存在通配符;

  • 路由直接发送到 bindingKey == routingKey 的消息队列中;

1.2.4 Topic Exchange

  • 路由交换机需要与消息队列以 bindingKey 绑定;
  • bindingKey 允许通配符,# 表示任意 0 至多个 topic,* 表示任意一个;
  • 路由发送到所有匹配的消息队列中;

1.2.5 Spring AMQP 声明交换机 & 队列

  • Queue 声明队列的类,也可以使用 QueueBuilder 工厂类创建;
  • Exchange 声明交换机的类,也可以使用 ExchangeBuilder 工厂类创建;
  • Binding 声明队列和交换机的绑定关系,BindingBuilder(常用);

Method 1. Spring Bean Configuration 式声明

举例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/* 通常在 consumer 段声明,因为 consumer 更需要关注交换机和队列的生成 */
@Configuration
public class FanoutConfig {
@Bean
public FanoutExchange fanout() {
/* 构造函数为名称 */
return new FanoutExchange("test.fanout");
/* 等价于: */
/* return ExchangeBuilder.fanoutExchange("test.fanout").build(); */
}
@Bean
public Queue fQueue1() {
return new Queue("test.queue1");
/* 等价于(默认 durable queue): */
/* return QueueBuilder.durable("test.queue1").build(); */
}
@Bean
public Queue fQueue2() {
return new Queue("test.queue2");
}
/* 自动注入 queue 和 exchange,从上面的 bean 寻找,采用 autowired-by-name 策略 */
@Bean
public Binding bindingQueue(Queue fQueue1, FanoutExchange exchange) {
return BindingBuilder.bind(fQueue1).to(exchange);
}
@Bean
public Binding bindingQueue(Queue fQueue2, FanoutExchange exchange) {
return BindingBuilder.bind(fQueue2).to(exchange);
}
}

Method 2. Listener 注解式声明

显然这种方法非常麻烦,主要有以下的问题:

  • 每个方法大同小异,大部分是 boilerplate code;

  • 定义步骤相当繁琐每定义一个队列、交换机或者绑定关系,就要新建一个方法;

  • 很多队列或者交换机的 bean,只能通过 autowired-by-name 的方法注入,降低了代码可读性和可维护性;

于是可以使用另一种定义方式,直接使用 @RabbitListener 提供的 bindings 参数:

1
2
3
4
5
6
7
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = <Queue Name>, [durable = "true" | "false"]),
exchange = @Exchange(value = <Exchange Name>, type = ExchangeTypes.?),
key = { /* bindingKeys [String Array] */ }
)
)

只需要声明一个注解即可定义队列、交换机、绑定关系;

但是上面的 Method 2 可能还是有问题:

  • 配置散落在业务代码中,没有与业务逻辑解耦;

  • 这么写可能存在 queue / exchange 重复定义的问题,而且需要保证每次声明同样对象的配置一致。降低了可维护性和可扩展性;

  • 在每个方法前面写这么大段注解,降低代码可读性;

有什么办法解决吗?可以综合 Method 1 使用 ~

1.2.6 Spring AMQP 消息转换器

注意,到目前为止,我们没有讨论过队列传输的对象是 POJO 或者是更复杂的 Java 对象的情况。

我们知道如果传输的是简单的 Java String,则 RabbitMQ 直接在队列上传输字符串;但是对于一般的 Java 对象,RabbitMQ 会使用 Java 内置的序列化实现将对象转为 Java Serializable Object;

我们跟踪 Spring RabbitTemplate 源码发现,内部对 Object(传递的信息)执行了 convertMessageIfNecessary 方法:

1
2
3
4
5
6
7
8
/* ... */
protected Message covertMessageIfNecessary(final Object object) {
if (object instanceof Message) {
return (Message) object;
}
return getRequiredMessageConverter().toMessage(object, new MessageProperties());
}
/* ... */

Message 类型是 AMQP 定义的接口,定义了帮助 AMQP 传输消息的方法。

我们进一步了解发现 getRequiredMessageConverter() 方法是获取 RabbitTemplate 的实例属性 messageConverter其类型是 MessageConverter,默认实现是 SimpleMessageConverter

再查看 MessageConverter 的接口实现可以发现 Spring 中存在很多实现,例如 Jackson2XmlMessageConverter / Jackson2JsonMessageConverter / ...

那么这些 converter 具体是如何将一般的 Object 转换为 Message 的呢?

我们找到 MessageConverter 接口的 toMessage() 方法,发现其在 SimpleMessageConverter 中的实现如下:

注意到,对于非空非 byte[] / String 类型的可序列化对象,SimpleMessageConverter 会直接将对象序列化(默认 Java 的对象流 ObjectOutputStream)。

这样做一般情况下没什么,但是在消息中间件中不建议使用,具体有以下几点原因:

  • JDKObjectOutputStream 本身存在安全风险(可以轻松反序列化并且进行代码注入);
  • JDK 的对象数据流大小往往很大,存放了一些并不需要传输的数据(几个 byte 的数据可能被序列化成几百个 byte 的数据,降低了传输性能,限制了消息吞吐量);

这里建议使用 Jackson2JsonMessageConverter。我们只需要在发送模块书写一个 Bean 配置类,然后让 Spring Boot 自动装配即可。

确保引入依赖 com.fasterxml.jackson.core:jackson-databind

1.3 Spring AMQP 实战:消息中间件替换 OpenFeign 同步远程调用

小贴士:如果设置了消费者确认机制并且使用 auto 模式,使用 @RabbitListener 注解的函数返回类型必须是 void。否则 RabbitMQ 会认为 consumer 执行错误。

Chapter 2. MQ 进阶:消息可靠性

使用 Spring AMQP 进行服务间异步通信可能存在一些问题:

  • 消息发布方网络丢包,导致消息丢失;
  • 消息被 MQ 正确接受到后,MQ 宕机导致消息丢失;或者消费者速度较低,MQ 产生内存中的消息积压(内存占满的刷盘期间)可能造成消息丢失;
  • 在 MQ 发送给消息消费方时,消费方宕机导致消息丢失;

因为网络的不可靠性,即便我们针对上述问题进行保护措施,仍然可能出现一些问题。我们最终需要一些兜底的机制,至少需要确保消息的最终一致性。

接下来将会以 RabbitMQ 为例,从上面 4 个角度分析 MQ 如何保证消息的可靠性。

2.1 消息发布方的可靠性:重连与确认机制

2.1.1 发送者重连 [性能警告]

发送者重连,在 MQ 与消息发布方连接后,存在连接断开的情况,这可能导致发布方网络丢包;因此需要 MQ 和服务发布方在网络丢失后进行重连,特别地,在 RabbitMQ 中需要在消息发送方进行配置:

1
2
3
4
5
6
7
8
9
spring:
rabbitmq:
connection-timeout: 1s # MQ 连接超时时间(连接等待多长时间才算失败,失败后才进行重连)
template:
retry:
enable: true # 消息重连(默认 false)
initial-interval: 1000ms # 连接失败/断开后的初识等待时间(立即尝试的成功概率小)
multiplier: 1 # 尝试等待时间倍数(下次等待时间为上次的倍数。第二次相对于 initial-interval)
max-attempts: 3 # 最大尝试次数

但是这种机制是阻塞式重连,对业务性能会造成影响,这也是为什么 retry 默认配置是禁用的。如果必须要使用,也需要合理设置超时、等待时长,以及尝试次数;

2.1.2 发送者确认 [性能警告]

发送者确认:Spring AMQP 提供了 Publish Confirm(消息确认反馈)和 Publisher Return(路由错误返回信息)两种机制。

在发送者确认机制打开后,当消息发布方向 MQ 发送一条消息,MQ 会返回确认结果给发送方,确认结果分为以下几种情况:

  • 消息成功投递到 MQ 中,但是路由失败:MQ 通过 Publisher Return 返回路由错误原因,返回 ACK 告知投递成功;

    这种情况只可能是:exchange 没有绑定队列 / routing key 没有匹配队列,是开发者原因。与网络、发送方、MQ 都没有关系,所以认为投递成功。

    这种情况重新发送消息是没有意义的,因为错误不会因为重试而修复。

  • 临时消息投递到 MQ 中,且成功入队;MQ 反馈 ACK 告知投递成功;

    临时消息对于队列是否是 durable 的没有要求,只要投递到 MQ 中,并且进入队列内存,就算成功;

  • 持久消息投递到 MQ 中,且成功入队,且成功持久化;MQ 反馈 ACK 告知投递成功;

    持久消息需要被 MQ 放入 durable 队列中,并且持久化才算投递成功,这样可以防止 MQ 宕机造成消息丢失。

    同时这可能损失一部分性能,所以应该根据业务逻辑来选择持久消息或临时消息。

  • 其他任何情况都会反馈 NACK,表示投递失败。只有在这种情况下,进行消息重发是正确的、有意义的

以 RabbitMQ 为例,开启发送者确认机制需要在消息发送方进行配置:

1
2
3
4
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启 Publish Confirm 机制,指定模式类型
publish-return: true # 开启 Publish Return 机制

所谓 Publish Confirm 只包含了 ACK / NACK 的消息,而 Publish Return 则回调上面第一种投递成功,但路由失败的失败信息

Publish Confirm 的 3 种模式分别为:

  • none:关闭 Publish Confirm 机制;
  • simple:同步阻塞等待 MQ 的确认;
  • correlated:MQ 异步回调方式确认;

那么如何配置 MQ 的异步回调(confirm callback 和 return callback)?

事实上,一个 RabbitTemplate 只能配置一个 Return Callback(需要启用 Publish Return 机制)。所以需要在 Spring 项目启动过程中配置一次:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Slf4j
@RequiredArgsConstructor
@Configuration
public class MqConfig {
private final RabbitTemplate rabbitTemplate;

/* 在构造函数完成后执行一次 */
@PostConstruct
public void init() {
rabbitTemplate.setReturnsCallback(
/* RabbitTemplate.ReturnsCallback 是一个函数接口 */
new RabbitTemplate.ReturnsCallback() {
@Override
public void returnMessage(ReturnedMessage msg) {
log.error("Return callback triggered.");
log.debug("exchange: {}", returned.getExchange());
log.debug("routingKey: {}", returned.getRoutingKey());
log.debug("message: {}", returned.getMessage());
log.debug("replyCode: {}", returned.getReplyCode());
log.debug("replyText: {}", returned.getReplyText());
}
}
);
}
}

对于单条消息而言还有 Confirm Callback(需要启用 Publish Confirm 机制),这在每条消息发送前都需要配置一次:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Test
void testPublisherConfirm() throws InterruptedException {
// 1. 创建 CorrelationData,包含消息的全局 ID(MQ 需要区别消息以发送 confirm 或 return)
CorrelationData cd = new CorrelationData(
UUID.randomUUID().toString() /* 注:UUID 可能存在性能问题和 MAC 地址安全问题 */
);
// 2.给 Future 添加 ConfirmCallback
cd.getFuture().addCallback(
/* ListenableFutureCallback 是一个含有 onFailure 和 onSuccess 的接口 */
new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
// 2.1.Future发生异常时的处理逻辑,基本不会触发
log.error("handle message ack fail", ex);
}
/* 请类比 JavaScript 的 Promise,思考为什么即便是投递失败也在 onSuccess 中 */
@Override
public void onSuccess(CorrelationData.Confirm result) {
// 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容
if (result.isAck()) {
// result.isAck(),boolean类型
// true代表ack回执,false 代表 nack回执
log.debug("发送消息成功,收到 ack!");
} else {
// result.getReason(),String类型,返回nack时的异常描述
log.error(
"发送消息失败,收到 nack, reason : {}", result.getReason()
);
}
}
}
);
// 3.发送消息
rabbitTemplate.convertAndSend("test.direct", "red1", "hello", cd);
}

还要提一点,发送确认也会对性能有较大影响。而且发送丢包的概率较低,所以只建议在亟需确保数据可靠性的极端情况下才需要如此配置。

2.2 MQ 的可靠性

如上所述,MQ 可能丢失消息的场景有两类:

  • 消息被 MQ 正确接受到后,MQ 宕机导致消息丢失;
  • 消费者速度很低,MQ 产生内存中的消息积压阻塞(内存占满的刷盘期间无法继续入队)可能造成消息丢失;

具体有两种思路可以解决,一是数据持久化策略,二是 Lazy Queue;

2.2.1 MQ 的数据持久化 [默认]

  • 交换机持久化、队列持久化(默认都是 durable);

    交换机、队列的持久化,是指交换机 / 队列自身的信息也会持久化在磁盘中;

  • 消息持久化。我们在之前提到,临时消息不会被 MQ 保证持久化到磁盘中,意味着这类消息在掉电后可能丢失。所以如果对一类消息的正确性要求很高,需要将消息设置为持久消息:

    由于 RabbitTemplate 默认的 convertAndSend 方法中的 message converter 都默认构建 Message 为持久的消息,因此我们需要手动构建才能得到临时消息:

    1
    2
    3
    4
    Message message = MessageBuilder
    .withBody("Hello, Spring AMQP".getBytes(StandardCharsets.UTF_8))
    .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
    .build();

    如果使用大量数据实验会发现,大量、快速发送临时消息(不会主动刷盘)会不断在内存中积压,其触发的 Page Out(泛指内存耗尽触发的被动刷盘操作)会像数据结构 LSM Tree 的 compaction 操作一样短时间内迅速降低 MQ 的吞吐量,形成一个个性能低谷,总体性能反而小于持久数据(一开始就进行刷盘操作);

2.2.2 Lazy Queue [默认]

在 Rabbit MQ 3.12 以后,所有队列默认 Lazy Queue 且无法更改。

Lazy Queue 的特征是,

  • 接收到消息后直接存入磁盘,不再存储到内存;
  • 消费者要消费消息时才会从磁盘中读取并加载到内存(可以提前缓存部分消息到内存,最多2048条);

2.3 消费者的可靠性

2.3.1 消费者确认机制

Spring AMQP 同样存在一种机制,即消费者确认机制(Consumer Acknowledgement)。

它是为了确认消费者是否成功处理消息。当消费者处理消息结束后,应该向 Rabbit MQ 发送一个回执,告知 Rabbit MQ 自己消息处理状态:

  • ACK:成功处理消息,Rabbit MQ 从队列(内存以及磁盘)中删除该消息;
  • NACK:消息处理失败,Rabbit MQ 需要再次投递消息;
  • REJECT:消息处理失败并拒绝该消息,Rabbit MQ 从队列(内存以及磁盘)中删除该消息;

这个回执应该在消费者关于该消息所有业务逻辑处理完成后,才能返回,防止出错后无法重试。

这样的操作非常类似 事务机制。

Spring AMQP 已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式:

  • none:不处理。即消息投递给消费者后立刻 ACK,消息会立刻从 MQ 删除。非常不安全,不建议使用;

  • manual:手动模式。需要自己在业务代码中调用 API,发送 ACKREJECT,存在业务入侵,但更灵活;

  • auto:自动模式。Spring AMQP 利用 AOP 对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回 ACK. 当业务出现异常时,根据异常判断返回不同结果:

    • 如果是业务异常,会自动返回 NACK,消息会重新进入 Ready 状态投递给对应消费者(重新处于 Ready 状态);

      消费者宕机、抛出 RuntimeException / 其他自定义异常,都认为是 NACK

    • 如果是消息处理或校验异常(需要抛出例如 MessageConversionException),自动返回 REJECT

      这就提醒我们,遇到业务逻辑中的格式异常,请不要 throw RuntimeException,不然会被 Spring 认为是业务异常而重新发送!

我们需要在消息消费方配置:

1
2
3
4
5
6
spring:
rabbitmq:
listener:
simple:
prefetch: 1
acknowledge-mode: none # 默认不处理,

2.3.2 消费者失败重试机制

在引入消费者确认机制后,还会出现一个问题:如果 MQ 中积压的消息量过大,导致消费方宕机,在消费者恢复后,没有进行请求热身,MQ 又重发了大量的消息,很有可能会再次导致消费方再次宕机。

这会给消费方和 MQ 都造成极大的压力。

为了应对这种情况,MQ 引入了另一种机制来保障消费方的消息可靠性:消费者失败重试机制。在消费者出现异常时按照配置重试,而不是无限的重复入队。

1
2
3
4
5
6
7
8
9
10
11
spring:
rabbitmq:
listener:
simple:
prefetch: 1
retry: # 注:发送方重试位于 template 下
enabled: true # 默认 false
initial-interval: 1000ms # 失败初始等待时间
multiplier: 1
max-attempts: 3
stateless: true # 如果业务中含有事务,说明这种消息重递是有状态的,应该 false

在重试超过 max-attempts 后,消息状态转变为 requeue-exhausted,进而转入 MessageRecoverer 中处理。

Spring AMQP 默认的 MessageRecoverer 的实现是 RejectAndDontRequeueRecoverer,其策略是直接丢弃这条消息,这样做有失消费者安全性。

除了默认的 recoverer,还有两种:

  • ImmediateRequeueMessageRecoverer:重试耗尽后,仍然认为返回 NACK,重新入队;

    这种策略和不采取失败者重试的策略相比,性能影响会小一点;

  • RepublishMessageRecoverer:重试耗尽后,将消息(包括报错信息)投递到指定交换机,以供其他处理用途;

    这种策略也比较合理,认为反复投递无效就应该换一种处理方式。

    但这种方法需要专门配置一个指定的交换机:

    • 定义接受失败队列、与其绑定的交换机;
    • 定义 RepublishMessageRecoverer 的 Bean(传入 RabbitTemplate、交换机、队列名称);

这种机制虽然相较于原先确认机制而言,降低了可靠性,但是一定程度上提升了服务的可用性,降低多次/长时间宕机造成的资源浪费风险。

2.4 业务幂等性保证

在上面的过程中,有一种情况我们没有考虑:如果消费者的接口不是幂等的,就需要保证消息发送的不重复性。

也就是说,假设消费者收到消息后并且处理结束,要给 MQ 发送 ACK 时连接断开了(或者两方有一方宕机了),就可能会导致 MQ 消息重新发送。这个问题没办法借助 MQ 来解决,因为不是 MQ 本身的问题。

那么应该如何处理这种情况?

2.4.1 唯一消息 ID

第一种思路是唯一消息 ID:给每个消息都设置一个唯一 ID,利用 ID 区分是否是重复消息。

  • 每一条消息都生成一个唯一的 ID,与消息一起投递给消费者;
  • 消费者接收到消息后处理自己的业务,业务处理成功后将消息 ID 保存到数据库;
  • 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

Tips. 这和之前提到的发送者确认机制中创建的 CorrelationData 中的 ID 不一样,前者是作回执用的 ID,它和消费 ID 可以保持消费不一致;

我们可以在定义消息转换器时显式声明让 MQ 创建全局唯一的 Message ID:

1
2
3
4
5
6
@Bean
public MessageCoverter messageConverter() {
Jackson2JsonMessageConverter jmc = new Jackson2JsonMessageConverter();
jmc.setCreateMessageIds(true); /* 默认 false */
return jmc;
}

这个 Message ID 会存放在 Message 的 Properties 中(不是我们常用的 Payload 中),所以需要我们单独去取:

1
2
3
4
5
6
/* 都需要转换,所以直接使用 Message 类型接收,就能收到 properties 数据 */
@RabbitListener(/* ... */)
public void consumer(Message message) {
message.getBody(); /* 返回 byte[] */
message.getMessageProperties(); /* MessageProperties */
}
  • byte[] 中如果原来是字符串,则可以直接用字符串构造函数构造;

    如果是 自定义的对象,并且使用了 Jackson2JsonMessageConverter,直接用配套的 ObjectMapper 解析就行;

  • MessageProperties.getMessageId() 就能获取其中的 message-id 属性(如果有);

缺点:

  • 业务逻辑嵌入和耦合。给业务引入了原本不需要的逻辑,造成耦合;

  • 数据库写影响原本业务性能;

2.4.2 业务逻辑本身判断

上面的方法虽然通用,但是是侵入式的解决方案。如果业务逻辑允许,我们可以根据原有的业务逻辑判断这次消息是否被处理过。

相当于做了非幂等业务的保护流程。

例如对于订单业务,如下图,如果在交易服务中的 “标记订单为已支付”,如果用户在支付成功后,“标记订单已支付” 以及完成,正在向 MQ 发送 ACK 时断开连接。此时 MQ 认为消费者未收到。

如果用户此时发起了退款,交易服务立即更改为退款中,此后 MQ 的连接又恢复了,如果不作处理,则 “已支付” 的状态会覆盖 “退款中” 的状态。

这种情况除了使用 Message ID 的方法,还可以直接检查 “标记订单已支付” 之前的订单状态,毕竟根据业务逻辑,只有未支付的订单才会需要标记成已支付:

总结一下:

如何保证支付服务与交易服务之间的订单状态一致性?

  • 首先,支付服务会正在用户支付成功以后利用 MQ 消息通知交易服务,完成订单状态同步;

  • 其次,为了保证 MQ 消息的可靠性,我们采用了发送者确认机制、消费者确认、消费者失败重试等策略,确保消息投递和处理的可靠性。同时也开启了 MQ 的持久化,避免因服务宕机导致消息丢失;

    保证消费者至少消费一次。

  • 最后,我们还在交易服务更新订单状态时做了业务幂等判断,避免因消息重复消费导致订单状态异常。

上面的策略已经比较完善了,但是还是可能存在问题:我们的机制没有问题,但是网络原因,MQ 和消费者间真的一直都没办法建立连接,能否有个兜底机制,至少确保关键的业务(例如支付)数据一致性?

2.5 延迟任务和延迟消息

延迟任务是消息一致性的一种兜底方案。

延迟消息:发送者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。

以下单举例:

假设交易服务和支付服务间暂时一直无法连接,是否有机制确保二者间的消息一致?

我们可以引入超时时间的概念,一段时间后,再次向支付服务查询如果成功就改变状态;如果失败则取消。

在 Rabbit MQ 中可以借助插件来完成延迟任务(默认不支持)。

2.5.1 死信交换机(dead-letter)

当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter):

  • 消费者使用 REJECTNACK 声明消费失败并且消息的 requeue 参数设置为 false / 使用失败重试机制的 Message Recoverer 是 RejectAndDontRequeueRecoverer
  • 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费;
  • 要投递的队列消息堆积满了,最早的消息可能成为死信;

我们可以在声明交换机时,给定一个属性 dead-letter-exchange,并且与某个队列绑定,那么该队列中的死信就会自动投递到这个交换机中;

我们可以利用死信交换机的 “超时” 特性,实现延时任务:

我们可以对某个队列声明死信交换机,直接使用 QueueBuilder 的方式定义:

1
2
3
4
QueueBuilder
.durable(<queue name>)
.deadLetterExchange(<dlx exchange name>)
.build()

最后,我们还有在发送消息时还需要指定消息的过期时间,确保最终以规定时间进入死信交换机:

1
rabbitTemplate.convertAndSend(<normal direct>, <key>, <object>, <post process>)

在最后一个参数中,传入一个 MessagePostProcessor 函数接口的实现,即可在 object 转换为 Message 对象后再进行处理,以设置超时时间。因为超时时间也位于 Message 的 Properties 中

1
2
3
4
5
6
7
new MessagePostProcessor() {
@Override
public Message postProccessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("10000" /* 字符串表示的 ms */);
}
}
/* 也可以换成 lambda 表达式 */

2.5.2 Rabbit MQ 延时任务插件

需要在 mq-plugins/_data 中加入插件,并且配置 enabled_plugins 文件,加入 rabbitmq_delayed_message_exchange 即可;

然后,我们需要设置某个交换机的属性为 delayed

  • 如果使用 Bean 配置,那么就用 ExchangeBuilder 添加 delayed() 方法;
  • 如果使用 @RabbitListener(bindings) 的配置,就在 @Exchange 中加入 delayed = "true"

最后给要发送的消息指定 properties x-delay,同样使用 post processor,此时对 Message Properties 调用 setDelay(<ms>) (不是 setExpiration)即可;

注意:无论是延时消息,还是死信的生成,其计时依赖 CPU 时钟,所以是 CPU 密集型任务。

如果超时时间 / 过期时间设置过长,都会导致需要计时的消息大量积压,影响 MQ 性能。

所以无论是死信交换机中的过期时间,还是延时任务的延时时间,都不宜设置过长。