Loading... # 消息中间件之ActiveMQ(二) ## 事务 在创建session时,通过 `connection.createSession(true, Session.SESSION_TRANSACTED);`第一个参数传入true表示开启事务,开启事务之后,发送的消息只有在调用 `session.commit();`提交之后,才能被消费,可以逐条提交,也可以批量提交。当发生异常时,可以使用 `session.rollback();`将未提交的事务回滚。 ## 签收模式 签收代表接收端的session已收到消息的一次确认,反馈给broker。ActiveMQ支持自动签收与手动签收。 通过 `connection.createSession(false, Session.AUTO_ACKNOWLEDGE);`的第二个参数来设置签收模式,注意不能开启事务,如果开启了事务,即第一个参数是true,那么第二个参数无论是什么都会被强制替换成Session.SESSION_TRANSACTED。 **Session.AUTO_ACKNOWLEDGE** 当客户端从receiver或onMessage成功返回时,Session自动签收客户端的这条消息。 **Session.CLIENT_ACKNOWLEDGE** 客户端通过调用消息(Message)的acknowledge方法签收消息。在这种情况下,签收发生在Session层面:签收一个已经消费的消息会自动地签收这个Session已消费的所有消息。 **Session.DUPS_OK_ACKNOWLEDGE** Session不必确保对传送消息的签收,这个模式可能会引起消息的重复,但是降低了Session的开销,所以只有客户端能容忍重复的消息,才可使用。 ## 持久化 默认持久化是开启的,通过设置DeliveryMode.NON_PERSISTENT关闭 ``` producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT) ``` ## 优先级 可以打乱消费顺序 ``` producer.setPriority ``` 设置一个从1-9的值,默认是4,值越大就越会优先被消费。需要配置开启优先级的Destination。 ``` <policyEntry queue="queue1" prioritizedMessages="true" /> ``` ## 消息超时/过期 ``` producer.setTimeToLive ``` 设置消息超时,单位毫秒,消息超时后,消费端无法消费这些消息。 设置了超时时间的消息,超时之后,会进入死信队列,可以从死信队列中重新消费。 ## 死信 设置了超时时间的消息,超时后会进入到**ActiveMQ.DLQ**队列,并且不会自动清除,称为死信。该队列有消息堆积的风险。 ### 修改死信队列名称 ``` <policyEntry queue="f" prioritizedMessages="true" > <deadLetterStrategy> <individualDeadLetterStrategy queuePrefix="DLxxQ." useQueueForQueueMessages="true" /> </deadLetterStrategy> </policyEntry> ``` useQueueForQueueMessages: 设置使用队列保存死信,还可以设置useQueueForTopicMessages,使用Topic来保存死信。 ### 让非持久化消息也进入死信队列 非持久化消息默认是不会进入死信队列的,可以通过修改配置来改变它 ``` <individualDeadLetterStrategy queuePrefix="DLxxQ." useQueueForQueueMessages="true" processNonPersistent="true" /> ``` `processNonPersistent="true"`表示非持久化也会进入死信队列。 ### 过期消息不进死信队列 ``` <individualDeadLetterStrategy processExpired="false" /> ``` 就算消息超时了,也不会进入死信队列,这里我改完之后,发现消息发送出去直接丢失,并且没有任何记录,改了好多东西都没有搞好,将 `producer.setTimeToLive(6000);`注释掉,发现发送成功了,苦思冥想,突然想起可能是时间不匹配,查看服务器时间,发现果然与本地时间不同步,导致消息发送的时候就已经过期了,由于设置了不进入死信,所以就直接蒸发了。 ## 独占消费者 ``` Queue queue = session.createQueue("user?consumer.exclusive=true"); ``` 设置 `consumer.exclusive=true`参数,表示该消费者为独占消费者,也就是在该消费者消费这个Destination的消息时,其他消费者都不能消费,除非中途该消费者下线或者挂掉,主动让出消费权,其他消费者才可以消费。 还可以设置优先级 ``` Queue queue = session.createQueue("user?consumer.exclusive=true&consumer.priority=10"); ``` ## 消息类型 ### object 发送端 ``` Girl girl = new Girl("qiqi",25,398.0); Message message = session.createObjectMessage(girl); ``` 接收端 ``` if(message instanceof ActiveMQObjectMessage) { Girl girl = (Girl)((ActiveMQObjectMessage)message).getObject(); System.out.println(girl); System.out.println(girl.getName()); } ``` 如果遇到此类报错 ``` Exception in thread "main" javax.jms.JMSException: Failed to build body from content. Serializable class not available to broker. Reason: java.lang.ClassNotFoundException: Forbidden class com.mashibing.mq.Girl! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes. at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:36) at org.apache.activemq.command.ActiveMQObjectMessage.getObject(ActiveMQObjectMessage.java:213) at com.mashibing.mq.Receiver.main(Receiver.java:65) Caused by: java.lang.ClassNotFoundException: Forbidden class com.mashibing.mq.Girl! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes. at org.apache.activemq.util.ClassLoadingAwareObjectInputStream.checkSecurity(ClassLoadingAwareObjectInputStream.java:112) at org.apache.activemq.util.ClassLoadingAwareObjectInputStream.resolveClass(ClassLoadingAwareObjectInputStream.java:57) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) at org.apache.activemq.command.ActiveMQObjectMessage.getObject(ActiveMQObjectMessage.java:211) ... 1 more ``` 需要添加信任 在消费者中添加 ``` connectionFactory.setTrustedPackages(Collections.singletonList(Girl.class.getPackage().getName())); ``` 因为ActiveMQ默认不相信对象实现了Serializable接口,必须手动添加信任。 ### bytesMessage 字节流,常用来发送图片以及小文件 发送端 ``` BytesMessage bytesMessage = session.createBytesMessage(); bytesMessage.writeBytes("str".getBytes()); bytesMessage.writeUTF("哈哈"); ``` 接收端 ``` if(message instanceof BytesMessage) { BytesMessage bm = (BytesMessage)message; byte[] b = new byte[1024]; int len = -1; while ((len = bm.readBytes(b)) != -1) { System.out.println(new String(b, 0, len)); } } ``` 还可以使用ActiveMQ提供的便捷方法,但要注意读取和写入顺序,否则会发生不可预知的错误 ``` bm.readBoolean(); bm.readUTF(); ``` 写入文件 ``` FileOutputStream out = null; try { out = new FileOutputStream("d:/aa.txt"); } catch (FileNotFoundException e2) { e2.printStackTrace(); } byte[] by = new byte[1024]; int len = 0 ; try { while((len = bm.readBytes(by))!= -1){ out.write(by,0,len); } } catch (Exception e1) { e1.printStackTrace(); } ``` ### MapMessage key/value形式的消息 发送端 ``` MapMessage mapMessage = session.createMapMessage(); mapMessage.setString("name","lucy"); mapMessage.setBoolean("yihun",false); mapMessage.setInt("age", 17); producer.send(mapMessage); ``` 接收端 ``` Message message = consumer.receive(); MapMessage mes = (MapMessage) message; System.out.println(mes); System.out.println(mes.getString("name")); ``` ## 消息发送原理 ### 同步与异步 | | 开启事务 | 关闭事务 | | :-: | :-: | :-: | | 持久化 | 异步 | 同步 | | 非持久化 | 异步 | 异步 | 开启事务的时候,因为有commit确认机制,不太需要给发送端反馈,所以,无论持久化还是非持久化都是异步的。关闭事务的时候,因为开启持久化,表示尽量不想让消息丢失,所以,需要落地之后给发送端一个反馈,因此是同步的。非持久化时,表示消息可能不那么重要,发送端只管异步发出去,也就不需要考虑是否收到的问题了。 我们可以通过以下几种方式来设置改变默认的同步异步发送 ``` ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "admin", "admin", "tcp://localhost:61616" ); // 通过connectionFactory 设置 connectionFactory.setUseAsyncSend(true); // 通过connection设置,因为createConnection()默认返回的是jms的connection,因此需要强转成ActiveMQ的 ActiveMQConnection connection = (ActiveMQConnection)connectionFactory.createConnection(); connection.setUseAsyncSend(true); ``` `setUseAsyncSend();`或 `setUseAsyncSend();`传入true表示异步执行,false表示同步执行。 同步: 发送端发送消息后,必须等待消息队列返回ACK指令后,才能进行后续操作。 异步:发送端发送消息后,无需等待消息队列返回ACK指令,直接可以进行后续操作,需要手动实现SendCallback,在回调中接收消息队列的响应。 ### 消息堆积 ProducerWindowSize是在异步发送消息时,收到broker确认之前,允许发送的最大字节数。producer每发送一个消息,统计一下发送的字节数,当字节数达到ProducerWindowSize值时,需要等待broker的确认,才能继续发送。 使用窗口尺寸来约束在异步发送时producer端允许积压的(尚未ACK)的消息的尺寸,且只对异步发送有意义。每次发送消息之后,都将会导致memoryUsage尺寸增加(+message.size),当broker返回producerAck时,memoryUsage尺寸减少(producerAck.size,此size表示先前发送消息的大小)。 brokerUrl中设置: `tcp://localhost:61616?jms.producerWindowSize=1048576` destinationUri中设置: `myQueue?producer.windowSize=1048576` ### 延迟消息投递 schedulerSupport="true" 首先在配置文件中开启延迟和调度 ``` <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true"> ``` ### 延迟发送 ``` message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 10*1000); ``` ### 带间隔的重复发送 ``` // 延迟 10s 发送 long delay = 10 * 1000; // 每 2s 重复发送一次 long period = 2 * 1000; // 重复发送9次 不包括第一次 该值为一个int值 int repeat = 9; message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period); message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat); createProducer.send(message); ``` ### Cron表达式定时发送 Cron表达式是一个字符串,字符串以5或6个空格隔开,分为6或7个域,每一个域代表一个含义,Cron有如下两种语法格式: Seconds Minutes Hours DayofMonth Month DayofWeek Year或 Seconds Minutes Hours DayofMonth Month DayofWeek 每一个域可出现的字符如下: Seconds: 可出现", - * /"四个字符,有效范围为0-59的整数 Minutes: 可出现", - * /"四个字符,有效范围为0-59的整数 Hours: 可出现", - * /"四个字符,有效范围为0-23的整数 DayofMonth: 可出现", - * / ? L W C"八个字符,有效范围为0-31的整数 Month: 可出现", - * /"四个字符,有效范围为1-12的整数或JAN-DEC DayofWeek: 可出现", - * / ? L C # "八个字符,有效范围为1-7的整数或SUN-SAT两个范围。1表示星期天,2表示星期一, 依次类推 Year:可出现", - * /"四个字符,有效范围为1970-2099年 每一个域都使用数字,但还可以出现如下特殊字符,它们的含义是: (1)*: 表示匹配该域的任意值,假如在Minutes域使用*, 即表示每分钟都会触发事件。 (2)?: 只能用在DayofMonth和DayofWeek两个域。它也匹配域的任意值,但实际不会。因为DayofMonth和DayofWeek会相互影响。例如想在每月的20日触发调度,不管20日到底是星期几,则只能使用如下写法: 13 13 15 20 * ?, 其中最后一位只能用?,而不能使用*,如果使用*表示不管星期几都会触发,实际上并不是这样。我的理解是,*表示任意值,而?表示我不关心它是什么值,每月的20号,我不关心它星期几。 (3)-: 表示范围,例如在Minutes域使用5-20,表示从5分到20分钟每分钟触发一次 (4)/: 表示起始时间开始触发,然后每隔固定时间触发一次,例如在Minutes域使用5/20,则意味着5分触发一次,然后25,45分各触发一次。 (5),: 表示列出枚举值值。例如:在Minutes域使用5,20,则意味着在5和20分各触发一次。 (6)L: 表示最后,只能出现在DayofWeek和DayofMonth域,如果在DayofWeek域使用5L,意味着在最后的一个星期四触发。 (7)W: 表示有效工作日(周一到周五),只能出现在DayofMonth域,系统将在离指定日期的最近的有效工作日触发事件。例如:在 DayofMonth使用5W,如果5日是星期六,则将在最近的工作日:星期五,即4日触发。如果5日是星期天,则在6日(周一)触发;如果5日在星期一到星期五中的一天,则就在5日触发。另外一点,W的最近寻找不会跨过月份。 (8)LW: 这两个字符可以连用,表示在某个月最后一个工作日,即最后一个星期五。 (9)# : 用于确定每个月第几个星期几,只能出现在DayofWeek域。例如在4# 2,表示某月的第二个星期三。 举几个例子: 0 0 2 1 * ? * 表示在每月的1日的凌晨2点调度任务 0 15 10 ? * MON-FRI 表示周一到周五每天上午10:15执行作业 0 15 10 ? * 6L 2002-2006 表示2002-2006年的每个月的最后一个星期五上午10:15执行作 一个cron表达式有至少6个(也可能7个)有空格分隔的时间元素。 按顺序依次为 秒(0\~59) 分钟(0\~59) 小时(0\~23) 天(月)(0\~31,但是你需要考虑你月的天数) 月(0\~11) 天(星期)(1~7 1=SUN 或 SUN,MON,TUE,WED,THU,FRI,SAT) 年份(1970-2099) 其中每个元素可以是一个值(如6),一个连续区间(9-12),一个间隔时间(8-18/4)(/表示每隔4小时),一个列表(1,3,5),通配符。由于"月份中的日期"和"星期中的日期"这两个元素互斥的,必须要对其中一个设置? 0 0 10,14,16 * * ? 每天上午10点,下午2点,4点 0 0/30 9-17 * * ? 朝九晚五工作时间内每半小时 0 0 12 ? * WED 表示每个星期三中午12点 0 0 12 * * ? 每天中午12点触发 0 15 10 ? * * 每天上午10:15触发 0 15 10 * * ? 每天上午10:15触发 0 15 10 * * ? * 每天上午10:15触发 0 15 10 * * ? 2005 2005年的每天上午10:15触发 0 * 14 * * ? 在每天下午2点到下午2:59期间的每1分钟触发 0 0/5 14 * * ? 在每天下午2点到下午2:55期间的每5分钟触发 0 0/5 14,18 * * ? 在每天下午2点到2:55期间和下午6点到6:55期间的每5分钟触发 0 0-5 14 * * ? 在每天下午2点到下午2:05期间的每1分钟触发 0 10,44 14 ? 3 WED 每年三月的星期三的下午2:10和2:44触发 0 15 10 ? * MON-FRI 周一至周五的上午10:15触发 0 15 10 15 * ? 每月15日上午10:15触发 0 15 10 L * ? 每月最后一日的上午10:15触发 0 15 10 ? * 6L 每月的最后一个星期五上午10:15触发 0 15 10 ? * 6L 2002-2005 2002年至2005年的每月的最后一个星期五上午10:15触发 0 15 10 ? * 6# 3 每月的第三个星期五上午10:15触发 ## 监听器 可以使用监听器来处理消息接收 ``` consumer.setMessageListener(new MyListener()); ``` 需要实现接口MessageListener ``` public class MyListener implements MessageListener { public void onMessage(Message message) { // TODO Auto-generated method stub TextMessage textMessage = (TextMessage)message; try { System.out.println("message" + textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } ``` 当收到消息后会调起onMessage方法 ## 消息过滤 **消息发送** ``` MapMessage msg1 = session.createMapMessage(); msg1.setString("name", "qiqi"); msg1.setString("age", "18"); msg1.setStringProperty("name", "qiqi"); msg1.setIntProperty("age", 18); MapMessage msg2 = session.createMapMessage(); msg2.setString("name", "lucy"); msg2.setString("age", "18"); msg2.setStringProperty("name", "lucy"); msg2.setIntProperty("age", 18); MapMessage msg3 = session.createMapMessage(); msg3.setString("name", "qianqian"); msg3.setString("age", "17"); msg3.setStringProperty("name", "qianqian"); msg3.setIntProperty("age", 17); ``` **消息接收** ``` String selector1 = "age > 17"; String selector2 = "name = 'lucy'"; MessageConsumer consumer = session.createConsumer(queue,selector2); ``` 通过发送消息时,设置property头信息,接收时,在MessageConsumer中传入selector选择器,会自动读取头信息中配置的属性,selector符合sql规范。 Last modification:November 27th, 2020 at 09:58 pm © 允许规范转载