2016年2月19日星期五

ActiveMQ_013:Virtual Topic 使用例子

环境:OS X EI Capitan 10.11.3 + ActiveMQ 5.13.0

Topic中的消息,消息消费者必须在线才能消费生产者发送的消息。
如果消费者想消费离线时发送的消息,必须使用持久化订阅。但持久化订阅的配置有些麻烦,需要指定ClientID,并且效率很差,只能单线程消费消息。
这意味着,持久化订阅不能实现消息的负载均衡和高可用。
有没有一种Topic,可以消费离线后发送的消息,同时消费者可以多线程消费。
回答是肯定的,就是 ActiveMQ 的 Virtual Topic。
Virtual Topic是一个逻辑的Topic,不是真实存在的物理Topic,经由它,可以把发送到Virtual Topic转发到Queue,这些Queue是真实存在的物理Queue。
 


默认情况下,以 VirtualTopic打头的Topic就是Virtual Topic,比如 VirtualTopic.T;以Consumer.*.VirtualTopic.T 命名的就是和VirtualTopic.T连接的Queue。
并且 VirtualTopic 和对应的 Queue 无需事先创建,运行时动态创建即可。

比如 VirtualTopic.Orders 就是一个Virtual Topic;Consumer.A.VirtualTopic.Orders 和 Consumer.B.VirtualTopic.Orders 就是两个和VirtualTopic.Orders 连接的Queue。
发送到VirtualTopic.Orders的消息会自动转发到Consumer.A.VirtualTopic.Orders和Consumer.B.VirtualTopic.Orders中。

你也可以修改这个默认规则,比如下面这个例子就是把所有发送到VirtualTopic.T的消息,转发到型如 VirtualQueueConsumer.*. 的Queue中。

<broker> 
 <destinationInterceptors>
   <virtualDestinationInterceptor>
     <virtualDestinations>
        <!-- deliver traffic from virtual topic T to all subscribers to destinations matching the prefix "VirtualQueueConsumer.*" (queue or topic) -->
        <virtualTopic name="VirtualTopic.T"
                    prefix="VirtualQueueConsumer.*." />
     </virtualDestinations>
    </virtualDestinationInterceptor>
   </destinationInterceptors>
 </broker> 

1. VirtualTopic_Sender_AutoAck_NonPeresistent.java

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class VirtualTopic_Sender_AutoAck_NonPeresistent {

    private static final int SEND_NUMBER = 5;

    public static void main(String[] args) {

        // ConnectionFactory:连接工厂,JMS用它创建连接
        ConnectionFactory connectionFactory;
        // Connection:JMS客户端到JMS Provider的连接
        Connection connection = null;
        // Session:一个发送或接收消息的线程
        Session session;
        // Destination:消息的目的地
        Destination destination;
        // MessageProducer: 消息生产者
        MessageProducer producer;
        //构造ConnectionFactory实例对象,此处采用ActiveMq的实现
        connectionFactory = new ActiveMQConnectionFactory(
                "admin",
                "admin",
                "failover:(tcp://127.0.0.1:61616)");

        try {
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            destination = session.createTopic("VirtualTopic.Orders");
            producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            sendMessage(session, producer);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection) {
                    connection.close();
                }
            } catch (Throwable ignore) {
            }
        }
    }

    public static void sendMessage(Session session, MessageProducer producer)
            throws Exception {
        for (int i = 1; i <= SEND_NUMBER; i++) {
            TextMessage message = session
                    .createTextMessage("ActiveMq发送的消息" + i);
            //Thread.sleep(5000);
            System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
            producer.send(message);
        }
    }
}

2. VirtualTopic_Receiver_AutoAck_Async.java

package com.travelsky.activemq;

import java.io.IOException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class VirtualTopic_Receiver_AutoAck_Async {

    public static void main(String[] args) {

        // ConnectionFactory:连接工厂,JMS用它创建连接
        ConnectionFactory connectionFactory;
        // Connection:JMS客户端到JMS Provider的连接
        Connection connection = null;
        // Session:一个发送或接收消息的线程
        Session session;
        // Destination:消息的目的地;消息发送给谁.
        Destination destination;
        //消费者,消息接收者
        MessageConsumer consumer;
        connectionFactory = new ActiveMQConnectionFactory(
                "admin",
                "admin",
                "failover:(tcp://127.0.0.1:61616)");
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destinationA = session.createQueue("Consumer.A.VirtualTopic.Orders");
            MessageConsumer consumerA = session.createConsumer(destinationA);
            consumerA.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage tm = (TextMessage) message;
                    try {
                        System.out.println("Consumer A Received message: " + tm.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
           
            Destination destinationB = session.createQueue("Consumer.B.VirtualTopic.Orders");
            MessageConsumer consumerB = session.createConsumer(destinationB);
            consumerB.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage tm = (TextMessage) message;
                    try {
                        System.out.println("Consumer B Received message: " + tm.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });

            try {
                System.in.read();
            } catch (IOException e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection) {
                    connection.close();
                }
            } catch (Throwable ignore) {
            }
        }
    }
}

特别要注意的是, 消费者必须先连接到 Broker,这样才可以创建型如 Consumer.*.VirtualTopic.Orders 的Queue,然后发送到 VirtualTopic.Orders 的消息,Consumer.*.VirtualTopic.Orders 才能收到。
如果是在发送消息到 VirtualTopic.Orders 之后再运行消费者程序,则之前的消息不会收到。
这是 Topic 的固有特性决定的。

参考文献:
1. https://www.packtpub.com/books/content/using-virtual-destinations-advanced
2. http://activemq.apache.org/virtual-destinations.html
3. http://blog.csdn.net/kimmking/article/details/9773085

没有评论: