环境: MAC OS X 10.9.2 + JDK 1.7.0_55 + ActiveMQ 5.9.1
1. SteamMessageProducer.java
import java.io.FileInputStream;
import java.io.InputStream;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.StreamMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class SteamMessageProducer {
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private String subject = "Steam.Message.Queue";
private Destination destination = null;
private Connection connection = null;
private Session session = null;
private MessageProducer producer = null;
// 初始化
private void initialize() throws JMSException, Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
user, password, url);
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(subject);
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
// 发送消息
public void produceMessage(String filePath) throws JMSException, Exception {
initialize();
InputStream in = new FileInputStream(filePath);
byte[] buffer = new byte[2048];
int c = -1;
while ((c = in.read(buffer)) > 0) {
StreamMessage smsg = session.createStreamMessage();
smsg.writeBytes(buffer, 0, c);
producer.send(smsg);
System.out.println("send: " + c);
}
in.close();
}
// 关闭连接
public void close() throws JMSException {
System.out.println("Producer:->Closing connection");
if (producer != null) {
producer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
import java.io.FileInputStream;
import java.io.InputStream;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.StreamMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class SteamMessageProducer {
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private String subject = "Steam.Message.Queue";
private Destination destination = null;
private Connection connection = null;
private Session session = null;
private MessageProducer producer = null;
// 初始化
private void initialize() throws JMSException, Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
user, password, url);
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(subject);
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
// 发送消息
public void produceMessage(String filePath) throws JMSException, Exception {
initialize();
InputStream in = new FileInputStream(filePath);
byte[] buffer = new byte[2048];
int c = -1;
while ((c = in.read(buffer)) > 0) {
StreamMessage smsg = session.createStreamMessage();
smsg.writeBytes(buffer, 0, c);
producer.send(smsg);
System.out.println("send: " + c);
}
in.close();
}
// 关闭连接
public void close() throws JMSException {
System.out.println("Producer:->Closing connection");
if (producer != null) {
producer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
2. SteamMessageSynConsumer.java
import java.io.FileOutputStream;
import java.io.OutputStream;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.StreamMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class SteamMessageSynConsumer {
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private String subject = "Steam.Message.Queue";
private Destination destination = null;
private Connection connection = null;
private Session session = null;
private MessageConsumer consumer = null;
// 初始化
private void initialize() throws JMSException, Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
user, password, url);
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(subject);
consumer = session.createConsumer(destination);
}
// 消费消息
public void consumeMessage() throws JMSException, Exception {
initialize();
connection.start();
System.out.println("Consumer:->Begin receiving...");
OutputStream out = new FileOutputStream("/Users/maping/Test/MyThread2.java");
byte[] buffer = new byte[2048];
while (true) {
Message msg = consumer.receive(5000);
if (msg == null) {
break;
}
if (msg instanceof StreamMessage) {
StreamMessage smsg = (StreamMessage) msg;
int c = smsg.readBytes(buffer);
out.write(buffer, 0, c);
System.out.println("Receive: " + c);
}
}
out.close();
}
// 关闭连接
public void close() throws JMSException {
System.out.println("Consumer:->Closing connection");
if (consumer != null) {
consumer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
3. SteamMessageAsynConsumer.java
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.MessageListener;
import javax.jms.Message;
import javax.jms.StreamMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class SteamMessageAsynConsumer implements MessageListener {
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private String subject = "Steam.Message.Queue";
private Destination destination = null;
private Connection connection = null;
private Session session = null;
private MessageConsumer consumer = null;
// 初始化
private void initialize() throws JMSException, Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
user, password, url);
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(subject);
consumer = session.createConsumer(destination);
}
// 消费消息
public void consumeMessage() throws JMSException, Exception {
initialize();
connection.start();
System.out.println("Consumer:->Begin listening...");
// 开始监听
consumer.setMessageListener(this);
}
// 关闭连接
public void close() throws JMSException {
System.out.println("Consumer:->Closing connection");
if (consumer != null) {
consumer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
// 消息处理函数
public void onMessage(Message msg) {
try {
OutputStream out = new FileOutputStream("/Users/maping/Test/MyThread2.java");
byte[] buffer = new byte[2048];
while (true) {
if (msg == null) {
break;
}
if (msg instanceof StreamMessage) {
StreamMessage smsg = (StreamMessage) msg;
int c = smsg.readBytes(buffer);
out.write(buffer, 0, c);
System.out.println("Receive: " + c);
}
}
out.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException ex) {
Logger.getLogger(SteamMessageAsynConsumer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
完整代码下载:git@github.com:maping/ActiveMQ.git 之 ActiveMqSample1。
参考文献:
1. http://topmanopensource.iteye.com/blog/1065630
参考文献:
1. http://topmanopensource.iteye.com/blog/1065630
没有评论:
发表评论