Loading... # 消息中间件之RocketMQ(五) ## RocketMQ常见问题 ### 为什么要使用MQ? 因为项目比较大,做了分布式系统,所有远程服务调用请求都是**同步执行**,经常出现问题,所以引入了MQ。 #### 解耦 系统耦合度降低,没有强依赖关系,面向MQ编程,即使某个服务挂掉了,也不会影响其他服务。 #### 异步 不需要同步执行远程调用,可以有效提高响应时间。 #### 削峰 请求达到峰值后,后端service还可以保持固定消费速率,不会被压垮。 ### 多个MQ如何选型? #### RabbitMQ erlang开发,语言先天优势,延迟会比较低,但是语言比较冷门,定制化难度高。 #### RocketMQ java开发,面向互联网集群化功能丰富。 #### kafka Scala开发,面向日志功能丰富 #### ActiveMQ java开发,简单,稳定,集群支持度低,大数据量容易造成消息堆积。 小项目: ActiveMQ 大项目: RocketMQ、RabbitMQ或Kafka ### RocketMQ由哪些角色组成,每个角色作用和特点是什么? Nameserver、Producer、Broker、Consumer,着重Nameserver无状态,动态列表,互相之间不通信,数据不持久化,靠客户端(Broker)心跳主动上报到所有Nameserver节点,来实现Nameserver信息的同步,如果心跳超时,Nameserver会剔除该Broker。 ### RocketMQ中的Topic和ActiveMQ有什么区别? #### ActiveMQ 有Destination的概念,即消息目的地,Destination分为两类 - Topic - 广播消息 - Queue - 队列消息 #### RocketMQ RocketMQ的Topic是一组Message Queue的集合,是一个逻辑上的概念,一条消息是广播消息还是队列消息由Consumer来决定。 ### RocketMQ Broker中的消息被消费后会立即删除吗? 不会,每条消息都会持久化到CommitLog中,每个Consumer连接到Broker后会维持消费进度信息,当有消息消费后,只是当前Consumer的消费进度(CommitLog的offset)更新了。 #### 那么消息会堆积吗?什么时候清理过期消息? 4.6+版本默认48小时后会删除不再使用的CommitLog文件 - 检查这个文件最后访问时间 - 判断是否大于过期时间 - 大于过期时间,在指定时间点删除,默认凌晨4点 ### RocketMQ的消费模式有几种? 两种,集群消费与广播消费,消费模型由Consumer决定,消费维度为Topic #### 集群消费 一组Consumer同时消费一个Topic,可以分配消费负载均衡策略,分配Consumer消费Topic下的哪些Queue。多个Group同时消费一个Topic时,每个Group都会消费到数据。一条消息会推给每个Group,但每个Group中只能有一个Consumer消费。 #### 广播消费 消息将对一个Consumer Group下的各个Consumer实例都消费一遍,即使这些Consumer属于同一个Consumer Group。 ### 消费消息时使用的是push还是pull? 在刚开始的时候就要决定使用哪种方式消费 有两种消费模式 `DefaultLitePullConsumerImpl`拉,`DefaultMQPushConsumerImpl`推。两个都实现了`MQConsumerInner`接口,名称看上去一个推,一个拉,但实际底层实现都是采用的**长轮询机制**,即拉取方式,Broker端属性longPollingEnable标记是否开启长轮询,默认开启。 #### 为什么主动拉取消息而不使用事件监听方式? 事件驱动方式是建立好长连接,由事件的方式来实时推送。如果Broker主动推送消息的话,有可能push速度快,消费速度慢的情况,那么就会造成消息在Consumer端堆积过多,同时又不能被其他Consumer消费的情况。 #### 几种常见的消息消费机制 push: 如果Broker主动推送消息的话,有可能push速度快,消费速度慢的情况,那么就会造成消息在Consumer端堆积过多,同时又不能被其他Consumer消费的情况。 pull: 轮询时间间隔,固定值的话会造成资源浪费 长轮询: 基于长连接,没有数据时,将请求挂起,有数据再返回。 #### Broker如何处理拉取请求? Consumer首次请求Broker - Broker中是否有符合条件的消息 - 有: - 响应Consumer - 等待下次Consumer请求 - 没有: - 挂起Consumer的请求,既不断开连接,也不返回数据 - 挂起时间长短,长轮询写死在代码里(5S),短轮询可以配置,默认1s - 使用Consumer的offset - DefaultMessageStore# ReputMessageService# run方法 - 每隔1ms检查CommitLog中是否有新消息,有的话写入到pullRequestTable - 当有新消息的时候返回请求 - PullRequestHoldService来Hold连接,每隔5s执行一次,检查pullRequestTable有没有消息,有的话立即推送。 ### RocketMQ如何做负载均衡 通过Topic在多个Broker中分布式存储实现。 #### Producer端 发送端指定Target Message Queue,发送消息到相应的Broker,来达到写入时负载均衡。 - 提升写入吞吐量,当多个Producer同时向一个Broker写入数据的时候,性能会下降 - 消息分布在多Broker中,为负载消费做准备 每30s从NameServer获取Topic与Broker的映射关系,获取最新的数据存储单元,Queue落地在哪个Broker中。 在使用api中send方法的时候,可以指定Target Message Queue来写入,或者使用MessageQueueSelector。 #### 默认策略是随机选择 - Producer维护一个index - 每次取节点会自增 - index向所有Broker个数取模 - 自带容错策略 #### 其它实现 - SelectMessageQueueByHash - hash的是传入的args - SelectMessageQueueByRandom - SelectMessageQueueByMachineRoom 没有实现 也可以自定义**MessageQueueSelector**的实现 ``` MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg); ``` 可以自定义规则来选择mqs。 #### 如何知道mqs的,mqs数据从哪儿来的? producer.start()方法 - 启动Producer的时候会向NameServer发送心跳包 - 获取NameServer中的Topic列表 - 使用Topic向NameServer获取TopicRouteData **TopicRouteData**对象表示与某一个Topic有关系的Broker节点信息,内部包含多个QueueData对象(可以有多个Broker集群支持该Topic)和多个BrokerData信息(多个集群节点信息都在该列表中) Producer加工TopicRouteData对应的多节点信息后返回mqs。 #### Consumer端 客户端负载均衡 - 获取集群其他节点 - 当前节点消费哪些Queue - 负载粒度直到Message Queue - Consumer的数量最好和MessageQueue的数量对等或者是倍数,不然可能会有消费倾斜 - 每个Consumer通过balanced维护processQueueTable - processQueueTable为当前Consumer的消费Queue - processQueueTable中有 - ProcessQueue: 维护消费进度,从Broker中拉取回来的消息缓冲 - MessageQueue: 用来定位查找Queue DefaultMQPushConsumer默认使用AllocateMessageQueueAveragely(平均分配) #### 当消费负载均衡Consumer和Queue不对等的时候会发生什么? 平均分配 ![image-20200313171617553.png][1] 环形分配 ![image-20200313171645109.png][2] #### 负载均衡算法 平均分配策略(默认)(AllocateMessageQueueAveragely) 环形分配策略(AllocateMessageQueueAveragelyByCircle) 手动分配策略(AllocateMessageQueueByConfig) 机房分配策略(AllocateMessageQueueByMachineRoom) 一致性哈希分配策略(AllocateMessageQueueConsistentHash) 靠近机房策略(AllocateMachineRoomNearby) ### 消息丢失 #### sendResult Producer在发送同步/异步可靠消息后,会收到SendResult,表示发送成功,SendResult其中属性sendStatus表示了Broker是否真正完成了消息存储,当sendStatus="ok"的时候,应该重新发送消息,避免丢失。当producer.setRetryAnotherBrokerWhenNotStoreOK为true的时候,消息没有存储成功会发送给其他Broker。 ### 消息重复消费 影响消息正常发送和消费的重要原因是网络的不确定性。可能是因为Consumer首次启动引起重复消费,需要设置`consumer.setConsumeFromWhere`,只对一个新的ConsumeGroup第一次启动时有效,设置从头消费还是从维护开始消费。 #### 怎么保证投递出去的消息有且只有一条,不会出现重复数据? 绑定业务key #### 引起重复消费的原因 ACK 正常情况下在Consumer真正消费完消息后,应该发送ACK,通知Broker该消息已正常消费,从Queue中剔除,当ACK因为网络原因无法发送到Broker,Broker会认为这条消息没有被消费,此后会开启消息重投机制,把消息再次投递到Consumer。 Group 在CLUSTERING模式下,消息在Broker中会保证相同Group的Consumer消费一次,但是针对不同Group会推送多次。 #### 解决方案 数据库表 处理消息前,使用消息主键在表中带有约束的字段中insert Map 单机时,可以使用ConcurrentHashMap# putIfAbsent或GuavaCache做幂等 Redis 使用主键做set操作 ### 如何让RocketMQ保证消息的顺序消费 - 同一Topic - 同一Queue - 发消息的时候一个线程发 - 消费的时候一个线程消费一个Queue里的消息,或者使用MessageListenerOrderly - 多个Queue只能保证每个Queue里的顺序 #### 应用场景是什么? 应用场景和现实的生产业务绑定,避免在分布式系统中多端消费业务消息造成顺序混乱,比如需要严格按照顺序处理的数据或业务。 数据包装/清洗 ``` import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.HashMap; import java.util.Iterator; import java.util.Map; ``` 1. 去掉import 2. 统计某个字符出现次数 业务流程处理 1、返修过程 1. 收件录入信息 2. 信息核对 3. 送入检修系统处理 2、电商订单 1. 创建订单 2. 检查库存预扣库存 3. 支付 4. 真正扣库存 binlog的同步,sql执行顺序不能混乱 ### RocketMQ如何保证消息不丢失? 生产端如何保证投递出去的消息不丢失?消息在半路丢失或者在MQ内存中宕机丢失,此时如何基于MQ的功能保证消息不要丢失?MQ自身如何保证消息不丢失?消费端如何保证消费到的消息不丢失?如果处理到一半消费端宕机,导致消息丢失,此时怎么办? #### 解耦的思路 发送方: 发送消息时做消息备份(记日志或同步到数据库),判断SendResult是否正常返回 Broker: 节点保证 - master接受到消息后同步刷盘。保证了数据持久化到了本机磁盘中 - 同步写入Slave - 写入完成后返回SendResult Consumer: - 记日志 - 同步执行业务逻辑,最后返回ACK - 异常控制 磁盘保证: 使用Raid磁盘阵列保证数据磁盘安全 网络数据篡改: 内置TLS可以开启,默认使用crc32校验数据 ### 消息刷盘机制底层实现 每间隔10ms,执行一次数据持久化操作,两种方式:同步、异步。 ``` public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { this.waitForRunning(10); this.doCommit(); } catch (Exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); } } ``` ### RocketMQ的消息堆积如何处理 下游消费系统如果宕机了,导致几百万条消息在消息中间件里积压,此时怎么处理? 对于大规模消息发送接收可以使用pull模式,手动处理消息拉取速度,消费的时候统计消费时间以供参考。保证消息消费速度固定,即可通过上线更多Consumer临时解决消息堆积问题。 #### 如果Consumer和Queue不对等,上线了多台也在短时间内无法消费完堆积的消息怎么办? - 准备一个临时的Topic - Queue的数量是堆积的几倍 - Queue分布到多个Broker中 - 上线一台Consumer做消息的搬运工,把原来Topic中的消息挪到新的Topic里,不做业务逻辑处理,只是挪过去 - 上线N台Consumer同时消费临时Topic中的数据 - 恢复原来的Consumer,继续消费之前的Topic #### 堆积时间过长会不会超时? RocketMQ中的消息只会在CommitLog被删除的时候才会消失,不会超时。 #### 堆积的消息会不会进死信队列 不会,消息在消费失败后会进入重试队列(%RETRY%+consumergroup),多次(默认16)重试失败才会进入死信队列(%DLQ%+consumergroup) ### Rocket的底层架构原理,磁盘上的数据如何存储的,整体分布式架构是如何实现的? 有四个角色,Broker提供数据存储,Producer用来发送数据,Consumer用来消费数据,由于Broker是集群化的,具体向哪个Broker写入或消费,还需要NameServer进行服务发现, 底层原理: Broker: 磁盘存储,所有消息都是写到Broker的磁盘上,发消息再从磁盘中读出来,并不是通过内存。通过内存映射和NIO来保证效率,读写效率接近于内存。写入时顺序写入,效率很高,读取时是随机读取,效率偏低,可以使用SSD,也可以配置Page Cache提高读取效率。为什么不从内存直接推?首先,维护内存数据会造成额外开销,实际测试RocketMQ的磁盘效率并不比内存低太多,而且推消息的次数并不多(一次拉取多条),写入很频繁。最重要的是,多Group,没有消息超时概念,造成可能有很多CommitLog,不可能全部存到内存里。 所有消息都会写入到CommitLog里,每个Topic会单独生成index和consumequeue,通过consumequeue去定位消息在CommitLog中的位置。 ### 零拷贝技术如何运用的? 使用nio的MappedByteBuffer调起数据输出 ### RocketMQ分布式事务支持的底层原理 分布式系统中的事务可以使用TCC(Try、Confirm、Cancel)、2PC来解决。RocketMQ4.3+提供分布式事务功能,通过RocketMQ事务消息能达到分布式事务的最终一致 #### RocketMQ实现方式 Half Message: 预处理消息,当Broker收到此类消息后,会存储到RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中 检查事务状态: Broker会开启一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC队列中的消息,每次执行任务会向消息发送者确认事务执行状态(提交、回滚、未知),如果是未知,等待下一次回查。 超时: 如果超过回查次数,默认回滚消息 TransactionListener的两个方法: executeLocalTransaction: 半消息发送成功触发此方法来执行本地事务 checkLocalTransaction: broker将发送检查消息来检查事务状态,并将调用此方法来获取本地事务状态 本地事务执行状态: LocalTransactionState.COMMIT_MESSAGE: 执行事务成功,确认提交 LocalTransactionState.ROLLBACK_MESSAGE: 回滚消息,broker端会删除半消息 LocalTransactionState.UNKNOW: 暂时为未知状态,等待broker回查 ### 高吞吐量下如何优化生产者和消费者性能? #### 消费 - 同一Group下,多机部署,并行消费 - 单个Consumer提高消费线程个数 - 批量消费 - 消息批量拉取`consumer.setPullBatchSize()` - 业务逻辑批量处理`consumer.setConsumeMessageBatchMaxSize()` #### 运维 - 网卡调优 - jvm调优 - 多线程与cpu调优 - Page Cache ### RocketMQ是如何保证数据的高容错性的? - 在不开启容错的情况下,轮询队列进行发送,如果失败了,重试的时候过滤失败的Broker - 如果开启了容错策略,会通过RocketMQ的预测机制来预测一个Broker是否可用 - 如果上次失败的Broker可用,那么还是会选择该Broker的队列 - 如果上述情况失败,则随机选择一个进行发送 - 在发送消息的时候会记录一下调用的时间与是否报错,根据该时间去预测Broker的可用时间 [1]: https://www.princelei.club/usr/uploads/2020/07/1097837202.png [2]: https://www.princelei.club/usr/uploads/2020/07/618832060.png Last modification:July 7th, 2020 at 01:24 am © 允许规范转载