2009年3月8日星期日

WLS_017:WebLogic Server基本管理之十三:为JMS消息设置顺序

运行环境:WebLogic Server 12.1.1 开发版 + Oracle Database 10g Express Edition 10.2.0.1。

Queue中的消息默认遵循先进先出原则,如果一个Queue有多个消费者,那么消息可能被不同的消费者“消费”。
对于那些有关联性的消息,需要按照顺序被同一个消费者“消费”。
这时,我们需要“人为干预”的Queue中消息排列顺序,WebLogic Server支持为JMS消息设置顺序。
设计图如下:

1. 实验准备:创建和配置JMS资源
请参考实验《为JMS消息设置过期时间》。

2. 实验准备:设置环境变量
打开4个DOS命令窗口,均设置如下环境变量
set JAVA_HOME=C:\Oracle\Java\jdk1.6.0_25
set MW_HOME=C:\Oracle\wls1211_dev
set JAVA_VENDOR=Sun
运行脚本:%MW_HOME%\wlserver\server\bin\setWLSEnv.cmd

3. 编写Java生产者:QueueSend.java
public void init(Context ctx, String queueName) throws NamingException, JMSException {
qconFactory = (QueueConnectionFactory)ctx.lookup(JMS_FACTORY);
qcon = qconFactory.createQueueConnection();
qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
queue = (Queue)ctx.lookup(queueName);
qsender = qsession.createSender(queue);
msg = qsession.createTextMessage();
qcon.start();
}

private static void readAndSend(QueueSend qs) throws IOException, JMSException {
BufferedReader msgStream = new BufferedReader(new InputStreamReader(System.in));
String line = null;
boolean quitNow = false;
do {
System.out.print("Enter message (\"quit\" to quit): \n");
line = msgStream.readLine();
if (line != null && line.trim().length() != 0) {
qs.send(line);
System.out.println("JMS Message Sent: " + line + "\n");
quitNow = line.equalsIgnoreCase("quit");
}
} while (!quitNow);
}

4. 编写消费者代码:QueueReceiveClient.java
public void init(Context ctx, String queueName) throws NamingException, JMSException {
qconFactory = (QueueConnectionFactory)ctx.lookup(JMS_FACTORY);
qcon = qconFactory.createQueueConnection();
qsession = qcon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
queue = (Queue)ctx.lookup(queueName);
qreceiver = qsession.createReceiver(queue);
qreceiver.setMessageListener(this);
qcon.start();
}
上面的这行代码qcon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);表明消费者必须在代码中明确acknowledge,Queue中的消息才会消失。
如果换成qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);表明消费者将自动acknowledge接收的每一条消息,无需在代码中明确acknowledge。
要想让多个JMS消息被一个消费者“消费”,需要使用Session.CLIENT_ACKNOWLEDGE,而不是Session.AUTO_ACKNOWLEDGE。

public void onMessage(Message msg) {
try {
String msgText;
if (msg instanceof TextMessage) {
msgText = ((TextMessage)msg).getText();
} else {
msgText = msg.toString();
}

System.out.println("Message Received: " + msgText);

if (msgText.equalsIgnoreCase("quit")) {
synchronized (this) {
quit = true;
msg.acknowledge();
this.notifyAll(); // Notify main thread to quit
}
}
} catch (JMSException jmse) {
System.err.println("An exception occurred: " + jmse.getMessage());
}
}

上面这个方法表明:将一直接收消息,知道收到了“quit”消息,才acknowledge之前所有收到的信息。

5. 测试不使用Unit-of-Order的情况
(1)在DOS命令窗口中启动AdminServer。
(2)在DOS命令窗口中启动消息生产者:java QueueSend t3://localhost:7001
(3)在两个DOS命令窗口中分别启动两个消息消费者:java QueueReceiveClient t3://localhost:7001
(4)在消息生产者DOS窗口连续发送4条消息:msg1、msg2、msg3、msg4。
(5)观察两个消息消费者DOS窗口,发现一个消息消费者“消费”了msg1、msg3,另一个“消费”了msg2、msg4。
这说明默认情况下,Queue中消息可能被多个消费者“消费”,无法保证消息只被一个消费者“消费”。
(6)在消息生产者DOS命令窗口输入quit,会发现某个消息消费者DOS窗口接收了该消息,并acknowledge了该消费者收到的所有Message。
(7)此时,观察Queue中的消息,发现还有一些消息存在,这是因为另外一个消息消费者并没有收到quit消息,所以还没有acknowledge了该消费者收到的所有Message。
(8)再次在DOS命令窗口中启动消息生产者:java QueueSend t3://localhost:7001,发送quit消息,发现Queue所有的消息都不在了。

6. 测试使用Unit-of-Order的情况
修改Connection Factory中的Default Delivery中的参数:
(1)Default Unit-of-Order for Producer:User-generated
(2)User-generated Unit-of-Order Name:uoo-test (这个值可以任意给)
这个修改需要重启Server。

其余测试步骤与上面的(2)-(4)相同,我们发现所有的消息都被一个消费者“消费”了。
点击查看Queue中的消息,发现多了一个key:JMS_BEA_UnitOfOrder:uoo-test。

7. 总结与思考
要想实现让多个JMS消息被一个消费者“消费”,需要
(1)在Connection Factory上设置Default Delivery中的参数。
(2)消息消费者的代码必须使用Session.CLIENT_ACKNOWLEDGE方式。
(3)Queue中或代码中不需要指定Unit-of-Order Name,这一点与我原先想的不一样。
(4)在消息生产者和消费消费者代码中,均使用了特定的Connection Factory,看来能够起作用的原因就在于此。

代码下载:QueueSendandReceive.7z

没有评论: