环境:OS X EI Capitan 10.11.4 + ActiveMQ 5.13
默认情况下,无论持久化还是不持久化消息,每个队列都是有生产者流量控制的,即 producerFlowControl="true"。
针对同步发送消息和异步发送消息,生产者流控的限制方式有些区别。
1. 同步发送消息方式(useAsynSend=false)
当队列的内存超过 memoryLimit="1mb" 设定值时,会阻塞生产者。
该特性类似于队列深度的特性,只是限制的是队列使用的内存,而不是消息的个数。
你也许觉得 1mb 太小了,这是因为对于非持久化消息,消息会储存到 tempUsage;对于持久化消息,消息会存储到 storeUsage。
所以,理论上,只要 tempUsage 和 storeUsage 没有爆掉,这个 1mb 内存是不会超过的。
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="true" memoryLimit="1mb">
<pendingSubscriberPolicy>
<vmCursor />
</pendingSubscriberPolicy>
</policyEntry>
<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
<!-- Use VM cursor for better latency
For more information, see:
http://activemq.apache.org/message-cursors.html
<pendingQueuePolicy>
<vmQueueCursor/>
</pendingQueuePolicy>
-->
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
如果希望客户端抛出异常 javax.jms.ResourceAllocationException,而不是只是 Hung 住,可以在 systemUsage 配置上设置sendFailIfNoSpace="true" 或 sendFailIfNoSpaceAfterTimeout="3000"。
<systemUsage>
<systemUsage sendFailIfNoSpace="true" >
<memoryUsage>
<memoryUsage percentOfJvmHeap="70"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
2. 异步发送消息方式(useAsynSend=true)
异步发送消息时,由于不等待 broker 收到消息的确认信息,即使内存超出使用限制,也不会得到通知,因此也就无法阻塞生产者。
设置 connctionFactory.setProducerWindowSize(1024000); 可以控制 broker 确认收到消息前生产者能发送的最大字节数。
默认情况下,非持久化消息是以异步方式发送的,如果希望以同步方式发送,可以在 ConnctionFactory 上设置 alwaysSyncSend。
参考文献:
1. http://www.cnblogs.com/zhengyun_ustc/archive/2012/08/25/flowcontrol.html
2. http://activemq.apache.org/producer-flow-control.html
3. http://www.huaishao8.com/config/activemq/198.html
4. http://activemq.apache.org/async-sends.html
默认情况下,无论持久化还是不持久化消息,每个队列都是有生产者流量控制的,即 producerFlowControl="true"。
针对同步发送消息和异步发送消息,生产者流控的限制方式有些区别。
1. 同步发送消息方式(useAsynSend=false)
当队列的内存超过 memoryLimit="1mb" 设定值时,会阻塞生产者。
该特性类似于队列深度的特性,只是限制的是队列使用的内存,而不是消息的个数。
你也许觉得 1mb 太小了,这是因为对于非持久化消息,消息会储存到 tempUsage;对于持久化消息,消息会存储到 storeUsage。
所以,理论上,只要 tempUsage 和 storeUsage 没有爆掉,这个 1mb 内存是不会超过的。
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="true" memoryLimit="1mb">
<pendingSubscriberPolicy>
<vmCursor />
</pendingSubscriberPolicy>
</policyEntry>
<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
<!-- Use VM cursor for better latency
For more information, see:
http://activemq.apache.org/message-cursors.html
<pendingQueuePolicy>
<vmQueueCursor/>
</pendingQueuePolicy>
-->
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
如果希望客户端抛出异常 javax.jms.ResourceAllocationException,而不是只是 Hung 住,可以在 systemUsage 配置上设置sendFailIfNoSpace="true" 或 sendFailIfNoSpaceAfterTimeout="3000"。
<systemUsage>
<systemUsage sendFailIfNoSpace="true" >
<memoryUsage>
<memoryUsage percentOfJvmHeap="70"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
2. 异步发送消息方式(useAsynSend=true)
异步发送消息时,由于不等待 broker 收到消息的确认信息,即使内存超出使用限制,也不会得到通知,因此也就无法阻塞生产者。
设置 connctionFactory.setProducerWindowSize(1024000); 可以控制 broker 确认收到消息前生产者能发送的最大字节数。
默认情况下,非持久化消息是以异步方式发送的,如果希望以同步方式发送,可以在 ConnctionFactory 上设置 alwaysSyncSend。
参考文献:
1. http://www.cnblogs.com/zhengyun_ustc/archive/2012/08/25/flowcontrol.html
2. http://activemq.apache.org/producer-flow-control.html
3. http://www.huaishao8.com/config/activemq/198.html
4. http://activemq.apache.org/async-sends.html
没有评论:
发表评论