环境:MAC OS X EI Capitan 10.11.3 + JMeter 2.13 + JBoss AMQ 6.2.1
1. AmqReceiverTest.java
package com.travelsky.activemq;
import org.apache.jmeter.config.Arguments;
import org.apache.jmeter.protocol.java.sampler.AbstractJavaSamplerClient;
import org.apache.jmeter.protocol.java.sampler.JavaSamplerContext;
import org.apache.jmeter.samplers.SampleResult;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
*
* @author maping
*/
public class AmqReceiverTest extends AbstractJavaSamplerClient {
private static final String LABEL = "AmqReceiverTest";
private static final int DEFAULT_COUNT = 1;
private static final String DEFAULT_URL = "failover:(tcp://127.0.0.1:61616)";
private static final String DEFAULT_ADMIN_USER = "admin";
private static final String DEFAULT_ADMIN_PASSWORD = "admin";
private int count;
private String url;
private ConnectionFactory connectionFactory;
private Connection connection;
private Session session;
private Destination destination;
private MessageConsumer consumer;
@Override
public void setupTest(JavaSamplerContext arg0) {
System.out.println("### setupTest");
String countStr = arg0.getParameter("count");
count = (countStr != null ? Integer.parseInt(countStr) : DEFAULT_COUNT);
String urlStr = arg0.getParameter("url");
url = (urlStr != null ? urlStr : DEFAULT_URL);
connectionFactory = new ActiveMQConnectionFactory(
DEFAULT_ADMIN_USER,
DEFAULT_ADMIN_PASSWORD,
url);
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("FirstQueue");
consumer = session.createConsumer(destination);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public SampleResult runTest(JavaSamplerContext arg0) {
SampleResult sr = new SampleResult();
sr.setSampleLabel(LABEL);
sr.sampleStart();
try {
receiveMessage(session, consumer);
sr.setSuccessful(true);
System.out.println("### receive over!");
} catch (Exception e) {
e.printStackTrace();
sr.setSuccessful(false);
} finally {
sr.sampleEnd();
}
return sr;
}
public void receiveMessage(Session session, MessageConsumer consumer)
throws Exception {
for (int i = 1; i <= count; i++) {
//TextMessage message = (TextMessage) consumer.receive(1000);
consumer.receive(1000);
}
}
@Override
public void teardownTest(JavaSamplerContext arg0) {
System.out.println("### teardownTest");
try {
if (null != connection) {
connection.close();
}
} catch (Throwable ignore) {
}
}
@Override
public Arguments getDefaultParameters() {
Arguments params = new Arguments();
params.addArgument("count", DEFAULT_COUNT + "");
params.addArgument("url", DEFAULT_URL);
return params;
}
}
import org.apache.jmeter.config.Arguments;
import org.apache.jmeter.protocol.java.sampler.AbstractJavaSamplerClient;
import org.apache.jmeter.protocol.java.sampler.JavaSamplerContext;
import org.apache.jmeter.samplers.SampleResult;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
*
* @author maping
*/
public class AmqReceiverTest extends AbstractJavaSamplerClient {
private static final String LABEL = "AmqReceiverTest";
private static final int DEFAULT_COUNT = 1;
private static final String DEFAULT_URL = "failover:(tcp://127.0.0.1:61616)";
private static final String DEFAULT_ADMIN_USER = "admin";
private static final String DEFAULT_ADMIN_PASSWORD = "admin";
private int count;
private String url;
private ConnectionFactory connectionFactory;
private Connection connection;
private Session session;
private Destination destination;
private MessageConsumer consumer;
@Override
public void setupTest(JavaSamplerContext arg0) {
System.out.println("### setupTest");
String countStr = arg0.getParameter("count");
count = (countStr != null ? Integer.parseInt(countStr) : DEFAULT_COUNT);
String urlStr = arg0.getParameter("url");
url = (urlStr != null ? urlStr : DEFAULT_URL);
connectionFactory = new ActiveMQConnectionFactory(
DEFAULT_ADMIN_USER,
DEFAULT_ADMIN_PASSWORD,
url);
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("FirstQueue");
consumer = session.createConsumer(destination);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public SampleResult runTest(JavaSamplerContext arg0) {
SampleResult sr = new SampleResult();
sr.setSampleLabel(LABEL);
sr.sampleStart();
try {
receiveMessage(session, consumer);
sr.setSuccessful(true);
System.out.println("### receive over!");
} catch (Exception e) {
e.printStackTrace();
sr.setSuccessful(false);
} finally {
sr.sampleEnd();
}
return sr;
}
public void receiveMessage(Session session, MessageConsumer consumer)
throws Exception {
for (int i = 1; i <= count; i++) {
//TextMessage message = (TextMessage) consumer.receive(1000);
consumer.receive(1000);
}
}
@Override
public void teardownTest(JavaSamplerContext arg0) {
System.out.println("### teardownTest");
try {
if (null != connection) {
connection.close();
}
} catch (Throwable ignore) {
}
}
@Override
public Arguments getDefaultParameters() {
Arguments params = new Arguments();
params.addArgument("count", DEFAULT_COUNT + "");
params.addArgument("url", DEFAULT_URL);
return params;
}
}
2. 编译+部署
(1)CLASSPATH需要加入ApacheJMeter_core.jar和ApacheJMeter_java.jar
(2)把编译后的jar包,复制到 jmeter/lib/ext目录下
3. 创建测试计划
50个线程,每个线程接收2000条消息,一共10万条消息。
4. 运行
/Users/maping/Apache/jmeter/bin/jmeter.sh -n -t /Users/maping/Apache/jmeter/test/AMQ-Queue-Receiver.jmx -Jloop=1 -l /Users/maping/Apache/jmeter/result/AMQ-Queue-Receiver_`date +'%y%m%d%H%M%S'`.csv
5. 问题
运行完毕后,不知道为什么,总剩余些消息没有完全消费掉。
我的发送程序和接收程序消息数量是一样的,都是 50个线程,每个线程接收2000条消息,一共10万条消息。
没有评论:
发表评论