项目中同时用到了 Kafka 和 RabbitMq,因此需要作比较
Kafka 是一个分布式的消息流处理平台,拥有着极致的吞吐量,支持消息重新消费。非常适合应用在大数据领域。因此我们项目中的收数服务应用了 Kafka 来处理每天 10E 级的数据。
RabbitMq 则胜在拥有更灵活的交换器与队列的匹配规则(基于 topic 交换器 + # * 匹配关键字),还有 TTL+死信队列 实现的延时队列,及简单易上手的可靠性保障,因此在吞吐没有达到每秒几十万的而必须用 kafka 时,RabbitMq 是个很好的选择

? 排他队列仅对首次声明他的连接可见,并在连接断开时自动删除。该连接下所有的信道(Channel) 都可以使用它
? 多租户场景,对外部而言各个虚拟主机是完全独立的,A主机的交换器不能绑定B主机的队列,权限也是隔离的
? 消费者消费速度限制,须配合手动 ack 一起使用,此时当 mq 检测到某个 channel 未 ack 的消息达到阈值后,就不会推送消息到该 cahnnel
可以结合死信队列 + 队列过期时间,模拟延时队列

假设一种场景:业务A的 clientA 位于北京,需要往位于广州的 exchangeA 发送一条消息,那网络延迟对吞吐量的影响是不容小觑的,如果设置了事务或者开启了消息发送确认,就更慢了
此时可以通过 Federation 插件解决,在 broker3 中为交换器 exchageA 与 broker1 建立一个单向的 Federation 连接,此时 F 会在 broker1 中创建一个同名交换器 exchangeA,同时创建一个内部交换器 exchageA -> broker3 B,还有一个队列 f:eA -> b3 B。并与交换器 eA -> b3 B 绑定,F 插件会在队列: f:eA -> b3 B 与 broker3 中的 exchangeA 建立 AMQP 连接来实时的消费队列中的消息。
这样部署在北京的生产者可以直接向 exchageA 发送消息,可以低延迟的收到回复。而后消息通过 F link 转发到 broker3 的 exchangeA 中,由消费者进行消费

联邦队列可以在多个 Broker 节点(或者集群)之间为单个队列提供负载均衡的能力。一个联邦队列可以连接一个或多个上游队列(upstream queue),并从上游队列中获取消息以满足本地消费者的消费需求
如图所示:队列 queue1 和 queue2 原本在 broker2 中,由于某种需求将其配置为 fedarated queue 并将 broker1 设置为 upstream queue 。此时 federation 插件会在 broker1 上创建同名的队列 queue1 和 queue2,当有消费者 clientA 连接 broker2 并消费 queue1 (queue2) 时,若队列中有消息则会直接消费,如果队列中没有消息,那么它会通过 Federation 从 broker1 中的 queue1(queue2) 中拉取消息,然后存储到本地。最后被 clientA 消费
同时 Federate Queue 支持双向联邦,一条消息可以在队列中被转发多次,以达到消息最终被转发到某一个消费力更强的 broker 中从而被消费

与 Federation 具备的数据转发功能类似,Shovel 能够持续可靠的从一个 Broker 中的 queue 将消息转发到当前或另一个 Broker 中的 exchange 中(与 F 不同的是它将消息由队列转发至交换机,而 F 类似于在 B1 创建了一个代理,B1 一开始什么都不需要有)
其原理是通过消费队列中的数据同时将数据发送给交换器来实现数据转发, Shovel 同时也支持源数据为交换器或者目标数据为队列。实际上两者都是通过补足虚拟的队列或者交换器实现的
消息堆积严重时,可以选择清空队列,或者添加空消费者丢弃部分消息。但对于重要的数据而言,此举不可行
另一种方案是增加下游的消费能力,但是这种优化代码的方案在紧急时刻缺失“远水解不了近渴”
那么合理的优化方案是(一备一):
如果需要一备多的场景,可以使用镜像队列或 Federation
不管是持久化还是非持久化的消息都可以被写入到硬盘。持久化的消息在到达队列时就被写入到磁盘,并且如果可以,持久化的消息也在内存中保存一份备份,当内存吃紧的时候从内存中清除。非持久化的消息一般只保存在内存中,当内存吃紧的时候会被换入磁盘中,以节省内存空间。这两种消息的落盘处理都在 RabbitMq 的“持久层”完成
“持久层”实际上是一个逻辑概念,实际包含两个部分:队列索引 rabbit_queue_index 和消息存储 rabbit_msg_store
rabbit_queue_index 负责维护队列中落盘的消息,包括消息的存储地点、是否已交付给消费者、是否已 Ack,每个队列与之对应的 rabbit_queue_index
rabbit_msg_store 以键值对的形式存储消息,它被所有队列共享,在每个节点有且只有一个
消息(包括消息体、属性和 headers)可以存储在两者中的任意一个。一般通过 queue_index_embed_msg_below 配置一个大小阈值,较小的消息存储在 rabbit_queue_index 中,较大的消息存储在 rabbit_msg_store 中
通常队列由 rabbit_amqqueue_process 和 backing_queue 这两部分组成,rabbit_amqqueue_process 负责协议的相关消息处理,backing_queue 是消息存储的具体形式和引擎,消息入队列之后,不是固定不变的,它会随着系统的负载不断的流动,有以下四种状态
对于持久化的消息,消息内容和消息索引必须先保存在磁盘上,才会处于上述状态的一种,而 gamma 状态的消息是只有持久化的消息才会有的状态。对于 durable 为 true 的消息,在开启 publish confirm 机制后,只有到了 gamma 状态才会确认消息已被接收
如图所示:Q1 和 Q4 仅存储 alpha 状态的消息,Q2 和 Q3 存储 beta 和 gamma 状态的消息,Detla 存储 detla 状态的消息,当消费者消费消息时,会先从 Q4 从获取,如果成功则返回,如果 Q4 为空则按照一定的规则从上面的队列中转移消息到 Q4 后获取
通常负载正常时,对于不需要保证消息可靠不丢失的情况,极有可能消息只处于 alpha 状态。对于需要持久化的消息,只有当消息处于 gamma 状态时才会确认消息已接收。

惰性队列会尽可能的将消息存储在硬盘之中,而在消费者消费到相应的消息才会加载到内存中。惰性队列会将接收到的消息直接存储到文件系统中,而不管消息是持久化的还是非持久化的,这样可以减少内存的损耗

RabbitMq 的流控链如上图所示
当 connection 处于 flow 状态,而 connection 没有一个 channel 处于 flow 状态,说明 channel 出现了性能瓶颈,一般是因为处理大量较小的非持久化消息时出现
当 connection 处于 flow 状态,并且若干个 channel 处于 flow 状态,但是没有任何一个对应的队列处于 flow 状态。说明一个或多个队列出现了性能瓶颈,这可能是将消息存入队列时 CPU 占用过高,或者将消息持久化到磁盘时 I/O 过高,这种情况一般会在处理大量较小的持久化消息时出现
当 connection、channel、若干队列都是 flow 状态时,意味着在消息持久化时出现了性能瓶颈,这种情况一般在发送大量的较大持久化消息时最容易出现
向一个队列中推送消息时,往往会在 rabbit_amqqueue_process(即队列进程中)产生性能瓶颈。那如何破局,提高 rabbit 的性能呢

如图所示,因为 rabbit_amqqueue_process 是队列独享的,而在代码层面实现多个队列会增加业务的复杂度,因此可以通过封装拆分队列的逻辑来解决
如果 RabbitMq 只有一个 Broker 节点,那么该节点的失效将会导致整体服务的暂时不可用,并且有可能导致消息的丢失。可以将消息设置为持久化,并且将消息所属的队列 durable 属性设置为 true,但这仍无法避免缓存导致的问题,因为消息在发送之后到存盘之前有一个短暂的时间窗。通过 publish confirm 机制可以保证消息落盘后确认(前文有提到,broker 会在消息进入 gamma 阶段也即消息体存盘、消息索引磁盘和内存都有的时候,通知生产者消息发送成功),尽管如此,我们仍不希望 Broker 单点导致的服务不可用问题
镜像队列机制可以将队列镜像到集群中的其他 broker 上,如果集群中的一个 broker 失效了,队列能自动的切换到镜像中的另外一个节点保证服务的可用性,每一个镜像队列都包含一个主节点 master,和若干个从节点 slave,相应的结构图如下

slave 会准确按照 master 的执行命令的顺序进行动作,如果 master 宕机,"资历最老"(加入时间最长)的 slave 会提升成 master,发送到镜像队列的消息会同时发送给 master 和 slave(图中实线),除发送消息外的所有动作只会和 master 打交道,然后由 master 同步给 slave(图中虚线)。同步采用的是一种称为组播 GM(Guaranteed Multicast) 的方式,GM模块的实现是一种可靠的组播通讯协议,该协议能保证组播消息的原子性,即保证组中活着的节点要么都收到消息要么都收不到,它的实现大致如图上所示,所有节点形成一个循环链表,master 发出的消息最终会再次收到,以此确认组中所有节点都收到。
可能有人会觉得,消费者都是从 master 读取消息的,broker 之间是不是没有得到有效的负载均衡?其实不然,负载均衡是对整个 broker 而言,对整个机器而言的,而消费者消费的是队列,只要确保队列的 master 节点均匀的散落在不同的 broker 上,即可确保很大程度的负载均衡

RabbitMq 的镜像队列机制同时支持事物和 publisher confirm 两种机制,在事物机制中,只有当前事物在所有节点中都执行之后,才会返回 OK,同样的在 publisher confirm 机制,只有当所有镜像都接收该消息并处于 gamma 状态时,才会通知生产者