Loading... # 消息中间件之RocketMQ(二) ## 消息消费模式 消息消费模式由消费者来决定,可以由消费者设置MessageModel来决定消费模式。 消费模式默认为集群模式 ``` consumer.setMessageModel(MessageModel.BROADCASTING); //广播模式 consumer.setMessageModel(MessageModel.CLUSTERING); //集群模式 ``` 对于同一个Topic必须保证所有消费者的消费模式是一致的,否则会出现问题。 ### 集群消息 ![160707_kSpS_1469576.png][1] 集群消息是指**集群化部署消费者** 当使用集群消费模式时,MQ认为任意一条消息只需要被集群内的任意一个消费者处理即可。相当于ActiveMQ的Queue,即P2p模式,但是与ActiveMQ不同的是,它有一个集群的概念,也就是Group,RocketMQ是按组消费,一个Group中可以有多个Consumer。同时又可以有多个Group都订阅一个Topic,broker会向每一个Group都推送一次消息,每个Group中只能有一个Consumer消费成功,如果消费失败,重投时,有可能会投给相同Group下的另一个Consumer。也就是说,所有Group都可以消费一次Topic中的消息,因为每个Group对应不同的业务,在实际使用中,需要避免因Group不同导致的重复消费。 #### 特点 - 每条消息只需要被处理一次,Broker只会把消息发送给消费集群中的一个消费者 - 在消息重投时,不能保证路由到同一台机器上 - 消息状态由Broker维护 ### 广播消息 ![160902_4AOI_1469576.png][2] 当使用广播消费模式时,MQ会将每条消息推送给集群内所有注册过的客户端,保证消息至少被每台机器消费一次。相当于ActiveMQ的Topic也就是PUB/SUB模式。 #### 特点 - 消费状态由Consumer维护 - 保证每个消费者消费一次消息 - 消费失败的消息不会重投 ## 消息发送方式 ### 同步消息 消息发送时进入同步等待状态,直到Broker返回确认消息,可以保证消息投递一定到达 ``` SendResult sendResult = producer.send(message); ``` 线程会在这里阻塞等待SendResult的返回。 ### 异步消息 想要快速发送消息,又不想消息丢失,可以使用异步消息,也叫做异步可靠消息 ``` producer.send(message,new SendCallback() { public void onSuccess(SendResult sendResult) { // TODO Auto-generated method stub System.out.println("ok"); } public void onException(Throwable e) { // TODO Auto-generated method stub e.printStackTrace(); System.out.println("err"); } }); ``` 采用事件监听的方式,SendResult会在异步回调中返回。 ### 单向消息 只发送消息,不等待服务器的响应。此方式发送消息的过程耗时非常短,一般在微秒级别,但是不保证消息的到达性。 ## 批量发送消息 可以多条消息打包一起发送,减少网络传输次数,提高效率。 `producer.send(Collection c)`方法可以接受一个集合,实现批量发送 ``` List<Message> messages = Arrays.asList(message1, message2, message3); SendResult sendResult = producer.send(messages); ``` - 批量消息要求必须要具有同一Topic与相同的消息配置 - 不支持延时消息 - 建议一个批量消息最好不要超过1MB大小 - 如果不确定是否超过限制,可以手动计算大小分批发送 ## TAG 可以使用tag来过滤消息。 在Producer中添加Tag: ``` Message msg = new Message("TopicTest","TagA" ,("Hello RocketMQ " ).getBytes(RemotingHelper.DEFAULT_CHARSET)); ``` 在Consumer中订阅Tag: ``` consumer.subscribe("TopicTest", "TagA||TagB");// * 代表订阅Topic下的所有消息 ``` 注意,这里被过滤掉的消息会被Broker标记为`CONSUMED_BUT_FILTERED`状态,即被消费,但是被过滤。也就是说,就算消息被当前消费者过滤,没有处理,它的状态也是被消费的。 ## SQL表达式过滤 应用于更复杂的筛选过滤场景,利用SQL表达式,筛选出感兴趣的消息 ### 配置 在`conf/broker.conf`中添加配置 ``` enablePropertyFilter=true ``` 启动broker加载指定配置文件 ``` ./mqbroker -n localhost:9876 -c ../conf/broker.conf ``` 随后在集群配置中可以看到 ![image-20200219174859476.png][3] ### 实例 生产者 ``` message.putUserProperty("order", "7"); ``` 消费者 ``` MessageSelector selector = MessageSelector.bySql("order > 5"); consumer.subscribe("testSelector", selector); ``` ### 语法 RocketMQ只定义了一些基本的语法来支持这个功能: 1. 数字比较,像'>','>=','<','<=',BETWEEN,'=' 2. 字符比较,像'=','<>','IN' 3. `IS NULL`或者`IS NOT NULL` 4. 逻辑运算AND,OR,NOT 常量类型是: 1. 数字,像123,3.1415 2. 字符串,像'abc',必须使用单引号 3. NULL,特殊常数 4. 布尔常量,TRUE或FALSE 这里被过滤掉的消息也会被Broker标记为`CONSUMED`状态,即已经被消费。 ## 延迟消息 RocketMQ可以使用**messageDelayLevel**来设置延迟投递 ### 默认配置为 ``` messageDelayLevel 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h ``` 可以在`conf/broker.conf`中添加配置 ``` messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h ``` 这个配置项配置了从1级开始,各级延时的时间,可以修改指定级别的延时时间,时间单位支持:s、m、h、d,分别表示秒、分、时、天。 ### 使用 ``` message.setDelayTimeLevel(1); ``` 入参为配置中对应的延时级别。 ## 顺序消费 队列先天支持FIFO模型,即先进先出,单一生产者消费者下,本身就是有序的。但是在RocketMQ中,Topic是一个逻辑上的概念,其中包含多个Queue(默认是4个),一个队列可以保证顺序,多个队列就无法保证了。因此,要实现顺序性,在生产者层面要保证同一个Topic,同一个Queue,并且保证发送的有序性(同一个线程)。所以,发送时需要指定具体发送到哪一个Queue中。 RocketMQ提供了如下的send()实现 ``` producer.send(message,messageQueueSelector,arg,time); ``` 其中第二个参数需要传入一个`MessageQueueSelector`实现,即队列选择器,指定将消息发送到哪个队列中。 它有三个默认实现 ``` producer.send(message,new SelectMessageQueueByHash(),1); // 根据传入的arg做hash运算并取模,返回一个队列 producer.send(message,new SelectMessageQueueByRandom(),1); // 随机返回一个队列 producer.send(message,new SelectMessageQueueByMachineRoom(),1); // 按机房实现,不完整实现,需要自己实现 ``` 也可以手动实现MessageQueueSelector接口 ``` SendResult sendResult = producer.send(message, // queue 选择器,向topic中的哪个queue去写消息 new MessageQueueSelector() { // 手动选择一个Queue @Override public MessageQueue select( // 当前topic 里面包含的所有Queue List<MessageQueue> mqs, // 具体要发的那条消息 Message message, // 对应send()方法的arg Object arg) { // 向固定的一个queue里写消息 // 选好的queue return mqs.get((int) arg); } }, // 自定义参数 传入MessageQueue 回调的arg 0, // 发送超时时间 3000); ``` 在不开启事务的情况下,Producer默认就是单线程发送,开启事务的时候,需要手动指定线程池。 ``` producer.setExecutorService(); ``` 对于消费者,需要保证同一个Queue,同一个线程消费。 消费者提供了两种监听器: 1. MessageListenerConcurrently: 并发消费监听器,开启多个线程,并行消费,不能保证有序 2. MessageListenerOrderly: 对每个Queue开启一个线程,可以保证每个Queue有序 MessageListenerOrderly只能保证每个Queue的顺序,但是如果同时消费多个Queue,依然会开启多个线程,这样就会导致在每个Queue下是有序的,但是全局是乱序的。强制单线程是否可以保证全局顺序? ``` consumer.setConsumeThreadMax(1); // 最大消费线程数 consumer.setConsumeThreadMin(1); // 最小消费线程数 ``` 设置了之后,是可以保证挨个Queue去消费,但是不能保证Queue之间的顺序。 所以,如果正在处理全局顺序是强制性的场景,需要确保主题只使用一个消息队列。 跟普通消息相比,顺序消息的使用需要在Producer的send()方法中添加MessageQueueSelector接口的实现类,并重写select选择使用的队列,因为顺序消息是局部顺序,需要将所有消息指定发送到同一队列中。 ### 保证有序参与因素 - FIFO - 队列内保证有序 - 消费线程,单线程消费 ## 重试机制 ### Producer 默认超时时间 ``` /** * Timeout for sending messages. */ private int sendMsgTimeout = 3000; ``` ``` // 异步发送时 重试次数,默认 2 producer.setRetryTimesWhenSendAsyncFailed(1); // 同步发送时 重试次数,默认 2 producer.setRetryTimesWhenSendFailed(1); // 是否向其他broker发送请求 默认false producer.setRetryAnotherBrokerWhenNotStoreOK(true); ``` `producer.setRetryAnotherBrokerWhenNotStoreOK`当消息发送失败时,向其它broker发送。默认是false。 ### Consumer 消费超时,单位分钟 ``` consumer.setConsumeTimeout() ``` 如果超时不发送ACK,按消费失败处理。等同于`RECONSUME_LATER`即过一段时间尝试重投 ### Broker投递 只有在消息模式为MessageModel.CLUSTERING集群模式时,Broker才会自动进行重投,广播消息不重投,重投使用`messageDelayLevel` 默认值 ``` messageDelayLevel 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h ``` 默认使用的是级别3,也就是延迟10S。 ## 事务消息 分布式系统中通常使用TCC(Try,Confirm,Cancel)、2PC来解决事务问题,RocketMQ4.3+提供分布式功能,通过RocketMQ事务,消息能达到最终一致。RocketMQ采取2PC(两阶段提交)来处理分布式事务。 ### RocketMQ实现方式 **Half Message:** 预处理消息,当Broker收到此类消息后,会存储到RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中,该消息会持久化到本地磁盘,但是不能被消费者消费到。 **检查事务状态:** Broker会开启一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC队列中的消息,每次执行任务会向消息发送者确认事务执行状态(提交、回滚、未知),如果是未知,等待下一次回查。 **超时:** 如果消息超过回查次数,默认回滚消息。 ### TransactionListener的两个方法 #### executeLocalTransaction 半消息(Half Message)发送成功触发此方法来执行本地事务,也就是事务需要全部在此方法中处理。 #### checkLocalTransaction Broker将发送检查消息,来检查事务状态,检查时会调用此方法,需要在此方法中实现返回给Broker的事务状态逻辑。 #### 本地事务执行状态 `LocalTransactionState.COMMIT_MESSAGE`: 执行事务成功,确认提交。 `LocalTransactionState.ROLLBACK_MESSAGE`: 回滚消息,Broker会删除Half Message。 `LocalTransactionState.UNKNOW`: 暂时未知状态,等待Broker下一次回查。 ``` producer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message message, Object arg) { try { // 具体事务逻辑 return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { return LocalTransactionState.ROLLBACK_MESSAGE; } } @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { return LocalTransactionState.UNKNOW; } }); ``` [1]: https://www.princelei.club/usr/uploads/2020/06/1114748961.png [2]: https://www.princelei.club/usr/uploads/2020/06/591407523.png [3]: https://www.princelei.club/usr/uploads/2020/06/3357979539.png Last modification:June 30th, 2020 at 09:26 pm © 允许规范转载