2014年5月7日星期三

ActiveMQ_002:使用Queue发送和接收TextMessage

环境: MAC OS X 10.9.2 + JDK 1.7.0_55 + NetBeans 8.0 + ActiveMQ 5.9.1

1. TextMessageProducer.java
消息生产者。

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class TextMessageProducer {

    private String user = ActiveMQConnection.DEFAULT_USER;

    private String password = ActiveMQConnection.DEFAULT_PASSWORD;

    private String url = ActiveMQConnection.DEFAULT_BROKER_URL;

    private String subject = "Text.Message.Queue";

    private Destination destination = null;

    private Connection connection = null;

    private Session session = null;

    private MessageProducer producer = null;

    // 初始化
    private void initialize() throws JMSException, Exception {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                user, password, url);
        connection = connectionFactory.createConnection();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        destination = session.createQueue(subject);
        producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    }

    // 发送消息
    public void produceMessage(String message) throws JMSException, Exception {
        initialize();
        TextMessage msg = session.createTextMessage(message);
        connection.start();
        System.out.println("Producer:->Sending message: " + message);
        producer.send(msg);
        System.out.println("Producer:->Message sent complete!");
    }

    // 关闭连接
    public void close() throws JMSException {
        System.out.println("Producer:->Closing connection");
        if (producer != null)
            producer.close();
        if (session != null)
            session.close();
        if (connection != null)
            connection.close();
    }
}

2. TextMessageSynConsumer.java
消息同步消费者,调用consumer.receive()后,如果没有消息,会一直阻塞在这里。

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.MessageListener;
import javax.jms.Message;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class TextMessageSynConsumer {

    private String user = ActiveMQConnection.DEFAULT_USER;

    private String password = ActiveMQConnection.DEFAULT_PASSWORD;

    private String url = ActiveMQConnection.DEFAULT_BROKER_URL;

    private String subject = "Text.Message.Queue";

    private Destination destination = null;

    private Connection connection = null;

    private Session session = null;

    private MessageConsumer consumer = null;

    // 初始化
    private void initialize() throws JMSException, Exception {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                user, password, url);
        connection = connectionFactory.createConnection();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        destination = session.createQueue(subject);
        consumer = session.createConsumer(destination);

    }

    // 消费消息
    public void consumeMessage() throws JMSException, Exception {
        initialize();
        connection.start();

        System.out.println("Consumer:->Begin receiving...");

        Message message = consumer.receive();
        try {
            if (message instanceof TextMessage) {
                TextMessage txtMsg = (TextMessage) message;
                String msg = txtMsg.getText();
                System.out.println("Consumer:->Received: " + msg);
            } else {
                System.out.println("Consumer:->Received: " + message);
            }
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    // 关闭连接
    public void close() throws JMSException {
        System.out.println("Consumer:->Closing connection");
        if (consumer != null) {
            consumer.close();
        }
        if (session != null) {
            session.close();
        }
        if (connection != null) {
            connection.close();
        }
    }

}

3. TextMessageAsynConsumer.java
消息异步消费者,实现了MessageListener接口,监听指定的Queue,得到消息通知后,会回调onMessage()方法处理消息。

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.MessageListener;
import javax.jms.Message;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class TextMessageAsynConsumer implements MessageListener {

    private String user = ActiveMQConnection.DEFAULT_USER;

    private String password = ActiveMQConnection.DEFAULT_PASSWORD;

    private String url = ActiveMQConnection.DEFAULT_BROKER_URL;

    private String subject = "Text.Message.Queue";

    private Destination destination = null;

    private Connection connection = null;

    private Session session = null;

    private MessageConsumer consumer = null;

    // 初始化
    private void initialize() throws JMSException, Exception {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                user, password, url);
        connection = connectionFactory.createConnection();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        destination = session.createQueue(subject);
        consumer = session.createConsumer(destination);
         
    }

    // 消费消息
    public void consumeMessage() throws JMSException, Exception {
        initialize();
        connection.start();
         
        System.out.println("Consumer:->Begin listening...");
        // 开始监听
        consumer.setMessageListener(this);
    }

    // 关闭连接
    public void close() throws JMSException {
        System.out.println("Consumer:->Closing connection");
        if (consumer != null)
            consumer.close();
        if (session != null)
            session.close();
        if (connection != null)
            connection.close();
    }

    // 消息处理函数
    public void onMessage(Message message) {
        try {
            if (message instanceof TextMessage) {
                TextMessage txtMsg = (TextMessage) message;
                String msg = txtMsg.getText();
                System.out.println("Consumer:->Received: " + msg);
            } else {
                System.out.println("Consumer:->Received: " + message);
            }
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

4. 使用netstat监控有多少个消费者连接到61616端口
如果把同步和异步的消息消费者都运行起来,然后在终端中敲入: netstat -an | grep 61616
会输出如下内容:
tcp4       0      0  127.0.0.1.61616        127.0.0.1.51298        ESTABLISHED
tcp4       0      0  127.0.0.1.51298        127.0.0.1.61616        ESTABLISHED
tcp4       0      0  127.0.0.1.61616        127.0.0.1.51296        ESTABLISHED
tcp4       0      0  127.0.0.1.51296        127.0.0.1.61616        ESTABLISHED
tcp46      0      0  *.61616                *.*                    LISTEN

可以看出,虽然有4个TCP连接已经建立,但实际上只有2个消费者。
仔细看一下,每个TCP连接是“双工”的,因此消费者的数量是已建立TCP连接数量的一半。
可以用命令:netstat -an | grep 61616 | grep -v LISTEN | wc -l,计算有多少个已建立TCP连接数量。

完整代码下载:git@github.com:maping/ActiveMQ.git 之 ActiveMqSample1。

参考文献:
1. http://blog.csdn.net/yjflinchong/article/details/8441937

没有评论: