提交 e6e22f25 编写于 作者: F Frankie Wu

replace analyzer factory with analyzer manager

上级 a9861de0
......@@ -9,8 +9,8 @@ import org.junit.runners.JUnit4;
import org.unidal.lookup.ComponentTestCase;
import com.dianping.cat.configuration.ServerConfigManager;
import com.dianping.cat.consumer.MessageAnalyzerFactory;
import com.dianping.cat.consumer.MessageAnalyzer;
import com.dianping.cat.consumer.MessageAnalyzerManager;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.internal.DefaultTransaction;
import com.dianping.cat.message.io.DefaultMessageQueue;
......@@ -27,7 +27,7 @@ public class DumpUploaderSample extends ComponentTestCase {
@Test
public void testUpload() throws Exception {
MessageAnalyzerFactory factory = lookup(MessageAnalyzerFactory.class);
MessageAnalyzerManager manager = lookup(MessageAnalyzerManager.class);
long now = System.currentTimeMillis();
DefaultMessageQueue queue = new DefaultMessageQueue();
int num = 10000;
......@@ -39,10 +39,9 @@ public class DumpUploaderSample extends ComponentTestCase {
queue.offer(newMessageTree(i, now + i * 10L));
}
MessageAnalyzer analyzer = factory.create("dump", now, 10 * 1000L, 10 * 1000L);
MessageAnalyzer analyzer = manager.getAnalyzer("dump", now);
analyzer.analyze(queue);
analyzer.doCheckpoint(true);
System.out.println("checkpoint");
......
package com.dianping.cat.consumer;
import org.unidal.lookup.ContainerHolder;
public class DefaultMessageAnalyzerFactory extends ContainerHolder implements MessageAnalyzerFactory {
@Override
public MessageAnalyzer create(String name, long startTime, long duration, long extraTime) {
MessageAnalyzer analyzer = lookup(MessageAnalyzer.class, name);
analyzer.setAnalyzerInfo(startTime, duration, extraTime);
return analyzer;
}
}
package com.dianping.cat.consumer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import org.unidal.lookup.ContainerHolder;
public class DefaultMessageAnalyzerManager extends ContainerHolder implements MessageAnalyzerManager {
public class DefaultMessageAnalyzerManager extends ContainerHolder implements MessageAnalyzerManager, Initializable {
private static final long MINUTE = 60 * 1000L;
private long m_duration = 60 * MINUTE;
private long m_extraTime = 3 * MINUTE;
private List<String> m_analyzerNames;
private Map<Long, Map<String, MessageAnalyzer>> m_map = new HashMap<Long, Map<String, MessageAnalyzer>>();
@Override
public MessageAnalyzer getAnalyzer(String name, long startTime) {
return null;
Map<String, MessageAnalyzer> map = m_map.get(startTime);
if (map == null) {
synchronized (m_map) {
if (map == null) {
map = new HashMap<String, MessageAnalyzer>();
m_map.put(startTime, map);
}
}
}
MessageAnalyzer analyzer = map.get(name);
if (analyzer == null) {
synchronized (map) {
if (analyzer == null) {
analyzer = lookup(MessageAnalyzer.class, name);
analyzer.setAnalyzerInfo(startTime, m_duration, m_extraTime);
map.put(name, analyzer);
}
}
}
return analyzer;
}
@Override
public List<String> getAnalyzerNames() {
return m_analyzerNames;
}
@Override
public void initialize() throws InitializationException {
Map<String, MessageAnalyzer> map = lookupMap(MessageAnalyzer.class);
m_analyzerNames = new ArrayList<String>(map.keySet());
}
}
package com.dianping.cat.consumer;
public interface MessageAnalyzerFactory {
public MessageAnalyzer create(String name, long start, long duration, long extraTime);
}
\ No newline at end of file
package com.dianping.cat.consumer;
import java.util.List;
public interface MessageAnalyzerManager {
public List<String> getAnalyzerNames();
public MessageAnalyzer getAnalyzer(String name, long startTime);
}
......@@ -16,7 +16,6 @@ import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import org.unidal.helper.Splitters;
import org.unidal.helper.Threads;
import org.unidal.helper.Threads.Task;
import org.unidal.lookup.ContainerHolder;
......@@ -36,19 +35,18 @@ import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.status.ServerStateManager;
/**
* This is the real time consumer process framework.
* <p>
* This is the real time message consuming entry.
*/
public class RealtimeConsumer extends ContainerHolder implements MessageConsumer, Initializable, LogEnabled {
public static final String ID = "realtime";
private static final long MINUTE = 60 * 1000L;
@Inject
private ServerStateManager m_serverStateManager;
private MessageAnalyzerManager m_analyzerManager;
@Inject
private MessageAnalyzerFactory m_factory;
private ServerStateManager m_serverStateManager;
@Inject
private long m_duration = 60 * MINUTE;
......@@ -56,9 +54,6 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
@Inject
private long m_extraTime = 3 * MINUTE;
@Inject
private List<String> m_analyzerNames;
private Set<String> m_domains = new HashSet<String>();
private Map<String, Integer> m_errorTimeDomains = new HashMap<String, Integer>();
......@@ -67,14 +62,12 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
private PeriodManager m_periodManager;
private CountDownLatch m_latch;
private long m_networkError;
@Override
public void consume(MessageTree tree) {
try {
m_latch.await();
m_periodManager.waitUntilStarted();
} catch (InterruptedException e) {
// ignore it
}
......@@ -92,6 +85,7 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
}
} else {
m_serverStateManager.addNetworkTimeError(1);
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd HH:mm:ss");
String domain = tree.getDomain();
Integer size = m_errorTimeDomains.get(domain);
......@@ -101,9 +95,10 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
} else {
size++;
}
m_errorTimeDomains.put(domain, size);
m_errorTimeDomains.put(domain, size);
m_networkError++;
if (m_networkError % (CatConstants.ERROR_COUNT * 10) == 0) {
m_logger.error("Error network time:" + m_errorTimeDomains);
m_logger.error("The timestamp of message is out of range, IGNORED! "
......@@ -161,20 +156,11 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
@Override
public void initialize() throws InitializationException {
m_latch = new CountDownLatch(1);
m_periodManager = new PeriodManager(m_latch);
m_periodManager = new PeriodManager();
Threads.forGroup("Cat").start(m_periodManager);
}
public void setAnalyzers(String analyzers) {
m_analyzerNames = Splitters.by(',').noEmptyItem().trim().split(analyzers);
}
public void setExtraTime(long time) {
m_extraTime = time;
}
class Period {
private long m_startTime;
......@@ -183,14 +169,16 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
private List<PeriodTask> m_tasks;
public Period(long startTime, long endTime) {
List<String> names = m_analyzerManager.getAnalyzerNames();
m_startTime = startTime;
m_endTime = endTime;
m_tasks = new ArrayList<PeriodTask>(m_analyzerNames.size());
m_tasks = new ArrayList<PeriodTask>(names.size());
Map<String, MessageAnalyzer> analyzers = new HashMap<String, MessageAnalyzer>();
for (String name : m_analyzerNames) {
MessageAnalyzer analyzer = m_factory.create(name, startTime, m_duration, m_extraTime);
for (String name : names) {
MessageAnalyzer analyzer = m_analyzerManager.getAnalyzer(name, startTime);
MessageQueue queue = lookup(MessageQueue.class);
PeriodTask task = new PeriodTask(analyzer, queue, startTime);
......@@ -204,8 +192,10 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
MessageAnalyzer transaction = analyzers.get(TransactionAnalyzer.ID);
MessageAnalyzer problem = analyzers.get(ProblemAnalyzer.ID);
((TopAnalyzer) top).setTransactionAnalyzer((TransactionAnalyzer) transaction);
((TopAnalyzer) top).setProblemAnalyzer((ProblemAnalyzer) problem);
if (top != null) {
((TopAnalyzer) top).setTransactionAnalyzer((TransactionAnalyzer) transaction);
((TopAnalyzer) top).setProblemAnalyzer((ProblemAnalyzer) problem);
}
}
public void distribute(MessageTree tree) {
......@@ -249,7 +239,8 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
}
public MessageAnalyzer getAnalyzer(String name) {
int index = m_analyzerNames.indexOf(name);
List<String> names = m_analyzerManager.getAnalyzerNames();
int index = names.indexOf(name);
if (index >= 0) {
PeriodTask task = m_tasks.get(index);
......@@ -295,10 +286,14 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
private CountDownLatch m_latch;
public PeriodManager(CountDownLatch latch) {
public PeriodManager() {
m_strategy = new PeriodStrategy(m_duration, m_extraTime, 3 * MINUTE);
m_active = true;
m_latch = latch;
m_latch = new CountDownLatch(1);
}
public void waitUntilStarted() throws InterruptedException {
m_latch.await();
}
private void endPeriod(long startTime) {
......@@ -335,10 +330,10 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
long startTime = m_strategy.next(System.currentTimeMillis());
// for current period
startPeriod(startTime);
m_latch.countDown();
try {
startPeriod(startTime);
m_latch.countDown();
while (m_active) {
long now = System.currentTimeMillis();
long value = m_strategy.next(now);
......@@ -357,6 +352,8 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
}
} catch (InterruptedException e) {
// ignore it
} catch (Exception e) {
e.printStackTrace();
}
}
......
......@@ -14,9 +14,9 @@ import com.dainping.cat.consumer.dal.report.ReportDao;
import com.dainping.cat.consumer.dal.report.TaskDao;
import com.dianping.cat.configuration.ServerConfigManager;
import com.dianping.cat.consumer.CatConsumerModule;
import com.dianping.cat.consumer.DefaultMessageAnalyzerFactory;
import com.dianping.cat.consumer.DefaultMessageAnalyzerManager;
import com.dianping.cat.consumer.MessageAnalyzer;
import com.dianping.cat.consumer.MessageAnalyzerFactory;
import com.dianping.cat.consumer.MessageAnalyzerManager;
import com.dianping.cat.consumer.RealtimeConsumer;
import com.dianping.cat.consumer.dump.DumpAnalyzer;
import com.dianping.cat.consumer.event.EventAnalyzer;
......@@ -39,12 +39,10 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
public List<Component> defineComponents() {
List<Component> all = new ArrayList<Component>();
all.add(C(MessageAnalyzerFactory.class, DefaultMessageAnalyzerFactory.class));
all.add(C(MessageAnalyzerManager.class, DefaultMessageAnalyzerManager.class));
all.add(C(MessageConsumer.class, RealtimeConsumer.ID, RealtimeConsumer.class) //
.req(MessageAnalyzerFactory.class, ServerStateManager.class) //
.config(E("extraTime").value(property("extraTime", "180000")) //
, E("analyzers").value("problem,transaction,event,heartbeat,dump,state,top,cross,database,ip,matrix,sql")));
.req(MessageAnalyzerManager.class, ServerStateManager.class));
all.add(C(Handler.class, DefaultProblemHandler.ID, DefaultProblemHandler.class)//
.config(E("failureType").value("URL,SQL,Call,PigeonCall,Cache"))//
......
<plexus>
<components>
<component>
<role>com.dianping.cat.consumer.MessageAnalyzerFactory</role>
<implementation>com.dianping.cat.consumer.DefaultMessageAnalyzerFactory</implementation>
<role>com.dianping.cat.consumer.MessageAnalyzerManager</role>
<implementation>com.dianping.cat.consumer.DefaultMessageAnalyzerManager</implementation>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageConsumer</role>
<role-hint>realtime</role-hint>
<implementation>com.dianping.cat.consumer.RealtimeConsumer</implementation>
<configuration>
<extraTime>180000</extraTime>
<analyzers>problem,transaction,event,heartbeat,dump,state,top,cross,database,ip,matrix,sql</analyzers>
</configuration>
<requirements>
<requirement>
<role>com.dianping.cat.consumer.MessageAnalyzerFactory</role>
<role>com.dianping.cat.consumer.MessageAnalyzerManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.status.ServerStateManager</role>
......
......@@ -4,6 +4,8 @@ import org.junit.runner.RunWith;
import org.junit.runners.Suite;
import org.junit.runners.Suite.SuiteClasses;
import com.dianping.cat.consumer.demo.ManyAnalyzerTest;
import com.dianping.cat.consumer.demo.OneAnalyzerTwoDurationTest;
import com.dianping.cat.consumer.problem.DefaultHandlerTest;
import com.dianping.cat.consumer.transaction.FormatTest;
import com.dianping.cat.consumer.transaction.GsonTest;
......@@ -18,6 +20,10 @@ import com.dianping.cat.consumer.transaction.TransactionReportTest;
PeriodStrategyTest.class,
ManyAnalyzerTest.class,
OneAnalyzerTwoDurationTest.class,
DefaultHandlerTest.class,
FormatTest.class,
......
package com.dianping.cat.consumer.demo;
import java.util.Arrays;
import java.util.List;
import com.dianping.cat.consumer.MessageAnalyzer;
import com.dianping.cat.consumer.MessageAnalyzerFactory;
import com.dianping.cat.consumer.MessageAnalyzerManager;
import com.dianping.cat.consumer.demo.ManyAnalyzerTest.MockAnalyzer1;
import com.dianping.cat.consumer.demo.ManyAnalyzerTest.MockAnalyzer2;
import com.dianping.cat.consumer.demo.ManyAnalyzerTest.MockAnalyzer3;
public class ManyAnalyerMockFactory implements MessageAnalyzerFactory {
public class ManyAnalyzerMockManager implements MessageAnalyzerManager {
@Override
public MessageAnalyzer create(String name, long start, long duration, long extraTime) {
public List<String> getAnalyzerNames() {
return Arrays.asList("mock1", "mock2", "mock3");
}
@Override
public MessageAnalyzer getAnalyzer(String name, long startTime) {
if (name.equals("mock1")) {
MockAnalyzer1 analyzer = new MockAnalyzer1();
return analyzer;
......@@ -19,6 +27,7 @@ public class ManyAnalyerMockFactory implements MessageAnalyzerFactory {
MockAnalyzer3 analyzer = new MockAnalyzer3();
return analyzer;
}
return null;
}
}
......@@ -4,7 +4,6 @@ import java.util.Set;
import junit.framework.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
......@@ -24,10 +23,6 @@ public class ManyAnalyzerTest extends ComponentTestCase {
private static int s_count3;
@Before
public void before() {
}
@Test
public void test() throws Exception {
MessageConsumer consumer = lookup(MessageConsumer.class, "mockManyAnalyzers");
......@@ -48,10 +43,10 @@ public class ManyAnalyzerTest extends ComponentTestCase {
Assert.assertEquals(300, s_count3);
}
public static class MockAnalyzer1 extends AbstractMessageAnalyzer<AnalyzerResult> {
public static class MockAnalyzer1 extends AbstractMessageAnalyzer<Void> {
@Override
protected void process(MessageTree tree) {
++s_count1;
s_count1++;
}
@Override
......@@ -60,7 +55,7 @@ public class ManyAnalyzerTest extends ComponentTestCase {
}
@Override
public AnalyzerResult getReport(String domain) {
public Void getReport(String domain) {
return null;
}
......@@ -70,10 +65,10 @@ public class ManyAnalyzerTest extends ComponentTestCase {
}
}
public static class MockAnalyzer2 extends AbstractMessageAnalyzer<AnalyzerResult> {
public static class MockAnalyzer2 extends AbstractMessageAnalyzer<Void> {
@Override
protected void process(MessageTree tree) {
s_count2 = s_count2 + 2;
s_count2 += 2;
}
@Override
......@@ -82,7 +77,7 @@ public class ManyAnalyzerTest extends ComponentTestCase {
}
@Override
public AnalyzerResult getReport(String domain) {
public Void getReport(String domain) {
return null;
}
......@@ -92,10 +87,10 @@ public class ManyAnalyzerTest extends ComponentTestCase {
}
}
public static class MockAnalyzer3 extends AbstractMessageAnalyzer<AnalyzerResult> {
public static class MockAnalyzer3 extends AbstractMessageAnalyzer<Void> {
@Override
protected void process(MessageTree tree) {
s_count3 = s_count3 + 3;
s_count3 += 3;
}
@Override
......@@ -104,7 +99,7 @@ public class ManyAnalyzerTest extends ComponentTestCase {
}
@Override
public AnalyzerResult getReport(String domain) {
public Void getReport(String domain) {
return null;
}
......@@ -114,10 +109,6 @@ public class ManyAnalyzerTest extends ComponentTestCase {
}
}
public static class AnalyzerResult {
}
static class MockMessage extends AbstractMessage {
public MockMessage() {
super(null, null);
......
......@@ -7,7 +7,7 @@ import org.unidal.lookup.configuration.AbstractResourceConfigurator;
import org.unidal.lookup.configuration.Component;
import com.dianping.cat.consumer.MessageAnalyzer;
import com.dianping.cat.consumer.MessageAnalyzerFactory;
import com.dianping.cat.consumer.MessageAnalyzerManager;
import com.dianping.cat.consumer.RealtimeConsumer;
import com.dianping.cat.consumer.demo.ManyAnalyzerTest.MockAnalyzer1;
import com.dianping.cat.consumer.demo.ManyAnalyzerTest.MockAnalyzer2;
......@@ -24,9 +24,7 @@ public class ManyAnalyzerTestConfigurator extends AbstractResourceConfigurator {
List<Component> all = new ArrayList<Component>();
all.add(C(MessageConsumer.class, "mockManyAnalyzers", RealtimeConsumer.class) //
.req(MessageAnalyzerFactory.class)//
.config(E("analyzers").value("mock1,mock2,mock3") //
));
.req(MessageAnalyzerManager.class));
all.add(C(MessageAnalyzer.class, "mock1", MockAnalyzer1.class) //
.is(PER_LOOKUP));
......@@ -35,7 +33,8 @@ public class ManyAnalyzerTestConfigurator extends AbstractResourceConfigurator {
all.add(C(MessageAnalyzer.class, "mock3", MockAnalyzer3.class) //
.is(PER_LOOKUP));
all.add(C(MessageAnalyzerFactory.class, ManyAnalyerMockFactory.class));
all.add(C(MessageAnalyzerManager.class, ManyAnalyzerMockManager.class));
return all;
}
......
package com.dianping.cat.consumer.demo;
import java.util.Arrays;
import java.util.List;
import com.dianping.cat.consumer.MessageAnalyzer;
import com.dianping.cat.consumer.MessageAnalyzerFactory;
import com.dianping.cat.consumer.MessageAnalyzerManager;
import com.dianping.cat.consumer.demo.OneAnalyzerTwoDurationTest.MockAnalyzer;
public class OneAnalyzerMockFactory implements MessageAnalyzerFactory {
public class OneAnalyzerMockManager implements MessageAnalyzerManager {
@Override
public MessageAnalyzer create(String name, long start, long duration, long extraTime) {
public List<String> getAnalyzerNames() {
return Arrays.asList("mock");
}
@Override
public MessageAnalyzer getAnalyzer(String name, long startTime) {
if (name.equals("mock")) {
MockAnalyzer analyzer = new OneAnalyzerTwoDurationTest.MockAnalyzer();
return analyzer;
}
return null;
}
}
......@@ -8,13 +8,13 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.unidal.lookup.ComponentTestCase;
import com.dianping.cat.consumer.AbstractMessageAnalyzer;
import com.dianping.cat.message.internal.AbstractMessage;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
import org.unidal.lookup.ComponentTestCase;
/**
* The tree message is in the latest two hours
......@@ -54,7 +54,7 @@ public class OneAnalyzerTwoDurationTest extends ComponentTestCase {
Assert.assertEquals(1, s_period);
}
public static class MockAnalyzer extends AbstractMessageAnalyzer<AnalyzerResult> {
public static class MockAnalyzer extends AbstractMessageAnalyzer<Void> {
public MockAnalyzer() {
s_period++;
}
......@@ -75,7 +75,7 @@ public class OneAnalyzerTwoDurationTest extends ComponentTestCase {
}
@Override
public AnalyzerResult getReport(String domain) {
public Void getReport(String domain) {
return null;
}
......@@ -85,10 +85,6 @@ public class OneAnalyzerTwoDurationTest extends ComponentTestCase {
}
}
public static class AnalyzerResult {
}
static class MockMessage extends AbstractMessage {
private int m_type;
......@@ -109,7 +105,6 @@ public class OneAnalyzerTwoDurationTest extends ComponentTestCase {
@Override
public void complete() {
}
}
}
......@@ -7,7 +7,7 @@ import org.unidal.lookup.configuration.AbstractResourceConfigurator;
import org.unidal.lookup.configuration.Component;
import com.dianping.cat.consumer.MessageAnalyzer;
import com.dianping.cat.consumer.MessageAnalyzerFactory;
import com.dianping.cat.consumer.MessageAnalyzerManager;
import com.dianping.cat.consumer.RealtimeConsumer;
import com.dianping.cat.consumer.demo.OneAnalyzerTwoDurationTest.MockAnalyzer;
import com.dianping.cat.message.spi.MessageConsumer;
......@@ -22,14 +22,12 @@ public class OneAnalyzerTwoDurationTestConfigurator extends AbstractResourceConf
List<Component> all = new ArrayList<Component>();
all.add(C(MessageConsumer.class, "mock", RealtimeConsumer.class) //
.config(E("analyzers").value("mock") //
).req(MessageAnalyzerFactory.class)//
);
.req(MessageAnalyzerManager.class));
all.add(C(MessageAnalyzer.class, "mock", MockAnalyzer.class) //
.is(PER_LOOKUP));
all.add(C(MessageAnalyzerFactory.class, OneAnalyzerMockFactory.class));
all.add(C(MessageAnalyzerManager.class, OneAnalyzerMockManager.class));
return all;
}
......
......@@ -3,20 +3,20 @@ package com.dianping.cat.consumer.dump;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.unidal.lookup.ComponentTestCase;
import com.dianping.cat.consumer.MessageAnalyzerFactory;
import com.dianping.cat.consumer.MessageAnalyzer;
import com.dianping.cat.consumer.MessageAnalyzerManager;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.internal.DefaultTransaction;
import com.dianping.cat.message.io.DefaultMessageQueue;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
import org.unidal.lookup.ComponentTestCase;
@RunWith(JUnit4.class)
public class DumpAnalyzerTest extends ComponentTestCase {
@Test
public void test() throws Exception {
MessageAnalyzerFactory factory = lookup(MessageAnalyzerFactory.class);
MessageAnalyzerManager manager = lookup(MessageAnalyzerManager.class);
long now = 1334041324150L;
DefaultMessageQueue queue = new DefaultMessageQueue();
int num = 1000000;
......@@ -28,10 +28,9 @@ public class DumpAnalyzerTest extends ComponentTestCase {
queue.offer(newMessageTree(i, now + i * 10L));
}
MessageAnalyzer analyzer = factory.create("dump", now, 10 * 1000L, 10 * 1000L);
MessageAnalyzer analyzer = manager.getAnalyzer("dump", now);
analyzer.analyze(queue);
analyzer.doCheckpoint(true);
}
......
/**
*
*/
package com.dianping.cat.consumer.transaction;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.unidal.lookup.ComponentTestCase;
import com.dianping.cat.consumer.MessageAnalyzerFactory;
import com.dianping.cat.consumer.MessageAnalyzerManager;
import com.dianping.cat.consumer.transaction.model.entity.TransactionName;
import com.dianping.cat.consumer.transaction.model.entity.TransactionReport;
import com.dianping.cat.consumer.transaction.model.entity.TransactionType;
......@@ -18,29 +13,14 @@ import com.dianping.cat.message.internal.DefaultTransaction;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
/**
* @author sean.wang
* @since Jan 5, 2012
*/
@RunWith(JUnit4.class)
public class TransactionReportMessageAnalyzerTest extends ComponentTestCase {
/**
* Test method for
* {@link com.dianping.cat.consumer.transaction.TransactionReportAnalyzer#process(com.dianping.cat.message.spi.MessageTree)}
* .
*
* @throws InterruptedException
*/
@Test
public void testCommonGenerate() throws Exception {
long current = System.currentTimeMillis();
long duration = 60 * 60 * 1000;
long extraTime = 5 * 60 * 1000;
long start = current - current % (60 * 60 * 1000) - 1000L * 60 * 60;
MessageAnalyzerFactory factory = lookup(MessageAnalyzerFactory.class);
TransactionAnalyzer analyzer = (TransactionAnalyzer) factory.create("transaction", start, duration, extraTime);
MessageAnalyzerManager manager = lookup(MessageAnalyzerManager.class);
TransactionAnalyzer analyzer = (TransactionAnalyzer) manager.getAnalyzer("transaction", start);
for (int i = 1; i <= 1000; i++) {
MessageTree tree = new DefaultMessageTree();
......@@ -78,21 +58,21 @@ public class TransactionReportMessageAnalyzerTest extends ComponentTestCase {
Assert.assertEquals(1000, n1.getTotalCount());
Assert.assertEquals(500, n1.getFailCount());
double e = 0.001;
Assert.assertEquals(50.0, n1.getFailPercent(),e);
Assert.assertEquals(2.0, n1.getMin(),e);
Assert.assertEquals(2000.0, n1.getMax(),e);
Assert.assertEquals(1001.0, n1.getAvg(),e);
Assert.assertEquals(1001000.0, n1.getSum(),e);
Assert.assertEquals(50.0, n1.getFailPercent(), e);
Assert.assertEquals(2.0, n1.getMin(), e);
Assert.assertEquals(2000.0, n1.getMax(), e);
Assert.assertEquals(1001.0, n1.getAvg(), e);
Assert.assertEquals(1001000.0, n1.getSum(), e);
TransactionType typeA1 = report.getMachines().get("192.168.1.1").getTypes().get("A-1");
TransactionName n2 = typeA1.getNames().get("n2");
Assert.assertEquals(1000, n2.getTotalCount());
Assert.assertEquals(500, n2.getFailCount());
Assert.assertEquals(50.0, n2.getFailPercent(),e);
Assert.assertEquals(1.0, n2.getMin(),e);
Assert.assertEquals(1000.0, n2.getMax(),e);
Assert.assertEquals(500.5, n2.getAvg(),e);
Assert.assertEquals(500500.0, n2.getSum(),e);
Assert.assertEquals(50.0, n2.getFailPercent(), e);
Assert.assertEquals(1.0, n2.getMin(), e);
Assert.assertEquals(1000.0, n2.getMax(), e);
Assert.assertEquals(500.5, n2.getAvg(), e);
Assert.assertEquals(500500.0, n2.getSum(), e);
}
}
......@@ -4,12 +4,9 @@
<role>com.dianping.cat.message.spi.MessageConsumer</role>
<role-hint>mockManyAnalyzers</role-hint>
<implementation>com.dianping.cat.consumer.RealtimeConsumer</implementation>
<configuration>
<analyzers>mock1,mock2,mock3</analyzers>
</configuration>
<requirements>
<requirement>
<role>com.dianping.cat.consumer.MessageAnalyzerFactory</role>
<role>com.dianping.cat.consumer.MessageAnalyzerManager</role>
</requirement>
</requirements>
</component>
......@@ -32,8 +29,8 @@
<instantiation-strategy>per-lookup</instantiation-strategy>
</component>
<component>
<role>com.dianping.cat.consumer.MessageAnalyzerFactory</role>
<implementation>com.dianping.cat.consumer.demo.ManyAnalyerMockFactory</implementation>
<role>com.dianping.cat.consumer.MessageAnalyzerManager</role>
<implementation>com.dianping.cat.consumer.demo.ManyAnalyzerMockManager</implementation>
</component>
</components>
</plexus>
......@@ -4,12 +4,9 @@
<role>com.dianping.cat.message.spi.MessageConsumer</role>
<role-hint>mock</role-hint>
<implementation>com.dianping.cat.consumer.RealtimeConsumer</implementation>
<configuration>
<analyzers>mock</analyzers>
</configuration>
<requirements>
<requirement>
<role>com.dianping.cat.consumer.MessageAnalyzerFactory</role>
<role>com.dianping.cat.consumer.MessageAnalyzerManager</role>
</requirement>
</requirements>
</component>
......@@ -20,8 +17,8 @@
<instantiation-strategy>per-lookup</instantiation-strategy>
</component>
<component>
<role>com.dianping.cat.consumer.MessageAnalyzerFactory</role>
<implementation>com.dianping.cat.consumer.demo.OneAnalyzerMockFactory</implementation>
<role>com.dianping.cat.consumer.MessageAnalyzerManager</role>
<implementation>com.dianping.cat.consumer.demo.OneAnalyzerMockManager</implementation>
</component>
</components>
</plexus>
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册