2016年3月22日星期二

ActiveMQ_023:当 Topic 遇到慢消费者

环境:OS X EI Capitan 10.11.4 + ActiveMQ 5.13

非持久化订阅中,一个 Topic 可能有多个消费者,如果其中有一个消费者消费的速度很慢,会让生产者的生产的消息不断堆积在内存中,最终会导致生产者不得不放慢生产速度,其它的快消费者也只好跟着慢下来。

目前 ActiveMQ 使用Pending Message Limit Strategy 来解决这个问题。
设置消费者 prefetch buffer 的最大消息个数,当达到最大值后,如果还有有新的消息进来,那么旧的消息将被丢弃。
这样可以让内存中的消息保持“常新”,并且可以继续给“慢消费者”发送消息。
但是请注意,这个解决办法的前提是允许丢弃消息。

有两种配置方式
(1)常量
例如:<constantPendingMessageLimitStrategy limit="50"/>
(2)消息预取大小(prefetch size) 的倍数
例如:<prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>

以上两种方式中,设置为 0 表示除了 prefetch 之外不再缓存消息;设置为 -1 表示禁止丢弃消息。此外,你还可以配置消息的丢弃策略,目前有以下两种:
(1)oldestMessageEvictionStrategy 丢弃最旧的消息。
(2)oldestMessageWithLowestPriorityEvictionStrategy 丢弃最旧的,并且优先级最低的消息。

可以针对不同的 Topic 定义不同的策略,比如对于 PRICES. 开头的 Topic,可以设置比较小的 limit,因为报价丢了就丢了,不可惜。
对于 ORDERS. 开头的 Topic,可以设置比较大的 limit,因为订单是很重要的消息,不希望丢失。

<destinationPolicy>
  <policyMap>
    <policyEntries>
      <policyEntry topic="ORDERS.>">
        <dispatchPolicy>
          <strictOrderDispatchPolicy />
        </dispatchPolicy>
        <!--  1 minutes worth -->
        <subscriptionRecoveryPolicy>
          <timedSubscriptionRecoveryPolicy recoverDuration="60000" />
        </subscriptionRecoveryPolicy>
      </policyEntry>
      <policyEntry topic="PRICES.>">
        <!-- lets force old messages to be discarded for slow consumers -->
        <pendingMessageLimitStrategy>
          <constantPendingMessageLimitStrategy limit="10"/>
        </pendingMessageLimitStrategy>
        <!--  10 seconds worth -->
        <subscriptionRecoveryPolicy>
          <timedSubscriptionRecoveryPolicy recoverDuration="10000" />
        </subscriptionRecoveryPolicy>
      </policyEntry>
    </policyEntries>
  </policyMap>
</destinationPolicy>

参考文献:
1. http://activemq.apache.org/slow-consumer-handling.html

没有评论: