Loading... # 消息中间件之RocketMQ(三) ## Offset message queue是无限长的数组,一条消息进来下标就会长1,下标就是offset,消息在某个MessageQueue里的位置,通过offset的值可以定位到这条消息,或者指示Consumer从这条消息开始向后处理。 ### minOffset 当前下标最小值,也就是从哪里开始读。 ### maxOffset 当前下标最大值,也就是可以读到哪。maxOffset并不是最新的那条消息的offset,而是最新消息的offset+1。 ### consumerOffset 消费者消费的进度 ### diffTotal 消息积压/未被消费的消息数量 ## 消费者 在消费端,我们可以视情况来控制消费过程 - **DefaultMQPushConsumer**: 由系统自动控制过程 - **DefaultMQPullConsumer**: 大部分功能需要手动控制 注意,虽然一个叫push,一个叫pull,但是并不表示推送或拉取,本质上底层都是Consumer主动拉取的,Broker并不会主动推送。 ### 集群消息的负载均衡 在集群模式(clustering)下,相同的Group中的每个消费者只消费Topic中的一部分内容,Group中的所有消费者都参与消费过程,每个消费者消费的内容不重复,从而达到负载均衡的效果。使用DefaultMQPushConsumer,新启动的消费者自动参与负载均衡。 ### ProcessQueue ProcessQueue,是一个消息快照,即消息消费端保存的消息队列镜像,提供很多功能,负载均衡、消息拉取、消费状态处理、offset提交,控制着整个消费的脉搏,尤其在顺序消费中参与更多。 ### Consumer与Borker的连接方式 RocketMQ采用长轮询的方式进行连接 原因: - Consumer的处理能力Broker不知道 - 直接推送消息Broker端压力较大 - 采用长连接有可能Consumer不能及时处理推送过来的数据 - pull主动权在Consumer手里 **短轮询** cient不断发送请求到server,每次都需要重新建立连接 **长轮询** clent发送请求到server,server有数据就返回,没有就将请求挂起,等到有数据再返回。 **长连接** 连接一旦建立,就不会断开,server可以主动推送消息到client。 ## 消息存储 ![rocketmq消息存储.jpg][1] RocketMQ使用文件系统来持久化消息,性能要比使用DB产品高,**省去DB层提高性能**,性能瓶颈在磁盘I/O上,可以更换更好的磁盘接口和协议,如**M.2接口 NVME磁盘存储协议** ### 数据零拷贝 很多使用文件系统存储的高性能中间件都使用了零拷贝技术来发送文件数据,比如Nginx。传统I/O操作,需要JVM向内核去请求数据,内核需要从磁盘将数据拷贝到内核中,然后再发送给JVM,然后JVM再传给网卡发送出去,数据经历了磁盘-->内核-->本地进程-->网卡三次拷贝。而零拷贝技术是,进程直接发一个信号给内核,内核从磁盘将数据读取出来,直接交给网卡,避免了拷贝到进程内存中,提高了性能。 #### 内存映射MappedByteBuffer API 1. MappedByteBuffer使用虚拟内存,因此分配(map)的内存大小不受JVM的-Xmx等参数限制,但是也是有大小限制的。 2. 如果当文件超出1.5G限制时,可以通过position参数重新map文件后面的内容。 3. MappedByteBuffer在处理大文件时的确性能很高,但也存在一些问题,如内存占用,文件关闭不确定,被其打开的文件只有在垃圾回收时才会被关闭,而且这个时间点是不确定的。javadoc中也提到: **A mapped byte buffer and the file mapping that it represents remain valid until the buffer itself is garbage-collected.** 所以为了使用零拷贝技术,RocketMQ的文件存储大小默认每个1G,超过1G会重新建立一个新文件。 ### 存储结构 #### CommitLog 存储消息的详细内容,按照消息收到的顺序,所有消息都存储在一起。每个消息存储后都会有一个offset,代表在commitLog中的偏移量,默认配置`MessageStoreConfig`类中。 核心方法 - putMessage 写入消息 CommitLog内部结构 一个MappedFlieQueue对应N个MappedFile,因为MappedFile的大小限制,注定一个文件不能保存所有数据,因此需要MappedFlieQueue来管理这些MappedFile,每次写入时,都会从队尾拿到最后一个MappedFile进行。 MappedFile的默认大小是1G ``` // CommitLog file size,default is 1G private int mappedFileSizeCommitLog = 1024 * 1024 * 1024; ``` #### ConsumerQueue 通过消息偏移量建立的消息索引,针对每个Topic创建,消费逻辑队列,存储位置信息,用来快速定位CommitLog中的数据位置。启动后会被加载到内存中,加快查找消息速度。以Topic作为文件名称,每个Topic下又以Queue ID作为文件夹分组。 默认大小 ``` // ConsumeQueue extend file size, 48M private int mappedFileSizeConsumeQueueExt = 48 * 1024 * 1024; ``` #### indexFile 消息的Key和时间戳索引 #### 存储路径配置 默认文件会存储在家目录下`/root/store/` ![01.png][2] #### config 以json格式存储消费信息 consumerFilter.json: 消息过滤器 consumerOffset.json: 客户端消费进度 delayOffset.json: 延迟消息进度 subscriptionGroup.json: group的订阅数据 topics.json: Topic的配置信息 ### 刷盘机制 在CommitLog初始化时,判断配置文件,加载相应的service ``` if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { // 异步刷盘 this.flushCommitLogService = new GroupCommitService(); } else { // 同步刷盘 this.flushCommitLogService = new FlushRealTimeService(); } ``` 写入时消息会不会分隔到两个MappedFile中? ``` // Determines whether there is sufficient free space if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { this.resetByteBuffer(this.msgStoreItemMemory, maxBlank); // 1 TOTALSIZE this.msgStoreItemMemory.putInt(maxBlank); // 2 MAGICCODE 不够放下一个消息的时候,用魔术字符代替 this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE); // 3 The remaining space may be any value // Here the length of the specially set maxBlank final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank); return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); } ``` 可以看到,当message长度加上`END_FILE_MIN_BLANK_LENGTH(8字节)`大于`maxBlank(当前MappedFile剩余空间时)`,会在结尾写入maxBlank和魔术字符,每个MappedFile都需要确保预留8字节的大小,来保存文件文件剩余空间(4字节)和魔术字符(4字节),魔术字符表示文件结尾,当读到这个字符,表示文件已结束,返回结果中状态为`END_OF_FILE`,表示文件已结束,上级方法case到该状态会创建新的文件,保证一个Message不会跨越两个Commitlog。 #### 同步刷盘 消息被Broker写入磁盘后再给Producer响应。 #### 异步刷盘 消息被Broker写入内存后立即给Producer响应,当内存中消息堆积到一定程度的时候,一次性写入磁盘。 #### 配置选项 默认是异步刷盘,可以修改为同步 ![02.png][3] [1]: https://www.princelei.club/usr/uploads/2020/07/237526071.jpg [2]: https://www.princelei.club/usr/uploads/2020/07/2621585810.png [3]: https://www.princelei.club/usr/uploads/2020/07/1552871376.png Last modification:July 3rd, 2020 at 11:54 pm © 允许规范转载