环境:OS X EI Capitan 10.11.3 + ActiveMQ 5.13.0
Composite Queue/Topic 和 Virtual Topic 功能差不多,唯一的区别是Composite Queue/Topic需要事先定义在配置文件中,而 Virtual Topic 不需要(除了修改默认规则)。
1. 修改配置文件activemq.xml,增加如下内容:
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<compositeQueue name="CompositeQueue.Orders" >
<forwardTo>
<topic physicalName="Topic.A" />
<queue physicalName="Queue.B"/>
</forwardTo>
</compositeQueue>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
2. CompositeQueue_Sender_AutoAck_NonPeresistent.java
package com.travelsky.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class CompositeQueue_Sender_AutoAck_NonPeresistent {
private static final int SEND_NUMBER = 5;
public static void main(String[] args) {
// ConnectionFactory:连接工厂,JMS用它创建连接
ConnectionFactory connectionFactory;
// Connection:JMS客户端到JMS Provider的连接
Connection connection = null;
// Session:一个发送或接收消息的线程
Session session;
// Destination:消息的目的地
Destination destination;
// MessageProducer: 消息生产者
MessageProducer producer;
//构造ConnectionFactory实例对象,此处采用ActiveMq的实现
connectionFactory = new ActiveMQConnectionFactory(
"admin",
"admin",
"failover:(tcp://127.0.0.1:61616)");
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("CompositeQueue.Orders");
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMessage(session, producer);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection) {
connection.close();
}
} catch (Throwable ignore) {
}
}
}
public static void sendMessage(Session session, MessageProducer producer)
throws Exception {
for (int i = 1; i <= SEND_NUMBER; i++) {
TextMessage message = session
.createTextMessage("ActiveMq发送的消息" + i);
//Thread.sleep(5000);
System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
producer.send(message);
}
}
}
Composite Queue/Topic 和 Virtual Topic 功能差不多,唯一的区别是Composite Queue/Topic需要事先定义在配置文件中,而 Virtual Topic 不需要(除了修改默认规则)。
1. 修改配置文件activemq.xml,增加如下内容:
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<compositeQueue name="CompositeQueue.Orders" >
<forwardTo>
<topic physicalName="Topic.A" />
<queue physicalName="Queue.B"/>
</forwardTo>
</compositeQueue>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
2. CompositeQueue_Sender_AutoAck_NonPeresistent.java
package com.travelsky.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class CompositeQueue_Sender_AutoAck_NonPeresistent {
private static final int SEND_NUMBER = 5;
public static void main(String[] args) {
// ConnectionFactory:连接工厂,JMS用它创建连接
ConnectionFactory connectionFactory;
// Connection:JMS客户端到JMS Provider的连接
Connection connection = null;
// Session:一个发送或接收消息的线程
Session session;
// Destination:消息的目的地
Destination destination;
// MessageProducer: 消息生产者
MessageProducer producer;
//构造ConnectionFactory实例对象,此处采用ActiveMq的实现
connectionFactory = new ActiveMQConnectionFactory(
"admin",
"admin",
"failover:(tcp://127.0.0.1:61616)");
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("CompositeQueue.Orders");
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMessage(session, producer);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection) {
connection.close();
}
} catch (Throwable ignore) {
}
}
}
public static void sendMessage(Session session, MessageProducer producer)
throws Exception {
for (int i = 1; i <= SEND_NUMBER; i++) {
TextMessage message = session
.createTextMessage("ActiveMq发送的消息" + i);
//Thread.sleep(5000);
System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
producer.send(message);
}
}
}
没有评论:
发表评论