2016年3月22日星期二

ActiveMQ_024:生产者流控

环境:OS X EI Capitan 10.11.4 + ActiveMQ 5.13

默认情况下,无论持久化还是不持久化消息,每个队列都是有生产者流量控制的,即 producerFlowControl="true"。
针对同步发送消息和异步发送消息,生产者流控的限制方式有些区别。

1. 同步发送消息方式(useAsynSend=false)
当队列的内存超过 memoryLimit="1mb" 设定值时,会阻塞生产者。
该特性类似于队列深度的特性,只是限制的是队列使用的内存,而不是消息的个数。
你也许觉得 1mb 太小了,这是因为对于非持久化消息,消息会储存到 tempUsage;对于持久化消息,消息会存储到 storeUsage。
所以,理论上,只要  tempUsage 和 storeUsage 没有爆掉,这个 1mb 内存是不会超过的。

<destinationPolicy>
    <policyMap>
      <policyEntries>
        <policyEntry topic=">" producerFlowControl="true" memoryLimit="1mb">
          <pendingSubscriberPolicy>
            <vmCursor />
          </pendingSubscriberPolicy>
        </policyEntry>
        <policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
          <!-- Use VM cursor for better latency
               For more information, see:
               http://activemq.apache.org/message-cursors.html
          <pendingQueuePolicy>
            <vmQueueCursor/>
          </pendingQueuePolicy>
          -->
        </policyEntry>
      </policyEntries>
    </policyMap>
</destinationPolicy>

如果希望客户端抛出异常 javax.jms.ResourceAllocationException,而不是只是 Hung 住,可以在 systemUsage 配置上设置sendFailIfNoSpace="true" 或 sendFailIfNoSpaceAfterTimeout="3000"。

<systemUsage>
  <systemUsage sendFailIfNoSpace="true" >
    <memoryUsage>
      <memoryUsage percentOfJvmHeap="70"/>
    </memoryUsage>
    <storeUsage>
      <storeUsage limit="100 gb"/>
    </storeUsage>
    <tempUsage>
      <tempUsage limit="50 gb"/>
    </tempUsage>
  </systemUsage>
</systemUsage>

2. 异步发送消息方式(useAsynSend=true)
异步发送消息时,由于不等待 broker 收到消息的确认信息,即使内存超出使用限制,也不会得到通知,因此也就无法阻塞生产者。
设置 connctionFactory.setProducerWindowSize(1024000); 可以控制 broker 确认收到消息前生产者能发送的最大字节数。
默认情况下,非持久化消息是以异步方式发送的,如果希望以同步方式发送,可以在 ConnctionFactory 上设置 alwaysSyncSend。

参考文献:
1. http://www.cnblogs.com/zhengyun_ustc/archive/2012/08/25/flowcontrol.html
2. http://activemq.apache.org/producer-flow-control.html
3. http://www.huaishao8.com/config/activemq/198.html
4. http://activemq.apache.org/async-sends.html

没有评论: