提交 1476219b 编写于 作者: J jialinsun

anaylzer milestone

上级 d3fbf410
......@@ -61,7 +61,9 @@
${basedir}/src/main/resources/META-INF/dal/model/matrix-report-manifest.xml,
${basedir}/src/main/resources/META-INF/dal/model/metric-report-manifest.xml,
${basedir}/src/main/resources/META-INF/dal/model/dependency-report-manifest.xml,
${basedir}/src/main/resources/META-INF/dal/model/metric-config-manifest.xml,</manifest>
${basedir}/src/main/resources/META-INF/dal/model/metric-config-manifest.xml,
${basedir}/src/main/resources/META-INF/dal/model/storage-report-manifest.xml,
</manifest>
</configuration>
</execution>
<execution>
......
......@@ -40,6 +40,8 @@ import com.dianping.cat.consumer.problem.ProblemHandler;
import com.dianping.cat.consumer.productline.ProductLineConfigManager;
import com.dianping.cat.consumer.state.StateAnalyzer;
import com.dianping.cat.consumer.state.StateDelegate;
import com.dianping.cat.consumer.storage.StorageAnalyzer;
import com.dianping.cat.consumer.storage.StorageDelegate;
import com.dianping.cat.consumer.top.TopAnalyzer;
import com.dianping.cat.consumer.top.TopDelegate;
import com.dianping.cat.consumer.transaction.TransactionAnalyzer;
......@@ -83,6 +85,7 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.addAll(defineMatrixComponents());
all.addAll(defineDependencyComponents());
all.addAll(defineMetricComponents());
all.addAll(defineStorageComponents());
all.add(C(Module.class, CatConsumerModule.ID, CatConsumerModule.class));
all.addAll(new CatDatabaseConfigurator().defineComponents());
......@@ -263,4 +266,19 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
return all;
}
private Collection<Component> defineStorageComponents() {
final List<Component> all = new ArrayList<Component>();
final String ID = StorageAnalyzer.ID;
all.add(C(MessageAnalyzer.class, ID, StorageAnalyzer.class).is(PER_LOOKUP) //
.req(ReportManager.class, ID).req(ReportDelegate.class, ID).req(ServerConfigManager.class));
all.add(C(ReportManager.class, ID, DefaultReportManager.class) //
.req(ReportDelegate.class, ID) //
.req(ReportBucketManager.class, HourlyReportDao.class, HourlyReportContentDao.class) //
.config(E("name").value(ID)));
all.add(C(ReportDelegate.class, ID, StorageDelegate.class).req(TaskManager.class, ServerConfigManager.class));
return all;
}
}
package com.dianping.cat.consumer.storage;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.unidal.lookup.annotation.Inject;
import org.unidal.lookup.util.StringUtils;
import org.unidal.tuple.Pair;
import com.dianping.cat.Cat;
import com.dianping.cat.analysis.AbstractMessageAnalyzer;
import com.dianping.cat.consumer.storage.model.entity.Domain;
import com.dianping.cat.consumer.storage.model.entity.Operation;
import com.dianping.cat.consumer.storage.model.entity.Segment;
import com.dianping.cat.consumer.storage.model.entity.StorageReport;
import com.dianping.cat.message.Event;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.service.DefaultReportManager.StoragePolicy;
import com.dianping.cat.service.ReportManager;
public class StorageAnalyzer extends AbstractMessageAnalyzer<StorageReport> implements LogEnabled {
@Inject
private StorageDelegate m_storageDelegate;
@Inject(ID)
private ReportManager<StorageReport> m_reportManager;
public static final String ID = "storage";
private static final long LONG_THRESHOLD = 1000;
private Map<String, Pair<String, String>> m_connections = new LinkedHashMap<String, Pair<String, String>>() {
private static final long serialVersionUID = 1L;
@Override
protected boolean removeEldestEntry(Entry<String, Pair<String, String>> eldest) {
return size() > 5000;
}
};
@Override
public void doCheckpoint(boolean atEnd) {
if (atEnd && !isLocalMode()) {
m_reportManager.storeHourlyReports(getStartTime(), StoragePolicy.FILE_AND_DB);
} else {
m_reportManager.storeHourlyReports(getStartTime(), StoragePolicy.FILE);
}
}
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
}
@Override
public StorageReport getReport(String id) {
long period = getStartTime();
StorageReport report = m_reportManager.getHourlyReport(period, id, false);
String type = id.substring(id.lastIndexOf("-") + 1);
for (String myId : m_reportManager.getDomains(period)) {
if (myId.endsWith(type)) {
String prefix = myId.substring(0, myId.lastIndexOf("-"));
report.getIds().add(prefix);
}
}
return report;
}
@Override
protected void process(MessageTree tree) {
Message message = tree.getMessage();
if (message instanceof Transaction) {
Transaction root = (Transaction) message;
processTransaction(tree, root);
}
}
private void processCacheTransaction(MessageTree tree, Transaction t) {
String ip = "";
String domain = tree.getDomain();
String cacheName = t.getType().substring(6);
String value = t.getName();
String method = value.substring(value.lastIndexOf(":") + 1);
List<Message> messages = t.getChildren();
for (Message message : messages) {
if (message instanceof Event) {
String type = message.getType();
if (type.equals("Cache.memcached.server")) {
ip = message.getName();
int index = ip.indexOf(":");
if (index > -1) {
ip = ip.substring(0, index);
}
}
}
}
if (StringUtils.isNotEmpty(method) && StringUtils.isNotEmpty(ip)) {
String id = queryCacheId(cacheName);
updateStorageReport(id, method, ip, domain, t);
}
}
private void processSQLTransaction(MessageTree tree, Transaction t) {
String databaseName = "";
String method = "";
String ip = "";
String domain = tree.getDomain();
List<Message> messages = t.getChildren();
for (Message message : messages) {
if (message instanceof Event) {
String type = message.getType();
if (type.equals("SQL.Method")) {
method = message.getName().toLowerCase();
}
if (type.equals("SQL.Database")) {
Pair<String, String> pair = queryDatabaseName(message.getName());
if (pair != null) {
ip = pair.getKey();
databaseName = pair.getValue();
}
}
}
}
if (StringUtils.isNotEmpty(databaseName) && StringUtils.isNotEmpty(method) && StringUtils.isNotEmpty(ip)) {
String id = querySQLId(databaseName);
updateStorageReport(id, method, ip, domain, t);
}
}
private void processTransaction(MessageTree tree, Transaction t) {
if (m_serverConfigManager.discardTransaction(t)) {
return;
} else {
String type = t.getType();
if (m_serverConfigManager.isSQLTransaction(type)) {
processSQLTransaction(tree, t);
} else if (m_serverConfigManager.isCacheTransaction(type)) {
processCacheTransaction(tree, t);
}
}
List<Message> children = t.getChildren();
for (Message child : children) {
if (child instanceof Transaction) {
processTransaction(tree, (Transaction) child);
}
}
}
private String queryCacheId(String name) {
return name + "-Cache";
}
private Pair<String, String> queryDatabaseName(String name) {
Pair<String, String> pair = m_connections.get(name);
if (pair == null && StringUtils.isNotEmpty(name)) {
try {
if (name.contains("jdbc:mysql://")) {
String con = name.split("jdbc:mysql://")[1];
con = con.split("\\?")[0];
String ip = con.substring(0, con.indexOf(":"));
String database = con.substring(con.indexOf("/") + 1);
pair = new Pair<String, String>(ip, database);
m_connections.put(name, pair);
} else if (name.contains("jdbc:sqlserver://")) {
String con = name.split("jdbc:sqlserver://")[1];
String ip = con.substring(0, con.indexOf(":"));
String field = name.split("DatabaseName=")[1];
String database = field.substring(0, field.indexOf(";"));
pair = new Pair<String, String>(ip, database);
m_connections.put(name, pair);
} else {
Cat.logError(new RuntimeException("Unrecognized jdbc connection string: " + name));
}
} catch (Exception e) {
Cat.logError(e);
}
}
return pair;
}
private String querySQLId(String name) {
return name + "-SQL";
}
private void updateStorageReport(String id, String method, String ip, String domain, Transaction t) {
StorageReport report = m_reportManager.getHourlyReport(getStartTime(), id, true);
Domain d = report.findOrCreateMachine(ip).findOrCreateDomain(domain);
long current = t.getTimestamp() / 1000 / 60;
int min = (int) (current % (60));
Operation operation = d.findOrCreateOperation(method);
Segment segment = operation.findOrCreateSegment(min);
long duration = t.getDurationInMillis();
report.addIp(ip);
operation.incCount();
operation.incSum(duration);
operation.setAvg(operation.getSum() / operation.getCount());
segment.incCount();
segment.incSum(duration);
segment.setAvg(segment.getSum() / segment.getCount());
if (!t.isSuccess()) {
operation.incError();
segment.incError();
}
if (duration > LONG_THRESHOLD) {
operation.incLongCount();
segment.incLongCount();
}
}
}
package com.dianping.cat.consumer.storage;
import java.util.Date;
import java.util.Map;
import java.util.Set;
import org.unidal.lookup.annotation.Inject;
import com.dianping.cat.configuration.ServerConfigManager;
import com.dianping.cat.consumer.storage.model.entity.StorageReport;
import com.dianping.cat.consumer.storage.model.transform.DefaultNativeBuilder;
import com.dianping.cat.consumer.storage.model.transform.DefaultNativeParser;
import com.dianping.cat.consumer.storage.model.transform.DefaultSaxParser;
import com.dianping.cat.service.ReportDelegate;
import com.dianping.cat.task.TaskManager;
import com.dianping.cat.task.TaskManager.TaskProlicy;
public class StorageDelegate implements ReportDelegate<StorageReport> {
@Inject
private TaskManager m_taskManager;
@Inject
private ServerConfigManager m_manager;
@Override
public void afterLoad(Map<String, StorageReport> reports) {
// TODO Auto-generated method stub
}
@Override
public void beforeSave(Map<String, StorageReport> reports) {
for (StorageReport report : reports.values()) {
Set<String> domainNames = report.getIds();
domainNames.clear();
domainNames.addAll(reports.keySet());
}
}
@Override
public byte[] buildBinary(StorageReport report) {
return DefaultNativeBuilder.build(report);
}
@Override
public StorageReport parseBinary(byte[] bytes) {
return DefaultNativeParser.parse(bytes);
}
@Override
public String buildXml(StorageReport report) {
return report.toString();
}
@Override
public String getDomain(StorageReport report) {
return report.getId();
}
@Override
public StorageReport makeReport(String id, long startTime, long duration) {
StorageReport report = new StorageReport(id);
int index = id.lastIndexOf("-");
String name = id.substring(0, index);
String type = id.substring(index + 1);
report.setName(name).setType(type);
report.setStartTime(new Date(startTime)).setEndTime(new Date(startTime + duration - 1));
return report;
}
@Override
public StorageReport mergeReport(StorageReport old, StorageReport other) {
StorageReportMerger merger = new StorageReportMerger(old);
other.accept(merger);
return old;
}
@Override
public StorageReport parseXml(String xml) throws Exception {
StorageReport report = DefaultSaxParser.parse(xml);
return report;
}
@Override
public boolean createHourlyTask(StorageReport report) {
String id = report.getId();
if (m_manager.validateDomain(id)) {
return m_taskManager.createTask(report.getStartTime(), id, StorageAnalyzer.ID, TaskProlicy.ALL_EXCLUED_HOURLY);
} else {
return true;
}
}
}
package com.dianping.cat.consumer.storage;
import com.dianping.cat.consumer.storage.model.entity.Operation;
import com.dianping.cat.consumer.storage.model.entity.Segment;
import com.dianping.cat.consumer.storage.model.entity.StorageReport;
import com.dianping.cat.consumer.storage.model.transform.DefaultMerger;
public class StorageReportMerger extends DefaultMerger {
public StorageReportMerger(StorageReport storageReport) {
super(storageReport);
}
@Override
protected void mergeOperation(Operation to, Operation from) {
to.setCount(to.getCount() + from.getCount());
to.setLongCount(to.getLongCount() + from.getLongCount());
to.setError(to.getError() + from.getError());
to.setSum(to.getSum() + from.getSum());
to.setAvg(to.getSum() / to.getCount());
}
@Override
protected void mergeSegment(Segment to, Segment from) {
to.setCount(to.getCount() + from.getCount());
to.setLongCount(to.getLongCount() + from.getLongCount());
to.setError(to.getError() + from.getError());
to.setSum(to.getSum() + from.getSum());
to.setAvg(to.getSum() / to.getCount());
}
}
......@@ -2,10 +2,11 @@
<model>
<entity name="storage-report" root="true">
<attribute name="id" value-type="String" />
<attribute name="domain" value-type="String" />
<attribute name="name" value-type="String" />
<attribute name="type" value-type="String" />
<attribute name="startTime" value-type="Date" format="yyyy-MM-dd HH:mm:ss" />
<attribute name="endTime" value-type="Date" format="yyyy-MM-dd HH:mm:ss" />
<element name="id" value-type="String" type="list" names="ids" />
<entity-ref name="machine" type="list" names="machines" />
</entity>
<entity name="machine">
......
<?xml version="1.0" encoding="UTF-8"?>
<model model-package="com.dianping.cat.home.storage" enable-merger="true" enable-sax-parser="true"
<model model-package="com.dianping.cat.consumer.storage.model" enable-merger="true" enable-sax-parser="true"
enable-base-visitor="true" enable-native-parser="true" enable-native-builder="true">
<entity name="storage-report" root="true">
<attribute name="id" key="true" />
<attribute name="id" value-type="String" />
<attribute name="domain" value-type="String" />
<attribute name="name" value-type="String" />
<attribute name="type" value-type="String" />
<attribute name="startTime" value-type="Date" format="yyyy-MM-dd HH:mm:ss" />
<attribute name="endTime" value-type="Date" format="yyyy-MM-dd HH:mm:ss" />
<entity-ref name="machine" type="map" names="machines" />
<element name="id" value-type="String" type="set" names="ids" />
<element name="ip" value-type="String" type="set" names="ips" />
<entity-ref name="machine" type="map" names="machines" method-find-or-create="true"/>
</entity>
<entity name="machine">
<attribute name="id" value-type="String" key="true"/>
<entity-ref name="domain" type="map" names="domains" />
<entity-ref name="domain" type="map" names="domains" method-find-or-create="true"/>
</entity>
<entity name="domain">
<attribute name="id" value-type="String" key="true"/>
<entity-ref name="operation" type="map" names="operations" />
<entity-ref name="operation" type="map" names="operations" method-find-or-create="true"/>
</entity>
<entity name="operation">
<attribute name="id" value-type="String" key="true"/>
......@@ -24,7 +27,7 @@
<attribute name="sum" value-type="double" primitive="true" format="0.0" method-inc="true" />
<attribute name="error" value-type="long" primitive="true" method-inc="true" />
<attribute name="long-count" value-type="long" primitive="true" method-inc="true" />
<entity-ref name="segment" type="map" names="segments" />
<entity-ref name="segment" type="map" names="segments" method-find-or-create="true"/>
</entity>
<entity name="segment">
<attribute name="id" value-type="int" primitive="true" key="true"/>
......
......@@ -619,6 +619,61 @@
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.analysis.MessageAnalyzer</role>
<role-hint>storage</role-hint>
<implementation>com.dianping.cat.consumer.storage.StorageAnalyzer</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<requirements>
<requirement>
<role>com.dianping.cat.service.ReportManager</role>
<role-hint>storage</role-hint>
</requirement>
<requirement>
<role>com.dianping.cat.service.ReportDelegate</role>
<role-hint>storage</role-hint>
</requirement>
<requirement>
<role>com.dianping.cat.configuration.ServerConfigManager</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.service.ReportManager</role>
<role-hint>storage</role-hint>
<implementation>com.dianping.cat.service.DefaultReportManager</implementation>
<configuration>
<name>storage</name>
</configuration>
<requirements>
<requirement>
<role>com.dianping.cat.service.ReportDelegate</role>
<role-hint>storage</role-hint>
</requirement>
<requirement>
<role>com.dianping.cat.storage.report.ReportBucketManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.core.dal.HourlyReportDao</role>
</requirement>
<requirement>
<role>com.dianping.cat.core.dal.HourlyReportContentDao</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.service.ReportDelegate</role>
<role-hint>storage</role-hint>
<implementation>com.dianping.cat.consumer.storage.StorageDelegate</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.task.TaskManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.configuration.ServerConfigManager</role>
</requirement>
</requirements>
</component>
<component>
<role>org.unidal.initialization.Module</role>
<role-hint>cat-consumer</role-hint>
......
......@@ -24,4 +24,7 @@
<model package="com.dianping.cat.advanced.browser-meta-report" name="browser-meta-report">
<sample-model>src/test/resources/com/dianping/cat/consumer/model/browser-meta.xml</sample-model>
</model>
<model package="com.dianping.cat.consumer.model" name="storage-report">
<sample-model>src/test/resources/com/dianping/cat/consumer/storage/storage.xml</sample-model>
</model>
</wizard>
<?xml version="1.0" encoding="utf-8"?>
<storage-report id="cat" domain="${id}+${type}" type="sql" startTime="2012-09-06 20:00:00" endTime="2012-09-06 20:59:59">
<storage-report id="cat-sql" name="cat" type="sql" startTime="2012-09-06 20:00:00" endTime="2012-09-06 20:59:59">
<id>cat</id>
<id>shop-web</id>
<ip>10.10.10.1</ip>
<ip>10.10.10.2</ip>
<machine id="10.1.6.102">
<domain id="All">
<operation id="get" count="100" avg="100" sum="10000" error="100" long-count="100">
<segment id="1" count="50" avg="10" sum="500" error="100" long-count="100"></segment>
<segment id="2" count="50" avg="10" sum="500" error="100" long-count="100"></segment>
</operation>
<operation id="mget" count="100" avg="100" sum="10000" error="100" long-count="100"></operation>
<operation id="add" count="100" avg="100" sum="10000" error="100" long-count="100"></operation>
<operation id="delete" count="100" avg="100" sum="10000" error="100" long-count="100"></operation>
<operation id="mget" count="100" avg="100" sum="10000" error="100" long-count="100">
<segment id="1" count="50" avg="10" sum="500" error="100" long-count="100"></segment>
<segment id="2" count="50" avg="10" sum="500" error="100" long-count="100"></segment>
</operation>
<operation id="add" count="100" avg="100" sum="10000" error="100" long-count="100">
<segment id="1" count="50" avg="10" sum="500" error="100" long-count="100"></segment>
<segment id="2" count="50" avg="10" sum="500" error="100" long-count="100"></segment>
</operation>
<operation id="delete" count="100" avg="100" sum="10000" error="100" long-count="100">
<segment id="1" count="50" avg="10" sum="500" error="100" long-count="100"></segment>
<segment id="2" count="50" avg="10" sum="500" error="100" long-count="100"></segment>
</operation>
</domain>
<domain id="shop-web">
<operation id="get" count="100" avg="100" sum="10000" error="100" long-count="100"></operation>
......
......@@ -376,6 +376,14 @@ public class ServerConfigManager implements Initializable, LogEnabled {
return "PigeonService".equals(type) || "Service".equals(type);
}
public boolean isSQLTransaction(String type) {
return "SQL".equals(type);
}
public boolean isCacheTransaction(String type) {
return StringUtils.isNotEmpty(type) && type.startsWith("Cache.memcached");
}
private long toLong(String str, long defaultValue) {
long value = 0;
int len = str == null ? 0 : str.length();
......@@ -402,7 +410,7 @@ public class ServerConfigManager implements Initializable, LogEnabled {
public boolean validateDomain(String domain) {
return !m_invalidateDomains.contains(domain) && StringUtils.isNotEmpty(domain);
}
public boolean validateIp(String str) {
Pattern pattern = Pattern
.compile("^((\\d|[1-9]\\d|1\\d\\d|2[0-4]\\d|25[0-5]|[*])\\.){3}(\\d|[1-9]\\d|1\\d\\d|2[0-4]\\d|25[0-5]|[*])$");
......
......@@ -16,6 +16,7 @@ import com.dianping.cat.consumer.matrix.MatrixAnalyzer;
import com.dianping.cat.consumer.metric.MetricAnalyzer;
import com.dianping.cat.consumer.problem.ProblemAnalyzer;
import com.dianping.cat.consumer.state.StateAnalyzer;
import com.dianping.cat.consumer.storage.StorageAnalyzer;
import com.dianping.cat.consumer.top.TopAnalyzer;
import com.dianping.cat.consumer.transaction.TransactionAnalyzer;
import com.dianping.cat.core.dal.DailyReportDao;
......@@ -73,6 +74,9 @@ public class ReportServiceComponentConfigurator extends AbstractResourceConfigur
all.add(C(ReportService.class, StateAnalyzer.ID, StateReportService.class).req(HourlyReportDao.class,
DailyReportDao.class, WeeklyReportDao.class, MonthlyReportDao.class, HourlyReportContentDao.class,
DailyReportContentDao.class, WeeklyReportContentDao.class, MonthlyReportContentDao.class));
all.add(C(ReportService.class, StorageAnalyzer.ID, CrossReportService.class).req(HourlyReportDao.class,
DailyReportDao.class, WeeklyReportDao.class, MonthlyReportDao.class, HourlyReportContentDao.class,
DailyReportContentDao.class, WeeklyReportContentDao.class, MonthlyReportContentDao.class));
all.add(C(ReportService.class, Constants.REPORT_BUG, BugReportService.class).req(HourlyReportDao.class,
DailyReportDao.class, WeeklyReportDao.class, MonthlyReportDao.class, HourlyReportContentDao.class,
......
......@@ -16,6 +16,7 @@ import com.dianping.cat.consumer.matrix.MatrixAnalyzer;
import com.dianping.cat.consumer.metric.MetricAnalyzer;
import com.dianping.cat.consumer.problem.ProblemAnalyzer;
import com.dianping.cat.consumer.state.StateAnalyzer;
import com.dianping.cat.consumer.storage.StorageAnalyzer;
import com.dianping.cat.consumer.top.TopAnalyzer;
import com.dianping.cat.consumer.transaction.TransactionAnalyzer;
import com.dianping.cat.hadoop.hdfs.HdfsMessageBucketManager;
......@@ -51,6 +52,9 @@ import com.dianping.cat.report.page.model.spi.ModelService;
import com.dianping.cat.report.page.model.state.CompositeStateService;
import com.dianping.cat.report.page.model.state.HistoricalStateService;
import com.dianping.cat.report.page.model.state.LocalStateService;
import com.dianping.cat.report.page.model.storage.CompositeStorageService;
import com.dianping.cat.report.page.model.storage.HistoricalStorageService;
import com.dianping.cat.report.page.model.storage.LocalStorageService;
import com.dianping.cat.report.page.model.top.CompositeTopService;
import com.dianping.cat.report.page.model.top.HistoricalTopService;
import com.dianping.cat.report.page.model.top.LocalTopService;
......@@ -138,6 +142,14 @@ class ServiceComponentConfigurator extends AbstractResourceConfigurator {
.req(ServerConfigManager.class) //
.req(ModelService.class, new String[] { "dependency-historical" }, "m_services"));
all.add(C(ModelService.class, "storage-local", LocalStorageService.class) //
.req(ReportBucketManager.class, MessageConsumer.class, ServerConfigManager.class));
all.add(C(ModelService.class, "storage-historical", HistoricalStorageService.class) //
.req(ReportBucketManager.class, ReportServiceManager.class, ServerConfigManager.class));
all.add(C(ModelService.class, StorageAnalyzer.ID, CompositeStorageService.class) //
.req(ServerConfigManager.class) //
.req(ModelService.class, new String[] { "storage-historical" }, "m_services"));
all.add(C(ModelService.class, "metric-local", LocalMetricService.class) //
.req(ReportBucketManager.class, MessageConsumer.class, ServerConfigManager.class));
all.add(C(ModelService.class, "metric-historical", HistoricalMetricService.class) //
......
......@@ -37,6 +37,7 @@ import com.dianping.cat.consumer.problem.model.entity.JavaThread;
import com.dianping.cat.consumer.problem.model.entity.Machine;
import com.dianping.cat.consumer.problem.model.entity.Segment;
import com.dianping.cat.consumer.state.StateAnalyzer;
import com.dianping.cat.consumer.storage.StorageAnalyzer;
import com.dianping.cat.consumer.top.TopAnalyzer;
import com.dianping.cat.consumer.transaction.TransactionAnalyzer;
import com.dianping.cat.consumer.transaction.model.IEntity;
......@@ -60,6 +61,7 @@ import com.dianping.cat.report.page.model.metric.LocalMetricService;
import com.dianping.cat.report.page.model.problem.LocalProblemService;
import com.dianping.cat.report.page.model.spi.ModelService;
import com.dianping.cat.report.page.model.state.LocalStateService;
import com.dianping.cat.report.page.model.storage.LocalStorageService;
import com.dianping.cat.report.page.model.top.LocalTopService;
import com.dianping.cat.report.page.model.transaction.LocalTransactionService;
import com.dianping.cat.report.page.system.graph.SystemReportConvertor;
......@@ -104,6 +106,9 @@ public class Handler extends ContainerHolder implements PageHandler<Context> {
@Inject(type = ModelService.class, value = "transaction-local")
private LocalTransactionService m_transactionService;
@Inject(type = ModelService.class, value = "storage-local")
private LocalStorageService m_storageService;
@Inject
private IpService m_ipService;
......@@ -165,6 +170,8 @@ public class Handler extends ContainerHolder implements PageHandler<Context> {
} else if (DependencyAnalyzer.ID.equals(report)) {
return new DependencyReportFilter()
.buildXml((com.dianping.cat.consumer.dependency.model.IEntity<?>) dataModel);
} else if (StorageAnalyzer.ID.equals(report)) {
return new StorageReportFilter().buildXml((com.dianping.cat.consumer.storage.model.IEntity<?>) dataModel);
} else {
return String.valueOf(dataModel);
}
......@@ -230,6 +237,8 @@ public class Handler extends ContainerHolder implements PageHandler<Context> {
response = processMetricRequest(payload, request);
} else if (DependencyAnalyzer.ID.equals(report)) {
response = m_dependencyService.invoke(request);
} else if (StorageAnalyzer.ID.equals(report)) {
response = m_storageService.invoke(request);
} else {
throw new RuntimeException("Unsupported report: " + report + "!");
}
......@@ -462,6 +471,12 @@ public class Handler extends ContainerHolder implements PageHandler<Context> {
}
}
public static class StorageReportFilter extends com.dianping.cat.consumer.storage.model.transform.DefaultXmlBuilder {
public StorageReportFilter() {
super(true, new StringBuilder(DEFAULT_SIZE));
}
}
public static class TransactionReportFilter extends
com.dianping.cat.consumer.transaction.model.transform.DefaultXmlBuilder {
private String m_ipAddress;
......
package com.dianping.cat.report.page.model.storage;
import java.util.List;
import com.dianping.cat.consumer.storage.StorageAnalyzer;
import com.dianping.cat.consumer.storage.StorageReportMerger;
import com.dianping.cat.consumer.storage.model.entity.StorageReport;
import com.dianping.cat.report.page.model.spi.internal.BaseCompositeModelService;
import com.dianping.cat.report.page.model.spi.internal.BaseRemoteModelService;
import com.dianping.cat.service.ModelRequest;
import com.dianping.cat.service.ModelResponse;
public class CompositeStorageService extends BaseCompositeModelService<StorageReport> {
public CompositeStorageService() {
super(StorageAnalyzer.ID);
}
@Override
protected BaseRemoteModelService<StorageReport> createRemoteService() {
return new RemoteStorageService();
}
@Override
protected StorageReport merge(ModelRequest request, List<ModelResponse<StorageReport>> responses) {
if (responses.size() == 0) {
return null;
}
StorageReportMerger merger = new StorageReportMerger(new StorageReport(request.getDomain()));
for (ModelResponse<StorageReport> response : responses) {
if (response != null) {
StorageReport model = response.getModel();
if (model != null) {
model.accept(merger);
}
}
}
return merger.getStorageReport();
}
}
package com.dianping.cat.report.page.model.storage;
import java.util.Date;
import org.unidal.lookup.annotation.Inject;
import com.dianping.cat.consumer.storage.StorageAnalyzer;
import com.dianping.cat.consumer.storage.model.entity.StorageReport;
import com.dianping.cat.consumer.storage.model.transform.DefaultSaxParser;
import com.dianping.cat.helper.TimeHelper;
import com.dianping.cat.report.page.model.spi.internal.BaseHistoricalModelService;
import com.dianping.cat.report.service.ReportServiceManager;
import com.dianping.cat.service.ModelRequest;
import com.dianping.cat.storage.report.ReportBucket;
import com.dianping.cat.storage.report.ReportBucketManager;
public class HistoricalStorageService extends BaseHistoricalModelService<StorageReport> {
@Inject
private ReportBucketManager m_bucketManager;
@Inject
private ReportServiceManager m_reportService;
public HistoricalStorageService() {
super(StorageAnalyzer.ID);
}
@Override
protected StorageReport buildModel(ModelRequest request) throws Exception {
String domain = request.getDomain();
long date = request.getStartTime();
StorageReport report;
if (isLocalMode()) {
report = getReportFromLocalDisk(date, domain);
} else {
report = getReportFromDatabase(date, domain);
}
return report;
}
private StorageReport getReportFromDatabase(long timestamp, String id) throws Exception {
return m_reportService.queryStorageReport(id, new Date(timestamp), new Date(timestamp + TimeHelper.ONE_HOUR));
}
private StorageReport getReportFromLocalDisk(long timestamp, String id) throws Exception {
ReportBucket<String> bucket = null;
try {
bucket = m_bucketManager.getReportBucket(timestamp, StorageAnalyzer.ID);
String xml = bucket.findById(id);
return xml == null ? null : DefaultSaxParser.parse(xml);
} finally {
if (bucket != null) {
m_bucketManager.closeBucket(bucket);
}
}
}
}
package com.dianping.cat.report.page.model.storage;
import java.util.Date;
import org.unidal.lookup.annotation.Inject;
import com.dianping.cat.consumer.storage.StorageAnalyzer;
import com.dianping.cat.consumer.storage.model.entity.StorageReport;
import com.dianping.cat.consumer.storage.model.transform.DefaultSaxParser;
import com.dianping.cat.helper.TimeHelper;
import com.dianping.cat.report.page.model.spi.internal.BaseLocalModelService;
import com.dianping.cat.service.ModelPeriod;
import com.dianping.cat.service.ModelRequest;
import com.dianping.cat.storage.report.ReportBucket;
import com.dianping.cat.storage.report.ReportBucketManager;
public class LocalStorageService extends BaseLocalModelService<StorageReport> {
@Inject
private ReportBucketManager m_bucketManager;
public LocalStorageService() {
super(StorageAnalyzer.ID);
}
@Override
protected StorageReport getReport(ModelRequest request, ModelPeriod period, String id) throws Exception {
StorageReport report = super.getReport(request, period, id);
if ((report == null || report.getIps().isEmpty()) && period.isLast()) {
long startTime = request.getStartTime();
report = getReportFromLocalDisk(startTime, id);
}
return report;
}
private StorageReport getReportFromLocalDisk(long timestamp, String id) throws Exception {
ReportBucket<String> bucket = null;
try {
bucket = m_bucketManager.getReportBucket(timestamp, StorageAnalyzer.ID);
String xml = bucket.findById(id);
StorageReport report = null;
if (xml != null) {
report = DefaultSaxParser.parse(xml);
} else {
report = new StorageReport(id);
report.setStartTime(new Date(timestamp));
report.setEndTime(new Date(timestamp + TimeHelper.ONE_HOUR - 1));
report.getIds().addAll(bucket.getIds());
}
return report;
} finally {
if (bucket != null) {
m_bucketManager.closeBucket(bucket);
}
}
}
}
package com.dianping.cat.report.page.model.storage;
import java.io.IOException;
import org.xml.sax.SAXException;
import com.dianping.cat.consumer.storage.StorageAnalyzer;
import com.dianping.cat.consumer.storage.model.entity.StorageReport;
import com.dianping.cat.consumer.storage.model.transform.DefaultSaxParser;
import com.dianping.cat.report.page.model.spi.internal.BaseRemoteModelService;
public class RemoteStorageService extends BaseRemoteModelService<StorageReport> {
public RemoteStorageService() {
super(StorageAnalyzer.ID);
}
@Override
protected StorageReport buildModel(String xml) throws SAXException, IOException {
return DefaultSaxParser.parse(xml);
}
}
......@@ -29,6 +29,8 @@ import com.dianping.cat.consumer.problem.ProblemAnalyzer;
import com.dianping.cat.consumer.problem.model.entity.ProblemReport;
import com.dianping.cat.consumer.state.StateAnalyzer;
import com.dianping.cat.consumer.state.model.entity.StateReport;
import com.dianping.cat.consumer.storage.StorageAnalyzer;
import com.dianping.cat.consumer.storage.model.entity.StorageReport;
import com.dianping.cat.consumer.top.TopAnalyzer;
import com.dianping.cat.consumer.top.model.entity.TopReport;
import com.dianping.cat.consumer.transaction.TransactionAnalyzer;
......@@ -338,4 +340,11 @@ public class DefaultReportServiceManager extends ContainerHolder implements Repo
return reportService.queryReport(domain, start, end);
}
@Override
public StorageReport queryStorageReport(String domain, Date start, Date end) {
ReportService<StorageReport> reportService = m_reportServices.get(StorageAnalyzer.ID);
return reportService.queryReport(domain, start, end);
}
}
......@@ -11,6 +11,7 @@ import com.dianping.cat.consumer.matrix.model.entity.MatrixReport;
import com.dianping.cat.consumer.metric.model.entity.MetricReport;
import com.dianping.cat.consumer.problem.model.entity.ProblemReport;
import com.dianping.cat.consumer.state.model.entity.StateReport;
import com.dianping.cat.consumer.storage.model.entity.StorageReport;
import com.dianping.cat.consumer.top.model.entity.TopReport;
import com.dianping.cat.consumer.transaction.model.entity.TransactionReport;
import com.dianping.cat.core.dal.DailyReport;
......@@ -79,4 +80,6 @@ public interface ReportServiceManager {
public JarReport queryJarReport(String domain, Date start, Date end);
public SystemReport querySystemReport(String domain, Date start, Date end);
public StorageReport queryStorageReport(String domain, Date start, Date end);
}
package com.dianping.cat.report.service.impl;
import java.util.Date;
import com.dianping.cat.consumer.storage.model.entity.StorageReport;
import com.dianping.cat.report.service.AbstractReportService;
public class StorageReportService extends AbstractReportService<StorageReport> {
@Override
public StorageReport makeReport(String id, Date start, Date end) {
StorageReport report = new StorageReport(id);
int index = id.lastIndexOf("-");
String name = id.substring(0, index);
String type = id.substring(index + 1);
report.setName(name).setType(type);
report.setStartTime(start).setEndTime(end);
return report;
}
@Override
public StorageReport queryDailyReport(String id, Date start, Date end) {
// TODO Auto-generated method stub
return null;
}
@Override
public StorageReport queryHourlyReport(String id, Date start, Date end) {
// TODO Auto-generated method stub
return null;
}
@Override
public StorageReport queryMonthlyReport(String id, Date start) {
// TODO Auto-generated method stub
return null;
}
@Override
public StorageReport queryWeeklyReport(String id, Date start) {
// TODO Auto-generated method stub
return null;
}
}
......@@ -44,7 +44,7 @@ public class CrossReportBuilder implements TaskBuilder {
@Override
public boolean buildHourlyTask(String name, String domain, Date period) {
throw new RuntimeException("Cross report don't support HourReport!");
throw new RuntimeException("Cross report don't support HourlyReport!");
}
@Override
......
......@@ -605,6 +605,37 @@
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.report.service.ReportService</role>
<role-hint>storage</role-hint>
<implementation>com.dianping.cat.report.service.impl.CrossReportService</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.core.dal.HourlyReportDao</role>
</requirement>
<requirement>
<role>com.dianping.cat.core.dal.DailyReportDao</role>
</requirement>
<requirement>
<role>com.dianping.cat.core.dal.WeeklyReportDao</role>
</requirement>
<requirement>
<role>com.dianping.cat.core.dal.MonthlyReportDao</role>
</requirement>
<requirement>
<role>com.dianping.cat.core.dal.HourlyReportContentDao</role>
</requirement>
<requirement>
<role>com.dianping.cat.home.dal.report.DailyReportContentDao</role>
</requirement>
<requirement>
<role>com.dianping.cat.home.dal.report.WeeklyReportContentDao</role>
</requirement>
<requirement>
<role>com.dianping.cat.home.dal.report.MonthlyReportContentDao</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.report.service.ReportService</role>
<role-hint>bug</role-hint>
......@@ -2087,6 +2118,55 @@
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.report.page.model.spi.ModelService</role>
<role-hint>storage-local</role-hint>
<implementation>com.dianping.cat.report.page.model.storage.LocalStorageService</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.storage.report.ReportBucketManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.message.spi.core.MessageConsumer</role>
</requirement>
<requirement>
<role>com.dianping.cat.configuration.ServerConfigManager</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.report.page.model.spi.ModelService</role>
<role-hint>storage-historical</role-hint>
<implementation>com.dianping.cat.report.page.model.storage.HistoricalStorageService</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.storage.report.ReportBucketManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.report.service.ReportServiceManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.configuration.ServerConfigManager</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.report.page.model.spi.ModelService</role>
<role-hint>storage</role-hint>
<implementation>com.dianping.cat.report.page.model.storage.CompositeStorageService</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.configuration.ServerConfigManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.report.page.model.spi.ModelService</role>
<role-hints>
<role-hint>storage-historical</role-hint>
</role-hints>
<field-name>m_services</field-name>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.report.page.model.spi.ModelService</role>
<role-hint>metric-local</role-hint>
......@@ -3630,6 +3710,11 @@
<role-hint>transaction-local</role-hint>
<field-name>m_transactionService</field-name>
</requirement>
<requirement>
<role>com.dianping.cat.report.page.model.spi.ModelService</role>
<role-hint>storage-local</role-hint>
<field-name>m_storageService</field-name>
</requirement>
<requirement>
<role>com.dianping.cat.service.IpService</role>
</requirement>
......@@ -3809,6 +3894,21 @@
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.report.page.model.storage.LocalStorageService</role>
<implementation>com.dianping.cat.report.page.model.storage.LocalStorageService</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.storage.report.ReportBucketManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.message.spi.core.MessageConsumer</role>
</requirement>
<requirement>
<role>com.dianping.cat.configuration.ServerConfigManager</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.service.IpService</role>
<implementation>com.dianping.cat.service.IpService</implementation>
......
......@@ -84,7 +84,4 @@
<model package="com.dianping.cat.home.exception" name="exception-rule-config">
<sample-model>src/main/resources/config/exceptionRuleConfig.xml</sample-model>
</model>
<model package="com.dianping.cat.home.storage" name="storage-report">
<sample-model>src/test/resources/com/dianping/cat/report/page/storage/storage.xml</sample-model>
</model>
</wizard>
package com.dianping.cat.demo;
import org.junit.Test;
import com.dianping.cat.Cat;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
public class TestStorageMessage {
@Test
public void testCross() throws Exception {
String serverIp = "10.10.10.";
for (int i = 0; i < 10; i++) {
sendCacheMsg("Cat-Call", "cat", serverIp + i, "1000", "catServer1", serverIp + ":8080");
sendCacheMsg("Cat-Call", "cat", serverIp + i, "1000", "catServer2", serverIp + ":8081");
sendCacheMsg("Cat-Call", "cat", serverIp + i, "1000", "catServer1", serverIp + ":8080");
sendCacheMsg("Cat-Call", "cat", serverIp + i, "1000", "catServer2", serverIp + ":8081");
}
Thread.sleep(10000);
}
private void sendCacheMsg(String method, String client, String clientIp, String port, String server, String serverIp) {
Transaction t = Cat.newTransaction("Cache.memcached", "oUserAuthLevel:" + method);
Cat.logEvent("PigeonCall.server", serverIp);
Cat.logEvent("PigeonCall.app", server);
Cat.logEvent("PigeonCall.port", port);
MessageTree tree = Cat.getManager().getThreadLocalMessageTree();
((DefaultMessageTree) tree).setDomain(client);
((DefaultMessageTree) tree).setIpAddress(clientIp);
t.setStatus(Transaction.SUCCESS);
t.complete();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册