提交 5b6298d8 编写于 作者: Y youyong

consumer,anayler

上级 b3e8096e
......@@ -7,10 +7,10 @@ import com.dianping.cat.message.consumer.impl.AnalyzerFactory;
import com.dianping.cat.message.consumer.impl.DefaultAnalyzerFactoryImpl;
import com.dianping.cat.message.consumer.impl.DefaultMessageQueue;
import com.dianping.cat.message.consumer.impl.RealtimeConsumer;
import com.dianping.cat.message.consumer.model.failure.FailureReportAnalyzer;
import com.dianping.cat.message.consumer.model.failure.FailureReportAnalyzer.FailureHandler;
import com.dianping.cat.message.consumer.model.failure.FailureReportAnalyzer.Handler;
import com.dianping.cat.message.consumer.model.failure.FailureReportAnalyzer.LongUrlHandler;
import com.dianping.cat.message.consumer.model.failure.FailureReportAnalyzerConfig;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessageQueue;
import com.site.lookup.configuration.AbstractResourceConfigurator;
......@@ -22,29 +22,28 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
List<Component> all = new ArrayList<Component>();
all.add(C(MessageQueue.class, DefaultMessageQueue.class).is(PER_LOOKUP));
all.add(C(AnalyzerFactory.class, DefaultAnalyzerFactoryImpl.class));
all.add(C(MessageConsumer.class, "realtime", RealtimeConsumer.class) //
.req(AnalyzerFactory.class)
.config(E("consumerId").value("realtime") //
, E("domain").value("Review") //
, E("extraTime").value("300000")//
, E("analyzerNames").value("failure-report") //
));
all.add(C(AnalyzerFactory.class, DefaultAnalyzerFactoryImpl.class));
, E("analyzerNames").value("failure-report")
));
String handlers = "failure,long-url";
String failureTypes = "Error,RuntimeException,Exception";
all.add(C(Handler.class, "failure", FailureHandler.class)//
.config(E("failureType").value(failureTypes)).is(PER_LOOKUP));
.config(E("failureType").value(failureTypes)));
all.add(C(Handler.class, "long-url", LongUrlHandler.class) //
.config(E("threshold").value("2000")).is(PER_LOOKUP));
all.add(C(FailureReportAnalyzerConfig.class, "failure-analyzer-config",
FailureReportAnalyzerConfig.class)//
.config(E("handlers").value(handlers)//
, E("machines").value("All")));
.config(E("threshold").value("2000")));
all.add(C(FailureReportAnalyzer.class) //
.is(PER_LOOKUP)//
.req(Handler.class, new String[] { "failure", "long-url" },
"m_handlers"));
return all;
}
......
......@@ -4,7 +4,9 @@ import com.dianping.cat.message.spi.MessageAnalyzer;
public interface AnalyzerFactory {
public abstract MessageAnalyzer create(String name, long start,
long duration, String domain, long extraTime);
public MessageAnalyzer create(String name, long start, long duration,
String domain, long extraTime);
public void release(Object component);
}
\ No newline at end of file
package com.dianping.cat.message.consumer.impl;
import java.util.List;
import com.dianping.cat.message.consumer.model.failure.FailureReportAnalyzer;
import com.dianping.cat.message.consumer.model.failure.FailureReportAnalyzer.Handler;
import com.dianping.cat.message.consumer.model.failure.FailureReportAnalyzerConfig;
import com.dianping.cat.message.spi.MessageAnalyzer;
import com.site.helper.Splitters;
import com.site.lookup.ContainerHolder;
public class DefaultAnalyzerFactoryImpl extends ContainerHolder implements
......@@ -16,23 +11,11 @@ public class DefaultAnalyzerFactoryImpl extends ContainerHolder implements
public MessageAnalyzer create(String name, long start, long duration,
String domain, long extraTime) {
if (name.equals("failure")) {
FailureReportAnalyzer analyzer = new FailureReportAnalyzer(start,
duration, domain, extraTime);
FailureReportAnalyzerConfig config =lookup(FailureReportAnalyzerConfig.class,"failure-analyzer-config");
String handlers = config.getHandlers();
List<String> handlerList = Splitters.by(",").noEmptyItem().split(handlers);
String machines = config.getMachines();
for (String str : handlerList) {
if (str.equals("failure")) {
analyzer.addHandlers(lookup(Handler.class, "failure"));
} else if (str.equals("long-url")) {
analyzer.addHandlers(lookup(Handler.class, "long-url"));
}
}
analyzer.setMachines(machines);
FailureReportAnalyzer analyzer = lookup(FailureReportAnalyzer.class);
analyzer.setAnalyzerInfo(start, duration, domain, extraTime);
// TODO
// start,
// duration, domain, extraTime
return analyzer;
} else if (name.equals("transaction")) {
......@@ -40,4 +23,9 @@ public class DefaultAnalyzerFactoryImpl extends ContainerHolder implements
return null;
}
@Override
public void release(Object component) {
release(component);
}
}
package com.dianping.cat.message.consumer.impl;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import com.dianping.cat.message.spi.MessageQueue;
import com.dianping.cat.message.spi.MessageTree;
public class DefaultMessageQueue implements MessageQueue {
private Queue<MessageTree> queue = new LinkedBlockingQueue<MessageTree>();
private BlockingQueue<MessageTree> queue = new LinkedBlockingQueue<MessageTree>();
@Override
public MessageTree poll() {
return queue.poll();
try {
return queue.poll(1, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
return null;
}
}
@Override
......
......@@ -20,7 +20,7 @@ import com.site.lookup.annotation.Inject;
/**
* This is the real time consumer process framework. The config of the consumer
* contains name,domain,analyers.
* contains name,domain,analyzers.
*
*
*/
......@@ -36,7 +36,7 @@ public class RealtimeConsumer extends ContainerHolder implements
private static final int PROCESS_PERIOD = 3;
private static final long DEFAULT_EXTRA = 5*60*1000;
private static final long FIVE_MINUTES = 5 * 60 * 1000L;
@Inject
private String m_consumerId;
......@@ -45,29 +45,22 @@ public class RealtimeConsumer extends ContainerHolder implements
@Inject
private long m_duration = 1 * HOUR;
@Inject
private long m_extraTime = DEFAULT_EXTRA;
private long m_extraTime = FIVE_MINUTES;
@Inject
private List<String> m_analyzerNames;
@Inject
private AnalyzerFactory m_factory;
private int m_threads = 10;
private ExecutorService m_executor;
private List<Period> m_periods = new ArrayList<Period>(PROCESS_PERIOD);
/*private void cleanupQueue(List<MessageQueue> queues) {
for (int i = queues.size() - 1; i >= 0; i--) {
MessageQueue queue = queues.get(i);
if (!queue.isActive()) {
queues.remove(i);
}
}
}*/
private boolean isInDomain(MessageTree tree) {
if (m_domain == null || m_domain.length() == 0
|| m_domain.equalsIgnoreCase(DOMAIN_ALL)) {
......@@ -96,18 +89,13 @@ public class RealtimeConsumer extends ContainerHolder implements
if (current != null) {
List<MessageQueue> queues = current.getQueues();
distributeMessage(tree, queues);
//TODO
/*boolean dirty = distributeMessage(tree, queues);
// do clean up
if (dirty) {
cleanupQueue(queues);
}*/
} else {
// if not we will add many tasks
long systemTime = System.currentTimeMillis();
long nextStart = systemTime - systemTime % m_duration - 3
* m_duration;
if (timestamp < systemTime + MINUTE * 3 && timestamp >= nextStart) {
startTasks(tree);
} else {
......@@ -116,22 +104,14 @@ public class RealtimeConsumer extends ContainerHolder implements
}
}
//TODO
private void distributeMessage(MessageTree tree,
List<MessageQueue> queues) {
private void distributeMessage(MessageTree tree, List<MessageQueue> queues) {
int size = queues.size();
//boolean dirty = false;
// distribute to all queues
for (int i = 0; i < size; i++) {
MessageQueue queue = queues.get(i);
queue.offer(tree);
/*if (queue.isActive()) {
queue.offer(tree);
} else {
dirty = true;
}*/
}
//return dirty;
}
@Override
......@@ -170,23 +150,26 @@ public class RealtimeConsumer extends ContainerHolder implements
m_threads = threads;
}
public void setExtraTime(long time){
m_extraTime =time;
public void setExtraTime(long time) {
m_extraTime = time;
}
public void setFactory(AnalyzerFactory factory) {
this.m_factory = factory;
}
private void startTasks(MessageTree tree) {
long time = tree.getMessage().getTimestamp();
long start = time - time % m_duration;
LOG.info("Start Tasks At " + new Date(start));
List<MessageQueue> queues = new ArrayList<MessageQueue>();
Period current = new Period(start, start + m_duration, queues);
for (String name : m_analyzerNames) {
AnalyzerFactory factory = lookup(AnalyzerFactory.class);
MessageAnalyzer analyzer = factory.create(name, start, m_duration,
m_domain,m_extraTime);
MessageAnalyzer analyzer = m_factory.create(name, start, m_duration,
m_domain, m_extraTime);
MessageQueue queue = lookup(MessageQueue.class);
Task task = new Task(analyzer, queue);
Task task = new Task(m_factory, analyzer, queue);
queue.offer(tree);
queues.add(queue);
......@@ -221,11 +204,15 @@ public class RealtimeConsumer extends ContainerHolder implements
}
static class Task implements Runnable {
private AnalyzerFactory m_factory;
private MessageAnalyzer m_analyzer;
private MessageQueue m_queue;
public Task(MessageAnalyzer analyzer, MessageQueue queue) {
public Task(AnalyzerFactory factory, MessageAnalyzer analyzer,
MessageQueue queue) {
m_factory = factory;
m_analyzer = analyzer;
m_queue = queue;
}
......@@ -236,6 +223,8 @@ public class RealtimeConsumer extends ContainerHolder implements
public void run() {
m_analyzer.analyze(m_queue);
m_factory.release(m_analyzer);
m_factory.release(m_queue);
}
}
}
\ No newline at end of file
......@@ -33,50 +33,49 @@ public class FailureReportAnalyzer extends
private List<Handler> m_handlers;
private FailureReport m_report;
private long m_extraTime;
private static final long MINUTE = 60 * 1000;
private static final SimpleDateFormat sdf = new SimpleDateFormat(
"yyyy-MM-dd HH:mm");
private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm");
private static String getDateFormat(long time){
String result = "2012-01-01 00:00";
try {
Date date=new Date(time);
result = sdf.format(date);
} catch (Exception e) {
}
return result;
public FailureReportAnalyzer() {
}
public FailureReportAnalyzer(long startTime,long duration,String domain,long extraTime){
public void setAnalyzerInfo(long startTime, long duration, String domain,
long extraTime) {
m_report = new FailureReport();
m_report.setStartTime(new Date(startTime));
m_report.setEndTime(new Date(startTime+duration-MINUTE));
m_report.setEndTime(new Date(startTime + duration - MINUTE));
m_report.setDomain(domain);
m_extraTime=extraTime;
m_extraTime = extraTime;
m_report.setMachines(new Machines());
}
public void setMachines(String machines){
List<String> str =Splitters.by(',').noEmptyItem().split(machines);
for(String machine:str){
/*public void setMachines(String machines) {
List<String> str = Splitters.by(',').noEmptyItem().split(machines);
for (String machine : str) {
Machines temp = m_report.getMachines();
if(temp==null){
if (temp == null) {
temp = new Machines();
m_report.setMachines(temp);
}
temp.addMachine(machine);
}
}
}*/
public void addHandlers(Handler handler) {
if (m_handlers == null) {
m_handlers = new ArrayList<FailureReportAnalyzer.Handler>();
}
m_handlers.add(handler);
}
@Override
public FailureReport generate() {
return m_report;
......@@ -84,9 +83,12 @@ public class FailureReportAnalyzer extends
@Override
protected void process(MessageTree tree) {
if(m_handlers ==null){
if (m_handlers == null) {
throw new RuntimeException();
}
m_report.getMachines().addMachine(tree.getIpAddress());
for (Handler handler : m_handlers) {
handler.handle(m_report, tree);
}
......@@ -112,21 +114,22 @@ public class FailureReportAnalyzer extends
}
@Override
protected boolean isTimeEnd() {
protected boolean isTimeout() {
long endTime = m_report.getEndTime().getTime();
long currentTime = System.currentTimeMillis();
if(currentTime>endTime+m_extraTime){
if (currentTime > endTime + m_extraTime) {
return true;
}
return false;
}
public static abstract class Handler {
public abstract void handle(FailureReport report, MessageTree tree);
private Segment findOrCreateSegment(Message message,
public static interface Handler {
public void handle(FailureReport report, MessageTree tree);
}
public static abstract class AbstractHandler implements Handler {
protected Segment findOrCreateSegment(Message message,
FailureReport report) {
long time = message.getTimestamp();
long segmentId = time - time % MINUTE;
......@@ -134,21 +137,33 @@ public class FailureReportAnalyzer extends
Map<String, Segment> segments = report.getSegments();
Segment segment = segments.get(segmentStr);
if(segment ==null){
if (segment == null) {
segment = new Segment(segmentStr);
segments.put(segmentStr, segment);
}
return segment;
}
private String getDateFormat(long time) {
String result = "2012-01-01 00:00";
try {
Date date = new Date(time);
result = sdf.format(date);
} catch (Exception e) {
}
return result;
}
}
public static class FailureHandler extends Handler {
public static class FailureHandler extends AbstractHandler {
@Inject
private Set<String> m_failureTypes;
private void addEntry(FailureReport report, Message message,
MessageTree tree) {
String messageId = tree.getMessageId();
String threadId = tree.getThreadId();
Entry entry = new Entry();
......@@ -157,8 +172,8 @@ public class FailureReportAnalyzer extends
entry.setThreadId(threadId);
entry.setText(message.getName());
entry.setType(message.getType());
Segment segment =super.findOrCreateSegment(message,report);
Segment segment = super.findOrCreateSegment(message, report);
segment.addEntry(entry);
}
......@@ -200,14 +215,14 @@ public class FailureReportAnalyzer extends
}
}
}
public void setFailureType(String type) {
m_failureTypes = new HashSet<String>(Splitters.by(',')
.noEmptyItem().split(type));
}
}
public static class LongUrlHandler extends Handler {
public static class LongUrlHandler extends AbstractHandler {
@Inject
private long m_threshold;
......@@ -219,16 +234,17 @@ public class FailureReportAnalyzer extends
String messageId = ((DefaultMessageTree) tree).getMessageId();
String threadId = ((DefaultMessageTree) tree).getThreadId();
Transaction t = (Transaction) message;
if (t.getDuration() > m_threshold) {
Entry entry = new Entry();
entry.setMessageId(messageId);
entry.setThreadId(threadId);
entry.setText(message.getName());
entry.setType(message.getType());
Segment segment =super.findOrCreateSegment(message,report);
Segment segment = super
.findOrCreateSegment(message, report);
segment.addEntry(entry);
}
}
......
package com.dianping.cat.message.consumer.model.failure;
public class FailureReportAnalyzerConfig {
private String m_machines;
private String m_handlers;
public String getMachines() {
return m_machines;
}
public void setMachines(String machines) {
m_machines = machines;
}
public String getHandlers() {
return m_handlers;
}
public void setHandlers(String handlers) {
m_handlers = handlers;
}
}
......@@ -121,4 +121,10 @@ public class TransactionReportMessageAnalyzer extends AbstractMessageAnalyzer<Tr
public void setReportFile(File reportFile) {
this.reportFile = reportFile;
}
@Override
protected boolean isTimeout() {
// TODO Auto-generated method stub
return false;
}
}
......@@ -5,6 +5,10 @@
<implementation>com.dianping.cat.message.consumer.impl.DefaultMessageQueue</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
</component>
<component>
<role>com.dianping.cat.message.consumer.impl.AnalyzerFactory</role>
<implementation>com.dianping.cat.message.consumer.impl.DefaultAnalyzerFactoryImpl</implementation>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageConsumer</role>
<role-hint>realtime</role-hint>
......@@ -15,16 +19,16 @@
<extraTime>300000</extraTime>
<analyzerNames>failure-report</analyzerNames>
</configuration>
</component>
<component>
<role>com.dianping.cat.message.consumer.impl.AnalyzerFactory</role>
<implementation>com.dianping.cat.message.consumer.impl.DefaultAnalyzerFactoryImpl</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.consumer.impl.AnalyzerFactory</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.message.consumer.model.failure.FailureReportAnalyzer$Handler</role>
<role-hint>failure</role-hint>
<implementation>com.dianping.cat.message.consumer.model.failure.FailureReportAnalyzer$FailureHandler</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<configuration>
<failureType>Error,RuntimeException,Exception</failureType>
</configuration>
......@@ -33,19 +37,24 @@
<role>com.dianping.cat.message.consumer.model.failure.FailureReportAnalyzer$Handler</role>
<role-hint>long-url</role-hint>
<implementation>com.dianping.cat.message.consumer.model.failure.FailureReportAnalyzer$LongUrlHandler</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<configuration>
<threshold>2000</threshold>
</configuration>
</component>
<component>
<role>com.dianping.cat.message.consumer.model.failure.FailureReportAnalyzerConfig</role>
<role-hint>failure-analyzer-config</role-hint>
<implementation>com.dianping.cat.message.consumer.model.failure.FailureReportAnalyzerConfig</implementation>
<configuration>
<handlers>failure,long-url</handlers>
<machines>All</machines>
</configuration>
<role>com.dianping.cat.message.consumer.model.failure.FailureReportAnalyzer</role>
<implementation>com.dianping.cat.message.consumer.model.failure.FailureReportAnalyzer</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<requirements>
<requirement>
<role>com.dianping.cat.message.consumer.model.failure.FailureReportAnalyzer$Handler</role>
<role-hints>
<role-hint>failure</role-hint>
<role-hint>long-url</role-hint>
</role-hints>
<field-name>m_handlers</field-name>
</requirement>
</requirements>
</component>
</components>
</plexus>
......@@ -21,4 +21,9 @@ public class ManyAnalyerMockFactory implements AnalyzerFactory{
}
return null;
}
@Override
public void release(Object component) {
}
}
......@@ -57,7 +57,7 @@ public class ManyAnalyzerTest extends ComponentTestCase {
@Override
protected boolean isTimeEnd() {
protected boolean isTimeout() {
// TODO Auto-generated method stub
return false;
}
......@@ -82,7 +82,7 @@ public class ManyAnalyzerTest extends ComponentTestCase {
@Override
protected boolean isTimeEnd() {
protected boolean isTimeout() {
// TODO Auto-generated method stub
return false;
}
......@@ -106,7 +106,7 @@ public class ManyAnalyzerTest extends ComponentTestCase {
}
@Override
protected boolean isTimeEnd() {
protected boolean isTimeout() {
// TODO Auto-generated method stub
return false;
}
......
......@@ -12,4 +12,9 @@ public class OneAnalyzerMockFactory implements AnalyzerFactory{
}
return null;
}
@Override
public void release(Object component) {
}
}
......@@ -75,7 +75,7 @@ public class OneAnalyzerTwoDurationTest extends ComponentTestCase {
}
@Override
protected boolean isTimeEnd() {
protected boolean isTimeout() {
// TODO Auto-generated method stub
return false;
}
......
......@@ -2,12 +2,14 @@ package com.dianping.cat.message.consumer.model.failure;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Map;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import com.dianping.cat.consumer.model.failure.entity.Entry;
import com.dianping.cat.consumer.model.failure.entity.FailureReport;
import com.dianping.cat.consumer.model.failure.entity.Segment;
import com.dianping.cat.message.Message;
......@@ -20,7 +22,7 @@ import com.site.lookup.ComponentTestCase;
@RunWith(JUnit4.class)
public class FailureAnalyzerTest extends ComponentTestCase {
@Test
public void testFailureHandler() throws Exception {
long current = System.currentTimeMillis();
......@@ -30,81 +32,99 @@ public class FailureAnalyzerTest extends ComponentTestCase {
AnalyzerFactory factory = lookup(AnalyzerFactory.class);
FailureReportAnalyzer analyzer = (FailureReportAnalyzer) factory
.create("failure", start, duration, "all-domain", extraTime);
.create("failure", start, duration, "domain1", extraTime);
int number = 30000*10;
DefaultEvent e11 = new DefaultEvent("Error", "testError");
DefaultEvent e21 = new DefaultEvent("Exception", "testException1");
DefaultEvent e31 = new DefaultEvent("RuntimeException",
"testRuntimeException1");
DefaultEvent e22 = new DefaultEvent("Exception", "testException2");
DefaultEvent e32 = new DefaultEvent("RuntimeException",
"testRuntimeException2");
MessageTree tree = new DefaultMessageTree();
tree.setMessageId("xx0001");
tree.setDomain("group");
tree.setHostName("group001");
DefaultTransaction t1 = new DefaultTransaction("T1", "N1", null);
DefaultTransaction t2 = new DefaultTransaction("T2", "N2", null);
DefaultTransaction t3 = new DefaultTransaction("T3", "N3", null);
t2.addChild(t3);
t2.addChild(e21);
t2.addChild(e22);
t3.addChild(e31);
t3.addChild(e32);
t2.setStatus("ERROR");
t2.complete();
t1.setStatus(Message.SUCCESS);
t1.complete();
t1.addChild(t2);
t1.addChild(e11);
tree.setMessage(t1);
System.out.println(tree.toString());
long ct = System.nanoTime();
for (int i = 0; i < number; i++) {
if (i == 1) {
ct = System.nanoTime();
}
long addTime = 6* 1000* i;
long timestamp = start + addTime;
tree.setIpAddress("192.168.1."+i);
t1.setTimestamp(timestamp);
t2.setTimestamp(timestamp);
t3.setTimestamp(timestamp);
e11.setTimestamp(timestamp);
e21.setTimestamp(timestamp);
e22.setTimestamp(timestamp);
e31.setTimestamp(timestamp);
e32.setTimestamp(timestamp);
for (int i = 0; i < 60; i++) {
DefaultEvent e11 = new DefaultEvent("Error", "testError");
DefaultEvent e21 = new DefaultEvent("Exception", "testException");
DefaultEvent e31 = new DefaultEvent("RuntimeException",
"testRuntimeException");
DefaultEvent e22 = new DefaultEvent("Exception", "testException");
DefaultEvent e32 = new DefaultEvent("RuntimeException",
"testRuntimeException");
MessageTree tree = new DefaultMessageTree();
tree.setMessageId("xx0001");
tree.setDomain("group");
tree.setHostName("group001");
tree.setIpAddress("192.168.1.1");
DefaultTransaction t1 = new DefaultTransaction("Error",
"OutOfMemory", null);
DefaultTransaction t2 = new DefaultTransaction("Exception",
"NullPointException", null);
DefaultTransaction t3 = new DefaultTransaction("RuntimeException",
"RuntimeException", null);
t2.addChild(t3);
t2.addChild(e21);
t2.addChild(e22);
t3.addChild(e31);
t3.addChild(e32);
t2.setStatus("ERROR");
t2.complete();
t2.setDuration(i);
t1.addChild(t2);
t1.setStatus(Message.SUCCESS);
t1.complete();
t1.addChild(e11);
t1.setDuration(i * 2);
tree.setMessage(t1);
t1.setTimestamp(start + 60 * 1000 * i);
t2.setTimestamp(start + 60 * 1000 * i);
t3.setTimestamp(start + 60 * 1000 * i);
e11.setTimestamp(start + 60 * 1000 * i);
e21.setTimestamp(start + 60 * 1000 * i);
e22.setTimestamp(start + 60 * 1000 * i);
e31.setTimestamp(start + 60 * 1000 * i);
e32.setTimestamp(start + 60 * 1000 * i);
analyzer.process(tree);
analyzer.process(tree);
analyzer.process(tree);
}
long time = System.nanoTime() - ct;
System.out.println("time: " + time / 1e6 + " ms,"
+ (time / 1e6 / number) + " ms each");
FailureReport report = analyzer.generate();
assertEquals("Check the Machines",report.getMachines().getMachines().size(), 3);
assertEquals("Check the domain", report.getDomain(), "all-domain");
//System.out.println(report.toString());
assertEquals("Check the Machines",number, report.getMachines().getMachines()
.size());
assertEquals("Check the domain", report.getDomain(), "domain1");
Date startDate = report.getStartTime();
Date endDate = report.getEndTime();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String startStr = sdf.format(startDate);
String endStr = sdf.format(endDate);
Date realStartDate = new Date(start);
Date realEndDate = new Date(start + duration - 60 * 1000);
assertEquals("Check the report start time",sdf.format(realStartDate), startStr);
assertEquals("Check the report end time",sdf.format(realEndDate), endStr);
assertEquals("Check the report start time", sdf.format(realStartDate),
startStr);
assertEquals("Check the report end time", sdf.format(realEndDate),
endStr);
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd hh:mm");
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm");
Map<String, Segment> segments = report.getSegments();
for (int i = 0; i < 60; i++) {
for (int i = 0; i < number/10; i++) {
String minuteStr = sdf2.format(startDate);
Segment temp = segments.get(minuteStr);
assertEquals("Check the segment size ",temp.getEntries().size(), 8*3);
startDate.setTime(startDate.getTime()+ 1000 * 60);
List<Entry> entries = temp.getEntries();
if(entries==null){
System.out.println(minuteStr);
}
assertEquals("Check the segment size ", 50, entries.size());
startDate.setTime(startDate.getTime() + 1000 * 60);
}
System.out.println(report.toString());
// System.out.println(report.toString());
}
@Test
......@@ -116,47 +136,50 @@ public class FailureAnalyzerTest extends ComponentTestCase {
long start = current - current % (60 * 60 * 1000);
AnalyzerFactory factory = lookup(AnalyzerFactory.class);
FailureReportAnalyzer analyzer = (FailureReportAnalyzer) factory
.create("failure", start, duration, "all-domain", extraTime);
for (int i = 0; i < 60; i++) {
.create("failure", start, duration, "domain1", extraTime);
int number = 60;
for (int i = 0; i < number; i++) {
DefaultTransaction t = new DefaultTransaction("A1", "B1", null);
MessageTree tree = new DefaultMessageTree();
tree.setMessageId("thread0001");
tree.setDomain("middleware");
tree.setHostName("middleware");
tree.setIpAddress("127.0.0.1");
tree.setIpAddress("127.0.0."+i);
tree.setMessage(t);
t.setDuration(3 *1000);
t.setDuration(3 * 1000);
t.setTimestamp(start + 1000 * 60 * i);
analyzer.process(tree);
analyzer.process(tree);
// analyzer.process(tree);
}
FailureReport report = analyzer.generate();
assertEquals("Check the Machines",report.getMachines().getMachines().size(), 3);
assertEquals("Check the domain", report.getDomain(), "all-domain");
assertEquals("Check the Machines", number,report.getMachines().getMachines()
.size());
assertEquals("Check the domain", report.getDomain(), "domain1");
Date startDate = report.getStartTime();
Date endDate = report.getEndTime();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String startStr = sdf.format(startDate);
String endStr = sdf.format(endDate);
Date realStartDate = new Date(start);
Date realEndDate = new Date(start + duration - 60 * 1000);
assertEquals("Check the report start time",sdf.format(realStartDate), startStr);
assertEquals("Check the report end time",sdf.format(realEndDate), endStr);
assertEquals("Check the report start time", sdf.format(realStartDate),
startStr);
assertEquals("Check the report end time", sdf.format(realEndDate),
endStr);
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd hh:mm");
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm");
System.out.println(report.toString());
//System.out.println(report.toString());
Map<String, Segment> segments = report.getSegments();
for (int i = 0; i < 60; i++) {
String minuteStr = sdf2.format(startDate);
Segment temp = segments.get(minuteStr);
assertEquals("Check the segment size ",temp.getEntries().size(), 2);
startDate.setTime(startDate.getTime()+ 1000 * 60);
assertEquals("Check the segment size ", temp.getEntries().size(), 1);
startDate.setTime(startDate.getTime() + 1000 * 60);
}
}
}
......@@ -16,13 +16,13 @@ public class FailureAnalyzerTestConfigurator extends
@Override
public List<Component> defineComponents() {
List<Component> all = new ArrayList<Component>();
String handlers = "failure,long-url";
/*String handlers = "failure,long-url";
all.add(C(FailureReportAnalyzerConfig.class, "failure-analyzer-config",//
FailureReportAnalyzerConfig.class)//
.config(E("handlers").value(handlers)//
, E("machines").value("192.168.1.1,192.168.1.2,192.168.1.3")));
*/
return all;
}
......
<plexus>
<components>
<component>
<role>com.dianping.cat.message.consumer.model.failure.FailureReportAnalyzerConfig</role>
<role-hint>failure-analyzer-config</role-hint>
<implementation>com.dianping.cat.message.consumer.model.failure.FailureReportAnalyzerConfig</implementation>
<configuration>
<handlers>failure,long-url</handlers>
<machines>192.168.1.1,192.168.1.2,192.168.1.3</machines>
</configuration>
</component>
</components>
<components/>
</plexus>
......@@ -3,25 +3,22 @@ package com.dianping.cat.message.spi;
public abstract class AbstractMessageAnalyzer<R> implements MessageAnalyzer {
@Override
public void analyze(MessageQueue queue) {
while (!isTimeEnd()) {
while (!isTimeout()) {
MessageTree tree = queue.poll();
if (tree != null) {
process(tree);
} else {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//已经过期了,但是任务队列还有任务处理。
while(queue.size()>0){
// 已经过期了,但是任务队列还有任务处理。
while (true) {
MessageTree tree = queue.poll();
if (tree != null) {
process(tree);
} else {
break;
}
}
......@@ -35,6 +32,6 @@ public abstract class AbstractMessageAnalyzer<R> implements MessageAnalyzer {
public abstract R generate();
protected abstract void process(MessageTree tree);
protected abstract boolean isTimeEnd();
protected abstract boolean isTimeout();
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册