环境:OS X EI Capitan 10.11.3 + ActiveMQ 5.13.0
Topic中的消息,消息消费者必须在线才能消费生产者发送的消息。
如果消费者想消费离线时发送的消息,必须使用持久化订阅。但持久化订阅的配置有些麻烦,需要指定ClientID,并且效率很差,只能单线程消费消息。
这意味着,持久化订阅不能实现消息的负载均衡和高可用。
有没有一种Topic,可以消费离线后发送的消息,同时消费者可以多线程消费。
回答是肯定的,就是 ActiveMQ 的 Virtual Topic。
Virtual Topic是一个逻辑的Topic,不是真实存在的物理Topic,经由它,可以把发送到Virtual Topic转发到Queue,这些Queue是真实存在的物理Queue。
默认情况下,以 VirtualTopic打头的Topic就是Virtual Topic,比如 VirtualTopic.T;以Consumer.*.VirtualTopic.T 命名的就是和VirtualTopic.T连接的Queue。
并且 VirtualTopic 和对应的 Queue 无需事先创建,运行时动态创建即可。
比如 VirtualTopic.Orders 就是一个Virtual Topic;Consumer.A.VirtualTopic.Orders 和 Consumer.B.VirtualTopic.Orders 就是两个和VirtualTopic.Orders 连接的Queue。
发送到VirtualTopic.Orders的消息会自动转发到Consumer.A.VirtualTopic.Orders和Consumer.B.VirtualTopic.Orders中。
你也可以修改这个默认规则,比如下面这个例子就是把所有发送到VirtualTopic.T的消息,转发到型如 VirtualQueueConsumer.*. 的Queue中。
<broker>
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<!-- deliver traffic from virtual topic T to all subscribers to
destinations matching the prefix "VirtualQueueConsumer.*" (queue or
topic) -->
<virtualTopic name="VirtualTopic.T"
prefix="VirtualQueueConsumer.*." />
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
</broker>
1. VirtualTopic_Sender_AutoAck_NonPeresistent.java
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class VirtualTopic_Sender_AutoAck_NonPeresistent {
private static final int SEND_NUMBER = 5;
public static void main(String[] args) {
// ConnectionFactory:连接工厂,JMS用它创建连接
ConnectionFactory connectionFactory;
// Connection:JMS客户端到JMS Provider的连接
Connection connection = null;
// Session:一个发送或接收消息的线程
Session session;
// Destination:消息的目的地
Destination destination;
// MessageProducer: 消息生产者
MessageProducer producer;
//构造ConnectionFactory实例对象,此处采用ActiveMq的实现
connectionFactory = new ActiveMQConnectionFactory(
"admin",
"admin",
"failover:(tcp://127.0.0.1:61616)");
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createTopic("VirtualTopic.Orders");
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMessage(session, producer);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection) {
connection.close();
}
} catch (Throwable ignore) {
}
}
}
public static void sendMessage(Session session, MessageProducer producer)
throws Exception {
for (int i = 1; i <= SEND_NUMBER; i++) {
TextMessage message = session
.createTextMessage("ActiveMq发送的消息" + i);
//Thread.sleep(5000);
System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
producer.send(message);
}
}
}
2. VirtualTopic_Receiver_AutoAck_Async.java
package com.travelsky.activemq;
import java.io.IOException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class VirtualTopic_Receiver_AutoAck_Async {
public static void main(String[] args) {
// ConnectionFactory:连接工厂,JMS用它创建连接
ConnectionFactory connectionFactory;
// Connection:JMS客户端到JMS Provider的连接
Connection connection = null;
// Session:一个发送或接收消息的线程
Session session;
// Destination:消息的目的地;消息发送给谁.
Destination destination;
//消费者,消息接收者
MessageConsumer consumer;
connectionFactory = new ActiveMQConnectionFactory(
"admin",
"admin",
"failover:(tcp://127.0.0.1:61616)");
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destinationA = session.createQueue("Consumer.A.VirtualTopic.Orders");
MessageConsumer consumerA = session.createConsumer(destinationA);
consumerA.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
System.out.println("Consumer A Received message: " + tm.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
Destination destinationB = session.createQueue("Consumer.B.VirtualTopic.Orders");
MessageConsumer consumerB = session.createConsumer(destinationB);
consumerB.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
System.out.println("Consumer B Received message: " + tm.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection) {
connection.close();
}
} catch (Throwable ignore) {
}
}
}
}
特别要注意的是, 消费者必须先连接到 Broker,这样才可以创建型如 Consumer.*.VirtualTopic.Orders 的Queue,然后发送到 VirtualTopic.Orders 的消息,Consumer.*.VirtualTopic.Orders 才能收到。
如果是在发送消息到 VirtualTopic.Orders 之后再运行消费者程序,则之前的消息不会收到。
这是 Topic 的固有特性决定的。
参考文献:
1. https://www.packtpub.com/books/content/using-virtual-destinations-advanced
2. http://activemq.apache.org/virtual-destinations.html
3. http://blog.csdn.net/kimmking/article/details/9773085