2016年4月9日星期六

ActiveMQ_028:使用临时队列实现 Request-Response 通信

环境:OS X EI Capitan 10.11.4 + ActiveMQ 5.13

实际情况中,很多应用需要请求-问答方式的通讯方式。请求-应答方式并不是JMS规范系统默认提供的通信方式,而是通过在现有通信方式的基础上稍微运用一点技巧实现的。
下图是典型的请求-应答方式的交互过程:

1. TemporaryQueue_Client_AutoAck_NonPeresistent.java

package com.travelsky.activemq;

import java.util.Random;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class TemporaryQueue_Client_AutoAck_NonPeresistent implements MessageListener {

    private static final int SEND_NUMBER = 5;

    // ConnectionFactory:连接工厂,JMS用它创建连接
    private ConnectionFactory connectionFactory;
    // Connection:JMS客户端到JMS Provider的连接
    private Connection connection = null;
    // Session:一个发送或接收消息的线程
    private Session session;
    // Destination:消息的目的地
    private Destination destination;
    // Destination: 临时消息目的地
    private Destination tempDest;
    // MessageProducer: 消息生产者
    private MessageProducer producer;
    // MessageConsumer: 响应消息消费者
    private MessageConsumer responseConsumer;

    public TemporaryQueue_Client_AutoAck_NonPeresistent() {

        //构造ConnectionFactory实例对象,此处采用ActiveMq的实现
        connectionFactory = new ActiveMQConnectionFactory(
                "admin",
                "admin",
                "failover:(tcp://127.0.0.1:61616)");

        try {
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue("Client.MessageQueue");
           
            producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
           
            // This class will handle the messages to the temp queue as well
            tempDest = session.createTemporaryQueue();
            responseConsumer = session.createConsumer(tempDest);
            responseConsumer.setMessageListener(this);
            sendMessage(session, producer);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void sendMessage(Session session, MessageProducer producer)
            throws Exception {
        for (int i = 1; i <= SEND_NUMBER; i++) {
            TextMessage message = session
                    .createTextMessage("MyProtocolMessage");
            message.setJMSReplyTo(tempDest);
            String correlationId = this.createRandomString();
            message.setJMSCorrelationID(correlationId);
            //Thread.sleep(5000);
            System.out.println("发送消息:" + "MyProtocolMessage");
            producer.send(message);
        }
    }

    @Override
    public void onMessage(Message message) {
        String messageText = null;
        try {
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                messageText = textMessage.getText();
                System.out.println("messageText = " + messageText);
            }
        } catch (JMSException e) {
            //Handle the exception appropriately
        }
    }
   
    private String createRandomString() {
        Random random = new Random(System.currentTimeMillis());
        long randomLong = random.nextLong();
        return Long.toHexString(randomLong);
    }

    public static void main(String[] args) {
        new TemporaryQueue_Client_AutoAck_NonPeresistent();
    }
}

2. TemporaryQueue_Server_AutoAck_NonPeresistent.java

package com.travelsky.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class TemporaryQueue_Server_AutoAck_NonPeresistent implements MessageListener {

    private static final int SEND_NUMBER = 5;

    // ConnectionFactory:连接工厂,JMS用它创建连接
    private ConnectionFactory connectionFactory;
    // Connection:JMS客户端到JMS Provider的连接
    private Connection connection = null;
    // Session:一个发送或接收消息的线程
    private Session session;
    // Destination:消息的目的地
    private Destination destination;
    // Destination: 临时消息目的地
    private Destination tempDest;
    // MessageProducer: 反馈消息生产者
    private MessageProducer replyProducer;
    private MessageProtocol messageProtocol;

    public TemporaryQueue_Server_AutoAck_NonPeresistent() {

        //构造ConnectionFactory实例对象,此处采用ActiveMq的实现
        connectionFactory = new ActiveMQConnectionFactory(
                "admin",
                "admin",
                "failover:(tcp://127.0.0.1:61616)");
       
        try {
            this.messageProtocol = new MessageProtocol();

            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue("Client.MessageQueue");

            replyProducer = session.createProducer(null);
            replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

            //Set up a consumer to consume messages off of the admin queue
            MessageConsumer consumer = this.session.createConsumer(destination);
            consumer.setMessageListener(this);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void onMessage(Message message) {
        try {
            TextMessage response = this.session.createTextMessage();
            if (message instanceof TextMessage) {
                TextMessage txtMsg = (TextMessage) message;
                String messageText = txtMsg.getText();
                response.setText(this.messageProtocol.handleProtocolMessage(messageText));
            }

            //Set the correlation ID from the received message to be the correlation id of the response message
            //this lets the client identify which message this is a response to if it has more than
            //one outstanding message to the server
            response.setJMSCorrelationID(message.getJMSCorrelationID());

            //Send the response to the Destination specified by the JMSReplyTo field of the received message,
            //this is presumably a temporary queue created by the client
            this.replyProducer.send(message.getJMSReplyTo(), response);
            System.out.println("replyMessageText = " + response.getText());
        } catch (JMSException e) {
            //Handle the exception appropriately
        }
    }

    public static void main(String[] args) {
        new TemporaryQueue_Server_AutoAck_NonPeresistent();
    }
}

3. MessageProtocol.java

package com.travelsky.activemq;

public class MessageProtocol {
    public String handleProtocolMessage(String messageText) {
        String responseText;
        if ("MyProtocolMessage".equalsIgnoreCase(messageText)) {
            responseText = "I recognize your protocol message";
        } else {
            responseText = "Unknown protocol message: " + messageText;
        }
        
        return responseText;
    }
}

参考文献:
1. http://www.csdn123.com/html/exception/567/567525_567522_567528.htm
2. http://www.360doc.com/content/09/0712/21/18042_4241975.shtml
3. http://stackoverflow.com/questions/4722022/activemq-sending-a-message-to-a-temporary-queue-specified-using-a-string-nms
4. http://activemq.apache.org/how-should-i-implement-request-response-with-jms.html

没有评论: