2014年5月7日星期三

ActiveMQ_003:使用Queue发送和接收SteamMessage

环境: MAC OS X 10.9.2 + JDK 1.7.0_55 + ActiveMQ 5.9.1

1.  SteamMessageProducer.java

import java.io.FileInputStream;
import java.io.InputStream;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.StreamMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class SteamMessageProducer {

    private String user = ActiveMQConnection.DEFAULT_USER;

    private String password = ActiveMQConnection.DEFAULT_PASSWORD;

    private String url = ActiveMQConnection.DEFAULT_BROKER_URL;

    private String subject = "Steam.Message.Queue";

    private Destination destination = null;

    private Connection connection = null;

    private Session session = null;

    private MessageProducer producer = null;

    // 初始化
    private void initialize() throws JMSException, Exception {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                user, password, url);
        connection = connectionFactory.createConnection();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        destination = session.createQueue(subject);
        producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    }

    // 发送消息
    public void produceMessage(String filePath) throws JMSException, Exception {
        initialize();

        InputStream in = new FileInputStream(filePath);
        byte[] buffer = new byte[2048];
        int c = -1;
        while ((c = in.read(buffer)) > 0) {
            StreamMessage smsg = session.createStreamMessage();
            smsg.writeBytes(buffer, 0, c);
            producer.send(smsg);
            System.out.println("send: " + c);
        }
        in.close();
    }

    // 关闭连接
    public void close() throws JMSException {
        System.out.println("Producer:->Closing connection");
        if (producer != null) {
            producer.close();
        }
        if (session != null) {
            session.close();
        }
        if (connection != null) {
            connection.close();
        }
    }

}

2. SteamMessageSynConsumer.java

import java.io.FileOutputStream;
import java.io.OutputStream;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.StreamMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class SteamMessageSynConsumer {

    private String user = ActiveMQConnection.DEFAULT_USER;

    private String password = ActiveMQConnection.DEFAULT_PASSWORD;

    private String url = ActiveMQConnection.DEFAULT_BROKER_URL;

    private String subject = "Steam.Message.Queue";

    private Destination destination = null;

    private Connection connection = null;

    private Session session = null;

    private MessageConsumer consumer = null;

    // 初始化  
    private void initialize() throws JMSException, Exception {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                user, password, url);
        connection = connectionFactory.createConnection();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        destination = session.createQueue(subject);
        consumer = session.createConsumer(destination);

    }

    // 消费消息   
    public void consumeMessage() throws JMSException, Exception {
        initialize();
        connection.start();

        System.out.println("Consumer:->Begin receiving...");

        OutputStream out = new FileOutputStream("/Users/maping/Test/MyThread2.java");
        byte[] buffer = new byte[2048];
        while (true) {
            Message msg = consumer.receive(5000);
            if (msg == null) {
                break;
            }

            if (msg instanceof StreamMessage) {
                StreamMessage smsg = (StreamMessage) msg;
                int c = smsg.readBytes(buffer);
                out.write(buffer, 0, c);
                System.out.println("Receive: " + c);
            }
        }
        out.close();
    }

    // 关闭连接  
    public void close() throws JMSException {
        System.out.println("Consumer:->Closing connection");
        if (consumer != null) {
            consumer.close();
        }
        if (session != null) {
            session.close();
        }
        if (connection != null) {
            connection.close();
        }
    }
}

3. SteamMessageAsynConsumer.java

import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.MessageListener;
import javax.jms.Message;
import javax.jms.StreamMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class SteamMessageAsynConsumer implements MessageListener {

    private String user = ActiveMQConnection.DEFAULT_USER;

    private String password = ActiveMQConnection.DEFAULT_PASSWORD;

    private String url = ActiveMQConnection.DEFAULT_BROKER_URL;

    private String subject = "Steam.Message.Queue";

    private Destination destination = null;

    private Connection connection = null;

    private Session session = null;

    private MessageConsumer consumer = null;

    // 初始化  
    private void initialize() throws JMSException, Exception {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                user, password, url);
        connection = connectionFactory.createConnection();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        destination = session.createQueue(subject);
        consumer = session.createConsumer(destination);

    }

    // 消费消息   
    public void consumeMessage() throws JMSException, Exception {
        initialize();
        connection.start();

        System.out.println("Consumer:->Begin listening...");
        // 开始监听  
        consumer.setMessageListener(this);
    }

    // 关闭连接  
    public void close() throws JMSException {
        System.out.println("Consumer:->Closing connection");
        if (consumer != null) {
            consumer.close();
        }
        if (session != null) {
            session.close();
        }
        if (connection != null) {
            connection.close();
        }
    }

    // 消息处理函数   
    public void onMessage(Message msg) {
        try {
            OutputStream out = new FileOutputStream("/Users/maping/Test/MyThread2.java");
            byte[] buffer = new byte[2048];
            while (true) {
                if (msg == null) {
                    break;
                }

                if (msg instanceof StreamMessage) {
                    StreamMessage smsg = (StreamMessage) msg;
                    int c = smsg.readBytes(buffer);
                    out.write(buffer, 0, c);
                    System.out.println("Receive: " + c);
                }
            }
            out.close();
        } catch (JMSException e) {
            // TODO Auto-generated catch block  
            e.printStackTrace();
        } catch (IOException ex) {
            Logger.getLogger(SteamMessageAsynConsumer.class.getName()).log(Level.SEVERE, null, ex);
        }
    }
}

完整代码下载:git@github.com:maping/ActiveMQ.git 之 ActiveMqSample1。

参考文献:
1. http://topmanopensource.iteye.com/blog/1065630


没有评论: