Message Queue
Last updated
Last updated
消息队列是分布式系统中的重要组件,主要用于解决什么问题呢?后者说为什么要使用消息队列?
解耦:如果一个系统A,调用了多个系统BCD。可能有新的系统E需要调用,老的系统D不再需要调用等情况。系统A还需要考虑C挂掉怎么办?需不需要重新调用?维护起来很麻烦。如果这个调用是不需要同步调用的,那么可以用MQ给他异步化解耦。
异步:引入消息队列,将不是必须的业务逻辑异步处理,可以显著提高系统吞吐量。
削峰:可以缓解短时间内高流量压垮应用,比如秒杀活动。
引入消息队列可以有上述这么多作用,但是也会带来一些问题。
可用性降低:系统引入的外部依赖越多,越容易挂掉。本来系统A调用BCD即可,现在加入MQ,若MQ挂了,整个系统就崩溃了。
系统复杂度提高:引入MQ就需要考虑MQ的问题。怎么保证消息没有重复消费?怎么处理消息丢失的情况?怎么保证消息传递的顺序性?
一致性问题:系统A处理完,发送消息后就返回成功的结果给用户。但是BCD收到消息后不一定能保证执行成功,若有一个系统执行失败了,整个处理结果是失败的。那么和用户得到的结果是不一致的。
市面上常用的消息队列产品有ActiveMQ、RabbitMQ、RocketMQ、Kafka等。
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
单机吞吐量 | 万级 | 万级 | 10万级 | 10万级 |
topic | topic可以达到几百,几千个的级别,吞吐量会有较小幅度的下降 | topic从几十个到几百个的时候,吞吐量会大幅度下降 | ||
时效性 | ms级 | us级 | ms级 | ms级 |
可用性 | 高,主从 | 高,主从 | 非常高,分布式 | 非常高,分布式 |
消息可靠性 | 有较低的概率丢失数据 | 经过参数优化配置,可以做到0丢失 | 经过参数优化配置,消息可以做到0丢失 | |
功能支持 | MQ领域的功能极其完备 | 基于erlang开发,所以并发能力很强,性能极其好,延时很低 | 功能较为完善,还是分布式的,扩展性好 | 功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用,是事实上的标准 |
综上所述:
ActiveMQ:早期都用这个,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃,不推荐。
RabbitMQ:Erlang语言阻止了大量的Java工程师去深入研究和掌控他,对公司而言,几乎处于不可控的状态,但是确实人是开源的,比较稳定的支持,活跃度也高。
RocketMQ:有阿里品牌保障,日处理消息上百亿之多,可以做到大规模吞吐,性能也非常好,分布式扩展也很方便。Java编写的,我们可以自己阅读源码,定制自己公司的MQ。
Kafka:仅仅提供较少的核心功能,但是提供超高的吞吐量,ms级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展。如果是大数据领域的实时计算、日志采集等场景,Kafka用是业内标准的,社区活跃度很高,何况几乎是全世界这个领域的事实性规范。
主从模式的MQ产品有ActiveMQ、RabbitMQ等。以RabbitMQ为例,有三种部署模式,单机模式、普通集群模式、镜像集群模式。
单机模式:Demo级别,本地启动玩玩,生产环境没人用。
普通集群模式:默认的集群模式,对于Queue来说,消息实体只存在于其中一个节点A,其它节点BC仅有相同的元数据。当Consumer从B拉取消息时,B会从A先拉取数据,再从B转发给Consumer。可用性没有保障,若A宕机了,没有持久化时消息直接丢失,持久化时也得等A重新启动才能工作。这个模式仅用来提高吞吐量。
镜像集群模式:把队列做成镜像队列,存在于多个节点,属于RabbitMQ的HA方案。其实质和普通模式不同之处在于,消息实体会主动在镜像节点间同步,而不是在Consumer取数据时临时拉取。缺点是性能开销太大,没有扩展性可言。
分布式模式的MQ产品主要有RocketMQ、Kafka等。
以Kafka为例,每个节点有一个Broker进程,每创建一个Topic会分成多个Partition,同一个Topic的Partition会分布在不同的Broker上,每个Partition有多个Replica,多个Replica会选举一个Leader。
Producer和Consumer只与Leader交互。
RabbitMQ、RocketMQ、Kafka等都会出现重复消费消息的情况,但是这不是MQ去控制的,也不能控制的,而是应该由使用者自己去处理,即保证重复消费消息的幂等性。
以 Kafka 为例,来说明为何会出现重复消费消息? Kafka 每条消息有一个 Offset,代表他的序号,Consumer 消费了数据之后,每隔一段时间,会把自己消费过的消息的 Offset 提交给 Kafka,下次重启的时候,就从上次消费到的 Offset 继续。若消费者是不正常的重启,在重启之前没有提交 Offset 给 Kafka,那么重启后,就会出现重复消费数据。
保证消费消息幂等性的常见方法:
数据库写入时,先根据主键查一下,若存在,则不插入,而是 Update。
消息写入时,在每条消息里加入一个全局唯一的 ID,消费者拿到之后,先去 Redis 查询是否消费过。
MQ 的基本原则是数据不能多也不能少,不能多就是上面讲的消息重复消费,不能少就是消息不能丢失。
消息丢失会出现在三个环节:生产者丢失数据、MQ 丢失数据、消费者丢失数据。下面分别介绍 RabbitMQ 和 Kafka 怎么解决消息丢失的问题。
生产者在把数据发送给 RabbitMQ 时,可能由于网络原因在半路丢失了。
事务方式:发消息前开启事务channel.txSelect
,然后发送消息,若消息没有被 RabbitMQ 收到,则会收到异常,此时可以回滚消息channel.txRollback
,然后重新发送,若收到了消息,则可以提交事务channel.txCommit
。
Confirm 方式:生产者端开启 Confirm 模式,每次写消息会分配一个 ID,RabbitMQ 处理后会发送一个回调(ack、nack
)给生产者。那么生产者可以在内存中维护每个消息的 ID 状态,若长时间没有收到回调,则可以再次发送。
事务方式是同步的,MQ 的吞吐量会下降。Confirm 是异步的。所以一般用 Confirm 的方式。
开启持久化功能。创建 Queue 的时候设置为持久化,发送消息的时候也将消息设置为持久化(deliveryMode = 2
),必须两个都设置才能持久化。
若在持久化还没完成,RabbitMQ 挂了,消息也会丢失,这种情况概率极低。可以和 Confirm 方式结合,只有在持久化完成之后才发送回调给生产者。
若消费者拿到消息后,还没来得及处理就挂了,但是 RabbitMQ 认为你已经消费完成了,这样就出现了消息丢失的情况。
关闭 RabbitMQ 的自动 ACK,通过 API 来调用,只有当消费者处理完之后,再发送给 RabbitMQ ACK。
acks=all
,要求每条数据,必须写入所有 Replica 之后,才能认为是写成功了。
retries = Integer.MAX
,这个是要求一旦写入失败,就无限重试。
若某个 Broker 挂了,上面的 Leader 也挂了,此时需要重新选举 Leader,而此时 Follower 刚好还有些数据没有同步完成,那么就会丢失一些数据。
设置四个参数:
topic参数:replication.factor > 1
,Topic 的每个 Partition 至少有2个副本。
Kafka服务端参数:min.insync.replicas > 1
,要求一个 Leader 至少感知到有至少一个 Follower 还跟自己保持联系,这样才能在 Leader 挂了之后,还有 Follower 可以选举。
Producer端参数:acks = all
。
Producer端参数:retries = Integer.MAX
。
和 RabbitMQ 类似,消费者会自动提交 Offset ,只需要关闭自动提交即可,当处理完消息后才手动提交 Offset。
一个 Queue 多个 Consumer,那么消息顺序性就会被打乱。RabbitMQ 不保证消费顺序,因为消耗太大,所以应该在应用层面保证顺序性。
方法一:拆分多个 Queue,每个 Queue 对应一个 Consumer。
方法二:一个Queue 对应一个 Consumer,然后这个 Consumer 内部做排序后,再分发给下面的 Worker。
Kafka 有个特性,一个 Partition 只能被一个固定的 Consumer 消费。
全局有序:比如 MySQL BinLog 传输,通常采用一个 Producer,一个 Partition, 一个 Consumer。当然不同的表可以使用不同的 Topic 或者 Partition。
局部有序:大部分业务仅需要局部有序,可以在 Producer 写入数据的时候控制统一个 Key 写入同一个 Partition,所以对同一个 Key 的消息是有序的。