import java.io.Serializable;
import java.util.HashMap;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class JmsSynConsumerClient {
private Context getContext() throws NamingException {
final Properties env = new Properties();
env.put(Context.INITIAL_CONTEXT_FACTORY, "org.jboss.naming.remote.client.InitialContextFactory");
env.put(Context.SECURITY_PRINCIPAL, "democlient");
env.put(Context.SECURITY_CREDENTIALS, "password1!");
return new InitialContext(env);
}
private Context context;
private void execute() throws Exception {
context = getContext();
ConnectionFactory connectionFactory = null;
Connection connection = null;
Session session = null;
MessageConsumer consumer = null;
Destination destination = null;
try {
connectionFactory = (ConnectionFactory) context.lookup("jms/RemoteConnectionFactory");
System.out.println("Acquiring connection factory success, " + connectionFactory);
destination = (Destination) context.lookup("jms/queue/TransactionQueue");
System.out.println("Acquiring destination success, " + destination);
connection = connectionFactory.createConnection("democlient", "password1!");
System.out.println("Creating connection success, " + connection);
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
System.out.println("Creating session success, " + session);
consumer = session.createConsumer(destination);
System.out.println("Creating consumer success, " + consumer);
connection.start();
Message message = consumer.receive();
int count = 1;
ObjectMessage objectMessage = (ObjectMessage) message;
Serializable object = objectMessage.getObject();
@SuppressWarnings("unchecked")
HashMap map = (HashMap) object;
String text = (String) map.get("message");
count = (Integer) map.get("count");
long delay = (Long) map.get("delay");
System.out.println(count + ": " + text);
Thread.sleep(delay);
// consumer.setMessageListener(new MessageListener() {
// @Override
// public void onMessage(Message message) {
// try {
// ObjectMessage objectMessage = (ObjectMessage) message;
// Serializable object = objectMessage.getObject();
// @SuppressWarnings("unchecked")
// HashMap map = (HashMap) object;
// String text = (String) map.get("message");
// int count = (Integer) map.get("count");
// long delay = (Long) map.get("delay");
// System.out.println(count + ": " + text);
// try {
// Thread.sleep(delay);
// } catch (InterruptedException ex) {
// Logger.getLogger(JmsSynConsumerClient.class.getName()).log(Level.SEVERE, null, ex);
// }
// } catch (JMSException ex) {
// Logger.getLogger(JmsSynConsumerClient.class.getName()).log(Level.SEVERE, null, ex);
// }
// }
// });
//
// Thread.sleep(30000);
session.commit();
System.out.println("Commit the session transaction");
System.out.println("JMSClient exit");
} catch (Exception e) {
throw e;
} finally {
if (consumer != null) {
consumer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void main(String[] args) throws Exception {
new JmsSynConsumerClient().execute();
}
}
3. 仔细翻阅了HornetQ文档,我发现HornetQ的工作方式就是这样的
具体请参看下文,我把重点的地方翻译一下:
HornetQ无法确定message的acknowledgments在fail-over时是否已经丢失,所以当执行session.commit()时,一律rollback事务。
这就是为什么consumer却无法透明地继续消费fail over到 backup server 上的消息的原因。
我觉得这是有道理的,因为处于安全性的考虑,出现异常时,rollback是正确的,这样可以防止消息丢失或被重复消费。
如果一定要实现该用户的场景,可以捕捉javax.jms.TransactionRolledBackException异常,然后进行重试。
try
{
session.commit();
}
catch (TransactionRolledBackException e)
{
System.err.println("transaction has been rolled back: " + e.getMessage());
// TODO,写你的重试逻辑代码
}
也就是说,出现异常后,HornetQ回滚消息,然后把如何继续处理的方式交给了开发人员。
这里,要提醒的是,如果Queue上有多个消费者,被回滚的消息有可能被其它消费者收到了。
4. HornetQ 有关fail over With Transactions的文档:
37.2.1.2. Handling fail-over With Transactions
If the session is transactional and messages have already been sent or acknowledged in the current transaction, then the server cannot be sure that messages sent or acknowledgments have not been lost during the fail-over.
Consequently the transaction will be marked as rollback-only, and any subsequent attempt to commit will throw a javax.jms.TransactionRolledBackException
(if using JMS), or a HornetQException
with error code HornetQException.TRANSACTION_ROLLED_BACK
if using the core API.
It is up to the user to catch the exception, and perform any client side local rollback code as necessary. There is no need to manually rollback the session - it is already rolled back. The user can then just retry the transactional operations again on the same session.
If fail-over occurs when a commit call is being executed, the server, as previously described, will unblock the call to prevent a hang, since no response will come back. In this case it is not easy for the client to determine whether the transaction commit was actually processed on the live server before failure occurred.
To remedy this, the client can enable duplicate detection (
Chapter 35, Duplicate Message Detection) in the transaction, and retry the transaction operations again after the call is unblocked. If the transaction had indeed been committed on the live server successfully before fail-over, then when the transaction is retried, duplicate detection will ensure that any durable messages resent in the transaction will be ignored on the server to prevent them getting sent more than once.
By catching the rollback exceptions and retrying, catching unblocked calls and enabling duplicate detection, once and only once delivery guarantees for messages can be provided in the case of failure, guaranteeing 100% no loss or duplication of messages.
1. https://access.redhat.com/site/documentation/en-US/JBoss_Enterprise_Application_Platform/5/html/HornetQ_User_Guide/failover.html
2. http://hornetq.sourceforge.net/docs/hornetq-2.0.0.GA/user-manual/en/html/ha.html