2015年10月14日星期三

AMQ_001:A-MQ 功能演示1:安装、启动

环境:JBoss A-MQ 6.0.0

1. 安装与启动
(1)unzip jboss-a-mq-6.0.0.redhat-024.zip
(2)vim jboss-a-mq-6.0.0.redhat-024/etc/users.properties 去掉admin注释
(3)vim jboss-a-mq-6.0.0.redhat-024/etc/system.properties 去掉admin注释
    activemq.jmx.user=admin
    activemq.jmx.password=admin
    说明:在 jboss-a-mq-6.2.0.redhat-133 中没有找到这两个属性。
(4)cd jboss-a-mq-6.0.0.redhat-024/bin
(5)./amq,启动Server和交互模式Console
    或者 ./amq server,只启动Server
    或者 ./amq client,只启动交互模式Console
    或者 ./start,后台启动 broker
    或者 ./stop,停止后台 broker
(6)log:display | grep fuse
(7)按下tab键,查看所有命令,或者 <command> —-help查看使用帮助
(8)http://localhost:8181/
(9)连接本地或远程的instances:./client —help

2. 基于Queue的消息发送与接收
(1)cd extras
(2)java -jar mq-client.jar producer --brokerUrl failover://tcp://localhost:61616 --user admin --password admin --destination queue://summit.test --count 10
(3)java -jar mq-client.jar consumer --brokerUrl failover://tcp://localhost:61616 --user admin --password admin --destination queue://summit.test --count 10
说明:
(1)Queue 是一对一通讯。
(2)Queue 允许生产者和消费者异步通讯,即两者通讯时不必都在运行。
(3)failover Transport 当连接断掉的时候会自动重连。

3. Queue的round robin 负载均衡特性
(1)cd extras
(2)java -jar mq-client.jar consumer --brokerUrl failover://tcp://localhost:61616 --user admin --password admin --destination queue://summit.test --count 50
(3)java -jar mq-client.jar consumer --brokerUrl failover://tcp://localhost:61616 --user admin --password admin --destination queue://summit.test --count 50
(4)java -jar mq-client.jar producer --brokerUrl failover://tcp://localhost:61616 --user admin --password admin --destination queue://summit.test --count 100 --sleep 100
说明:
(1)Queue默认轮流发送消息到各个消费者,所以一个消费者接收奇数消息,另一个接收偶数消息。

4. 基于Topic的消息发送与接收
(1)cd extras
(2)java -jar mq-client.jar consumer --brokerUrl failover://tcp://localhost:61616 --user admin --password admin --destination topic://summit.test --count 10
(3)java -jar mq-client.jar consumer --brokerUrl failover://tcp://localhost:61616 --user admin --password admin --destination topic://summit.test --count 10
(4)java -jar mq-client.jar producer --brokerUrl failover://tcp://localhost:61616 --user admin --password admin --destination topic://summit.test --count 10
说明:
(1)Topic 是一对多通讯。
(2)Topic 默认情况下要求生产者和消费者通讯时必须都在运行。
(3)failover Transport 当连接断掉的时候会自动重连。

5. etc/activemq.xml 内容说明

<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- Allows us to use system properties and fabric as variables in this configuration file -->
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="properties">
            <bean class="org.fusesource.mq.fabric.ConfigurationProperties"/>
        </property>     
    </bean>

    <broker xmlns="http://activemq.apache.org/schema/core"
            brokerName="${broker-name}"
            dataDirectory="${data}"
            start="false">

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" producerFlowControl="true">
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
                <policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>

        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>

        <persistenceAdapter>
            <kahaDB directory="${data}/kahadb"/>
        </persistenceAdapter>

        <plugins>
            <jaasAuthenticationPlugin configuration="karaf" />
        </plugins>

        <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage limit="64 mb"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="100 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="50 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>
       
        <transportConnectors>
            <transportConnector name="openwire" uri="tcp://0.0.0.0:0?maximumConnections=1000"/>
        </transportConnectors>
    </broker>

</beans>

说明:
(1)activemq.xml 是一个Spring的配置文件,使用Spring的依赖注入机制。
(2)DestinationPolicy – 使用匹配符配置 Queue/Topic 的限制策略。
比如这里的配置要求
• 所有的Topic 的未决消息不能超过1000条,如果到达了此值,将会停止生产者继续向该Topic发送消息。
• 所有的Queue 不能超过1mb的内存大小,如果到达了此值,将会停止生产者继续向该Queue发送消息。
(3)ManagementContext – 配置使用JMX连接ActiveMQ,默认情况下,使用父容器的JMX连接。
(4)PersistenceAdapter – 控制持久化信息如何保存。默认情况下,ActiveMQ使用KahaDB作为持久化存储,在给生产者发送确认之前将消息持久化。
(5)JaasAuthenticationPlugin – ActiveMQ 使用 JAAS 认证和授权。默认情况下,使用文件 etc/users.properties 保存用户的身份和权限。
(6)SystemUsage – 限制 ActiveMQ 使用的内存和硬盘空间,当超过限制时,ActiveMQ “友善地”停止从生产者接收消息,当消费者取走足够的消息后,会重新从生产者接收消息。
(7)TransportConnectors – 允许客户端使用哪些协议连接ActiveMQ。ActiveMQ支持AMQP 1.0, MQTT, STOMP, and OpenWire,并且可以组合SSL。

没有评论: