提交 19ae73aa 编写于 作者: W william.liangf

DUBBO-360 迁移benckmark代码到开源trunk

git-svn-id: http://code.alibabatech.com/svn/dubbo/trunk@1638 1a56cb94-b969-4eaa-88fa-be21384802f2
上级 ca6b18ce
......@@ -36,4 +36,36 @@
<version>${project.parent.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2.1</version>
<executions>
<execution>
<id>assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<descriptors>
<descriptor>${basedir}/src/assembly/release.xml</descriptor>
</descriptors>
<appendAssemblyId>false</appendAssemblyId>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
<assembly>
<id>dist</id>
<formats>
<format>tar.gz</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<outputDirectory>dubbo.benchmark</outputDirectory>
<fileMode>0755</fileMode>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<outputDirectory>dubbo.benchmark/lib</outputDirectory>
</dependencySet>
</dependencySets>
</assembly>
\ No newline at end of file
<assembly>
<id>dist</id>
<formats>
<format>tar.gz</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<outputDirectory>dubbo.benchmark</outputDirectory>
<fileMode>0755</fileMode>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<outputDirectory>dubbo.benchmark/lib</outputDirectory>
</dependencySet>
</dependencySets>
</assembly>
\ No newline at end of file
package com.alibaba.dubbo.rpc.benchmark;
/**
* nfs-rpc
* Apache License
*
* http://code.google.com/p/nfs-rpc (c) 2011
*/
import java.io.BufferedWriter;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.lang.reflect.InvocationTargetException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import com.alibaba.dubbo.common.utils.ConfigUtils;
/**
* Abstract benchmark client,test for difference scenes Usage: -Dwrite.statistics=false BenchmarkClient serverIP
* serverPort concurrents timeout codectype requestSize runtime(seconds) clientNums
*
* @author <a href="mailto:bluedavy@gmail.com">bluedavy</a>
*/
public abstract class AbstractBenchmarkClient {
private static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private static long maxTPS = 0;
private static long minTPS = 0;
private static long allRequestSum;
private static long allResponseTimeSum;
private static long allErrorRequestSum;
private static long allErrorResponseTimeSum;
private static int runtime;
// < 0
private static long below0sum;
// (0,1]
private static long above0sum;
// (1,5]
private static long above1sum;
// (5,10]
private static long above5sum;
// (10,50]
private static long above10sum;
// (50,100]
private static long above50sum;
// (100,500]
private static long above100sum;
// (500,1000]
private static long above500sum;
// > 1000
private static long above1000sum;
Properties properties = ConfigUtils.getProperties();
public void run(String[] args) throws Exception {
final String serverIP = properties.getProperty("serverip");
final int serverPort = Integer.parseInt(properties.getProperty("serverport"));
final int concurrents = Integer.parseInt(properties.getProperty("concurrents"));
final int timeout = Integer.parseInt(properties.getProperty("timeout"));
runtime = Integer.parseInt(properties.getProperty("runtime"));
final long endtime = System.nanoTime() / 1000L + runtime * 1000 * 1000L;
final int clientNums = Integer.parseInt(properties.getProperty("connectionnums"));
// Print start info
Date currentDate = new Date();
Calendar calendar = Calendar.getInstance();
calendar.setTime(currentDate);
calendar.add(Calendar.SECOND, runtime);
StringBuilder startInfo = new StringBuilder(dateFormat.format(currentDate));
startInfo.append(" ready to start client benchmark,server is ");
startInfo.append(serverIP).append(":").append(serverPort);
startInfo.append(",concurrents is: ").append(concurrents);
startInfo.append(",clientNums is: ").append(clientNums);
startInfo.append(",timeout is:").append(timeout);
startInfo.append(" s,the benchmark will end at:").append(dateFormat.format(calendar.getTime()));
System.out.println(startInfo.toString());
CyclicBarrier barrier = new CyclicBarrier(concurrents);
CountDownLatch latch = new CountDownLatch(concurrents);
List<ClientRunnable> runnables = new ArrayList<ClientRunnable>();
// benchmark start after thirty seconds,let java app warm up
long beginTime = System.nanoTime() / 1000L + 30 * 1000 * 1000L;
for (int i = 0; i < concurrents; i++) {
ClientRunnable runnable = getClientRunnable(serverIP, serverPort, clientNums, timeout, barrier, latch,
beginTime, endtime);
runnables.add(runnable);
}
startRunnables(runnables);
latch.await();
// read results & add all
// key: runtime second range value: Long[2] array Long[0]: execute count Long[1]: response time sum
Map<String, Long[]> times = new HashMap<String, Long[]>();
Map<String, Long[]> errorTimes = new HashMap<String, Long[]>();
for (ClientRunnable runnable : runnables) {
List<long[]> results = runnable.getResults();
long[] responseSpreads = results.get(0);
below0sum += responseSpreads[0];
above0sum += responseSpreads[1];
above1sum += responseSpreads[2];
above5sum += responseSpreads[3];
above10sum += responseSpreads[4];
above50sum += responseSpreads[5];
above100sum += responseSpreads[6];
above500sum += responseSpreads[7];
above1000sum += responseSpreads[8];
long[] tps = results.get(1);
long[] responseTimes = results.get(2);
long[] errorTPS = results.get(3);
long[] errorResponseTimes = results.get(4);
for (int i = 0; i < tps.length; i++) {
String key = String.valueOf(i);
if (times.containsKey(key)) {
Long[] successInfos = times.get(key);
Long[] errorInfos = errorTimes.get(key);
successInfos[0] += tps[i];
successInfos[1] += responseTimes[i];
errorInfos[0] += errorTPS[i];
errorInfos[1] += errorResponseTimes[i];
times.put(key, successInfos);
errorTimes.put(key, errorInfos);
} else {
Long[] successInfos = new Long[2];
successInfos[0] = tps[i];
successInfos[1] = responseTimes[i];
Long[] errorInfos = new Long[2];
errorInfos[0] = errorTPS[i];
errorInfos[1] = errorResponseTimes[i];
times.put(key, successInfos);
errorTimes.put(key, errorInfos);
}
}
}
long ignoreRequest = 0;
long ignoreErrorRequest = 0;
int maxTimeRange = runtime - 30;
// ignore the last 10 second requests,so tps can count more accurate
for (int i = 0; i < 10; i++) {
Long[] values = times.remove(String.valueOf(maxTimeRange - i));
if (values != null) {
ignoreRequest += values[0];
}
Long[] errorValues = errorTimes.remove(String.valueOf(maxTimeRange - i));
if (errorValues != null) {
ignoreErrorRequest += errorValues[0];
}
}
for (Map.Entry<String, Long[]> entry : times.entrySet()) {
long successRequest = entry.getValue()[0];
long errorRequest = 0;
if (errorTimes.containsKey(entry.getKey())) {
errorRequest = errorTimes.get(entry.getKey())[0];
}
allRequestSum += successRequest;
allResponseTimeSum += entry.getValue()[1];
allErrorRequestSum += errorRequest;
if (errorTimes.containsKey(entry.getKey())) {
allErrorResponseTimeSum += errorTimes.get(entry.getKey())[1];
}
long currentRequest = successRequest + errorRequest;
if (currentRequest > maxTPS) {
maxTPS = currentRequest;
}
if (minTPS == 0 || currentRequest < minTPS) {
minTPS = currentRequest;
}
}
boolean isWriteResult = Boolean.parseBoolean(System.getProperty("write.statistics", "false"));
if (isWriteResult) {
BufferedWriter writer = new BufferedWriter(new FileWriter("benchmark.all.results"));
for (Map.Entry<String, Long[]> entry : times.entrySet()) {
writer.write(entry.getKey() + "," + entry.getValue()[0] + "," + entry.getValue()[1] + "\r\n");
}
writer.close();
}
System.out.println("----------Benchmark Statistics--------------");
System.out.println(" Concurrents: " + concurrents);
System.out.println(" ClientNums: " + clientNums);
System.out.println(" Runtime: " + runtime + " seconds");
System.out.println(" Benchmark Time: " + times.keySet().size());
long benchmarkRequest = allRequestSum + allErrorRequestSum;
long allRequest = benchmarkRequest + ignoreRequest + ignoreErrorRequest;
System.out.println(" Requests: " + allRequest + " Success: " + (allRequestSum + ignoreRequest) * 100
/ allRequest + "% (" + (allRequestSum + ignoreRequest) + ") Error: "
+ (allErrorRequestSum + ignoreErrorRequest) * 100 / allRequest + "% ("
+ (allErrorRequestSum + ignoreErrorRequest) + ")");
System.out.println(" Avg TPS: " + benchmarkRequest / times.keySet().size() + " Max TPS: " + maxTPS
+ " Min TPS: " + minTPS);
System.out.println(" Avg RT: " + (allErrorResponseTimeSum + allResponseTimeSum) / benchmarkRequest / 1000f
+ "ms");
System.out.println(" RT <= 0: " + (below0sum * 100 / allRequest) + "% " + below0sum + "/" + allRequest);
System.out.println(" RT (0,1]: " + (above0sum * 100 / allRequest) + "% " + above0sum + "/" + allRequest);
System.out.println(" RT (1,5]: " + (above1sum * 100 / allRequest) + "% " + above1sum + "/" + allRequest);
System.out.println(" RT (5,10]: " + (above5sum * 100 / allRequest) + "% " + above5sum + "/" + allRequest);
System.out.println(" RT (10,50]: " + (above10sum * 100 / allRequest) + "% " + above10sum + "/" + allRequest);
System.out.println(" RT (50,100]: " + (above50sum * 100 / allRequest) + "% " + above50sum + "/" + allRequest);
System.out.println(" RT (100,500]: " + (above100sum * 100 / allRequest) + "% " + above100sum + "/" + allRequest);
System.out.println(" RT (500,1000]: " + (above500sum * 100 / allRequest) + "% " + above500sum + "/"
+ allRequest);
System.out.println(" RT > 1000: " + (above1000sum * 100 / allRequest) + "% " + above1000sum + "/" + allRequest);
System.exit(0);
}
public abstract ClientRunnable getClientRunnable(String targetIP, int targetPort, int clientNums, int rpcTimeout,
CyclicBarrier barrier, CountDownLatch latch, long startTime,
long endTime) throws IllegalArgumentException, SecurityException, InstantiationException, IllegalAccessException, InvocationTargetException, NoSuchMethodException, ClassNotFoundException;
protected void startRunnables(List<ClientRunnable> runnables) {
for (int i = 0; i < runnables.size(); i++) {
final ClientRunnable runnable = runnables.get(i);
Thread thread = new Thread(runnable, "benchmarkclient-" + i);
thread.start();
}
}
}
package com.alibaba.dubbo.rpc.benchmark;
/**
* nfs-rpc Apache License http://code.google.com/p/nfs-rpc (c) 2011
*/
import java.text.SimpleDateFormat;
import java.util.Date;
import com.alibaba.dubbo.remoting.RemotingException;
import com.alibaba.dubbo.remoting.exchange.ExchangeChannel;
import com.alibaba.dubbo.remoting.exchange.Exchangers;
import com.alibaba.dubbo.remoting.exchange.support.ExchangeHandlerAdapter;
/**
* Abstract benchmark server Usage: BenchmarkServer listenPort maxThreads responseSize
*
* @author <a href="mailto:bluedavy@gmail.com">bluedavy</a>
*/
public abstract class AbstractBenchmarkServer {
private static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public void run(String[] args) throws Exception {
if (args == null || args.length != 5) {
throw new IllegalArgumentException(
"must give three args: listenPort | maxThreads | responseSize | transporter | serialization");
}
int listenPort = Integer.parseInt(args[0]);
int maxThreads = Integer.parseInt(args[1]);
final int responseSize = Integer.parseInt(args[2]);
String transporter = args[3];
String serialization = args[4];
System.out.println(dateFormat.format(new Date()) + " ready to start server,listenPort is: " + listenPort
+ ",maxThreads is:" + maxThreads + ",responseSize is:" + responseSize
+ " bytes,transporter is:" + transporter + ",serialization is:" + serialization);
StringBuilder url = new StringBuilder();
url.append("exchange://0.0.0.0:");
url.append(listenPort);
url.append("?transporter=");
url.append(transporter);
url.append("&serialization=");
url.append(serialization);
url.append("&threads=");
url.append(maxThreads);
Exchangers.bind(url.toString(), new ExchangeHandlerAdapter() {
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
return new ResponseObject(responseSize); // 发送响应
}
});
}
}
package com.alibaba.dubbo.rpc.benchmark;
/**
* nfs-rpc Apache License http://code.google.com/p/nfs-rpc (c) 2011
*/
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Simple Processor RPC Benchmark Client Thread
*
* @author <a href="mailto:bluedavy@gmail.com">bluedavy</a>
*/
public abstract class AbstractClientRunnable implements ClientRunnable {
private static final Log LOGGER = LogFactory.getLog(AbstractClientRunnable.class);
private CyclicBarrier barrier;
private CountDownLatch latch;
private long endTime;
private boolean running = true;
// response time spread
private long[] responseSpreads = new long[9];
// error request per second
private long[] errorTPS = null;
// error response times per second
private long[] errorResponseTimes = null;
// tps per second
private long[] tps = null;
// response times per second
private long[] responseTimes = null;
// benchmark startTime
private long startTime;
// benchmark maxRange
private int maxRange;
private ServiceFactory serviceFactory = new ServiceFactory();
public AbstractClientRunnable(String targetIP, int targetPort, int clientNums, int rpcTimeout,
CyclicBarrier barrier, CountDownLatch latch, long startTime, long endTime){
this.barrier = barrier;
this.latch = latch;
this.startTime = startTime;
this.endTime = endTime;
serviceFactory.setTargetIP(targetIP);
serviceFactory.setClientNums(clientNums);
serviceFactory.setTargetPort(targetPort);
serviceFactory.setConnectTimeout(rpcTimeout);
maxRange = (Integer.parseInt(String.valueOf((endTime - startTime))) / 1000000) + 1;
errorTPS = new long[maxRange];
errorResponseTimes = new long[maxRange];
tps = new long[maxRange];
responseTimes = new long[maxRange];
// init
for (int i = 0; i < maxRange; i++) {
errorTPS[i] = 0;
errorResponseTimes[i] = 0;
tps[i] = 0;
responseTimes[i] = 0;
}
}
public void run() {
try {
barrier.await();
} catch (Exception e) {
// IGNORE
}
runJavaAndHessian();
latch.countDown();
}
private void runJavaAndHessian() {
while (running) {
long beginTime = System.nanoTime() / 1000L;
if (beginTime >= endTime) {
running = false;
break;
}
try {
Object result = invoke(serviceFactory);
long currentTime = System.nanoTime() / 1000L;
if (beginTime <= startTime) {
continue;
}
long consumeTime = currentTime - beginTime;
sumResponseTimeSpread(consumeTime);
int range = Integer.parseInt(String.valueOf(beginTime - startTime)) / 1000000;
if (range >= maxRange) {
System.err.println("benchmark range exceeds maxRange,range is: " + range + ",maxRange is: "
+ maxRange);
continue;
}
if (result != null) {
tps[range] = tps[range] + 1;
responseTimes[range] = responseTimes[range] + consumeTime;
} else {
LOGGER.error("server return result is null");
errorTPS[range] = errorTPS[range] + 1;
errorResponseTimes[range] = errorResponseTimes[range] + consumeTime;
}
} catch (Exception e) {
LOGGER.error("client.invokeSync error", e);
long currentTime = System.nanoTime() / 1000L;
if (beginTime <= startTime) {
continue;
}
long consumeTime = currentTime - beginTime;
sumResponseTimeSpread(consumeTime);
int range = Integer.parseInt(String.valueOf(beginTime - startTime)) / 1000000;
if (range >= maxRange) {
System.err.println("benchmark range exceeds maxRange,range is: " + range + ",maxRange is: "
+ maxRange);
continue;
}
errorTPS[range] = errorTPS[range] + 1;
errorResponseTimes[range] = errorResponseTimes[range] + consumeTime;
}
}
}
public abstract Object invoke(ServiceFactory<?> serviceFactory);
public List<long[]> getResults() {
List<long[]> results = new ArrayList<long[]>();
results.add(responseSpreads);
results.add(tps);
results.add(responseTimes);
results.add(errorTPS);
results.add(errorResponseTimes);
return results;
}
private void sumResponseTimeSpread(long responseTime) {
responseTime = responseTime / 1000L;
if (responseTime <= 0) {
responseSpreads[0] = responseSpreads[0] + 1;
} else if (responseTime > 0 && responseTime <= 1) {
responseSpreads[1] = responseSpreads[1] + 1;
} else if (responseTime > 1 && responseTime <= 5) {
responseSpreads[2] = responseSpreads[2] + 1;
} else if (responseTime > 5 && responseTime <= 10) {
responseSpreads[3] = responseSpreads[3] + 1;
} else if (responseTime > 10 && responseTime <= 50) {
responseSpreads[4] = responseSpreads[4] + 1;
} else if (responseTime > 50 && responseTime <= 100) {
responseSpreads[5] = responseSpreads[5] + 1;
} else if (responseTime > 100 && responseTime <= 500) {
responseSpreads[6] = responseSpreads[6] + 1;
} else if (responseTime > 500 && responseTime <= 1000) {
responseSpreads[7] = responseSpreads[7] + 1;
} else if (responseTime > 1000) {
responseSpreads[8] = responseSpreads[8] + 1;
}
}
}
package com.alibaba.dubbo.rpc.benchmark;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
public class BenchmarkClient extends AbstractBenchmarkClient {
@Override
public ClientRunnable getClientRunnable(String targetIP, int targetPort, int clientNums, int rpcTimeout,
CyclicBarrier barrier,
CountDownLatch latch, long endTime, long startTime) {
return new SimpleProcessorBenchmarkClientRunnable(targetIP, targetPort, clientNums, rpcTimeout,
barrier, latch, startTime, endTime);
}
public static void main(String[] args) throws Exception {
new BenchmarkClient().run(args);
}
}
package com.alibaba.dubbo.rpc.benchmark;
public class BenchmarkServer extends AbstractBenchmarkServer {
public static void main(String[] args) throws Exception {
new BenchmarkServer().run(args);
synchronized (BenchmarkServer.class) {
BenchmarkServer.class.wait();
}
}
}
/**
* nfs-rpc
* Apache License
*
* http://code.google.com/p/nfs-rpc (c) 2011
*/
package com.alibaba.dubbo.rpc.benchmark;
import java.util.List;
/**
* client runnable,so we can collect results
*
* @author <a href="mailto:bluedavy@gmail.com">bluedavy</a>
*/
public interface ClientRunnable extends Runnable {
public List<long[]> getResults();
}
package com.alibaba.dubbo.rpc.benchmark;
/**
* TODO Comment of HelloService
*
* @author tony.chenl
*/
public interface DemoService {
public Object sendRequest(Object requestObject);
}
package com.alibaba.dubbo.rpc.benchmark;
/**
* TODO Comment of HelloService
*
* @author tony.chenl
*/
public class DemoServiceImpl implements DemoService{
ResponseObject responseObject = new ResponseObject(100);
public Object sendRequest(Object request) {
return responseObject;
}
}
package com.alibaba.dubbo.rpc.benchmark;
/**
* nfs-rpc Apache License http://code.google.com/p/nfs-rpc (c) 2011
*/
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.FutureTask;
import com.alibaba.dubbo.remoting.exchange.ExchangeClient;
import com.alibaba.dubbo.remoting.exchange.Exchangers;
/**
* Abstract ExchangeClient Factory,create custom nums ExchangeClient
*
* @author <a href="mailto:bluedavy@gmail.com">bluedavy</a>
*/
public class ExchangeClientFactory {
// Cache ExchangeClient
private static ConcurrentHashMap<String, FutureTask<List<ExchangeClient>>> clients = new ConcurrentHashMap<String, FutureTask<List<ExchangeClient>>>();
public ExchangeClient get(final String targetIP, final int targetPort, final int connectTimeout) throws Exception {
return get(targetIP, targetPort, connectTimeout, 1);
}
public ExchangeClient get(final String targetIP, final int targetPort, final int connectTimeout,
final int clientNums) throws Exception {
String key = targetIP + ":" + targetPort;
if (clients.containsKey(key)) {
if (clientNums == 1) {
return clients.get(key).get().get(0);
} else {
Random random = new Random();
return clients.get(key).get().get(random.nextInt(clientNums));
}
} else {
FutureTask<List<ExchangeClient>> task = new FutureTask<List<ExchangeClient>>(
new Callable<List<ExchangeClient>>() {
public List<ExchangeClient> call()
throws Exception {
List<ExchangeClient> clients = new ArrayList<ExchangeClient>(
clientNums);
for (int i = 0; i < clientNums; i++) {
clients.add(createClient(targetIP,
targetPort,
connectTimeout));
}
return clients;
}
});
FutureTask<List<ExchangeClient>> currentTask = clients.putIfAbsent(key, task);
if (currentTask == null) {
task.run();
} else {
task = currentTask;
}
if (clientNums == 1) return task.get().get(0);
else {
Random random = new Random();
return task.get().get(random.nextInt(clientNums));
}
}
}
public void removeClient(String key, ExchangeClient ExchangeClient) {
try {
// TODO: Fix It
clients.remove(key);
// clients.get(key).get().remove(ExchangeClient);
// clients.get(key)
// .get()
// .add(createClient(ExchangeClient.getServerIP(),
// ExchangeClient.getServerPort(), ExchangeClient.getConnectTimeout(),
// key));
} catch (Exception e) {
// IGNORE
}
}
public static ExchangeClientFactory getInstance() {
throw new UnsupportedOperationException("should be implemented by true class");
}
protected ExchangeClient createClient(String targetIP, int targetPort, int connectTimeout) throws Exception {
StringBuilder url = new StringBuilder();
url.append("exchange://");
url.append(targetIP);
url.append(":");
url.append(targetPort);
url.append("?");
url.append("timeout=");
url.append(connectTimeout);
return Exchangers.connect(url.toString());
}
}
package com.alibaba.dubbo.rpc.benchmark;
/**
* nfs-rpc
* Apache License
*
* http://code.google.com/p/nfs-rpc (c) 2011
*/
import java.io.Serializable;
/**
* Just for RPC Benchmark Test,request object
*
* @author <a href="mailto:bluedavy@gmail.com">bluedavy</a>
*/
public class RequestObject implements Serializable {
private static final long serialVersionUID = 1L;
public RequestObject(){
}
private byte[] bytes = null;
public void setBytes(byte[] bytes) {
this.bytes = bytes;
}
public RequestObject(int size){
bytes = new byte[size];
}
public byte[] getBytes() {
return bytes;
}
}
package com.alibaba.dubbo.rpc.benchmark;
/**
* nfs-rpc
* Apache License
*
* http://code.google.com/p/nfs-rpc (c) 2011
*/
import java.io.Serializable;
/**
* Just for RPC Benchmark Test,response object
*
* @author <a href="mailto:bluedavy@gmail.com">bluedavy</a>
*/
public class ResponseObject implements Serializable {
private static final long serialVersionUID = 1L;
private byte[] bytes = null;
public ResponseObject(int size){
bytes = new byte[size];
}
public byte[] getBytes() {
return bytes;
}
}
package com.alibaba.dubbo.rpc.benchmark;
import java.lang.reflect.InvocationTargetException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
public class RpcBenchmarkClient extends AbstractBenchmarkClient {
@SuppressWarnings("rawtypes")
@Override
public ClientRunnable getClientRunnable(String targetIP, int targetPort, int clientNums, int rpcTimeout,
CyclicBarrier barrier, CountDownLatch latch, long startTime, long endTime) throws IllegalArgumentException, SecurityException, InstantiationException, IllegalAccessException, InvocationTargetException, NoSuchMethodException, ClassNotFoundException {
String runnable = properties.getProperty("classname");
Class[] parameterTypes = new Class[] { String.class, int.class, int.class, int.class, CyclicBarrier.class,
CountDownLatch.class, long.class, long.class };
Object[] parameters = new Object[] { targetIP, targetPort, clientNums, rpcTimeout, barrier, latch, startTime,
endTime };
return (ClientRunnable) Class.forName(runnable).getConstructor(parameterTypes).newInstance(parameters);
}
public static void main(String[] args) throws Exception {
new RpcBenchmarkClient().run(args);
}
}
package com.alibaba.dubbo.rpc.benchmark;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class RpcBenchmarkServer extends AbstractBenchmarkServer {
public static void main(String[] args) throws Exception {
ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("ProviderSample.xml");
ctx.start();
synchronized (RpcBenchmarkServer.class) {
try {
RpcBenchmarkServer.class.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package com.alibaba.dubbo.rpc.benchmark;
import java.util.concurrent.ConcurrentHashMap;
import com.alibaba.dubbo.config.ApplicationConfig;
import com.alibaba.dubbo.config.ReferenceConfig;
/**
* Abstract Service Factory,create custom nums Service
*
* @author tony.chenl
*/
public class ServiceFactory<T> {
String targetIP = null;
int targetPort = 0;
int connectTimeout = 0;
int clientNums = 0;
public String getTargetIP() {
return targetIP;
}
public void setTargetIP(String targetIP) {
this.targetIP = targetIP;
}
public int getTargetPort() {
return targetPort;
}
public void setTargetPort(int targetPort) {
this.targetPort = targetPort;
}
public int getConnectTimeout() {
return connectTimeout;
}
public void setConnectTimeout(int connectTimeout) {
this.connectTimeout = connectTimeout;
}
public int getClientNums() {
return clientNums;
}
public void setClientNums(int clientNums) {
this.clientNums = clientNums;
}
// Cache ExchangeClient
private static ConcurrentHashMap<String, Object> services = new ConcurrentHashMap<String, Object>();
@SuppressWarnings("unchecked")
public T get(final Class<T> cls){
String key = cls.getName();
if (services.containsKey(key)) {
return (T) services.get(key);
} else {
T service = createClient(cls, targetIP, targetPort, connectTimeout,clientNums);
services.put(key, service);
return (T) services.get(key);
}
}
protected T createClient(Class<T> cls, String targetIP, int targetPort, int connectTimeout,int clientNums){
ReferenceConfig<T> referenceConfig = new ReferenceConfig<T>();
referenceConfig.setInterface(cls);
StringBuilder url = new StringBuilder();
url.append("dubbo://");
url.append(targetIP);
url.append(":");
url.append(targetPort);
url.append("/");
url.append(cls.getName());
referenceConfig.setUrl(url.toString());
// hardcode
referenceConfig.setConnections(clientNums);
ApplicationConfig application = new ApplicationConfig();
application.setName("dubbo_consumer");
referenceConfig.setApplication(application);
referenceConfig.setTimeout(connectTimeout);
return referenceConfig.get();
}
}
package com.alibaba.dubbo.rpc.benchmark;
/**
* nfs-rpc Apache License http://code.google.com/p/nfs-rpc (c) 2011
*/
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Simple Processor RPC Benchmark Client Thread
*
* @author <a href="mailto:bluedavy@gmail.com">bluedavy</a>
*/
public class SimpleProcessorBenchmarkClientRunnable implements ClientRunnable {
private static final Log LOGGER = LogFactory.getLog(SimpleProcessorBenchmarkClientRunnable.class);
private int requestSize;
private CyclicBarrier barrier;
private CountDownLatch latch;
private long endTime;
private boolean running = true;
private ExchangeClientFactory clientFactory = new ExchangeClientFactory();
private String targetIP;
private int targetPort;
private int clientNums;
private int rpcTimeout;
// response time spread
private long[] responseSpreads = new long[9];
// error request per second
private long[] errorTPS = null;
// error response times per second
private long[] errorResponseTimes = null;
// tps per second
private long[] tps = null;
// response times per second
private long[] responseTimes = null;
// benchmark startTime
private long startTime;
// benchmark maxRange
private int maxRange;
public SimpleProcessorBenchmarkClientRunnable(String targetIP, int targetPort, int clientNums, int rpcTimeout,
CyclicBarrier barrier, CountDownLatch latch, long startTime,
long endTime){
this.targetIP = targetIP;
this.targetPort = targetPort;
this.clientNums = clientNums;
this.rpcTimeout = rpcTimeout;
this.barrier = barrier;
this.latch = latch;
this.startTime = startTime;
this.endTime = endTime;
maxRange = (Integer.parseInt(String.valueOf((endTime - startTime))) / 1000) + 1;
errorTPS = new long[maxRange];
errorResponseTimes = new long[maxRange];
tps = new long[maxRange];
responseTimes = new long[maxRange];
// init
for (int i = 0; i < maxRange; i++) {
errorTPS[i] = 0;
errorResponseTimes[i] = 0;
tps[i] = 0;
responseTimes[i] = 0;
}
}
public void run() {
try {
barrier.await();
} catch (Exception e) {
// IGNORE
}
runJavaAndHessian();
latch.countDown();
}
private void runJavaAndHessian() {
while (running) {
Object requestObject = new RequestObject(requestSize);
long beginTime = System.nanoTime();
if (beginTime >= endTime) {
running = false;
break;
}
try {
Object response = null;
response = clientFactory.get(targetIP, targetPort, rpcTimeout, clientNums).request(requestObject).get();
long currentTime = System.nanoTime();
if (beginTime <= startTime) {
continue;
}
long consumeTime = currentTime - beginTime;
sumResponseTimeSpread(consumeTime);
int range = Integer.parseInt(String.valueOf(beginTime - startTime)) / 1000;
if (range >= maxRange) {
System.err.println("benchmark range exceeds maxRange,range is: " + range + ",maxRange is: "
+ maxRange);
continue;
}
if (((ResponseObject) response).getBytes() != null) {
tps[range] = tps[range] + 1;
responseTimes[range] = responseTimes[range] + consumeTime;
} else {
LOGGER.error("server return response is null");
errorTPS[range] = errorTPS[range] + 1;
errorResponseTimes[range] = errorResponseTimes[range] + consumeTime;
}
} catch (Exception e) {
LOGGER.error("client.invokeSync error", e);
long currentTime = System.nanoTime();
if (beginTime <= startTime) {
continue;
}
long consumeTime = currentTime - beginTime;
sumResponseTimeSpread(consumeTime);
int range = Integer.parseInt(String.valueOf(beginTime - startTime)) / 1000;
if (range >= maxRange) {
System.err.println("benchmark range exceeds maxRange,range is: " + range + ",maxRange is: "
+ maxRange);
continue;
}
errorTPS[range] = errorTPS[range] + 1;
errorResponseTimes[range] = errorResponseTimes[range] + consumeTime;
}
}
}
public List<long[]> getResults() {
List<long[]> results = new ArrayList<long[]>();
results.add(responseSpreads);
results.add(tps);
results.add(responseTimes);
results.add(errorTPS);
results.add(errorResponseTimes);
return results;
}
private void sumResponseTimeSpread(long responseTime) {
responseTime = responseTime / 1000;
if (responseTime <= 0) {
responseSpreads[0] = responseSpreads[0] + 1;
} else if (responseTime > 0 && responseTime <= 1) {
responseSpreads[1] = responseSpreads[1] + 1;
} else if (responseTime > 1 && responseTime <= 5) {
responseSpreads[2] = responseSpreads[2] + 1;
} else if (responseTime > 5 && responseTime <= 10) {
responseSpreads[3] = responseSpreads[3] + 1;
} else if (responseTime > 10 && responseTime <= 50) {
responseSpreads[4] = responseSpreads[4] + 1;
} else if (responseTime > 50 && responseTime <= 100) {
responseSpreads[5] = responseSpreads[5] + 1;
} else if (responseTime > 100 && responseTime <= 500) {
responseSpreads[6] = responseSpreads[6] + 1;
} else if (responseTime > 500 && responseTime <= 1000) {
responseSpreads[7] = responseSpreads[7] + 1;
} else if (responseTime > 1000) {
responseSpreads[8] = responseSpreads[8] + 1;
}
}
}
一、新建一个benchmark工程,如demo.benchmark
二、导入自己服务的接口api包和dubbo.benchmark.jar(解压dubbo.benchmark.tar.gz,在lib目录下)
三、新建一个类,实现AbstractClientRunnable
a、实现父类的构造函数
b、实现invoke方法,通过serviceFactory创建本地接口代理,并实现自己的业务逻辑,如下
public Object invoke(ServiceFactory serviceFactory) {
DemoService demoService = (DemoService) serviceFactory.get(DemoService.class);
return demoService.sendRequest("hello");
}
四、将自己的benchmark工程打成jar包,如demo.benchmark.jar
五、将demo.benchmark.jar放到dubbo.benchmark/lib目录下
六、配置duubo.properties
七、运行run.bat(windows)或run.sh(linux)
\ No newline at end of file
#\u5b9e\u73b0\u7684benchmark runnable\u7c7b\u540d
classname=
#\u63d0\u4f9b\u8005ip
serverip=
#\u63d0\u4f9b\u8005\u7aef\u53e3
serverport=
#\u5ba2\u6237\u7aef\u5e76\u53d1\u6570
concurrents=
#\u5ba2\u6237\u7aef\u8d85\u65f6\u65f6\u95f4\uff0c\u5355\u4f4d\u6beb\u79d2
timeout=
#benchmark\u8fd0\u884c\u65f6\u95f4,\u5355\u4f4d\u79d2
runtime=
#\u8fde\u63a5\u6570
connectionnums=
\ No newline at end of file
java -Xms512m -Xmx512m -Xmn128m -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:gc.log -Dwrite.statistics=true -Djava.ext.dirs="./lib" "com.alibaba.dubbo.rpc.benchmark.RpcBenchmarkClient" > "benchmark.log"
\ No newline at end of file
java -Xms512m -Xmx512m -Xmn128m -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:gc.log -Dwrite.statistics=true -Djava.ext.dirs="./lib" "com.alibaba.dubbo.rpc.benchmark.RpcBenchmarkClient" > "benchmark.log" 2>&1 &
\ No newline at end of file
/*
* Copyright 1999-2101 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jingdong.client;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import com.alibaba.dubbo.rpc.benchmark.AbstractClientRunnable;
import com.alibaba.dubbo.rpc.benchmark.DemoService;
import com.alibaba.dubbo.rpc.benchmark.ServiceFactory;
/**
* DemoBenchmarkClient.java
* @author tony.chenl
*/
public class DemoBenchmarkClientRunnable extends AbstractClientRunnable{
public DemoBenchmarkClientRunnable(String targetIP, int targetPort, int clientNums, int rpcTimeout,
CyclicBarrier barrier, CountDownLatch latch, long startTime,
long endTime){
super(targetIP, targetPort, clientNums, rpcTimeout, barrier, latch, startTime, endTime);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public Object invoke(ServiceFactory serviceFactory) {
DemoService demoService = (DemoService) serviceFactory.get(DemoService.class);
return demoService.sendRequest("hello");
}
}
/*
* Copyright 2011 Alibaba.com All right reserved. This software is the
* confidential and proprietary information of Alibaba.com ("Confidential
* Information"). You shall not disclose such Confidential Information and shall
* use it only in accordance with the terms of the license agreement you entered
* into with Alibaba.com.
*/
package com.dubbo.serialize.benchmark;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import com.alibaba.dubbo.common.io.UnsafeByteArrayInputStream;
import com.alibaba.dubbo.common.io.UnsafeByteArrayOutputStream;
import com.alibaba.dubbo.common.serialize.support.dubbo.GenericObjectInput;
import com.alibaba.dubbo.common.serialize.support.dubbo.GenericObjectOutput;
import com.caucho.hessian.io.Hessian2StreamingInput;
import com.caucho.hessian.io.Hessian2StreamingOutput;
import data.media.MediaContent;
/**
* 类Dubbo.java的实现描述:Dubbo Seriazition Benchmark
*
* @author tony.chenl 2011-9-30 上午10:17:21
*/
public class Dubbo {
public static void register(TestGroups groups) {
groups.media.add(JavaBuiltIn.MediaTransformer, Dubbo.<MediaContent>GenericSerializer());
}
public static <T> Serializer<T> GenericSerializer()
{
@SuppressWarnings("unchecked")
Serializer<T> s = (Serializer<T>) GenericSerializer;
return s;
}
// ------------------------------------------------------------
// Serializer (just one)
public static Serializer<Object> GenericSerializer = new Serializer<Object>()
{
public Object deserialize(byte[] array) throws Exception
{
GenericObjectInput objectInput = new GenericObjectInput(new ByteArrayInputStream(array));
return objectInput.readObject();
}
public byte[] serialize(Object data) throws java.io.IOException
{
UnsafeByteArrayOutputStream os = new UnsafeByteArrayOutputStream(10240);
GenericObjectOutput objectOutput = new GenericObjectOutput(os);
objectOutput.writeObject(data);
objectOutput.flushBuffer();
return os.toByteArray();
}
public String getName()
{
return "dubbo";
}
};
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://code.alibabatech.com/schema/dubbo
http://code.alibabatech.com/schema/dubbo/dubbo.xsd
">
<bean id="demo.local" class="com.alibaba.dubbo.rpc.benchmark.DemoServiceImpl" />
<bean id="persistPropertyPlaceholderConfigurer"
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE" />
<property name="ignoreResourceNotFound" value="true" />
<property name="ignoreUnresolvablePlaceholders" value="true" />
<property name="locations">
<list>
<value>classpath:dubbo-default.properties</value>
<value>classpath:dubbo.properties</value>
</list>
</property>
</bean>
<dubbo:registry address="N/A"/>
<!-- 服务应用配置 -->
<dubbo:application name="dubbo_provider" />
<!-- 服务提供者全局配置 -->
<dubbo:protocol name="dubbo" port="20885"/>
<!-- 服务提供者暴露服务配置 -->
<dubbo:service id="helloService" interface="com.alibaba.dubbo.rpc.benchmark.DemoService"
ref="demo.local"/>
</beans>
\ No newline at end of file
dubbo.registry.address=10.20.153.28:9090
dubbo.provider.port=20880
dubbo.provider.protocol=dubbo
dubbo.provider.codec=
dubbo.provider.client=
dubbo.provider.server=
dubbo.provider.serialization=
\ No newline at end of file
log4j.rootLogger=WARN,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} [%t] %5p %c{1}\:%L - %m%n
log4j.logger.com.alibaba.dubbo=WARN
\ No newline at end of file
sh servercommon.sh "../lib" "com.alibaba.dubbo.rpc.benchmark.RpcBenchmarkServer" "rpcserver.log.dubbo"
sh servercommon.sh "../lib" "com.alibaba.dubbo.rpc.benchmark.BenchmarkServer" "server.log.dubbo"
java -Xms2g -Xmx2g -Xmn500m -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:gc.log -Djava.ext.dirs=$1 $2 [listenport] [maxthreads] [responsesize] [transporter] [serialization]> $3 2>&1 &
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册