运行环境:WebLogic Server 12.1.1 开发版 + Oracle Database 10g Express Edition 10.2.0.1。
默认情况下,如果Topic的订阅者“不在线”,那么是无法收到Topic中的消息的。
这与一些实际情况不相符合:我订阅的杂志,我不在家,就不给我了吗?
为此,WebLogic Server提供了一个Topic持久化订阅功能。
它可以帮助我们在“离线”的情况下,上线后收到之前所有没有收到的消息。
重要步骤说明:
1. 实验准备:创建和配置JMS资源
请参考实验《为JMS消息设置过期时间》。
2. 创建File Store
Topic中的消息要想做到持久化,必须设定持久化方式,这里我们使用File Store作为Persistent Store。
3. 为JMS Server设置Persistent Store
4. 为Topic 创建Durable Subscriber
点击Topic,选择Monitor-->Durable Subscribers-->New
设置Subscription Name:ChinaSubscripiton
设置Client ID:ChinaClient
单凭Client ID还不足以唯一标识一个持久化订阅者,因为一个Client ID还可以该Topic中的多种“杂志”。
Subscription Name就相当于杂志的名称。这跟一个身份证可以订多个房间的道理类似。
5. 编写Topic 发送方代码:TopicSend.java
public void init(Context ctx, String topicName) throws NamingException, JMSException {
tconFactory = (TopicConnectionFactory)ctx.lookup(JMS_FACTORY);
tcon = tconFactory.createTopicConnection();
tsession = tcon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
topic = (Topic)ctx.lookup(topicName);
tpublisher = tsession.createPublisher(topic);
msg = tsession.createTextMessage();
tcon.start();
}
private static void readAndSend(TopicSend ts) throws IOException, JMSException {
BufferedReader msgStream = new BufferedReader(new InputStreamReader(System.in));
String line = null;
boolean quitNow = false;
do {
System.out.print("Enter message (\"quit\" to quit): \n");
line = msgStream.readLine();
if (line != null && line.trim().length() != 0) {
ts.send(line);
System.out.println("JMS Message Sent: " + line + "\n");
quitNow = line.equalsIgnoreCase("quit");
}
} while (!quitNow);
}
以上代码跟发送到普通的Topic代码没有什么不同。
区别是在Topic接收方代码。
6. 编写普通的Topic 接收方代码:TopicReceiveClient.java
public void init(Context ctx, String topicName) throws NamingException, JMSException {
tconFactory = (TopicConnectionFactory)ctx.lookup(JMS_FACTORY);
tcon = tconFactory.createTopicConnection();
tsession = tcon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
topic = (Topic)ctx.lookup(topicName);
tsubscriber = tsession.createSubscriber(topic);
tsubscriber.setMessageListener(this);
tcon.start();
}
public void onMessage(Message msg) {
try {
String msgText;
if (msg instanceof TextMessage) {
msgText = ((TextMessage)msg).getText();
} else {
msgText = msg.toString();
}
System.out.println("Message Received: " + msgText);
if (msgText.equalsIgnoreCase("quit")) {
synchronized (this) {
quit = true;
this.notifyAll(); // Notify main thread to quit
}
}
} catch (JMSException jmse) {
System.err.println("An exception occurred: " + jmse.getMessage());
}
}
7. 编写持久化的Topic 接收方代码:DurableTopicReceiveClient.java
public void init(Context ctx, String topicName) throws NamingException, JMSException {
tconFactory = (TopicConnectionFactory)ctx.lookup(JMS_FACTORY);
tcon = tconFactory.createTopicConnection();
tcon.setClientID("ChinaClient");
tsession = tcon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
topic = (Topic)ctx.lookup(topicName);
tsubscriber = tsession.createDurableSubscriber(topic,"ChinaSubscription");
tsubscriber.setMessageListener(this);
tcon.start();
}
onMessage方法与普通的Topic 接收方代码相同。
8. 测试
8.1 测试环境设置
打开4个DOS命令窗口,均设置如下环境变量
set JAVA_HOME=C:\Oracle\Java\jdk1.6.0_25
set MW_HOME=C:\Oracle\wls1211_dev
set JAVA_VENDOR=Sun
运行脚本:%MW_HOME%\wlserver\server\bin\setWLSEnv.cmd
8.2 测试普通Topic订阅者特性:一对多的消息分发
(1)在DOS命令窗口中启动AdminServer。
(2)在DOS命令窗口中启动消息生产者:java TopicSend t3://localhost:7001
(3)在两个DOS命令窗口中分别启动两个消息消费者:java TopicReceiveClient t3://localhost:7001
(4)在消息生产者DOS窗口连续发送4条消息:msg1、msg2、msg3、msg4。
(5)观察两个消息消费者DOS窗口,发现两个窗口都“消费”了所有消息。
(6)在消息生产者DOS命令窗口输入quit,两个消息消费者DOS窗口都收到该消息,并退出。
8.3 测试普通Topic订阅者特性:离线后再上线,无法收到以前的消息
(1)在DOS命令窗口中启动AdminServer。
(2)在DOS命令窗口中启动消息生产者:java TopicSend t3://localhost:7001
(3)在1个DOS命令窗口中启动1个消息消费者:java TopicReceiveClient t3://localhost:7001
(4)在消息生产者DOS窗口连续发送2条消息:msg1、msg2。
(5)在1个DOS命令窗口中启动另外1个消息消费者:java TopicReceiveClient t3://localhost:7001
(6)在消息生产者DOS窗口连续发送2条消息:msg3、msg4。
(7)观察两个消息消费者DOS窗口,发现第1个窗口“消费”了所有消息,第2个窗口只“消费”了msg3、msg4。
(8)在消息生产者DOS命令窗口输入quit,两个消息消费者DOS窗口都收到该消息,并退出。
8.4 测试持久化Topic订阅者特性:离线后再上线,依然可以收到以前的消息
(1)在DOS命令窗口中启动AdminServer。
(2)在DOS命令窗口中启动消息生产者:java TopicSend t3://localhost:7001
(3)在1个DOS命令窗口中启动1个消息消费者:java TopicReceiveClient t3://localhost:7001
(4)在消息生产者DOS窗口连续发送2条消息:msg1、msg2。
(5)在1个DOS命令窗口中启动另外1个持久化消息消费者:java DurableTopicReceiveClient t3://localhost:7001
(6)在消息生产者DOS窗口连续发送2条消息:msg3、msg4。
(7)观察两个消息消费者DOS窗口,发现两个窗口都“消费”了所有消息,这说明持久化配置起作用了。
(8)在消息生产者DOS命令窗口输入quit,两个消息消费者DOS窗口都收到该消息,并退出。
8.5 测试持久化Topic订阅者特性:设置多个Subscription Name
(1)在DOS命令窗口中启动AdminServer。
(2)在DOS命令窗口中启动消息生产者:java TopicSend t3://localhost:7001
(3)在两个DOS命令窗口中分别启动两个消息消费者:java DurableTopicReceiveClient t3://localhost:7001
这时,在第2个消费者窗口会报错:说Client id:ChinaClient已经被另一个对象使用了。
Exception in thread "main" weblogic.jms.common.InvalidClientIDException: Client id, ChinaClient, is in use. The reason for rejection is "The JNDI name weblogic
.jms.connection.clientid.ChinaClient was found, and was bound to an object of type weblogic.jms.frontend.FEClientIDSingularAggregatable : FEClientIDSingularAggr
egatable(SingularAggregatable(<6539122430256738311 .1="">:4):ChinaClient)"
at weblogic.jms.dispatcher.DispatcherAdapter.convertToJMSExceptionAndThrow(DispatcherAdapter.java:110)
at weblogic.jms.dispatcher.DispatcherAdapter.dispatchSync(DispatcherAdapter.java:45)
at weblogic.jms.client.JMSConnection.setClientIDInternal(JMSConnection.java:660)
at weblogic.jms.client.JMSConnection.setClientID(JMSConnection.java:630)
at weblogic.jms.client.WLConnectionImpl.setClientID(WLConnectionImpl.java:662)
at DurableTopicReceiveClient.init(DurableTopicReceiveClient.java:72)
at DurableTopicReceiveClient.main(DurableTopicReceiveClient.java:104)
Caused by: weblogic.jms.common.InvalidClientIDException: Client id, ChinaClient, is in use. The reason for rejection is "The JNDI name weblogic.jms.connection.
clientid.ChinaClient was found, and was bound to an object of type weblogic.jms.frontend.FEClientIDSingularAggregatable : FEClientIDSingularAggregatable(Singula
rAggregatable(<6539122430256738311 .1="">:4):ChinaClient)"
at weblogic.jms.frontend.FEConnection.setConnectionClientId(FEConnection.java:1170)
at weblogic.jms.frontend.FEConnection.invoke(FEConnection.java:1649)
at weblogic.messaging.dispatcher.Request.wrappedFiniteStateMachine(Request.java:961)
at weblogic.messaging.dispatcher.DispatcherServerRef.invoke(DispatcherServerRef.java:276)
at weblogic.messaging.dispatcher.DispatcherServerRef.handleRequest(DispatcherServerRef.java:141)
at weblogic.messaging.dispatcher.DispatcherServerRef.access$000(DispatcherServerRef.java:34)
at weblogic.messaging.dispatcher.DispatcherServerRef$2.run(DispatcherServerRef.java:111)
at weblogic.work.ExecuteThread.execute(ExecuteThread.java:256)
at weblogic.work.ExecuteThread.run(ExecuteThread.java:221)
这说明,Client ID只能被一个持久化Topic接收方使用,具有排他性。也就是说,多个持久化Topic接收方,需要使用多个Client ID。
(4)复制DurableTopicReceiveClient.java,生成DurableTopicReceiveClient2.java
只把China的地方改成Beijing。
public void init(Context ctx, String topicName) throws NamingException, JMSException {
tconFactory = (TopicConnectionFactory)ctx.lookup(JMS_FACTORY);
tcon = tconFactory.createTopicConnection();
tcon.setClientID("BeijingClient");
tsession = tcon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
topic = (Topic)ctx.lookup(topicName);
tsubscriber = tsession.createDurableSubscriber(topic,"BeijingSubscription");
tsubscriber.setMessageListener(this);
tcon.start();
}
(5)在1个DOS命令窗口中启动消息消费者:java DurableTopicReceiveClient2 t3://localhost:7001
我们意外地发现,在Console中的Topic Durable Subscribers中“自动”增加了Beijing持久化订阅者。
也就是说,不用在Console事先配置,这个特性比较好,省得为每一个持久化订阅者人工配置。
(6)在消息生产者DOS窗口连续发送2条消息:msg1、msg2。
(7)观察两个消息消费者DOS窗口,发现两个窗口都“消费”了所有消息。
(8)停掉两个持久化消费者。
(9)在消息生产者DOS窗口连续发送2条消息:msg3、msg4。
(10)观察两个消息消费者DOS窗口,发现两个窗口都“消费”了所有消息。
代码下载:DurableTopicSubscriber.7z6539122430256738311>6539122430256738311>
没有评论:
发表评论