环境:OS X EI Capitan 10.11.4 + ActiveMQ 5.13
实际情况中,很多应用需要请求-问答方式的通讯方式。请求-应答方式并不是JMS规范系统默认提供的通信方式,而是通过在现有通信方式的基础上稍微运用一点技巧实现的。
下图是典型的请求-应答方式的交互过程:
1. TemporaryQueue_Client_AutoAck_NonPeresistent.java
package com.travelsky.activemq;
import java.util.Random;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TemporaryQueue_Client_AutoAck_NonPeresistent implements MessageListener {
private static final int SEND_NUMBER = 5;
// ConnectionFactory:连接工厂,JMS用它创建连接
private ConnectionFactory connectionFactory;
// Connection:JMS客户端到JMS Provider的连接
private Connection connection = null;
// Session:一个发送或接收消息的线程
private Session session;
// Destination:消息的目的地
private Destination destination;
// Destination: 临时消息目的地
private Destination tempDest;
// MessageProducer: 消息生产者
private MessageProducer producer;
// MessageConsumer: 响应消息消费者
private MessageConsumer responseConsumer;
public TemporaryQueue_Client_AutoAck_NonPeresistent() {
//构造ConnectionFactory实例对象,此处采用ActiveMq的实现
connectionFactory = new ActiveMQConnectionFactory(
"admin",
"admin",
"failover:(tcp://127.0.0.1:61616)");
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("Client.MessageQueue");
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// This class will handle the messages to the temp queue as well
tempDest = session.createTemporaryQueue();
responseConsumer = session.createConsumer(tempDest);
responseConsumer.setMessageListener(this);
sendMessage(session, producer);
} catch (Exception e) {
e.printStackTrace();
}
}
public void sendMessage(Session session, MessageProducer producer)
throws Exception {
for (int i = 1; i <= SEND_NUMBER; i++) {
TextMessage message = session
.createTextMessage("MyProtocolMessage");
message.setJMSReplyTo(tempDest);
String correlationId = this.createRandomString();
message.setJMSCorrelationID(correlationId);
//Thread.sleep(5000);
System.out.println("发送消息:" + "MyProtocolMessage");
producer.send(message);
}
}
@Override
public void onMessage(Message message) {
String messageText = null;
try {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
messageText = textMessage.getText();
System.out.println("messageText = " + messageText);
}
} catch (JMSException e) {
//Handle the exception appropriately
}
}
private String createRandomString() {
Random random = new Random(System.currentTimeMillis());
long randomLong = random.nextLong();
return Long.toHexString(randomLong);
}
public static void main(String[] args) {
new TemporaryQueue_Client_AutoAck_NonPeresistent();
}
}
2. TemporaryQueue_Server_AutoAck_NonPeresistent.java
package com.travelsky.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TemporaryQueue_Server_AutoAck_NonPeresistent implements MessageListener {
private static final int SEND_NUMBER = 5;
// ConnectionFactory:连接工厂,JMS用它创建连接
private ConnectionFactory connectionFactory;
// Connection:JMS客户端到JMS Provider的连接
private Connection connection = null;
// Session:一个发送或接收消息的线程
private Session session;
// Destination:消息的目的地
private Destination destination;
// Destination: 临时消息目的地
private Destination tempDest;
// MessageProducer: 反馈消息生产者
private MessageProducer replyProducer;
private MessageProtocol messageProtocol;
public TemporaryQueue_Server_AutoAck_NonPeresistent() {
//构造ConnectionFactory实例对象,此处采用ActiveMq的实现
connectionFactory = new ActiveMQConnectionFactory(
"admin",
"admin",
"failover:(tcp://127.0.0.1:61616)");
try {
this.messageProtocol = new MessageProtocol();
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("Client.MessageQueue");
replyProducer = session.createProducer(null);
replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//Set up a consumer to consume messages off of the admin queue
MessageConsumer consumer = this.session.createConsumer(destination);
consumer.setMessageListener(this);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void onMessage(Message message) {
try {
TextMessage response = this.session.createTextMessage();
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
String messageText = txtMsg.getText();
response.setText(this.messageProtocol.handleProtocolMessage(messageText));
}
//Set the correlation ID from the received message to be the correlation id of the response message
//this lets the client identify which message this is a response to if it has more than
//one outstanding message to the server
response.setJMSCorrelationID(message.getJMSCorrelationID());
//Send the response to the Destination specified by the JMSReplyTo field of the received message,
//this is presumably a temporary queue created by the client
this.replyProducer.send(message.getJMSReplyTo(), response);
System.out.println("replyMessageText = " + response.getText());
} catch (JMSException e) {
//Handle the exception appropriately
}
}
public static void main(String[] args) {
new TemporaryQueue_Server_AutoAck_NonPeresistent();
}
}
3. MessageProtocol.java
package com.travelsky.activemq;
public class MessageProtocol {
public String handleProtocolMessage(String messageText) {
String responseText;
if ("MyProtocolMessage".equalsIgnoreCase(messageText)) {
responseText = "I recognize your protocol message";
} else {
responseText = "Unknown protocol message: " + messageText;
}
return responseText;
}
}
参考文献:
1. http://www.csdn123.com/html/exception/567/567525_567522_567528.htm
2. http://www.360doc.com/content/09/0712/21/18042_4241975.shtml
3. http://stackoverflow.com/questions/4722022/activemq-sending-a-message-to-a-temporary-queue-specified-using-a-string-nms
4. http://activemq.apache.org/how-should-i-implement-request-response-with-jms.html
实际情况中,很多应用需要请求-问答方式的通讯方式。请求-应答方式并不是JMS规范系统默认提供的通信方式,而是通过在现有通信方式的基础上稍微运用一点技巧实现的。
下图是典型的请求-应答方式的交互过程:
1. TemporaryQueue_Client_AutoAck_NonPeresistent.java
package com.travelsky.activemq;
import java.util.Random;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TemporaryQueue_Client_AutoAck_NonPeresistent implements MessageListener {
private static final int SEND_NUMBER = 5;
// ConnectionFactory:连接工厂,JMS用它创建连接
private ConnectionFactory connectionFactory;
// Connection:JMS客户端到JMS Provider的连接
private Connection connection = null;
// Session:一个发送或接收消息的线程
private Session session;
// Destination:消息的目的地
private Destination destination;
// Destination: 临时消息目的地
private Destination tempDest;
// MessageProducer: 消息生产者
private MessageProducer producer;
// MessageConsumer: 响应消息消费者
private MessageConsumer responseConsumer;
public TemporaryQueue_Client_AutoAck_NonPeresistent() {
//构造ConnectionFactory实例对象,此处采用ActiveMq的实现
connectionFactory = new ActiveMQConnectionFactory(
"admin",
"admin",
"failover:(tcp://127.0.0.1:61616)");
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("Client.MessageQueue");
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// This class will handle the messages to the temp queue as well
tempDest = session.createTemporaryQueue();
responseConsumer = session.createConsumer(tempDest);
responseConsumer.setMessageListener(this);
sendMessage(session, producer);
} catch (Exception e) {
e.printStackTrace();
}
}
public void sendMessage(Session session, MessageProducer producer)
throws Exception {
for (int i = 1; i <= SEND_NUMBER; i++) {
TextMessage message = session
.createTextMessage("MyProtocolMessage");
message.setJMSReplyTo(tempDest);
String correlationId = this.createRandomString();
message.setJMSCorrelationID(correlationId);
//Thread.sleep(5000);
System.out.println("发送消息:" + "MyProtocolMessage");
producer.send(message);
}
}
@Override
public void onMessage(Message message) {
String messageText = null;
try {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
messageText = textMessage.getText();
System.out.println("messageText = " + messageText);
}
} catch (JMSException e) {
//Handle the exception appropriately
}
}
private String createRandomString() {
Random random = new Random(System.currentTimeMillis());
long randomLong = random.nextLong();
return Long.toHexString(randomLong);
}
public static void main(String[] args) {
new TemporaryQueue_Client_AutoAck_NonPeresistent();
}
}
2. TemporaryQueue_Server_AutoAck_NonPeresistent.java
package com.travelsky.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TemporaryQueue_Server_AutoAck_NonPeresistent implements MessageListener {
private static final int SEND_NUMBER = 5;
// ConnectionFactory:连接工厂,JMS用它创建连接
private ConnectionFactory connectionFactory;
// Connection:JMS客户端到JMS Provider的连接
private Connection connection = null;
// Session:一个发送或接收消息的线程
private Session session;
// Destination:消息的目的地
private Destination destination;
// Destination: 临时消息目的地
private Destination tempDest;
// MessageProducer: 反馈消息生产者
private MessageProducer replyProducer;
private MessageProtocol messageProtocol;
public TemporaryQueue_Server_AutoAck_NonPeresistent() {
//构造ConnectionFactory实例对象,此处采用ActiveMq的实现
connectionFactory = new ActiveMQConnectionFactory(
"admin",
"admin",
"failover:(tcp://127.0.0.1:61616)");
try {
this.messageProtocol = new MessageProtocol();
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("Client.MessageQueue");
replyProducer = session.createProducer(null);
replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//Set up a consumer to consume messages off of the admin queue
MessageConsumer consumer = this.session.createConsumer(destination);
consumer.setMessageListener(this);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void onMessage(Message message) {
try {
TextMessage response = this.session.createTextMessage();
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
String messageText = txtMsg.getText();
response.setText(this.messageProtocol.handleProtocolMessage(messageText));
}
//Set the correlation ID from the received message to be the correlation id of the response message
//this lets the client identify which message this is a response to if it has more than
//one outstanding message to the server
response.setJMSCorrelationID(message.getJMSCorrelationID());
//Send the response to the Destination specified by the JMSReplyTo field of the received message,
//this is presumably a temporary queue created by the client
this.replyProducer.send(message.getJMSReplyTo(), response);
System.out.println("replyMessageText = " + response.getText());
} catch (JMSException e) {
//Handle the exception appropriately
}
}
public static void main(String[] args) {
new TemporaryQueue_Server_AutoAck_NonPeresistent();
}
}
3. MessageProtocol.java
package com.travelsky.activemq;
public class MessageProtocol {
public String handleProtocolMessage(String messageText) {
String responseText;
if ("MyProtocolMessage".equalsIgnoreCase(messageText)) {
responseText = "I recognize your protocol message";
} else {
responseText = "Unknown protocol message: " + messageText;
}
return responseText;
}
}
参考文献:
1. http://www.csdn123.com/html/exception/567/567525_567522_567528.htm
2. http://www.360doc.com/content/09/0712/21/18042_4241975.shtml
3. http://stackoverflow.com/questions/4722022/activemq-sending-a-message-to-a-temporary-queue-specified-using-a-string-nms
4. http://activemq.apache.org/how-should-i-implement-request-response-with-jms.html
没有评论:
发表评论