环境:MAC OS X 10.11.6 + ActiveMQ 5.13.4
发送消息时,message.setStringProperty() 方法可以设置某个属性及其值。
接收消息时,可以指定接收符合属性及其值的消息。
1. Queue_Selector_Sender_AutoAck_Peresistent.java
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Queue_Selector_Sender_AutoAck_Peresistent {
//private static final int SEND_NUMBER = 1209600; // 每秒发送2条,发送7天
private static final int SEND_NUMBER = 200; // 每秒发送2条,发送7天
public static void main(String[] args) {
// ConnectionFactory:连接工厂,JMS用它创建连接
ConnectionFactory connectionFactory;
// Connection:JMS客户端到JMS Provider的连接
Connection connection = null;
// Session:一个发送或接收消息的线程
Session session;
// Destination:消息的目的地
Destination destination;
// MessageProducer: 消息生产者
MessageProducer producer;
//构造ConnectionFactory实例对象,此处采用ActiveMq的实现
connectionFactory = new ActiveMQConnectionFactory(
"admin",
"admin",
"failover:(tcp://127.0.0.1:61616)");
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("FirstQueue");
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
sendMessage(session, producer);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection) {
connection.close();
}
} catch (Throwable ignore) {
}
}
}
public static void sendMessage(Session session, MessageProducer producer)
throws Exception {
for (int i = 1; i <= SEND_NUMBER; i++) {
TextMessage message = session
.createTextMessage("#### ActiveMq发送的消息" + i);
if (i % 2 == 0) {
message.setStringProperty("MessageGroupID", "A");
} else {
message.setStringProperty("MessageGroupID", "B");
}
System.out.println(System.currentTimeMillis() + " 发送消息:" + "ActiveMq 发送的消息" + i);
producer.send(message);
}
}
}
2. Queue_Selector_Receiver_AutoAck_Async.java
import java.io.IOException;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Queue_Selector_Receiver_AutoAck_Async {
public static void main(String[] args) {
// ConnectionFactory:连接工厂,JMS用它创建连接
ConnectionFactory connectionFactory;
// Connection:JMS客户端到JMS Provider的连接
Connection connection = null;
// Session:一个发送或接收消息的线程
Session session;
// Destination:消息的目的地;消息发送给谁.
Destination destination;
// MessageConsumer:消息消费者
MessageConsumer consumerA, consumerB;
connectionFactory = new ActiveMQConnectionFactory(
"admin",
"admin",
"failover:(tcp://127.0.0.1:61616)");
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("FirstQueue");
consumerA = session.createConsumer(destination, "MessageGroupID='A'");
consumerA.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
System.out.println("consumerA Received message: " + tm.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
consumerB = session.createConsumer(destination, "MessageGroupID='B'");
consumerB.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
System.out.println("consumerB Received message: " + tm.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection) {
connection.close();
}
} catch (Throwable ignore) {
}
}
}
}
参考文献:
1. http://activemq.apache.org/selectors.html
2. http://blog.csdn.net/e_wsq/article/details/50053981
3. http://timjansen.github.io/jarfiller/guide/jms/selectors.xhtml
发送消息时,message.setStringProperty() 方法可以设置某个属性及其值。
接收消息时,可以指定接收符合属性及其值的消息。
1. Queue_Selector_Sender_AutoAck_Peresistent.java
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Queue_Selector_Sender_AutoAck_Peresistent {
//private static final int SEND_NUMBER = 1209600; // 每秒发送2条,发送7天
private static final int SEND_NUMBER = 200; // 每秒发送2条,发送7天
public static void main(String[] args) {
// ConnectionFactory:连接工厂,JMS用它创建连接
ConnectionFactory connectionFactory;
// Connection:JMS客户端到JMS Provider的连接
Connection connection = null;
// Session:一个发送或接收消息的线程
Session session;
// Destination:消息的目的地
Destination destination;
// MessageProducer: 消息生产者
MessageProducer producer;
//构造ConnectionFactory实例对象,此处采用ActiveMq的实现
connectionFactory = new ActiveMQConnectionFactory(
"admin",
"admin",
"failover:(tcp://127.0.0.1:61616)");
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("FirstQueue");
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
sendMessage(session, producer);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection) {
connection.close();
}
} catch (Throwable ignore) {
}
}
}
public static void sendMessage(Session session, MessageProducer producer)
throws Exception {
for (int i = 1; i <= SEND_NUMBER; i++) {
TextMessage message = session
.createTextMessage("#### ActiveMq发送的消息" + i);
if (i % 2 == 0) {
message.setStringProperty("MessageGroupID", "A");
} else {
message.setStringProperty("MessageGroupID", "B");
}
System.out.println(System.currentTimeMillis() + " 发送消息:" + "ActiveMq 发送的消息" + i);
producer.send(message);
}
}
}
2. Queue_Selector_Receiver_AutoAck_Async.java
import java.io.IOException;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Queue_Selector_Receiver_AutoAck_Async {
public static void main(String[] args) {
// ConnectionFactory:连接工厂,JMS用它创建连接
ConnectionFactory connectionFactory;
// Connection:JMS客户端到JMS Provider的连接
Connection connection = null;
// Session:一个发送或接收消息的线程
Session session;
// Destination:消息的目的地;消息发送给谁.
Destination destination;
// MessageConsumer:消息消费者
MessageConsumer consumerA, consumerB;
connectionFactory = new ActiveMQConnectionFactory(
"admin",
"admin",
"failover:(tcp://127.0.0.1:61616)");
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("FirstQueue");
consumerA = session.createConsumer(destination, "MessageGroupID='A'");
consumerA.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
System.out.println("consumerA Received message: " + tm.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
consumerB = session.createConsumer(destination, "MessageGroupID='B'");
consumerB.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
System.out.println("consumerB Received message: " + tm.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection) {
connection.close();
}
} catch (Throwable ignore) {
}
}
}
}
参考文献:
1. http://activemq.apache.org/selectors.html
2. http://blog.csdn.net/e_wsq/article/details/50053981
3. http://timjansen.github.io/jarfiller/guide/jms/selectors.xhtml
没有评论:
发表评论