From 1476219bd9915ee43daca4ac63373015ab389345 Mon Sep 17 00:00:00 2001 From: jialinsun Date: Wed, 11 Feb 2015 11:05:44 +0800 Subject: [PATCH] anaylzer milestone --- cat-consumer/pom.xml | 4 +- .../build/ComponentsConfigurator.java | 18 ++ .../cat/consumer/storage/StorageAnalyzer.java | 240 ++++++++++++++++++ .../cat/consumer/storage/StorageDelegate.java | 99 ++++++++ .../consumer/storage/StorageReportMerger.java | 31 +++ .../dal/model/storage-report-codegen.xml | 3 +- .../dal/model/storage-report-manifest.xml | 0 .../dal/model/storage-report-model.xml | 15 +- .../resources/META-INF/plexus/components.xml | 55 ++++ .../META-INF/wizard/model/wizard.xml | 3 + .../cat/consumer}/storage/storage.xml | 21 +- .../configuration/ServerConfigManager.java | 10 +- .../ReportServiceComponentConfigurator.java | 4 + .../build/ServiceComponentConfigurator.java | 12 + .../cat/report/page/model/Handler.java | 15 ++ .../storage/CompositeStorageService.java | 40 +++ .../storage/HistoricalStorageService.java | 60 +++++ .../model/storage/LocalStorageService.java | 59 +++++ .../model/storage/RemoteStorageService.java | 21 ++ .../service/DefaultReportServiceManager.java | 9 + .../report/service/ReportServiceManager.java | 3 + .../service/impl/StorageReportService.java | 46 ++++ .../report/task/cross/CrossReportBuilder.java | 2 +- .../resources/META-INF/plexus/components.xml | 100 ++++++++ .../META-INF/wizard/model/wizard.xml | 3 - .../dianping/cat/demo/TestStorageMessage.java | 39 +++ 26 files changed, 895 insertions(+), 17 deletions(-) create mode 100644 cat-consumer/src/main/java/com/dianping/cat/consumer/storage/StorageAnalyzer.java create mode 100644 cat-consumer/src/main/java/com/dianping/cat/consumer/storage/StorageDelegate.java create mode 100644 cat-consumer/src/main/java/com/dianping/cat/consumer/storage/StorageReportMerger.java rename {cat-home => cat-consumer}/src/main/resources/META-INF/dal/model/storage-report-codegen.xml (90%) rename {cat-home => cat-consumer}/src/main/resources/META-INF/dal/model/storage-report-manifest.xml (100%) rename {cat-home => cat-consumer}/src/main/resources/META-INF/dal/model/storage-report-model.xml (69%) rename {cat-home/src/test/resources/com/dianping/cat/report/page => cat-consumer/src/test/resources/com/dianping/cat/consumer}/storage/storage.xml (55%) create mode 100755 cat-home/src/main/java/com/dianping/cat/report/page/model/storage/CompositeStorageService.java create mode 100755 cat-home/src/main/java/com/dianping/cat/report/page/model/storage/HistoricalStorageService.java create mode 100755 cat-home/src/main/java/com/dianping/cat/report/page/model/storage/LocalStorageService.java create mode 100755 cat-home/src/main/java/com/dianping/cat/report/page/model/storage/RemoteStorageService.java create mode 100644 cat-home/src/main/java/com/dianping/cat/report/service/impl/StorageReportService.java create mode 100644 cat-home/src/test/java/com/dianping/cat/demo/TestStorageMessage.java diff --git a/cat-consumer/pom.xml b/cat-consumer/pom.xml index fecd21e3c..d10e1ae3b 100644 --- a/cat-consumer/pom.xml +++ b/cat-consumer/pom.xml @@ -61,7 +61,9 @@ ${basedir}/src/main/resources/META-INF/dal/model/matrix-report-manifest.xml, ${basedir}/src/main/resources/META-INF/dal/model/metric-report-manifest.xml, ${basedir}/src/main/resources/META-INF/dal/model/dependency-report-manifest.xml, - ${basedir}/src/main/resources/META-INF/dal/model/metric-config-manifest.xml, + ${basedir}/src/main/resources/META-INF/dal/model/metric-config-manifest.xml, + ${basedir}/src/main/resources/META-INF/dal/model/storage-report-manifest.xml, + diff --git a/cat-consumer/src/main/java/com/dianping/cat/consumer/build/ComponentsConfigurator.java b/cat-consumer/src/main/java/com/dianping/cat/consumer/build/ComponentsConfigurator.java index 19599fdef..cd280661a 100644 --- a/cat-consumer/src/main/java/com/dianping/cat/consumer/build/ComponentsConfigurator.java +++ b/cat-consumer/src/main/java/com/dianping/cat/consumer/build/ComponentsConfigurator.java @@ -40,6 +40,8 @@ import com.dianping.cat.consumer.problem.ProblemHandler; import com.dianping.cat.consumer.productline.ProductLineConfigManager; import com.dianping.cat.consumer.state.StateAnalyzer; import com.dianping.cat.consumer.state.StateDelegate; +import com.dianping.cat.consumer.storage.StorageAnalyzer; +import com.dianping.cat.consumer.storage.StorageDelegate; import com.dianping.cat.consumer.top.TopAnalyzer; import com.dianping.cat.consumer.top.TopDelegate; import com.dianping.cat.consumer.transaction.TransactionAnalyzer; @@ -83,6 +85,7 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator { all.addAll(defineMatrixComponents()); all.addAll(defineDependencyComponents()); all.addAll(defineMetricComponents()); + all.addAll(defineStorageComponents()); all.add(C(Module.class, CatConsumerModule.ID, CatConsumerModule.class)); all.addAll(new CatDatabaseConfigurator().defineComponents()); @@ -263,4 +266,19 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator { return all; } + + private Collection defineStorageComponents() { + final List all = new ArrayList(); + final String ID = StorageAnalyzer.ID; + + all.add(C(MessageAnalyzer.class, ID, StorageAnalyzer.class).is(PER_LOOKUP) // + .req(ReportManager.class, ID).req(ReportDelegate.class, ID).req(ServerConfigManager.class)); + all.add(C(ReportManager.class, ID, DefaultReportManager.class) // + .req(ReportDelegate.class, ID) // + .req(ReportBucketManager.class, HourlyReportDao.class, HourlyReportContentDao.class) // + .config(E("name").value(ID))); + all.add(C(ReportDelegate.class, ID, StorageDelegate.class).req(TaskManager.class, ServerConfigManager.class)); + + return all; + } } diff --git a/cat-consumer/src/main/java/com/dianping/cat/consumer/storage/StorageAnalyzer.java b/cat-consumer/src/main/java/com/dianping/cat/consumer/storage/StorageAnalyzer.java new file mode 100644 index 000000000..575cbdd88 --- /dev/null +++ b/cat-consumer/src/main/java/com/dianping/cat/consumer/storage/StorageAnalyzer.java @@ -0,0 +1,240 @@ +package com.dianping.cat.consumer.storage; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.codehaus.plexus.logging.LogEnabled; +import org.codehaus.plexus.logging.Logger; +import org.unidal.lookup.annotation.Inject; +import org.unidal.lookup.util.StringUtils; +import org.unidal.tuple.Pair; + +import com.dianping.cat.Cat; +import com.dianping.cat.analysis.AbstractMessageAnalyzer; +import com.dianping.cat.consumer.storage.model.entity.Domain; +import com.dianping.cat.consumer.storage.model.entity.Operation; +import com.dianping.cat.consumer.storage.model.entity.Segment; +import com.dianping.cat.consumer.storage.model.entity.StorageReport; +import com.dianping.cat.message.Event; +import com.dianping.cat.message.Message; +import com.dianping.cat.message.Transaction; +import com.dianping.cat.message.spi.MessageTree; +import com.dianping.cat.service.DefaultReportManager.StoragePolicy; +import com.dianping.cat.service.ReportManager; + +public class StorageAnalyzer extends AbstractMessageAnalyzer implements LogEnabled { + + @Inject + private StorageDelegate m_storageDelegate; + + @Inject(ID) + private ReportManager m_reportManager; + + public static final String ID = "storage"; + + private static final long LONG_THRESHOLD = 1000; + + private Map> m_connections = new LinkedHashMap>() { + + private static final long serialVersionUID = 1L; + + @Override + protected boolean removeEldestEntry(Entry> eldest) { + return size() > 5000; + } + + }; + + @Override + public void doCheckpoint(boolean atEnd) { + if (atEnd && !isLocalMode()) { + m_reportManager.storeHourlyReports(getStartTime(), StoragePolicy.FILE_AND_DB); + } else { + m_reportManager.storeHourlyReports(getStartTime(), StoragePolicy.FILE); + } + } + + @Override + public void enableLogging(Logger logger) { + m_logger = logger; + } + + @Override + public StorageReport getReport(String id) { + long period = getStartTime(); + StorageReport report = m_reportManager.getHourlyReport(period, id, false); + String type = id.substring(id.lastIndexOf("-") + 1); + + for (String myId : m_reportManager.getDomains(period)) { + if (myId.endsWith(type)) { + String prefix = myId.substring(0, myId.lastIndexOf("-")); + + report.getIds().add(prefix); + } + } + return report; + } + + @Override + protected void process(MessageTree tree) { + Message message = tree.getMessage(); + + if (message instanceof Transaction) { + Transaction root = (Transaction) message; + + processTransaction(tree, root); + } + + } + + private void processCacheTransaction(MessageTree tree, Transaction t) { + String ip = ""; + String domain = tree.getDomain(); + String cacheName = t.getType().substring(6); + String value = t.getName(); + String method = value.substring(value.lastIndexOf(":") + 1); + List messages = t.getChildren(); + + for (Message message : messages) { + if (message instanceof Event) { + String type = message.getType(); + + if (type.equals("Cache.memcached.server")) { + ip = message.getName(); + int index = ip.indexOf(":"); + + if (index > -1) { + ip = ip.substring(0, index); + } + } + } + } + if (StringUtils.isNotEmpty(method) && StringUtils.isNotEmpty(ip)) { + String id = queryCacheId(cacheName); + + updateStorageReport(id, method, ip, domain, t); + } + } + + private void processSQLTransaction(MessageTree tree, Transaction t) { + String databaseName = ""; + String method = ""; + String ip = ""; + String domain = tree.getDomain(); + List messages = t.getChildren(); + + for (Message message : messages) { + if (message instanceof Event) { + String type = message.getType(); + + if (type.equals("SQL.Method")) { + method = message.getName().toLowerCase(); + } + if (type.equals("SQL.Database")) { + Pair pair = queryDatabaseName(message.getName()); + + if (pair != null) { + ip = pair.getKey(); + databaseName = pair.getValue(); + } + } + } + } + if (StringUtils.isNotEmpty(databaseName) && StringUtils.isNotEmpty(method) && StringUtils.isNotEmpty(ip)) { + String id = querySQLId(databaseName); + + updateStorageReport(id, method, ip, domain, t); + } + } + + private void processTransaction(MessageTree tree, Transaction t) { + if (m_serverConfigManager.discardTransaction(t)) { + return; + } else { + String type = t.getType(); + + if (m_serverConfigManager.isSQLTransaction(type)) { + processSQLTransaction(tree, t); + } else if (m_serverConfigManager.isCacheTransaction(type)) { + processCacheTransaction(tree, t); + } + } + List children = t.getChildren(); + + for (Message child : children) { + if (child instanceof Transaction) { + processTransaction(tree, (Transaction) child); + } + } + } + + private String queryCacheId(String name) { + return name + "-Cache"; + } + + private Pair queryDatabaseName(String name) { + Pair pair = m_connections.get(name); + + if (pair == null && StringUtils.isNotEmpty(name)) { + try { + if (name.contains("jdbc:mysql://")) { + String con = name.split("jdbc:mysql://")[1]; + con = con.split("\\?")[0]; + String ip = con.substring(0, con.indexOf(":")); + String database = con.substring(con.indexOf("/") + 1); + pair = new Pair(ip, database); + + m_connections.put(name, pair); + } else if (name.contains("jdbc:sqlserver://")) { + String con = name.split("jdbc:sqlserver://")[1]; + String ip = con.substring(0, con.indexOf(":")); + String field = name.split("DatabaseName=")[1]; + String database = field.substring(0, field.indexOf(";")); + pair = new Pair(ip, database); + + m_connections.put(name, pair); + } else { + Cat.logError(new RuntimeException("Unrecognized jdbc connection string: " + name)); + } + } catch (Exception e) { + Cat.logError(e); + } + } + return pair; + } + + private String querySQLId(String name) { + return name + "-SQL"; + } + + private void updateStorageReport(String id, String method, String ip, String domain, Transaction t) { + StorageReport report = m_reportManager.getHourlyReport(getStartTime(), id, true); + Domain d = report.findOrCreateMachine(ip).findOrCreateDomain(domain); + long current = t.getTimestamp() / 1000 / 60; + int min = (int) (current % (60)); + Operation operation = d.findOrCreateOperation(method); + Segment segment = operation.findOrCreateSegment(min); + long duration = t.getDurationInMillis(); + + report.addIp(ip); + + operation.incCount(); + operation.incSum(duration); + operation.setAvg(operation.getSum() / operation.getCount()); + + segment.incCount(); + segment.incSum(duration); + segment.setAvg(segment.getSum() / segment.getCount()); + + if (!t.isSuccess()) { + operation.incError(); + segment.incError(); + } + if (duration > LONG_THRESHOLD) { + operation.incLongCount(); + segment.incLongCount(); + } + } +} diff --git a/cat-consumer/src/main/java/com/dianping/cat/consumer/storage/StorageDelegate.java b/cat-consumer/src/main/java/com/dianping/cat/consumer/storage/StorageDelegate.java new file mode 100644 index 000000000..b5126997f --- /dev/null +++ b/cat-consumer/src/main/java/com/dianping/cat/consumer/storage/StorageDelegate.java @@ -0,0 +1,99 @@ +package com.dianping.cat.consumer.storage; + +import java.util.Date; +import java.util.Map; +import java.util.Set; + +import org.unidal.lookup.annotation.Inject; + +import com.dianping.cat.configuration.ServerConfigManager; +import com.dianping.cat.consumer.storage.model.entity.StorageReport; +import com.dianping.cat.consumer.storage.model.transform.DefaultNativeBuilder; +import com.dianping.cat.consumer.storage.model.transform.DefaultNativeParser; +import com.dianping.cat.consumer.storage.model.transform.DefaultSaxParser; +import com.dianping.cat.service.ReportDelegate; +import com.dianping.cat.task.TaskManager; +import com.dianping.cat.task.TaskManager.TaskProlicy; + +public class StorageDelegate implements ReportDelegate { + + @Inject + private TaskManager m_taskManager; + + @Inject + private ServerConfigManager m_manager; + + @Override + public void afterLoad(Map reports) { + // TODO Auto-generated method stub + } + + @Override + public void beforeSave(Map reports) { + for (StorageReport report : reports.values()) { + Set domainNames = report.getIds(); + + domainNames.clear(); + domainNames.addAll(reports.keySet()); + } + } + + @Override + public byte[] buildBinary(StorageReport report) { + return DefaultNativeBuilder.build(report); + } + + @Override + public StorageReport parseBinary(byte[] bytes) { + return DefaultNativeParser.parse(bytes); + } + + @Override + public String buildXml(StorageReport report) { + return report.toString(); + } + + @Override + public String getDomain(StorageReport report) { + return report.getId(); + } + + @Override + public StorageReport makeReport(String id, long startTime, long duration) { + StorageReport report = new StorageReport(id); + int index = id.lastIndexOf("-"); + String name = id.substring(0, index); + String type = id.substring(index + 1); + + report.setName(name).setType(type); + report.setStartTime(new Date(startTime)).setEndTime(new Date(startTime + duration - 1)); + + return report; + } + + @Override + public StorageReport mergeReport(StorageReport old, StorageReport other) { + StorageReportMerger merger = new StorageReportMerger(old); + + other.accept(merger); + return old; + } + + @Override + public StorageReport parseXml(String xml) throws Exception { + StorageReport report = DefaultSaxParser.parse(xml); + return report; + } + + @Override + public boolean createHourlyTask(StorageReport report) { + String id = report.getId(); + + if (m_manager.validateDomain(id)) { + return m_taskManager.createTask(report.getStartTime(), id, StorageAnalyzer.ID, TaskProlicy.ALL_EXCLUED_HOURLY); + } else { + return true; + } + } + +} diff --git a/cat-consumer/src/main/java/com/dianping/cat/consumer/storage/StorageReportMerger.java b/cat-consumer/src/main/java/com/dianping/cat/consumer/storage/StorageReportMerger.java new file mode 100644 index 000000000..a784396c7 --- /dev/null +++ b/cat-consumer/src/main/java/com/dianping/cat/consumer/storage/StorageReportMerger.java @@ -0,0 +1,31 @@ +package com.dianping.cat.consumer.storage; + +import com.dianping.cat.consumer.storage.model.entity.Operation; +import com.dianping.cat.consumer.storage.model.entity.Segment; +import com.dianping.cat.consumer.storage.model.entity.StorageReport; +import com.dianping.cat.consumer.storage.model.transform.DefaultMerger; + +public class StorageReportMerger extends DefaultMerger { + + public StorageReportMerger(StorageReport storageReport) { + super(storageReport); + } + + @Override + protected void mergeOperation(Operation to, Operation from) { + to.setCount(to.getCount() + from.getCount()); + to.setLongCount(to.getLongCount() + from.getLongCount()); + to.setError(to.getError() + from.getError()); + to.setSum(to.getSum() + from.getSum()); + to.setAvg(to.getSum() / to.getCount()); + } + + @Override + protected void mergeSegment(Segment to, Segment from) { + to.setCount(to.getCount() + from.getCount()); + to.setLongCount(to.getLongCount() + from.getLongCount()); + to.setError(to.getError() + from.getError()); + to.setSum(to.getSum() + from.getSum()); + to.setAvg(to.getSum() / to.getCount()); + } +} diff --git a/cat-home/src/main/resources/META-INF/dal/model/storage-report-codegen.xml b/cat-consumer/src/main/resources/META-INF/dal/model/storage-report-codegen.xml similarity index 90% rename from cat-home/src/main/resources/META-INF/dal/model/storage-report-codegen.xml rename to cat-consumer/src/main/resources/META-INF/dal/model/storage-report-codegen.xml index 21f9b586c..6aff1b5b2 100644 --- a/cat-home/src/main/resources/META-INF/dal/model/storage-report-codegen.xml +++ b/cat-consumer/src/main/resources/META-INF/dal/model/storage-report-codegen.xml @@ -2,10 +2,11 @@ - + + diff --git a/cat-home/src/main/resources/META-INF/dal/model/storage-report-manifest.xml b/cat-consumer/src/main/resources/META-INF/dal/model/storage-report-manifest.xml similarity index 100% rename from cat-home/src/main/resources/META-INF/dal/model/storage-report-manifest.xml rename to cat-consumer/src/main/resources/META-INF/dal/model/storage-report-manifest.xml diff --git a/cat-home/src/main/resources/META-INF/dal/model/storage-report-model.xml b/cat-consumer/src/main/resources/META-INF/dal/model/storage-report-model.xml similarity index 69% rename from cat-home/src/main/resources/META-INF/dal/model/storage-report-model.xml rename to cat-consumer/src/main/resources/META-INF/dal/model/storage-report-model.xml index 6cb0d4e74..0ef9a9087 100644 --- a/cat-home/src/main/resources/META-INF/dal/model/storage-report-model.xml +++ b/cat-consumer/src/main/resources/META-INF/dal/model/storage-report-model.xml @@ -1,21 +1,24 @@ - + - + - + + + - + - + @@ -24,7 +27,7 @@ - + diff --git a/cat-consumer/src/main/resources/META-INF/plexus/components.xml b/cat-consumer/src/main/resources/META-INF/plexus/components.xml index 248a8ef8f..1ca32d721 100644 --- a/cat-consumer/src/main/resources/META-INF/plexus/components.xml +++ b/cat-consumer/src/main/resources/META-INF/plexus/components.xml @@ -619,6 +619,61 @@ + + com.dianping.cat.analysis.MessageAnalyzer + storage + com.dianping.cat.consumer.storage.StorageAnalyzer + per-lookup + + + com.dianping.cat.service.ReportManager + storage + + + com.dianping.cat.service.ReportDelegate + storage + + + com.dianping.cat.configuration.ServerConfigManager + + + + + com.dianping.cat.service.ReportManager + storage + com.dianping.cat.service.DefaultReportManager + + storage + + + + com.dianping.cat.service.ReportDelegate + storage + + + com.dianping.cat.storage.report.ReportBucketManager + + + com.dianping.cat.core.dal.HourlyReportDao + + + com.dianping.cat.core.dal.HourlyReportContentDao + + + + + com.dianping.cat.service.ReportDelegate + storage + com.dianping.cat.consumer.storage.StorageDelegate + + + com.dianping.cat.task.TaskManager + + + com.dianping.cat.configuration.ServerConfigManager + + + org.unidal.initialization.Module cat-consumer diff --git a/cat-consumer/src/main/resources/META-INF/wizard/model/wizard.xml b/cat-consumer/src/main/resources/META-INF/wizard/model/wizard.xml index e49431095..264c5dd12 100644 --- a/cat-consumer/src/main/resources/META-INF/wizard/model/wizard.xml +++ b/cat-consumer/src/main/resources/META-INF/wizard/model/wizard.xml @@ -24,4 +24,7 @@ src/test/resources/com/dianping/cat/consumer/model/browser-meta.xml + + src/test/resources/com/dianping/cat/consumer/storage/storage.xml + diff --git a/cat-home/src/test/resources/com/dianping/cat/report/page/storage/storage.xml b/cat-consumer/src/test/resources/com/dianping/cat/consumer/storage/storage.xml similarity index 55% rename from cat-home/src/test/resources/com/dianping/cat/report/page/storage/storage.xml rename to cat-consumer/src/test/resources/com/dianping/cat/consumer/storage/storage.xml index b11ed3d98..eee5bd2a6 100644 --- a/cat-home/src/test/resources/com/dianping/cat/report/page/storage/storage.xml +++ b/cat-consumer/src/test/resources/com/dianping/cat/consumer/storage/storage.xml @@ -1,14 +1,27 @@ - + + cat + shop-web + 10.10.10.1 + 10.10.10.2 - - - + + + + + + + + + + + + diff --git a/cat-core/src/main/java/com/dianping/cat/configuration/ServerConfigManager.java b/cat-core/src/main/java/com/dianping/cat/configuration/ServerConfigManager.java index 9803396ec..eaf1a2c40 100644 --- a/cat-core/src/main/java/com/dianping/cat/configuration/ServerConfigManager.java +++ b/cat-core/src/main/java/com/dianping/cat/configuration/ServerConfigManager.java @@ -376,6 +376,14 @@ public class ServerConfigManager implements Initializable, LogEnabled { return "PigeonService".equals(type) || "Service".equals(type); } + public boolean isSQLTransaction(String type) { + return "SQL".equals(type); + } + + public boolean isCacheTransaction(String type) { + return StringUtils.isNotEmpty(type) && type.startsWith("Cache.memcached"); + } + private long toLong(String str, long defaultValue) { long value = 0; int len = str == null ? 0 : str.length(); @@ -402,7 +410,7 @@ public class ServerConfigManager implements Initializable, LogEnabled { public boolean validateDomain(String domain) { return !m_invalidateDomains.contains(domain) && StringUtils.isNotEmpty(domain); } - + public boolean validateIp(String str) { Pattern pattern = Pattern .compile("^((\\d|[1-9]\\d|1\\d\\d|2[0-4]\\d|25[0-5]|[*])\\.){3}(\\d|[1-9]\\d|1\\d\\d|2[0-4]\\d|25[0-5]|[*])$"); diff --git a/cat-home/src/main/java/com/dianping/cat/build/ReportServiceComponentConfigurator.java b/cat-home/src/main/java/com/dianping/cat/build/ReportServiceComponentConfigurator.java index ff36fdfb7..2d9f2cdbe 100644 --- a/cat-home/src/main/java/com/dianping/cat/build/ReportServiceComponentConfigurator.java +++ b/cat-home/src/main/java/com/dianping/cat/build/ReportServiceComponentConfigurator.java @@ -16,6 +16,7 @@ import com.dianping.cat.consumer.matrix.MatrixAnalyzer; import com.dianping.cat.consumer.metric.MetricAnalyzer; import com.dianping.cat.consumer.problem.ProblemAnalyzer; import com.dianping.cat.consumer.state.StateAnalyzer; +import com.dianping.cat.consumer.storage.StorageAnalyzer; import com.dianping.cat.consumer.top.TopAnalyzer; import com.dianping.cat.consumer.transaction.TransactionAnalyzer; import com.dianping.cat.core.dal.DailyReportDao; @@ -73,6 +74,9 @@ public class ReportServiceComponentConfigurator extends AbstractResourceConfigur all.add(C(ReportService.class, StateAnalyzer.ID, StateReportService.class).req(HourlyReportDao.class, DailyReportDao.class, WeeklyReportDao.class, MonthlyReportDao.class, HourlyReportContentDao.class, DailyReportContentDao.class, WeeklyReportContentDao.class, MonthlyReportContentDao.class)); + all.add(C(ReportService.class, StorageAnalyzer.ID, CrossReportService.class).req(HourlyReportDao.class, + DailyReportDao.class, WeeklyReportDao.class, MonthlyReportDao.class, HourlyReportContentDao.class, + DailyReportContentDao.class, WeeklyReportContentDao.class, MonthlyReportContentDao.class)); all.add(C(ReportService.class, Constants.REPORT_BUG, BugReportService.class).req(HourlyReportDao.class, DailyReportDao.class, WeeklyReportDao.class, MonthlyReportDao.class, HourlyReportContentDao.class, diff --git a/cat-home/src/main/java/com/dianping/cat/build/ServiceComponentConfigurator.java b/cat-home/src/main/java/com/dianping/cat/build/ServiceComponentConfigurator.java index 66eea4347..379940c99 100755 --- a/cat-home/src/main/java/com/dianping/cat/build/ServiceComponentConfigurator.java +++ b/cat-home/src/main/java/com/dianping/cat/build/ServiceComponentConfigurator.java @@ -16,6 +16,7 @@ import com.dianping.cat.consumer.matrix.MatrixAnalyzer; import com.dianping.cat.consumer.metric.MetricAnalyzer; import com.dianping.cat.consumer.problem.ProblemAnalyzer; import com.dianping.cat.consumer.state.StateAnalyzer; +import com.dianping.cat.consumer.storage.StorageAnalyzer; import com.dianping.cat.consumer.top.TopAnalyzer; import com.dianping.cat.consumer.transaction.TransactionAnalyzer; import com.dianping.cat.hadoop.hdfs.HdfsMessageBucketManager; @@ -51,6 +52,9 @@ import com.dianping.cat.report.page.model.spi.ModelService; import com.dianping.cat.report.page.model.state.CompositeStateService; import com.dianping.cat.report.page.model.state.HistoricalStateService; import com.dianping.cat.report.page.model.state.LocalStateService; +import com.dianping.cat.report.page.model.storage.CompositeStorageService; +import com.dianping.cat.report.page.model.storage.HistoricalStorageService; +import com.dianping.cat.report.page.model.storage.LocalStorageService; import com.dianping.cat.report.page.model.top.CompositeTopService; import com.dianping.cat.report.page.model.top.HistoricalTopService; import com.dianping.cat.report.page.model.top.LocalTopService; @@ -138,6 +142,14 @@ class ServiceComponentConfigurator extends AbstractResourceConfigurator { .req(ServerConfigManager.class) // .req(ModelService.class, new String[] { "dependency-historical" }, "m_services")); + all.add(C(ModelService.class, "storage-local", LocalStorageService.class) // + .req(ReportBucketManager.class, MessageConsumer.class, ServerConfigManager.class)); + all.add(C(ModelService.class, "storage-historical", HistoricalStorageService.class) // + .req(ReportBucketManager.class, ReportServiceManager.class, ServerConfigManager.class)); + all.add(C(ModelService.class, StorageAnalyzer.ID, CompositeStorageService.class) // + .req(ServerConfigManager.class) // + .req(ModelService.class, new String[] { "storage-historical" }, "m_services")); + all.add(C(ModelService.class, "metric-local", LocalMetricService.class) // .req(ReportBucketManager.class, MessageConsumer.class, ServerConfigManager.class)); all.add(C(ModelService.class, "metric-historical", HistoricalMetricService.class) // diff --git a/cat-home/src/main/java/com/dianping/cat/report/page/model/Handler.java b/cat-home/src/main/java/com/dianping/cat/report/page/model/Handler.java index e8a59461a..f38ea7a0f 100755 --- a/cat-home/src/main/java/com/dianping/cat/report/page/model/Handler.java +++ b/cat-home/src/main/java/com/dianping/cat/report/page/model/Handler.java @@ -37,6 +37,7 @@ import com.dianping.cat.consumer.problem.model.entity.JavaThread; import com.dianping.cat.consumer.problem.model.entity.Machine; import com.dianping.cat.consumer.problem.model.entity.Segment; import com.dianping.cat.consumer.state.StateAnalyzer; +import com.dianping.cat.consumer.storage.StorageAnalyzer; import com.dianping.cat.consumer.top.TopAnalyzer; import com.dianping.cat.consumer.transaction.TransactionAnalyzer; import com.dianping.cat.consumer.transaction.model.IEntity; @@ -60,6 +61,7 @@ import com.dianping.cat.report.page.model.metric.LocalMetricService; import com.dianping.cat.report.page.model.problem.LocalProblemService; import com.dianping.cat.report.page.model.spi.ModelService; import com.dianping.cat.report.page.model.state.LocalStateService; +import com.dianping.cat.report.page.model.storage.LocalStorageService; import com.dianping.cat.report.page.model.top.LocalTopService; import com.dianping.cat.report.page.model.transaction.LocalTransactionService; import com.dianping.cat.report.page.system.graph.SystemReportConvertor; @@ -104,6 +106,9 @@ public class Handler extends ContainerHolder implements PageHandler { @Inject(type = ModelService.class, value = "transaction-local") private LocalTransactionService m_transactionService; + @Inject(type = ModelService.class, value = "storage-local") + private LocalStorageService m_storageService; + @Inject private IpService m_ipService; @@ -165,6 +170,8 @@ public class Handler extends ContainerHolder implements PageHandler { } else if (DependencyAnalyzer.ID.equals(report)) { return new DependencyReportFilter() .buildXml((com.dianping.cat.consumer.dependency.model.IEntity) dataModel); + } else if (StorageAnalyzer.ID.equals(report)) { + return new StorageReportFilter().buildXml((com.dianping.cat.consumer.storage.model.IEntity) dataModel); } else { return String.valueOf(dataModel); } @@ -230,6 +237,8 @@ public class Handler extends ContainerHolder implements PageHandler { response = processMetricRequest(payload, request); } else if (DependencyAnalyzer.ID.equals(report)) { response = m_dependencyService.invoke(request); + } else if (StorageAnalyzer.ID.equals(report)) { + response = m_storageService.invoke(request); } else { throw new RuntimeException("Unsupported report: " + report + "!"); } @@ -462,6 +471,12 @@ public class Handler extends ContainerHolder implements PageHandler { } } + public static class StorageReportFilter extends com.dianping.cat.consumer.storage.model.transform.DefaultXmlBuilder { + public StorageReportFilter() { + super(true, new StringBuilder(DEFAULT_SIZE)); + } + } + public static class TransactionReportFilter extends com.dianping.cat.consumer.transaction.model.transform.DefaultXmlBuilder { private String m_ipAddress; diff --git a/cat-home/src/main/java/com/dianping/cat/report/page/model/storage/CompositeStorageService.java b/cat-home/src/main/java/com/dianping/cat/report/page/model/storage/CompositeStorageService.java new file mode 100755 index 000000000..92e40893b --- /dev/null +++ b/cat-home/src/main/java/com/dianping/cat/report/page/model/storage/CompositeStorageService.java @@ -0,0 +1,40 @@ +package com.dianping.cat.report.page.model.storage; + +import java.util.List; + +import com.dianping.cat.consumer.storage.StorageAnalyzer; +import com.dianping.cat.consumer.storage.StorageReportMerger; +import com.dianping.cat.consumer.storage.model.entity.StorageReport; +import com.dianping.cat.report.page.model.spi.internal.BaseCompositeModelService; +import com.dianping.cat.report.page.model.spi.internal.BaseRemoteModelService; +import com.dianping.cat.service.ModelRequest; +import com.dianping.cat.service.ModelResponse; + +public class CompositeStorageService extends BaseCompositeModelService { + public CompositeStorageService() { + super(StorageAnalyzer.ID); + } + + @Override + protected BaseRemoteModelService createRemoteService() { + return new RemoteStorageService(); + } + + @Override + protected StorageReport merge(ModelRequest request, List> responses) { + if (responses.size() == 0) { + return null; + } + StorageReportMerger merger = new StorageReportMerger(new StorageReport(request.getDomain())); + + for (ModelResponse response : responses) { + if (response != null) { + StorageReport model = response.getModel(); + if (model != null) { + model.accept(merger); + } + } + } + return merger.getStorageReport(); + } +} diff --git a/cat-home/src/main/java/com/dianping/cat/report/page/model/storage/HistoricalStorageService.java b/cat-home/src/main/java/com/dianping/cat/report/page/model/storage/HistoricalStorageService.java new file mode 100755 index 000000000..2ee16b5e3 --- /dev/null +++ b/cat-home/src/main/java/com/dianping/cat/report/page/model/storage/HistoricalStorageService.java @@ -0,0 +1,60 @@ +package com.dianping.cat.report.page.model.storage; + +import java.util.Date; + +import org.unidal.lookup.annotation.Inject; + +import com.dianping.cat.consumer.storage.StorageAnalyzer; +import com.dianping.cat.consumer.storage.model.entity.StorageReport; +import com.dianping.cat.consumer.storage.model.transform.DefaultSaxParser; +import com.dianping.cat.helper.TimeHelper; +import com.dianping.cat.report.page.model.spi.internal.BaseHistoricalModelService; +import com.dianping.cat.report.service.ReportServiceManager; +import com.dianping.cat.service.ModelRequest; +import com.dianping.cat.storage.report.ReportBucket; +import com.dianping.cat.storage.report.ReportBucketManager; + +public class HistoricalStorageService extends BaseHistoricalModelService { + @Inject + private ReportBucketManager m_bucketManager; + + @Inject + private ReportServiceManager m_reportService; + + public HistoricalStorageService() { + super(StorageAnalyzer.ID); + } + + @Override + protected StorageReport buildModel(ModelRequest request) throws Exception { + String domain = request.getDomain(); + long date = request.getStartTime(); + StorageReport report; + + if (isLocalMode()) { + report = getReportFromLocalDisk(date, domain); + } else { + report = getReportFromDatabase(date, domain); + } + + return report; + } + + private StorageReport getReportFromDatabase(long timestamp, String id) throws Exception { + return m_reportService.queryStorageReport(id, new Date(timestamp), new Date(timestamp + TimeHelper.ONE_HOUR)); + } + + private StorageReport getReportFromLocalDisk(long timestamp, String id) throws Exception { + ReportBucket bucket = null; + try { + bucket = m_bucketManager.getReportBucket(timestamp, StorageAnalyzer.ID); + String xml = bucket.findById(id); + + return xml == null ? null : DefaultSaxParser.parse(xml); + } finally { + if (bucket != null) { + m_bucketManager.closeBucket(bucket); + } + } + } +} diff --git a/cat-home/src/main/java/com/dianping/cat/report/page/model/storage/LocalStorageService.java b/cat-home/src/main/java/com/dianping/cat/report/page/model/storage/LocalStorageService.java new file mode 100755 index 000000000..63975c25e --- /dev/null +++ b/cat-home/src/main/java/com/dianping/cat/report/page/model/storage/LocalStorageService.java @@ -0,0 +1,59 @@ +package com.dianping.cat.report.page.model.storage; + +import java.util.Date; + +import org.unidal.lookup.annotation.Inject; + +import com.dianping.cat.consumer.storage.StorageAnalyzer; +import com.dianping.cat.consumer.storage.model.entity.StorageReport; +import com.dianping.cat.consumer.storage.model.transform.DefaultSaxParser; +import com.dianping.cat.helper.TimeHelper; +import com.dianping.cat.report.page.model.spi.internal.BaseLocalModelService; +import com.dianping.cat.service.ModelPeriod; +import com.dianping.cat.service.ModelRequest; +import com.dianping.cat.storage.report.ReportBucket; +import com.dianping.cat.storage.report.ReportBucketManager; + +public class LocalStorageService extends BaseLocalModelService { + @Inject + private ReportBucketManager m_bucketManager; + + public LocalStorageService() { + super(StorageAnalyzer.ID); + } + + @Override + protected StorageReport getReport(ModelRequest request, ModelPeriod period, String id) throws Exception { + StorageReport report = super.getReport(request, period, id); + + if ((report == null || report.getIps().isEmpty()) && period.isLast()) { + long startTime = request.getStartTime(); + report = getReportFromLocalDisk(startTime, id); + } + return report; + } + + private StorageReport getReportFromLocalDisk(long timestamp, String id) throws Exception { + ReportBucket bucket = null; + try { + bucket = m_bucketManager.getReportBucket(timestamp, StorageAnalyzer.ID); + String xml = bucket.findById(id); + StorageReport report = null; + + if (xml != null) { + report = DefaultSaxParser.parse(xml); + } else { + report = new StorageReport(id); + report.setStartTime(new Date(timestamp)); + report.setEndTime(new Date(timestamp + TimeHelper.ONE_HOUR - 1)); + report.getIds().addAll(bucket.getIds()); + } + return report; + + } finally { + if (bucket != null) { + m_bucketManager.closeBucket(bucket); + } + } + } +} diff --git a/cat-home/src/main/java/com/dianping/cat/report/page/model/storage/RemoteStorageService.java b/cat-home/src/main/java/com/dianping/cat/report/page/model/storage/RemoteStorageService.java new file mode 100755 index 000000000..2730aa78d --- /dev/null +++ b/cat-home/src/main/java/com/dianping/cat/report/page/model/storage/RemoteStorageService.java @@ -0,0 +1,21 @@ +package com.dianping.cat.report.page.model.storage; + +import java.io.IOException; + +import org.xml.sax.SAXException; + +import com.dianping.cat.consumer.storage.StorageAnalyzer; +import com.dianping.cat.consumer.storage.model.entity.StorageReport; +import com.dianping.cat.consumer.storage.model.transform.DefaultSaxParser; +import com.dianping.cat.report.page.model.spi.internal.BaseRemoteModelService; + +public class RemoteStorageService extends BaseRemoteModelService { + public RemoteStorageService() { + super(StorageAnalyzer.ID); + } + + @Override + protected StorageReport buildModel(String xml) throws SAXException, IOException { + return DefaultSaxParser.parse(xml); + } +} diff --git a/cat-home/src/main/java/com/dianping/cat/report/service/DefaultReportServiceManager.java b/cat-home/src/main/java/com/dianping/cat/report/service/DefaultReportServiceManager.java index dbf88f5e7..bdf805c64 100644 --- a/cat-home/src/main/java/com/dianping/cat/report/service/DefaultReportServiceManager.java +++ b/cat-home/src/main/java/com/dianping/cat/report/service/DefaultReportServiceManager.java @@ -29,6 +29,8 @@ import com.dianping.cat.consumer.problem.ProblemAnalyzer; import com.dianping.cat.consumer.problem.model.entity.ProblemReport; import com.dianping.cat.consumer.state.StateAnalyzer; import com.dianping.cat.consumer.state.model.entity.StateReport; +import com.dianping.cat.consumer.storage.StorageAnalyzer; +import com.dianping.cat.consumer.storage.model.entity.StorageReport; import com.dianping.cat.consumer.top.TopAnalyzer; import com.dianping.cat.consumer.top.model.entity.TopReport; import com.dianping.cat.consumer.transaction.TransactionAnalyzer; @@ -338,4 +340,11 @@ public class DefaultReportServiceManager extends ContainerHolder implements Repo return reportService.queryReport(domain, start, end); } + @Override + public StorageReport queryStorageReport(String domain, Date start, Date end) { + ReportService reportService = m_reportServices.get(StorageAnalyzer.ID); + + return reportService.queryReport(domain, start, end); + } + } diff --git a/cat-home/src/main/java/com/dianping/cat/report/service/ReportServiceManager.java b/cat-home/src/main/java/com/dianping/cat/report/service/ReportServiceManager.java index bd0773813..8f71e7bdb 100644 --- a/cat-home/src/main/java/com/dianping/cat/report/service/ReportServiceManager.java +++ b/cat-home/src/main/java/com/dianping/cat/report/service/ReportServiceManager.java @@ -11,6 +11,7 @@ import com.dianping.cat.consumer.matrix.model.entity.MatrixReport; import com.dianping.cat.consumer.metric.model.entity.MetricReport; import com.dianping.cat.consumer.problem.model.entity.ProblemReport; import com.dianping.cat.consumer.state.model.entity.StateReport; +import com.dianping.cat.consumer.storage.model.entity.StorageReport; import com.dianping.cat.consumer.top.model.entity.TopReport; import com.dianping.cat.consumer.transaction.model.entity.TransactionReport; import com.dianping.cat.core.dal.DailyReport; @@ -79,4 +80,6 @@ public interface ReportServiceManager { public JarReport queryJarReport(String domain, Date start, Date end); public SystemReport querySystemReport(String domain, Date start, Date end); + + public StorageReport queryStorageReport(String domain, Date start, Date end); } diff --git a/cat-home/src/main/java/com/dianping/cat/report/service/impl/StorageReportService.java b/cat-home/src/main/java/com/dianping/cat/report/service/impl/StorageReportService.java new file mode 100644 index 000000000..abd032031 --- /dev/null +++ b/cat-home/src/main/java/com/dianping/cat/report/service/impl/StorageReportService.java @@ -0,0 +1,46 @@ +package com.dianping.cat.report.service.impl; + +import java.util.Date; + +import com.dianping.cat.consumer.storage.model.entity.StorageReport; +import com.dianping.cat.report.service.AbstractReportService; + +public class StorageReportService extends AbstractReportService { + + @Override + public StorageReport makeReport(String id, Date start, Date end) { + StorageReport report = new StorageReport(id); + int index = id.lastIndexOf("-"); + String name = id.substring(0, index); + String type = id.substring(index + 1); + + report.setName(name).setType(type); + report.setStartTime(start).setEndTime(end); + return report; + } + + @Override + public StorageReport queryDailyReport(String id, Date start, Date end) { + // TODO Auto-generated method stub + return null; + } + + @Override + public StorageReport queryHourlyReport(String id, Date start, Date end) { + // TODO Auto-generated method stub + return null; + } + + @Override + public StorageReport queryMonthlyReport(String id, Date start) { + // TODO Auto-generated method stub + return null; + } + + @Override + public StorageReport queryWeeklyReport(String id, Date start) { + // TODO Auto-generated method stub + return null; + } + +} diff --git a/cat-home/src/main/java/com/dianping/cat/report/task/cross/CrossReportBuilder.java b/cat-home/src/main/java/com/dianping/cat/report/task/cross/CrossReportBuilder.java index 4b30d0c2d..ddb5430e2 100644 --- a/cat-home/src/main/java/com/dianping/cat/report/task/cross/CrossReportBuilder.java +++ b/cat-home/src/main/java/com/dianping/cat/report/task/cross/CrossReportBuilder.java @@ -44,7 +44,7 @@ public class CrossReportBuilder implements TaskBuilder { @Override public boolean buildHourlyTask(String name, String domain, Date period) { - throw new RuntimeException("Cross report don't support HourReport!"); + throw new RuntimeException("Cross report don't support HourlyReport!"); } @Override diff --git a/cat-home/src/main/resources/META-INF/plexus/components.xml b/cat-home/src/main/resources/META-INF/plexus/components.xml index d64799960..b2e6b893d 100755 --- a/cat-home/src/main/resources/META-INF/plexus/components.xml +++ b/cat-home/src/main/resources/META-INF/plexus/components.xml @@ -605,6 +605,37 @@ + + com.dianping.cat.report.service.ReportService + storage + com.dianping.cat.report.service.impl.CrossReportService + + + com.dianping.cat.core.dal.HourlyReportDao + + + com.dianping.cat.core.dal.DailyReportDao + + + com.dianping.cat.core.dal.WeeklyReportDao + + + com.dianping.cat.core.dal.MonthlyReportDao + + + com.dianping.cat.core.dal.HourlyReportContentDao + + + com.dianping.cat.home.dal.report.DailyReportContentDao + + + com.dianping.cat.home.dal.report.WeeklyReportContentDao + + + com.dianping.cat.home.dal.report.MonthlyReportContentDao + + + com.dianping.cat.report.service.ReportService bug @@ -2087,6 +2118,55 @@ + + com.dianping.cat.report.page.model.spi.ModelService + storage-local + com.dianping.cat.report.page.model.storage.LocalStorageService + + + com.dianping.cat.storage.report.ReportBucketManager + + + com.dianping.cat.message.spi.core.MessageConsumer + + + com.dianping.cat.configuration.ServerConfigManager + + + + + com.dianping.cat.report.page.model.spi.ModelService + storage-historical + com.dianping.cat.report.page.model.storage.HistoricalStorageService + + + com.dianping.cat.storage.report.ReportBucketManager + + + com.dianping.cat.report.service.ReportServiceManager + + + com.dianping.cat.configuration.ServerConfigManager + + + + + com.dianping.cat.report.page.model.spi.ModelService + storage + com.dianping.cat.report.page.model.storage.CompositeStorageService + + + com.dianping.cat.configuration.ServerConfigManager + + + com.dianping.cat.report.page.model.spi.ModelService + + storage-historical + + m_services + + + com.dianping.cat.report.page.model.spi.ModelService metric-local @@ -3630,6 +3710,11 @@ transaction-local m_transactionService + + com.dianping.cat.report.page.model.spi.ModelService + storage-local + m_storageService + com.dianping.cat.service.IpService @@ -3809,6 +3894,21 @@ + + com.dianping.cat.report.page.model.storage.LocalStorageService + com.dianping.cat.report.page.model.storage.LocalStorageService + + + com.dianping.cat.storage.report.ReportBucketManager + + + com.dianping.cat.message.spi.core.MessageConsumer + + + com.dianping.cat.configuration.ServerConfigManager + + + com.dianping.cat.service.IpService com.dianping.cat.service.IpService diff --git a/cat-home/src/main/resources/META-INF/wizard/model/wizard.xml b/cat-home/src/main/resources/META-INF/wizard/model/wizard.xml index c8480dcb2..d48cab8fa 100644 --- a/cat-home/src/main/resources/META-INF/wizard/model/wizard.xml +++ b/cat-home/src/main/resources/META-INF/wizard/model/wizard.xml @@ -84,7 +84,4 @@ src/main/resources/config/exceptionRuleConfig.xml - - src/test/resources/com/dianping/cat/report/page/storage/storage.xml - diff --git a/cat-home/src/test/java/com/dianping/cat/demo/TestStorageMessage.java b/cat-home/src/test/java/com/dianping/cat/demo/TestStorageMessage.java new file mode 100644 index 000000000..57a2e7a3e --- /dev/null +++ b/cat-home/src/test/java/com/dianping/cat/demo/TestStorageMessage.java @@ -0,0 +1,39 @@ +package com.dianping.cat.demo; + +import org.junit.Test; + +import com.dianping.cat.Cat; +import com.dianping.cat.message.Transaction; +import com.dianping.cat.message.spi.MessageTree; +import com.dianping.cat.message.spi.internal.DefaultMessageTree; + +public class TestStorageMessage { + + @Test + public void testCross() throws Exception { + String serverIp = "10.10.10."; + + for (int i = 0; i < 10; i++) { + sendCacheMsg("Cat-Call", "cat", serverIp + i, "1000", "catServer1", serverIp + ":8080"); + sendCacheMsg("Cat-Call", "cat", serverIp + i, "1000", "catServer2", serverIp + ":8081"); + sendCacheMsg("Cat-Call", "cat", serverIp + i, "1000", "catServer1", serverIp + ":8080"); + sendCacheMsg("Cat-Call", "cat", serverIp + i, "1000", "catServer2", serverIp + ":8081"); + } + Thread.sleep(10000); + } + + private void sendCacheMsg(String method, String client, String clientIp, String port, String server, String serverIp) { + Transaction t = Cat.newTransaction("Cache.memcached", "oUserAuthLevel:" + method); + + Cat.logEvent("PigeonCall.server", serverIp); + Cat.logEvent("PigeonCall.app", server); + Cat.logEvent("PigeonCall.port", port); + + MessageTree tree = Cat.getManager().getThreadLocalMessageTree(); + + ((DefaultMessageTree) tree).setDomain(client); + ((DefaultMessageTree) tree).setIpAddress(clientIp); + t.setStatus(Transaction.SUCCESS); + t.complete(); + } +} -- GitLab