环境:OS X EI Capitan 10.11.4 + ActiveMQ 5.13
非持久化订阅中,一个 Topic 可能有多个消费者,如果其中有一个消费者消费的速度很慢,会让生产者的生产的消息不断堆积在内存中,最终会导致生产者不得不放慢生产速度,其它的快消费者也只好跟着慢下来。
目前 ActiveMQ 使用Pending Message Limit Strategy 来解决这个问题。
设置消费者 prefetch buffer 的最大消息个数,当达到最大值后,如果还有有新的消息进来,那么旧的消息将被丢弃。
这样可以让内存中的消息保持“常新”,并且可以继续给“慢消费者”发送消息。
但是请注意,这个解决办法的前提是允许丢弃消息。
有两种配置方式
(1)常量
例如:<constantPendingMessageLimitStrategy limit="50"/>
(2)消息预取大小(prefetch size) 的倍数
例如:<prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>
以上两种方式中,设置为 0 表示除了 prefetch 之外不再缓存消息;设置为 -1 表示禁止丢弃消息。此外,你还可以配置消息的丢弃策略,目前有以下两种:
(1)oldestMessageEvictionStrategy 丢弃最旧的消息。
(2)oldestMessageWithLowestPriorityEvictionStrategy 丢弃最旧的,并且优先级最低的消息。
可以针对不同的 Topic 定义不同的策略,比如对于 PRICES. 开头的 Topic,可以设置比较小的 limit,因为报价丢了就丢了,不可惜。
对于 ORDERS. 开头的 Topic,可以设置比较大的 limit,因为订单是很重要的消息,不希望丢失。
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic="ORDERS.>">
<dispatchPolicy>
<strictOrderDispatchPolicy />
</dispatchPolicy>
<!-- 1 minutes worth -->
<subscriptionRecoveryPolicy>
<timedSubscriptionRecoveryPolicy recoverDuration="60000" />
</subscriptionRecoveryPolicy>
</policyEntry>
<policyEntry topic="PRICES.>">
<!-- lets force old messages to be discarded for slow consumers -->
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="10"/>
</pendingMessageLimitStrategy>
<!-- 10 seconds worth -->
<subscriptionRecoveryPolicy>
<timedSubscriptionRecoveryPolicy recoverDuration="10000" />
</subscriptionRecoveryPolicy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
参考文献:
1. http://activemq.apache.org/slow-consumer-handling.html
非持久化订阅中,一个 Topic 可能有多个消费者,如果其中有一个消费者消费的速度很慢,会让生产者的生产的消息不断堆积在内存中,最终会导致生产者不得不放慢生产速度,其它的快消费者也只好跟着慢下来。
目前 ActiveMQ 使用Pending Message Limit Strategy 来解决这个问题。
设置消费者 prefetch buffer 的最大消息个数,当达到最大值后,如果还有有新的消息进来,那么旧的消息将被丢弃。
这样可以让内存中的消息保持“常新”,并且可以继续给“慢消费者”发送消息。
但是请注意,这个解决办法的前提是允许丢弃消息。
有两种配置方式
(1)常量
例如:<constantPendingMessageLimitStrategy limit="50"/>
(2)消息预取大小(prefetch size) 的倍数
例如:<prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>
以上两种方式中,设置为 0 表示除了 prefetch 之外不再缓存消息;设置为 -1 表示禁止丢弃消息。此外,你还可以配置消息的丢弃策略,目前有以下两种:
(1)oldestMessageEvictionStrategy 丢弃最旧的消息。
(2)oldestMessageWithLowestPriorityEvictionStrategy 丢弃最旧的,并且优先级最低的消息。
可以针对不同的 Topic 定义不同的策略,比如对于 PRICES. 开头的 Topic,可以设置比较小的 limit,因为报价丢了就丢了,不可惜。
对于 ORDERS. 开头的 Topic,可以设置比较大的 limit,因为订单是很重要的消息,不希望丢失。
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic="ORDERS.>">
<dispatchPolicy>
<strictOrderDispatchPolicy />
</dispatchPolicy>
<!-- 1 minutes worth -->
<subscriptionRecoveryPolicy>
<timedSubscriptionRecoveryPolicy recoverDuration="60000" />
</subscriptionRecoveryPolicy>
</policyEntry>
<policyEntry topic="PRICES.>">
<!-- lets force old messages to be discarded for slow consumers -->
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="10"/>
</pendingMessageLimitStrategy>
<!-- 10 seconds worth -->
<subscriptionRecoveryPolicy>
<timedSubscriptionRecoveryPolicy recoverDuration="10000" />
</subscriptionRecoveryPolicy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
参考文献:
1. http://activemq.apache.org/slow-consumer-handling.html
没有评论:
发表评论