提交 56c3ae41 编写于 作者: Y yong.you

add the test

上级 16391281
......@@ -31,8 +31,6 @@ import com.dianping.cat.consumer.transaction.TransactionReportTypeAggergatorTest
@RunWith(Suite.class)
@SuiteClasses({
PeriodStrategyTest.class,
ProblemHandlerTest.class,
FormatTest.class,
......
......@@ -18,15 +18,12 @@ import com.dianping.cat.core.dal.ProjectDao;
import com.dianping.cat.core.dal.TaskDao;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.codec.PlainTextMessageCodec;
import com.dianping.cat.message.spi.core.DefaultMessageHandler;
import com.dianping.cat.message.spi.core.DefaultMessagePathBuilder;
import com.dianping.cat.message.spi.core.MessageHandler;
import com.dianping.cat.message.spi.core.MessagePathBuilder;
import com.dianping.cat.message.spi.core.TcpSocketReceiver;
import com.dianping.cat.message.spi.core.TcpSocketReceiver.DecodeMessageTask;
import com.dianping.cat.service.RemoteModelService;
import com.dianping.cat.statistic.ServerStatisticManager;
import com.dianping.cat.storage.dump.ChannelBufferManager;
import com.dianping.cat.storage.dump.LocalMessageBucket;
import com.dianping.cat.storage.dump.LocalMessageBucketManager;
import com.dianping.cat.storage.dump.MessageBucket;
......@@ -50,21 +47,16 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(MessageAnalyzerManager.class, DefaultMessageAnalyzerManager.class));
all.add(C(RemoteModelService.class));
all.add(C(TcpSocketReceiver.class).req(ServerConfigManager.class).req(ServerStatisticManager.class)
.req(MessageCodec.class, PlainTextMessageCodec.ID).req(MessageHandler.class));
all.add(C(DecodeMessageTask.class));
all.add(C(MessageHandler.class, DefaultMessageHandler.class));
all.add(C(MessageBucket.class, LocalMessageBucket.ID, LocalMessageBucket.class) //
.is(PER_LOOKUP) //
.req(MessageCodec.class, PlainTextMessageCodec.ID));
all.add(C(MessageBucketManager.class, LocalMessageBucketManager.ID, LocalMessageBucketManager.class) //
.req(ServerConfigManager.class, MessagePathBuilder.class, ServerStatisticManager.class));
all.add(C(ChannelBufferManager.class));
all.add(C(Module.class, CatCoreModule.ID, CatCoreModule.class));
......
package com.dianping.cat.message.spi.core;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.unidal.lookup.ContainerHolder;
import org.unidal.lookup.annotation.Inject;
import com.dianping.cat.message.spi.MessageTree;
public class DefaultMessageHandler extends ContainerHolder implements MessageHandler, LogEnabled {
@Inject
private MessageConsumer m_consumer;
private Logger m_logger;
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
}
@Override
public void handle(MessageTree tree) {
if (m_consumer == null) {
m_consumer = lookup(MessageConsumer.class);
}
try {
m_consumer.consume(tree);
} catch (Throwable e) {
m_logger.error("Error when consuming message in " + m_consumer + "! tree: " + tree, e);
}
}
}
......@@ -5,7 +5,6 @@ import java.util.HashMap;
import java.util.Map;
public class ModelRequest {
private String m_reportName;
private String m_domain;
......@@ -25,10 +24,6 @@ public class ModelRequest {
return m_domain;
}
public String getReportName() {
return m_reportName;
}
public ModelPeriod getPeriod() {
return m_period;
}
......@@ -63,18 +58,6 @@ public class ModelRequest {
}
}
public boolean hasProperty(String name) {
if (m_properties != null) {
return m_properties.containsKey(name);
} else {
return false;
}
}
public void setReportName(String reportName) {
m_reportName = reportName;
}
public ModelRequest setProperty(String name, String value) {
if (m_properties == null) {
m_properties = new HashMap<String, String>();
......
package com.dianping.cat.service;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import org.unidal.helper.Files;
import org.unidal.helper.Threads;
import org.unidal.helper.Threads.Task;
import org.unidal.tuple.Pair;
import com.dianping.cat.Cat;
import com.dianping.cat.message.Event;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.internal.DefaultMessageProducer;
public class RemoteModelService implements Initializable {
private ExecutorService m_threadPool;
private int m_maxThreads = 50;
private String m_prefixUri = "/cat/r/model";
private URL buildUrl(Pair<String, Integer> endpoint, String name, ModelRequest request) throws MalformedURLException {
StringBuilder sb = new StringBuilder(128);
for (Entry<String, String> e : request.getProperties().entrySet()) {
if (e.getValue() != null) {
sb.append('&');
sb.append(e.getKey()).append('=').append(e.getValue());
}
}
String url = String.format("http://%s:%s%s/%s/%s/%s?op=xml%s", endpoint.getKey(), endpoint.getValue(), m_prefixUri, name,
request.getDomain(), request.getPeriod(), sb.toString());
return new URL(url);
}
@Override
public void initialize() throws InitializationException {
m_threadPool = Threads.forPool().getFixedThreadPool("Cat-HttpService", m_maxThreads);
}
public void invoke(Pair<String, Integer> endpoint, Transaction parent, String name, ModelRequest request,
HttpServiceCallback callback) throws IOException {
URL url = buildUrl(endpoint, name, request);
m_threadPool.submit(new HttpServiceInvoker(parent, name, url, callback));
}
public void setMaxThreads(int maxThreads) {
m_maxThreads = maxThreads;
}
public static interface HttpServiceCallback {
public void onComplete(String xml);
public void onException(Exception e, boolean timeout);
}
class HttpServiceInvoker extends TrackableTask {
private String m_name;
private URL m_url;
private HttpServiceCallback m_callback;
public HttpServiceInvoker(Transaction parent, String name, URL url, HttpServiceCallback callback) {
super(name, parent);
m_name = name;
m_url = url;
m_callback = callback;
}
@Override
public void run() {
Transaction t = newTransaction("ModelService", m_name);
try {
t.addData(m_url.toString());
String xml = Files.forIO().readFrom(m_url.openStream(), "utf-8");
int len = xml == null ? 0 : xml.length();
t.addData("length", len);
if (len > 0) {
m_callback.onComplete(xml);
t.setStatus(Message.SUCCESS);
} else {
t.setStatus("NoReport");
}
} catch (Exception e) {
logError(e);
t.setStatus(e);
} finally {
t.complete();
Cat.reset();
}
}
@Override
public void shutdown() {
}
}
static abstract class TrackableTask implements Task {
private String m_name;
private Transaction m_parent;
public TrackableTask(String name, Transaction parent) {
m_name = name;
m_parent = parent;
}
@Override
public String getName() {
return m_name;
}
protected void logError(Throwable cause) {
StringWriter writer = new StringWriter(2048);
cause.printStackTrace(new PrintWriter(writer));
if (cause instanceof Error) {
logEvent("Error", cause.getClass().getName(), "ERROR", writer.toString());
} else if (cause instanceof RuntimeException) {
logEvent("RuntimeException", cause.getClass().getName(), "ERROR", writer.toString());
} else {
logEvent("Exception", cause.getClass().getName(), "ERROR", writer.toString());
}
}
protected void logEvent(String type, String name, String status, String nameValuePairs) {
DefaultMessageProducer cat = (DefaultMessageProducer) Cat.getProducer();
Event event = cat.newEvent(m_parent, type, name);
if (nameValuePairs != null && nameValuePairs.length() > 0) {
event.addData(nameValuePairs);
}
event.setStatus(status);
event.complete();
}
protected Transaction newTransaction(String type, String name) {
DefaultMessageProducer cat = (DefaultMessageProducer) Cat.getProducer();
Transaction transaction = cat.newTransaction(m_parent, type, name);
return transaction;
}
protected void setParentTransaction(Transaction parentTransaction) {
m_parent = parentTransaction;
}
}
}
package com.dianping.cat.storage.dump;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
public class ChannelBufferManager {
private BlockingQueue<ChannelBuffer> m_pool = new LinkedBlockingQueue<ChannelBuffer>(100);
public ChannelBuffer allocate() {
ChannelBuffer buffer = m_pool.poll();
if (buffer != null) {
return buffer;
} else {
return ChannelBuffers.dynamicBuffer(16384);
}
}
public void revoke(ChannelBuffer buffer) {
if (buffer.capacity() <= 16384) { // get rid of big buffer
buffer.clear();
m_pool.offer(buffer);
}
}
}
......@@ -4,10 +4,12 @@ import org.junit.runner.RunWith;
import org.junit.runners.Suite;
import org.junit.runners.Suite.SuiteClasses;
import com.dianping.cat.analysis.PeriodStrategyTest;
import com.dianping.cat.message.spi.core.HtmlMessageCodecTest;
import com.dianping.cat.message.spi.core.MessagePathBuilderTest;
import com.dianping.cat.message.spi.core.TcpSocketReceiverTest;
import com.dianping.cat.message.spi.core.WaterfallMessageCodecTest;
import com.dianping.cat.service.ModelRequestTest;
import com.dianping.cat.statistic.ServerStatisticManagerTest;
import com.dianping.cat.storage.dump.LocalMessageBucketManagerTest;
import com.dianping.cat.storage.dump.LocalMessageBucketTest;
......@@ -36,7 +38,13 @@ TcpSocketReceiverTest.class,
MessagePathBuilderTest.class,
ServerStatisticManagerTest.class
ServerStatisticManagerTest.class,
PeriodStrategyTest.class,
ModelRequestTest.class,
ServerConfigManagerTest.class
})
public class AllTests {
......
package com.dianping.cat.consumer;
package com.dianping.cat.analysis;
import junit.framework.Assert;
......@@ -6,7 +6,6 @@ import org.junit.Test;
import com.dianping.cat.analysis.PeriodStrategy;
public class PeriodStrategyTest {
@Test
public void test1() {
......
package com.dianping.cat.service;
import org.junit.Test;
import junit.framework.Assert;
public class ModelPeriodTest {
@Test
public void test(){
long time = System.currentTimeMillis();
long start = time - time % (3600 * 1000L);
ModelPeriod period = ModelPeriod.getByTime(start);
Assert.assertEquals(start,period.getStartTime());
Assert.assertEquals(true,period.isCurrent());
Assert.assertEquals(false,period.isFuture());
Assert.assertEquals(false,period.isHistorical());
Assert.assertEquals(false,period.isLast());
Assert.assertEquals(period, ModelPeriod.getByName(period.name(), period));
}
}
package com.dianping.cat.service;
import junit.framework.Assert;
import org.junit.Test;
public class ModelRequestTest {
@Test
public void test() {
long time = System.currentTimeMillis();
long start = time - time % (3600 * 1000L);
ModelRequest request = new ModelRequest("Cat", start);
String str = "test";
request.setProperty(str, str);
Assert.assertEquals(ModelPeriod.CURRENT, request.getPeriod());
Assert.assertEquals(str, request.getProperty(str));
Assert.assertEquals(start, request.getStartTime());
Assert.assertEquals("ModelRequest[domain=Cat, period=CURRENT, properties={test=test}]", request.toString());
}
}
......@@ -24,6 +24,9 @@ public class ServerStatisticManagerTest {
manager.addMessageSize(6);
manager.addMessageTotal(7);
manager.addMessageTotalLoss(8);
manager.addPigeonTimeError(9);
manager.addNetworkTimeError(10);
manager.addProcessDelay(11);
manager.addMessageSize(domain, 1);
manager.addMessageTotal(domain, 2);
manager.addMessageTotalLoss(domain, 3);
......@@ -36,9 +39,17 @@ public class ServerStatisticManagerTest {
Assert.assertEquals(6.0, findState(manager, time).getMessageSize());
Assert.assertEquals(7, findState(manager, time).getMessageTotal());
Assert.assertEquals(8, findState(manager, time).getMessageTotalLoss());
Assert.assertEquals(9, findState(manager, time).getPigeonTimeError());
Assert.assertEquals(10, findState(manager, time).getNetworkTimeError());
Assert.assertEquals(11.0, findState(manager, time).getProcessDelaySum());
Assert.assertEquals(1, findState(manager, time).getProcessDelayCount());
Assert.assertEquals(1.0, findState(manager, time).getMessageSizes().get(domain));
Assert.assertEquals(2, findState(manager, time).getMessageTotals().get(domain).get());
Assert.assertEquals(3, findState(manager, time).getMessageTotalLosses().get(domain).get());
manager.removeState(time);
Assert.assertEquals(true, null != manager.findState(time));
}
private Statistic findState(ServerStatisticManager manager, long time) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册