环境:JBoss A-MQ 6.0.0
1. 安装与启动
(1)unzip jboss-a-mq-6.0.0.redhat-024.zip
(2)vim jboss-a-mq-6.0.0.redhat-024/etc/users.properties 去掉admin注释
(3)vim jboss-a-mq-6.0.0.redhat-024/etc/system.properties 去掉admin注释
activemq.jmx.user=admin
activemq.jmx.password=admin
说明:在 jboss-a-mq-6.2.0.redhat-133 中没有找到这两个属性。
(4)cd jboss-a-mq-6.0.0.redhat-024/bin
(5)./amq,启动Server和交互模式Console
或者 ./amq server,只启动Server
或者 ./amq client,只启动交互模式Console
或者 ./start,后台启动 broker
或者 ./stop,停止后台 broker
(6)log:display | grep fuse
(7)按下tab键,查看所有命令,或者 <command> —-help查看使用帮助
(8)http://localhost:8181/
(9)连接本地或远程的instances:./client —help
2. 基于Queue的消息发送与接收
(1)cd extras
(2)java -jar mq-client.jar producer --brokerUrl failover://tcp://localhost:61616 --user admin --password admin --destination queue://summit.test --count 10
(3)java -jar mq-client.jar consumer --brokerUrl failover://tcp://localhost:61616 --user admin --password admin --destination queue://summit.test --count 10
说明:
(1)Queue 是一对一通讯。
(2)Queue 允许生产者和消费者异步通讯,即两者通讯时不必都在运行。
(3)failover Transport 当连接断掉的时候会自动重连。
3. Queue的round robin 负载均衡特性
(1)cd extras
(2)java -jar mq-client.jar consumer --brokerUrl failover://tcp://localhost:61616 --user admin --password admin --destination queue://summit.test --count 50
(3)java -jar mq-client.jar consumer --brokerUrl failover://tcp://localhost:61616 --user admin --password admin --destination queue://summit.test --count 50
(4)java -jar mq-client.jar producer --brokerUrl failover://tcp://localhost:61616 --user admin --password admin --destination queue://summit.test --count 100 --sleep 100
说明:
(1)Queue默认轮流发送消息到各个消费者,所以一个消费者接收奇数消息,另一个接收偶数消息。
4. 基于Topic的消息发送与接收
(1)cd extras
(2)java -jar mq-client.jar consumer --brokerUrl failover://tcp://localhost:61616 --user admin --password admin --destination topic://summit.test --count 10
(3)java -jar mq-client.jar consumer --brokerUrl failover://tcp://localhost:61616 --user admin --password admin --destination topic://summit.test --count 10
(4)java -jar mq-client.jar producer --brokerUrl failover://tcp://localhost:61616 --user admin --password admin --destination topic://summit.test --count 10
说明:
(1)Topic 是一对多通讯。
(2)Topic 默认情况下要求生产者和消费者通讯时必须都在运行。
(3)failover Transport 当连接断掉的时候会自动重连。
5. etc/activemq.xml 内容说明
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- Allows us to use system properties and fabric as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="properties">
<bean class="org.fusesource.mq.fabric.ConfigurationProperties"/>
</property>
</bean>
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="${broker-name}"
dataDirectory="${data}"
start="false">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="true">
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<persistenceAdapter>
<kahaDB directory="${data}/kahadb"/>
</persistenceAdapter>
<plugins>
<jaasAuthenticationPlugin configuration="karaf" />
</plugins>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="64 mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:0?maximumConnections=1000"/>
</transportConnectors>
</broker>
</beans>
说明:
(1)activemq.xml 是一个Spring的配置文件,使用Spring的依赖注入机制。
(2)DestinationPolicy – 使用匹配符配置 Queue/Topic 的限制策略。
比如这里的配置要求
• 所有的Topic 的未决消息不能超过1000条,如果到达了此值,将会停止生产者继续向该Topic发送消息。
• 所有的Queue 不能超过1mb的内存大小,如果到达了此值,将会停止生产者继续向该Queue发送消息。
(3)ManagementContext – 配置使用JMX连接ActiveMQ,默认情况下,使用父容器的JMX连接。
(4)PersistenceAdapter – 控制持久化信息如何保存。默认情况下,ActiveMQ使用KahaDB作为持久化存储,在给生产者发送确认之前将消息持久化。
(5)JaasAuthenticationPlugin – ActiveMQ 使用 JAAS 认证和授权。默认情况下,使用文件 etc/users.properties 保存用户的身份和权限。
(6)SystemUsage – 限制 ActiveMQ 使用的内存和硬盘空间,当超过限制时,ActiveMQ “友善地”停止从生产者接收消息,当消费者取走足够的消息后,会重新从生产者接收消息。
(7)TransportConnectors – 允许客户端使用哪些协议连接ActiveMQ。ActiveMQ支持AMQP 1.0, MQTT, STOMP, and OpenWire,并且可以组合SSL。
1. 安装与启动
(1)unzip jboss-a-mq-6.0.0.redhat-024.zip
(2)vim jboss-a-mq-6.0.0.redhat-024/etc/users.properties 去掉admin注释
(3)vim jboss-a-mq-6.0.0.redhat-024/etc/system.properties 去掉admin注释
activemq.jmx.user=admin
activemq.jmx.password=admin
说明:在 jboss-a-mq-6.2.0.redhat-133 中没有找到这两个属性。
(4)cd jboss-a-mq-6.0.0.redhat-024/bin
(5)./amq,启动Server和交互模式Console
或者 ./amq server,只启动Server
或者 ./amq client,只启动交互模式Console
或者 ./start,后台启动 broker
或者 ./stop,停止后台 broker
(6)log:display | grep fuse
(7)按下tab键,查看所有命令,或者 <command> —-help查看使用帮助
(8)http://localhost:8181/
(9)连接本地或远程的instances:./client —help
2. 基于Queue的消息发送与接收
(1)cd extras
(2)java -jar mq-client.jar producer --brokerUrl failover://tcp://localhost:61616 --user admin --password admin --destination queue://summit.test --count 10
(3)java -jar mq-client.jar consumer --brokerUrl failover://tcp://localhost:61616 --user admin --password admin --destination queue://summit.test --count 10
说明:
(1)Queue 是一对一通讯。
(2)Queue 允许生产者和消费者异步通讯,即两者通讯时不必都在运行。
(3)failover Transport 当连接断掉的时候会自动重连。
3. Queue的round robin 负载均衡特性
(1)cd extras
(2)java -jar mq-client.jar consumer --brokerUrl failover://tcp://localhost:61616 --user admin --password admin --destination queue://summit.test --count 50
(3)java -jar mq-client.jar consumer --brokerUrl failover://tcp://localhost:61616 --user admin --password admin --destination queue://summit.test --count 50
(4)java -jar mq-client.jar producer --brokerUrl failover://tcp://localhost:61616 --user admin --password admin --destination queue://summit.test --count 100 --sleep 100
说明:
(1)Queue默认轮流发送消息到各个消费者,所以一个消费者接收奇数消息,另一个接收偶数消息。
4. 基于Topic的消息发送与接收
(1)cd extras
(2)java -jar mq-client.jar consumer --brokerUrl failover://tcp://localhost:61616 --user admin --password admin --destination topic://summit.test --count 10
(3)java -jar mq-client.jar consumer --brokerUrl failover://tcp://localhost:61616 --user admin --password admin --destination topic://summit.test --count 10
(4)java -jar mq-client.jar producer --brokerUrl failover://tcp://localhost:61616 --user admin --password admin --destination topic://summit.test --count 10
说明:
(1)Topic 是一对多通讯。
(2)Topic 默认情况下要求生产者和消费者通讯时必须都在运行。
(3)failover Transport 当连接断掉的时候会自动重连。
5. etc/activemq.xml 内容说明
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- Allows us to use system properties and fabric as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="properties">
<bean class="org.fusesource.mq.fabric.ConfigurationProperties"/>
</property>
</bean>
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="${broker-name}"
dataDirectory="${data}"
start="false">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="true">
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<persistenceAdapter>
<kahaDB directory="${data}/kahadb"/>
</persistenceAdapter>
<plugins>
<jaasAuthenticationPlugin configuration="karaf" />
</plugins>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="64 mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:0?maximumConnections=1000"/>
</transportConnectors>
</broker>
</beans>
说明:
(1)activemq.xml 是一个Spring的配置文件,使用Spring的依赖注入机制。
(2)DestinationPolicy – 使用匹配符配置 Queue/Topic 的限制策略。
比如这里的配置要求
• 所有的Topic 的未决消息不能超过1000条,如果到达了此值,将会停止生产者继续向该Topic发送消息。
• 所有的Queue 不能超过1mb的内存大小,如果到达了此值,将会停止生产者继续向该Queue发送消息。
(3)ManagementContext – 配置使用JMX连接ActiveMQ,默认情况下,使用父容器的JMX连接。
(4)PersistenceAdapter – 控制持久化信息如何保存。默认情况下,ActiveMQ使用KahaDB作为持久化存储,在给生产者发送确认之前将消息持久化。
(5)JaasAuthenticationPlugin – ActiveMQ 使用 JAAS 认证和授权。默认情况下,使用文件 etc/users.properties 保存用户的身份和权限。
(6)SystemUsage – 限制 ActiveMQ 使用的内存和硬盘空间,当超过限制时,ActiveMQ “友善地”停止从生产者接收消息,当消费者取走足够的消息后,会重新从生产者接收消息。
(7)TransportConnectors – 允许客户端使用哪些协议连接ActiveMQ。ActiveMQ支持AMQP 1.0, MQTT, STOMP, and OpenWire,并且可以组合SSL。
没有评论:
发表评论