Loading... # 消息中间件之ActiveMQ(三) ## NIO配置 ActiveMQ默认配置的是tcp,使用的是bio: ``` <transportConnector name="openwire" uri="tcp://0.0.0.0:616166 maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> ``` nio是隐藏选项,是基于TCP的 http://activemq.apache.org/configuring-version-5-transports 修改activemq.xml配置,在 `<transportConnectors>`标签下添加: ``` <transportConnector name="nio" uri="nio://0.0.0.0:61617"/> ``` 客户端就可以使用nio方式连接了。 ``` ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "admin", "admin", "nio://localhost:61617" ); ``` 但是这样做的话,只有nio协议使用了nio,它替代了openwire的tcp,但实际生产中,不能保证所有人都使用nio协议连接,其他网络协议(例如AMQP,MQTT,Stomp等)也具有自己的NIO传输实现。比如: ``` <transportConnector name="mqtt+nio" uri="mqtt+nio://0.0.0.0:1883> ``` 在原来的协议上面+nio就开启了nio传输,但是activemq支持很多种协议,都需要配置上+nio? 实际上active提供了一种auto自动选择协议的方式: ``` <transportConnector name="auto" uri="auto://0.0.0.0:5671"/> ``` 这样无论客户端使用哪种协议,都可以连接到5671端口,borker都会解析成对应的协议,有了自动协议检测,我们只需要配置 ``` <transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:5671"/> ``` 就为所有协议添加了nio实现。 ## OpenWire 可用配置选项 | Option | Default | Description | | :-: | :-: | :-: | | cacheEnabled | true | Should commonly repeated values be cached so that less marshaling occurs? | | cacheSize | 1024 | When`cacheEnabled=true` then this parameter is used to specify the number of values to be cached. | | maxInactivityDuration | 30000 | The maximum[inactivity](http://activemq.apache.org/activemq-inactivitymonitor) duration (before which the socket is considered dead) in milliseconds. On some platforms it can take a long time for a socket to die. Therefore allow the broker to kill connections when they have been inactive for the configured period of time. Used by some transports to enable a keep alive heart beat feature. Inactivity monitoring is disabled when set to a value `<= 0`. | | maxInactivityDurationInitalDelay | 10000 | The initial delay before starting[inactivity](http://activemq.apache.org/activemq-inactivitymonitor) checks. Yes, the word `'Inital'` is supposed to be misspelled like that. | | maxFrameSize | MAX_LONG | Maximum allowed frame size. Can help help prevent OOM DOS attacks. | | sizePrefixDisabled | false | Should the size of the packet be prefixed before each packet is marshaled? | | stackTraceEnabled | true | Should the stack trace of exception that occur on the broker be sent to the client? | | tcpNoDelayEnabled | true | Does not affect the wire format, but provides a hint to the peer that`TCP_NODELAY` should be enabled on the communications Socket. | | ightEncodingEnabled | true | Should wire size be optimized over CPU usage? | ## Trancport 可用配置选项 | Option Name | Default Value | Description | | :-: | :-: | :-: | | backlog | 5000 | Specifies the maximum number of connections waiting to be accepted by the transport server socket. | | closeAsync | true | If`true`the socket close call happens asynchronously. This parameter should be set to **`false`** for protocols like STOMP, that are commonly used in situations where a new connection is created for each read or write. Doing so ensures the socket close call happens synchronously. A synchronous close prevents the broker from running out of available sockets owing to the rapid cycling of connections. | | connectionTimeout | 30000 | If`>=1` the value sets the connection timeout in milliseconds. A value of **`0`** denotes no timeout. Negative values are ignored. | | daemon | false | If`true` the transport thread will run in daemon mode. Set this parameter to **`true`** when embedding the broker in a Spring container or a web container to allow the container to shut down correctly. | | dynamicManagement | false | If`true` the **`TransportLogger`** can be managed by JMX. | | ioBufferSize | 8*1024 | Specifies the size of the buffer to be used between the TCP layer and the OpenWire layer where`wireFormat` based marshaling occurs.* | | jmxPort | 1099 | (Client Only) Specifies the port that will be used by the JMX server to manage the`TransportLoggers`. This should only be set, via URI, by either a client producer or consumer as the broker creates its own JMX server. Specifying an alternate JMX port is useful for developers that test a broker and client on the same machine and need to control both via JMX. | | keepAlive | false | If`true`, enables [TCP KeepAlive](http://tldp.org/HOWTO/TCP-Keepalive-HOWTOoverview) on the broker connection to prevent connections from timing out at the TCP level. This should *not* be confused with **`KeepAliveInfo`** messages as used by the **`InactivityMonitor`.** | | logWriterName | default | ets the name of the`org.apache.activemq.transport.LogWriter` implementation to use. Names are mapped to classes in the **`resources/META-INF/services/org/apache/activemq/transport/logwriters`** directory. | | maximumConnections | nteger.MAX_VALUE | he maximum number of sockets allowed for this broker. | | minmumWireFormatVersion | 0 | The minimum remote`wireFormat` version that will be accepted (note the misspelling). Note: when the remote **`wireFormat`** version is lower than the configured minimum acceptable version an exception will be thrown and the connection attempt will be refused. A value of **`0`** denotes no checking of the remote **`wireFormat`** version. | | socketBufferSize | *64*1024 | Sets the size, in bytes, for the accepted socket’s read and write buffers. | | soLinger | Integer.MIN_VALUE | Sets the socket’s option`soLinger` when the value is **`> -1`**. When set to **`-1`** the **`soLinger`** socket option is disabled. | | soTimeout | 0 | Sets the socket’s read timeout in milliseconds. A value of`0` denotes no timeout. | | soWriteTimeout | 0 | Sets the socket’s write timeout in milliseconds. If the socket write operation does not complete before the specified timeout, the socket will be closed. A value of**0** denotes no timeout. | | stackSize | 0 | Set the stack size of the transport’s background reading thread. Must be specified in multiples of`128K`. A value of **`0`** indicates that this parameter is ignored. | | startLogging | true | If`true` the **`TransportLogger`** object of the Transport stack will initially write messages to the log. This parameter is ignored unless **`trace=true`**. | | tcpNoDelay | false | If`true` the socket’s option **`TCP_NODELAY`** is set. This disables Nagle’s algorithm for small packet transmission. | | threadName | N/A | When this parameter is specified the name of the thread is modified during the invocation of a transport. The remote address is appended so that a call stuck in a transport method will have the destination information in the thread name. This is extremely useful when using thread dumps for degugging. | | trace | false | Causes all commands that are sent over the transport to be logged. To view the logged output define the`Log4j` logger: **`log4j.logger.org.apache.activemq.transport.TransportLogger=DEBUG`**. | | trafficClass | 0 | The Traffic Class to be set on the socket. | | diffServ | 0 | (Client only) The preferred Differentiated Services traffic class to be set on outgoing packets, as described in RFC 2475. Valid integer values:**`[0,64]`**. Valid string values: **`EF`, `AF[1-3][1-4]`** or **`CS[0-7]`**. With JDK 6, only works when the JVM uses the IPv4 stack. To use the IPv4 stack set the system property **`java.net.preferIPv4Stack=true`**. Note: it’s invalid to specify both ‘**diffServ** and **typeOfService**’ at the same time as they share the same position in the TCP/IP packet headers | | typeOfService | 0 | (Client only) The preferred Type of Service value to be set on outgoing packets. Valid integer values:**`[0,256]`**. With JDK 6, only works when the JVM is configured to use the IPv4 stack. To use the IPv4 stack set the system property **`java.net.preferIPv4Stack=true`**. Note: it’s invalid to specify both ‘**diffServ** and **typeOfService**’ at the same time as they share the same position in the TCP/IP packet headers. | | useInactivityMonitor | true | When`false` the **`InactivityMonitor`** is disabled and connections will never time out. | | useKeepAlive | true | When`true` `KeepAliveInfo` messages are sent on an idle connection to prevent it from timing out. If this parameter is **`false`** connections will still timeout if no data was received on the connection for the specified amount of time. | | useLocalHost | false | When`true` local connections will be made using the value **`localhost`** instead of the actual local host name. On some operating systems, such as **`OS X`**, it’s not possible to connect as the local host name so **`localhost`** is better. | | useQueueForAccept | true | When`true` accepted sockets are placed onto a queue for asynchronous processing using a separate thread. | | wireFormat | default | The name of the`wireFormat` factory to use. | | wireFormat.* | N/A | Properties with this prefix are used to configure the`wireFormat`. | ## ActiveMQ服务监控 Hawtio ### 官方网站 > https://hawt.io/ ### 部署 **独立jar包的形式运行** java -jar hawtio单程序运行,可以对多个远程ActiveMQ服务器进行监控 **嵌入ActiveMQ** - 下载war包 - 复制到activemq的webapps文件夹下 jetty.xml `<bean id="secHandlerCollection" class="org.eclipse.jetty.server.handler.HandlerCollection">`标签下添加 ``` <bean class="org.eclipse.jetty.webapp.WebAppContext"> <property name="contextPath" value="/hawtio" /> <property name="war" value="${activemq.home}/webapps/hawtio.war" /> <property name="logUrlOnStart" value="true" /> </bean> ``` 接下来根据ActiveMQ所在操作系统不同,修改不同文件。如果是Linux或者Mac系统,修改ActiveMQ安装目录bin/env脚本文件,修改如下: ``` if [ -z "$ACTIVEMQ_OPTS" ] ; then ACTIVEMQ_OPTS="$ACTIVEMQ_OPTS_MEMORY -Dhawtio.realm=activemq -Dhawtio.role=admins -Dhawtio.rolePrincipalClasses=org.apache.activemq.jaas.GroupPrincipal -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=$ACTIVEMQ_CONF/login.config" fi ``` windows系统修改ActiveMQ.bat文件,添加: ``` if "%ACTIVEMQ_OPTS%" == "" set ACTIVEMQ_OPTS=-Xms1G -Xmx1G -Dhawtio.realm=activemq -Dhawtio.role=admins -Dhawtio.rolePrincipalClasses=org.apache.activemq.jaas.GroupPrincipal -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config="%ACTIVEMQ_CONF%\login.config" ``` 注意,windows修改之后不能直接双击ActiveMQ.bat启动,要打开cmd窗口,输入 `ActiveMQ start`启动,否则修改的配置不生效。 启动ActiveMQ之后,就会自动加载Hawtio程序,会在 `groups.properties`和 `users.properties`中自动创建用户分组和密码,默认账号密码是admin,admin。 可以在groups.properties为admins添加新的用户 ``` admins = admin,user ``` 然后在users.properties创建用户,或者修改密码 ``` admin=123 user=123456 ``` 添加用户user,密码123456。 ## JMS消息结构(Message) Message主要由三部分组成,分别是Header,Properties,Body,详细如下: - Header: 消息头,所有类型的这部分格式都是一样的 - Properties: 属性,按类型可以分为应用设置的属性,标准属性和消息中间件定义的属性 - Body: 消息正文,指我们具体需要消息传输的内容。 **Header** JMS消息头使用的所有方法: ``` public interface Message { public Destination getJMSDestination() throws JMSException; public void setJMSDestination(Destination destination) throws JMSException; public int getJMSDeliveryMode() throws JMSException public void setJMSDeliveryMode(int deliveryMode) throws JMSException; public String getJMSMessageID() throws JMSException; public void setJMSMessageID(String id) throws JMSException; public long getJMSTimestamp() throws JMSException' public void setJMSTimestamp(long timestamp) throws JMSException; public long getJMSExpiration() throws JMSException; public void setJMSExpiration(long expiration) throws JMSException; public boolean getJMSRedelivered() throws JMSException; public void setJMSRedelivered(boolean redelivered) throws JMSException; public int getJMSPriority() throws JMSException; public void setJMSPriority(int priority) throws JMSException; public Destination getJMSReplyTo() throws JMSException; public void setJMSReplyTo(Destination replyTo) throws JMSException; public String getJMScorrelationID() throws JMSException; public void setJMSCorrelationID(String correlationID) throws JMSException; public byte[] getJMSCorrelationIDAsBytes() throws JMSException; public void setJMSCorrelationIDAsBytes(byte[] correlationID) throws JMSException; public String getJMSType() throws JMSException; public void setJMSType(String type) throws JMSException; } ``` 消息头分为自动设置和手动设置的内容 **自动头信息** 有一部分可以在创建Session和MessageProducer时设置 | 属性名称 | **说明** | 设置者 | | :-: | :-: | :-: | | JMSDeliveryMode | 消息的发送模式,分为**NON_PERSISTENT**和**PERSISTENT**,即非持久性模式的和持久性模式。默认设置为**PERSISTENT(持久性)。**一条**持久性消息**应该被传送一次(就一次),这就意味着如果JMS提供者出现故障,该消息并不会丢失; 它会在服务器恢复正常之后再次传送。一条**非持久性消息**最多只会传送一次,这意味着如果JMS提供者出现故障,该消息可能会永久丢失。在持久性和非持久性这两种传送模式中,消息服务器都不会将一条消息向同一消息者发送一次以上(成功算一次)。 | send | | JMSMessageID | 消息ID,需要以ID:开头,用于唯一地标识了一条消息 | send | | JMSTimestamp | 消息发送时的时间。这条消息头用于确定发送消息和它被消费者实际接收的时间间隔。时间戳是一个以毫秒来计算的Long类型时间值(自1970年1月1日算起)。 | send | | JMSExpiration | 消息的过期时间,以毫秒为单位,用来防止把过期的消息传送给消费者。任何直接通过编程方式来调用setJMSExpiration()方法都会被忽略。 | | | JMSRedelivered | 消息是否重复发送过,如果该消息之前发送过,那么这个属性的值需要被设置为true, 客户端可以根据这个属性的值来确认这个消息是否重复发送过,以避免重复处理。 | Provider | | JMSPriority | 消息的优先级,0-4为普通的优化级,而5-9为高优先级,通常情况下,高优化级的消息需要优先发送。任何直接通过编程方式调用setJMSPriority()方法都将被忽略。 | send | | JMSDestination | 消息发送的目的地,是一个Topic或Queue | send | JMSDeliveryMode ``` MessageProducer producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); ``` JMSExpiration ``` //将过期时间设置为1小时(1000毫秒 *60 *60) producer.setTimeToLive(1000 * 60 * 60); ``` JMSPriority ``` producer.setPriority(9); ``` **手动头信息** | 属性名称 | **说明** | 设置者 | | :-: | :-: | :-: | | JMSCorrelationID | 关联的消息ID,这个通常用在需要回传消息的时候 | client | | JMSReplyTo | 消息回复的目的地,其值为一个Topic或Queue, 这个由发送者设置,但是接收者可以决定是否响应 | client | | JMSType | 由消息发送者设置的消息类型,代表消息的结构,有的消息中间件可能会用到这个,但这个并不是是批消息的种类,比如TextMessage之类的 | client | 从上表中我们可以看到,系统提供的标准头信息一共有10个属性,其中有6个是由send方法在调用时设置的,有三个是由客户端(client)设置的,还有一个是由消息中间件(Provider)设置的。 ## 高级使用 ### queue browser ``` QueueBrowser browser = session.createBrowser(queue); Enumeration<TextMessage> enumeration = browser.getEnumeration(); while (enumeration.hasMoreElements()) { TextMessage textMessage = enumeration.nextElement(); String text = textMessage.getText(); System.out.println(text); } ``` 可以查看队列中的消息而不消费,没有订阅的功能 ### JMSReplyTo 发送方可以接受到消息消费确认的地址 ``` Queue replay = session.createTextMessage("replay"); textMessage.setJMSReplyTo(replay); producer.send(textMessage); ``` 发送方发送时会创建一个队列replay,并监听该队列,当消费方监听到消息的时候通过 `Destination replay = textMessage.getJMSReplyTo();`获取到replay队列,并向其中发送一个消息,发送方就会从replay队列中收到消息,就可以得知消费方是否真的收到了这个消息,当然,消费方有权选择是否给发送方回复。 ### JMSCorrelationID 用于消息之间的关联,给人一种会话的感觉 ``` message.setJMSCorrelationID("123456"); ``` 当存在replay的时候,发送方接收到消费方的确认消息就会执行,但是要如果有多个消费者和生产者,返回的确认消息无法得知具体属于哪个生产者。由于确认消息是一条新的消息,无法使用之前的messageID,因此,可以使用CorrelationID,发送时,设置一个id,接收方收到消息时,可以获取到这个id,然后在返回的确认消息中再设置上该id,发送方设置上对于CorrelationID的过滤器,这样就可以实现点对点的交互了。 ### QueueRequestor同步消息 可以发送同步消息,本质违背了mq的异步通讯原则,但是mq还是能够提供应用解耦,异构系统的特性,因为使用QueueRequestor发送消息后,会等待接收端回复,如果收不到回复就会造成死等的现象!而且该方法没有设置超时等待的功能。 ``` Queue queue = session.createQueue("user"); QueueRequestor requestor = new QueueRequestor(session, queue); TextMessage message = session.createTextMessage("message"); TextMessage response = (TextMessage) requestor.request(message); ``` 本质上底层就是使用的ReplyTo实现,当调用 `requestor.request();`方法时回去请求,并阻塞等待返回,实际上是创建一个replay临时队列,并阻塞监听该队列 `consumer.receive();`当接收方收到消息时,必须给replay队列一个确认消息,发送发收到这个消息才会解除阻塞,继续执行,实现了发送与接收方之间的同步。 ### TemporaryQueue临时队列 ``` TemporaryQueue temporaryQueue = session.createTemporaryQueue(); ``` 这种队列的生命周期是与connection绑定的,connection断开时,会被删除,QueueRequestor用的就是这种方式,配合ReplyTo使用,避免创建的队列过多。 ## ActiveMQ会不会丢消息?如何防止消息丢失? 默认情况下,消息一般不会丢失,当开启了异步发送、非持久化或者设置了超时时间时,有可能造成丢失。解决方式有: 1. broker做高可用,当一个broker宕机或者网络延迟时,有备用的可以顶上。 2. 设置消息超时进入死信队列,然后重投。 3. 开启持久化,broker宕机,消息依然存在。 4. 生产者同步投递,消费者手动ACK。 5. 消息重投:每条消息多投几次,需要做幂等。 6. 记录日志,生产者将发送的消息记录日志,以便于消息丢失时做补偿处理。 7. 接收/消费确认: JMSReplayTo、QueueRequestor同步消息。 8. 查看是否有独占消费者,或者设置了分组过滤器。 ## 如何防止消息重复 消息做幂等处理,使用ConcurrentHashMap的 `putIfAbsent`,或者使用Google的guava cache,后者比前者多了超时机制,一般消费者不会重启,消息会越来越多,Map也会越来越大,需要有超时机制,使用ConcurrentHashMap需要自己实现。 ## 如何保证消费顺序 消息队列本身就是有序的,在不设置优先级的情况下一般不会乱序。发生乱序的情况一般出现在消费者做了负载均衡,有两个消费者A和B,A拿到消息1,B拿到消息2,但是要求1要在2之前执行,A与B不能保证同步,引发乱序。或者开启了prefetchSize,A和B都直接拉取了prefetchSize数量的消息,如果B没处理完挂了,那么没处理的消息会被broker重新投放,造成乱序。一般解决方法是,创建多个Destination,每个Destination只对应一个Consumer。 ## 生产环境中影响性能的几个因素 ### Out of memory 配置ActiveMQ的堆内存大小 windows在activemq.bat中 ``` %ACTIVEMQ_OPTS%" == "" set ACTIVEMQ_OPTS=-Xms1G -Xmx1G ``` linux在bin/env中 ``` ACTIVEMQ_OPTS_MEMORY="-Xms1G -Xmx1G" ``` 以及配置activemq.xml文件中的内存百分比 ``` <memoryUsage percentOfJvmHeap="70" /> ``` SystemUsage配置设置了一些系统内存和硬盘容量,当系统消耗超过这些容量设置时,ActiveMQ会"slow down producer"。 ### 是否开启了持久化 使用持久化,消息每次都要落地到数据库,也会消耗性能。 ### 消息异步发送 建议使用默认,强制开启有可能丢失消息 异步发送丢失消息的场景是: 生产者设置UseAsyncSend=true,使用producer.send(msg)持续发送消息。由于消息不阻塞,生产者会认为所有send的消息均被成功发送至MQ。如果服务器突然宕机,此时生产者内存中尚未被发送至MQ的消息都会丢失。 开启异步的三种方式: ``` new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true"); ``` ``` ((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true); ``` ``` ((ActiveMQConnection)connection).setUseAsyncSend(true) ``` ### 批量确认 ActiveMQ缺省支持批量确认消息,批量确认可以提高系统性能 关闭方法: ``` new ActiveMQConnectionFactory("tcp://locahost:61616?jms.optimizeAcknowledge=false"); ``` ``` ((ActiveMQConnectionFactory)connectionFactory).setOptimizeAcknowledge(fase); ``` ``` ((ActiveMQConnection)connection).setOptimizeAcknowledge(true); ``` ## 消费缓冲与消息积压prefetchSize 消费者端,一般来说消费的越快越好,broker的积压越小越好。但是考虑到事务性和客户端确认的情况,如果一个消费者一次获取了很多消息却都不确认,这会造成事务上下文变大,broker端这种“半消费状态”的数据变多,所以ActiveMQ有一个prefetchSize参数来控制未确认情况下,最多可以预获取多少条记录。 **Pre-fetch默认值** | consumer type | default value | | :-: | :-: | | queue | 1000 | | queue browser | 500 | | topic | 32766 | | durable topic | 1000 | ### 可以通过3种方式修改prefetchSize **创建连接时整体设置** ``` ActiveMQConnectionFactory connectio nFactory = new ActiveMQConnectionFactory( "admin", "admin", "tcp://localhost:5671?jms.prefetchPolicy.all=50" ); ``` **创建连接时对topic和queue单独设置** ``` ctiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "admin", "admin", "tcp://localhost:5671?jms.prefetchPolicy.queuePrefetch=1&jms.prefetchPolicy.topicPrefetch=1" ); ``` **针对destination单独设置** ``` Destination topic = session.createTopic("user?consumer.prefetchSize=10"); ``` 注意: 对destination设置prefetchSize后会覆盖连接时的设置值。 ## 消息到底是推还是拉 发送消息时是推向broker。 获取消息时: - 默认是一条一条推 - 当consumer的prefetchSize满的时候停止推消息 - 当consumer的prefetchSize==0时,拉取消息 ## 调优总结 ### Topic加强 可追溯消息 > http://activemq.apache.org/retroactive-consumer.html 默认情况下,topic是向订阅者推送消息,如果订阅者不在线,那么它将错过这些消息,使用追溯可以避免订阅者错过消息。 **消费者设置** ``` Destination topic = session.createTopic("tpk?consumer.retroactive=true"); ``` 添加 `consumer.retroactive=true`参数 接下来需要修改activemq.xml配置文件 **可用的追溯策略** | Policy Name | Sample Configuration | Description | | :-: | - | :-: | | FixedSizedSubscriptionRecoveryPolicy | `<fixedSizedSubscriptionRecoveryPolicy maximumSize="1024"/>` | Keep a fixed amount of memory in RAM for message history which is evicted in time order. | | FixedCountSubscriptionRecoveryPolicy | `<fixedCountSubscriptionRecoveryPolicy maximumSize="100"/>` | Keep a fixed count of last messages. | | LastImageSubscriptionRecoveryPolicy | `<lastImageSubscriptionRecoveryPolicy/>` | Keep only the last message. | | NoSubscriptionRecoveryPolicy | `<noSubscriptionRecoveryPolicy/>` | Disables message recovery. | | QueryBasedSubscriptionRecoveryPolicy | `<queryBasedSubscriptionRecoveryPolicy query="JMSType = 'car' AND color = 'blue'"/>` | Perform a user specific query mechanism to load any message they may have missed. Details on message selectors are available[here](http://java.sun.com/j2ee/1.4/docs/api/javax/jms/Message.html) | | TimedSubscriptionRecoveryPolicy | `<timedSubscriptionRecoveryPolicy recoverDuration="60000" />` | Keep a timed buffer of messages around in memory and use that to recover new subscriptions. Recovery time is in milliseconds. | | RetainedMessageSubscriptionRecoveryPolicy | `<retainedMessageSubscriptionRecoveryPolicy/>` | Keep the last message with ActiveMQ.Retain property set to true | **保留固定字节的消息** ``` <policyEntry topic=">"> <subscriptionRecoveryPolicy> <fixedSizedSubscriptionRecoveryPolicy maximumSize="1024"/> </subscriptionRecoveryPolicy> </policyEntry> ``` **保留固定数量的消息** ``` <policyEntry topic=">"> <subscriptionRecoveryPolicy> <fixedCountSubscriptionRecoveryPolicy maximumSize="100"/> </subscriptionRecoveryPolicy> </policyEntry> ``` **保留时间** ``` <subscriptionRecoveryPolicy> <timedSubscriptionRecoveryPolicy recoverDuration="60000" /> </subscriptionRecoveryPolicy> ``` **保留最后一条** ``` <subscriptionRecoveryPolicy> <lastImageSubscriptionRecoveryPolicy/> </subscriptionRecoveryPolicy> ``` ### 慢速消费 **SlowConsumerStrategy** 对于消费者,broker会启动一个后台线程用来检测所有的慢速消费者,并定期关闭慢消费者。 AbortSlowConsumerStrategy abortConnection: 中断慢速消费者,慢速消费者将会被关闭。 ``` <slowConsumerStrategy> <abortSlowConsumerStrategy abortConnection="false"/><!-- 不关闭底层链接 --> </slowConsumerStrategy> ``` AbortSlowConsumerStrategy maxTimeSinceLastAck: 如果慢速消费者最后一个ACK距离现在的时间间隔超过阈值,则中断慢速消费者。 ``` <slowConsumerStrategy> <abortSlowConsumerStrategy maxTimeSinceLastAck="30000"/><!-- 30秒之后 --> </slowConsumerStrategy> ``` **PendingMessageLimitStrategy:消息限制策略(面向慢消费者)** > http://activemq.apache.org/slow-consumer-handling 此策略只对Topic有效,只对未持久化订阅者有效,当通道中有大量的消息积压时,broker可以保留的消息量。为了防止Topic中有慢消费者,导致整个通道消息积压。 ConstantPendingMessageLimitStrategy: 保留固定条数的消息,如果消息量超过limit,将使用**消息剔除策略**移除消息。 ``` <policyEntry topic="ORDERS.>"> <!-- lets force old messages to be discarded for slow consumers --> <pendingMessageLimitStrategy> <constantPendingMessageLimitStrategy limit="50"/> </pendingMessageLimitStrategy> </policyEntry> ``` PrefetchRatePendingMessageLimitStrategy: 保留preFetchSize倍数的消息、 ``` <!-- 若prefetchSize为100,则保留2.5 * 100条消息 --> <prefetchRatePendingMessageLimitStrategy multiplier="2.5"/> ``` ### 消息堆积内存上涨 - 检查消息是否持久化 - 检查消息消费速度与生产速度 - 调整xms、xmx参数 ### 磁盘满 当非持久化消息堆积到一定程度,ActiveMQ会将非持久化消息写入临时文件,但是在重启的时候不会恢复,当存储持久化数据的磁盘满了的时候: - 持久化消息 - 生产者阻塞,消费正常,当消费一部分后,腾出空间,生产者继续 - 非持久化消息 - 由于临时文件造成磁盘满了,生产者阻塞,消费异常,无法提供服务 ### 开启事务 在发送非持久化消息的时候,可以有效地防止消息丢失。 ### prefetchSize影响消费倾斜 当有多个消费者,每个消费者的prefetchSize都比较大,但是消费者处理速度比较慢,它可能一次取出1000条,但是短时间根本消费不完,导致其他消费者没有消息可以取,当他处理完时,broker依然会优先给它推消息,结果就是这个消费者类似独占消费,其他消费者基本取不到消息,造成消费倾斜。 慢速消费的时候可以将prefetchSize设为1,每次取一条。 ### prefetchSize造成消费者内存溢出 消息的大小比较大,prefetchSize也比较大,造成消费者预读取preFetchSize条数据,一下子撑爆内存。 可以将prefetchSize适当调小,或者增加xms、xmx大小,或者调节 `percentOfJvmHeap`的百分比。 ### AUTO_ACKNOWLEDGE造成消息丢失 receive()方法接收到消息后立即确认 listener的onmessage方法执行完毕后才会确认 消息消费失败后,无法复原消息,可以手动ack 避免broker把消息自动确认删除。 手动ack的时候要等connection断开,才会重新推送给其他consumer,所以有可能会导致消费乱序。 ### exclusive和selector有可能造成消息堆积 当独占消费者消费过慢或者selector设置错误或者消费过慢时,会造成broker中的消息堆积。 Last modification:November 27th, 2020 at 10:26 pm © 允许规范转载