提交 a8841783 编写于 作者: F Frankie Wu

dump handler

上级 cfd698a5
package com.dianping.cat.consumer;
import com.dianping.cat.consumer.dump.DumpAnalyzer;
import com.dianping.cat.consumer.event.EventAnalyzer;
import com.dianping.cat.consumer.ip.IpAnalyzer;
import com.dianping.cat.consumer.problem.ProblemAnalyzer;
......@@ -7,10 +8,6 @@ import com.dianping.cat.consumer.transaction.TransactionAnalyzer;
import com.dianping.cat.message.spi.MessageAnalyzer;
import com.site.lookup.ContainerHolder;
/**
* @author yong.you
* @since Jan 5, 2012
*/
public class DefaultAnalyzerFactory extends ContainerHolder implements AnalyzerFactory {
@Override
public MessageAnalyzer create(String name, long start, long duration, long extraTime) {
......@@ -27,6 +24,11 @@ public class DefaultAnalyzerFactory extends ContainerHolder implements AnalyzerF
} else if (name.equals("event")) {
EventAnalyzer analyzer = lookup(EventAnalyzer.class);
analyzer.setAnalyzerInfo(start, duration, extraTime);
return analyzer;
} else if (name.equals("dump")) {
DumpAnalyzer analyzer = lookup(DumpAnalyzer.class);
analyzer.setAnalyzerInfo(start, duration, extraTime);
return analyzer;
} else if (name.equals("ip")) {
......
......@@ -179,6 +179,7 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
Date startDate = new Date(m_startTime);
Set<String> domains = new HashSet<String>();
Date endDate = new Date(m_endTime - 1);
m_logger.info(String.format("Finishing %s tasks in period [%s, %s]", m_tasks.size(), df.format(startDate),
df.format(endDate)));
......
......@@ -10,6 +10,9 @@ import com.dianping.cat.configuration.ServerConfigManager;
import com.dianping.cat.consumer.AnalyzerFactory;
import com.dianping.cat.consumer.DefaultAnalyzerFactory;
import com.dianping.cat.consumer.RealtimeConsumer;
import com.dianping.cat.consumer.dump.DumpAnalyzer;
import com.dianping.cat.consumer.dump.DumpChannel;
import com.dianping.cat.consumer.dump.DumpChannelManager;
import com.dianping.cat.consumer.event.EventAnalyzer;
import com.dianping.cat.consumer.ip.IpAnalyzer;
import com.dianping.cat.consumer.problem.ProblemAnalyzer;
......@@ -18,7 +21,9 @@ import com.dianping.cat.consumer.problem.handler.Handler;
import com.dianping.cat.consumer.problem.handler.LongUrlHandler;
import com.dianping.cat.consumer.transaction.TransactionAnalyzer;
import com.dianping.cat.hadoop.dal.ReportDao;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.storage.BucketManager;
import com.site.lookup.configuration.AbstractResourceConfigurator;
import com.site.lookup.configuration.Component;
......@@ -33,7 +38,7 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(MessageConsumer.class, "realtime", RealtimeConsumer.class) //
.req(AnalyzerFactory.class) //
.config(E("extraTime").value(property("extraTime", "300000"))//
, E("analyzers").value("problem,transaction,event,ip")));
, E("analyzers").value("problem,transaction,event,ip,dump")));
String failureTypes = "Error,RuntimeException,Exception";
......@@ -55,6 +60,14 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(IpAnalyzer.class));
all.add(C(DumpAnalyzer.class).is(PER_LOOKUP) //
.req(MessagePathBuilder.class) //
.req(DumpChannelManager.class));
all.add(C(DumpChannel.class));
all.add(C(DumpChannelManager.class) //
.req(MessageCodec.class, "plain-text"));
return all;
}
......
......@@ -43,12 +43,12 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
private long m_extraTime;
private Logger m_logger;
private long m_startTime;
private long m_duration;
private Logger m_logger;
@Override
public void doCheckpoint(boolean atEnd) {
storeReports(atEnd);
......
......@@ -10,7 +10,7 @@
<implementation>com.dianping.cat.consumer.RealtimeConsumer</implementation>
<configuration>
<extraTime>300000</extraTime>
<analyzers>problem,transaction,event,ip</analyzers>
<analyzers>problem,transaction,event,ip,dump</analyzers>
</configuration>
<requirements>
<requirement>
......@@ -86,7 +86,33 @@
<component>
<role>com.dianping.cat.consumer.ip.IpAnalyzer</role>
<implementation>com.dianping.cat.consumer.ip.IpAnalyzer</implementation>
</component>
<component>
<role>com.dianping.cat.consumer.dump.DumpAnalyzer</role>
<implementation>com.dianping.cat.consumer.dump.DumpAnalyzer</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessagePathBuilder</role>
</requirement>
<requirement>
<role>com.dianping.cat.consumer.dump.DumpChannelManager</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.consumer.dump.DumpChannel</role>
<implementation>com.dianping.cat.consumer.dump.DumpChannel</implementation>
</component>
<component>
<role>com.dianping.cat.consumer.dump.DumpChannelManager</role>
<implementation>com.dianping.cat.consumer.dump.DumpChannelManager</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessageCodec</role>
<role-hint>plain-text</role-hint>
</requirement>
</requirements>
</component>
</components>
</plexus>
......@@ -68,7 +68,7 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
.req(MessageConsumer.class, new String[] { DummyConsumer.ID }, "m_consumers"));
all.add(C(MessageQueue.class, DefaultMessageQueue.class) //
.config(E("size").value("1000")) //
.config(E("size").value("10000")) //
.is(PER_LOOKUP));
all.add(C(MessageSender.class, "tcp-socket", TcpSocketSender.class) //
......
......@@ -62,6 +62,7 @@ public abstract class MesageTreeBucketTestCase extends ComponentTestCase {
} catch (Exception e1) {
e1.printStackTrace();
}
try {
pool = Executors.newFixedThreadPool(threadNum);
bucket = createBucket();
......
......@@ -5,11 +5,14 @@ import java.io.IOException;
import java.util.Date;
import org.junit.After;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.storage.Bucket;
import com.dianping.cat.storage.MesageTreeBucketTestCase;
@RunWith(JUnit4.class)
public class LocalLogviewBucketTest extends MesageTreeBucketTestCase {
@SuppressWarnings("unchecked")
......
......@@ -3,7 +3,7 @@ package com.dianping.cat.report.build;
import java.util.ArrayList;
import java.util.List;
import com.dianping.cat.job.DumpToHdfsConsumer;
import com.dianping.cat.hadoop.DumpToHdfsConsumer;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessageConsumerRegistry;
import com.dianping.cat.message.spi.internal.DefaultMessageConsumerRegistry;
......
......@@ -20,13 +20,15 @@ public class DisplayTransactionNameReport {
return m_results;
}
public DisplayTransactionNameReport display(String sorted,String type,TransactionReport report) {
public DisplayTransactionNameReport display(String sorted, String type, TransactionReport report) {
Map<String, TransactionType> types = report.getTypes();
if (types != null) {
TransactionType names = types.get(type);
for (Entry<String, TransactionName> entry : names.getNames().entrySet()) {
m_results.add(new TransactionModel(entry.getKey(), entry.getValue()));
if (names != null) {
for (Entry<String, TransactionName> entry : names.getNames().entrySet()) {
m_results.add(new TransactionModel(entry.getKey(), entry.getValue()));
}
}
}
if (!StringUtils.isEmpty(sorted)) {
......
package com.dianping.cat.job;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessageStorage;
import com.dianping.cat.message.spi.MessageTree;
import com.site.lookup.annotation.Inject;
public class DumpToHdfsConsumer implements MessageConsumer {
public static final String ID = "dump-to-hdfs";
@Inject
private MessageStorage m_storage;
@Inject
private String m_domain;
@Override
public void consume(MessageTree tree) {
if (m_domain == null || m_domain.equals(tree.getDomain())) {
m_storage.store(tree);
}
}
@Override
public String getConsumerId() {
return ID;
}
public void setDomain(String domain) {
m_domain = domain;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册