From 9803a2db3dae0fea5c0afec9e9b206b4926be1c8 Mon Sep 17 00:00:00 2001 From: Frankie Wu Date: Mon, 16 Apr 2012 12:12:51 +0800 Subject: [PATCH] make ip report hourly --- .../cat/consumer/DefaultAnalyzerFactory.java | 5 +- .../build/ComponentsConfigurator.java | 7 +- .../{IpAnalyzer.java => TopIpAnalyzer.java} | 436 +++++++++++------- .../dal/model/event-report-codegen.xml | 2 +- .../META-INF/dal/model/event-report.xml | 2 - .../META-INF/dal/model/ip-codegen.xml | 5 +- .../resources/META-INF/dal/model/ip-model.xml | 5 +- .../resources/META-INF/plexus/components.xml | 13 +- .../cat/status/StatusInfoCollector.java | 7 +- .../dianping/cat/report/page/ip/Model.java | 2 +- .../report/page/model/ip/IpReportMerger.java | 2 +- 11 files changed, 299 insertions(+), 187 deletions(-) rename cat-consumer/src/main/java/com/dianping/cat/consumer/ip/{IpAnalyzer.java => TopIpAnalyzer.java} (51%) diff --git a/cat-consumer/src/main/java/com/dianping/cat/consumer/DefaultAnalyzerFactory.java b/cat-consumer/src/main/java/com/dianping/cat/consumer/DefaultAnalyzerFactory.java index 3549c7ee8..9e36db2a3 100644 --- a/cat-consumer/src/main/java/com/dianping/cat/consumer/DefaultAnalyzerFactory.java +++ b/cat-consumer/src/main/java/com/dianping/cat/consumer/DefaultAnalyzerFactory.java @@ -2,7 +2,7 @@ package com.dianping.cat.consumer; import com.dianping.cat.consumer.dump.DumpAnalyzer; import com.dianping.cat.consumer.event.EventAnalyzer; -import com.dianping.cat.consumer.ip.IpAnalyzer; +import com.dianping.cat.consumer.ip.TopIpAnalyzer; import com.dianping.cat.consumer.problem.ProblemAnalyzer; import com.dianping.cat.consumer.transaction.TransactionAnalyzer; import com.dianping.cat.message.spi.MessageAnalyzer; @@ -32,8 +32,9 @@ public class DefaultAnalyzerFactory extends ContainerHolder implements AnalyzerF analyzer.setAnalyzerInfo(start, duration, extraTime); return analyzer; } else if (name.equals("ip")) { - IpAnalyzer analyzer = lookup(IpAnalyzer.class); + TopIpAnalyzer analyzer = lookup(TopIpAnalyzer.class); + analyzer.setAnalyzerInfo(start, duration, extraTime); return analyzer; } diff --git a/cat-consumer/src/main/java/com/dianping/cat/consumer/build/ComponentsConfigurator.java b/cat-consumer/src/main/java/com/dianping/cat/consumer/build/ComponentsConfigurator.java index 41d041eab..011676e9a 100644 --- a/cat-consumer/src/main/java/com/dianping/cat/consumer/build/ComponentsConfigurator.java +++ b/cat-consumer/src/main/java/com/dianping/cat/consumer/build/ComponentsConfigurator.java @@ -17,7 +17,7 @@ import com.dianping.cat.consumer.dump.DumpChannel; import com.dianping.cat.consumer.dump.DumpChannelManager; import com.dianping.cat.consumer.dump.DumpUploader; import com.dianping.cat.consumer.event.EventAnalyzer; -import com.dianping.cat.consumer.ip.IpAnalyzer; +import com.dianping.cat.consumer.ip.TopIpAnalyzer; import com.dianping.cat.consumer.logview.LogviewUploader; import com.dianping.cat.consumer.problem.ProblemAnalyzer; import com.dianping.cat.consumer.problem.handler.ErrorHandler; @@ -71,8 +71,9 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator { all.add(C(EventAnalyzer.class).is(PER_LOOKUP) // .req(BucketManager.class, ReportDao.class)); - - all.add(C(IpAnalyzer.class)); + + all.add(C(TopIpAnalyzer.class).is(PER_LOOKUP) // + .req(BucketManager.class, ReportDao.class)); all.add(C(DumpAnalyzer.class).is(PER_LOOKUP) // .req(ServerConfigManager.class, MessagePathBuilder.class) // diff --git a/cat-consumer/src/main/java/com/dianping/cat/consumer/ip/IpAnalyzer.java b/cat-consumer/src/main/java/com/dianping/cat/consumer/ip/TopIpAnalyzer.java similarity index 51% rename from cat-consumer/src/main/java/com/dianping/cat/consumer/ip/IpAnalyzer.java rename to cat-consumer/src/main/java/com/dianping/cat/consumer/ip/TopIpAnalyzer.java index 4a967a8c9..908f2d3b2 100644 --- a/cat-consumer/src/main/java/com/dianping/cat/consumer/ip/IpAnalyzer.java +++ b/cat-consumer/src/main/java/com/dianping/cat/consumer/ip/TopIpAnalyzer.java @@ -1,163 +1,273 @@ -package com.dianping.cat.consumer.ip; - -import java.util.ArrayList; -import java.util.Calendar; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.codehaus.plexus.logging.LogEnabled; -import org.codehaus.plexus.logging.Logger; - -import com.dianping.cat.consumer.ip.model.entity.AllDomains; -import com.dianping.cat.consumer.ip.model.entity.Ip; -import com.dianping.cat.consumer.ip.model.entity.IpReport; -import com.dianping.cat.consumer.ip.model.entity.Period; -import com.dianping.cat.message.Event; -import com.dianping.cat.message.Heartbeat; -import com.dianping.cat.message.Message; -import com.dianping.cat.message.Transaction; -import com.dianping.cat.message.spi.AbstractMessageAnalyzer; -import com.dianping.cat.message.spi.MessageTree; - -public class IpAnalyzer extends AbstractMessageAnalyzer implements LogEnabled { - private static final String TOKEN = "RemoteIP="; - - private Map m_reports = new HashMap(); - - private int m_lastPhase; - - private Logger m_logger; - - private void clearLastPhase() { - Calendar cal = Calendar.getInstance(); - int minute = cal.get(Calendar.MINUTE); - int currentPhase = minute / 20; // 0, 1, 2 - - if (m_lastPhase != currentPhase) { - int baseIndex = m_lastPhase * 20; - List domains = new ArrayList(); - - for (Map.Entry e : m_reports.entrySet()) { - IpReport report = e.getValue(); - Map periods = report.getPeriods(); - - for (int i = 0; i < 20; i++) { - periods.remove(baseIndex + i); - } - - if (periods.isEmpty()) { - domains.add(e.getKey()); - } - } - - for (String domain : domains) { - m_reports.remove(domain); - } - - m_lastPhase = currentPhase; - } - } - - private IpReport findOrCreateReport(String domain) { - IpReport report = m_reports.get(domain); - - if (report == null) { - synchronized (m_reports) { - report = m_reports.get(domain); - - if (report == null) { - report = new IpReport().setDomain(domain); - m_reports.put(domain, report); - } - } - } - - return report; - } - - @Override - public Set getDomains() { - return m_reports.keySet(); - } - - private String getIpAddress(Transaction root) { - List children = ((Transaction) root).getChildren(); - - for (Message child : children) { - if (child instanceof Event && child.getType().equals("URL") && child.getName().equals("ClientInfo")) { - // URL:ClientInfo RemoteIP=&... - String data = child.getData().toString(); - int off = data.indexOf(TOKEN); - - if (off >= 0) { - int pos = data.indexOf('&', off + TOKEN.length()); - - if (pos > 0) { - return data.substring(off + TOKEN.length(), pos); - } - } - break; - } else if (child instanceof Heartbeat) { - // Heartbeat: - return child.getName(); - } - } - - return null; - } - - public IpReport getReport(String domain) { - IpReport report = m_reports.get(domain); - - if (report == null) { - report = new IpReport(); - report.setDomain(domain); - } - AllDomains allDomains = new AllDomains(); - allDomains.getDomains().addAll(m_reports.keySet()); - - report.setAllDomains(allDomains); - return report; - } - - @Override - protected boolean isTimeout() { - return false; - } - - @Override - protected void process(MessageTree tree) { - Message root = tree.getMessage(); - - if (root instanceof Transaction) { - String address = getIpAddress((Transaction) root); - - if (address == null) { - address = "N/A"; - - m_logger.debug("Unable to find IP address from message: " + tree); - } - - String domain = tree.getDomain(); - Calendar cal = Calendar.getInstance(); - - cal.setTimeInMillis(root.getTimestamp()); - - int minute = cal.get(Calendar.MINUTE); - IpReport report = findOrCreateReport(domain); - Period period = report.findOrCreatePeriod(minute); - Ip ip = period.findOrCreateIp(address); - - ip.incCount(); - - clearLastPhase(); - } - } - - @Override - public void enableLogging(Logger logger) { - m_logger = logger; - } -} +package com.dianping.cat.consumer.ip; + +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.codehaus.plexus.logging.LogEnabled; +import org.codehaus.plexus.logging.Logger; + +import com.dianping.cat.Cat; +import com.dianping.cat.configuration.NetworkInterfaceManager; +import com.dianping.cat.consumer.ip.model.entity.Ip; +import com.dianping.cat.consumer.ip.model.entity.IpReport; +import com.dianping.cat.consumer.ip.model.entity.Period; +import com.dianping.cat.consumer.ip.model.transform.DefaultXmlBuilder; +import com.dianping.cat.consumer.ip.model.transform.DefaultXmlParser; +import com.dianping.cat.hadoop.dal.Report; +import com.dianping.cat.hadoop.dal.ReportDao; +import com.dianping.cat.message.Event; +import com.dianping.cat.message.Heartbeat; +import com.dianping.cat.message.Message; +import com.dianping.cat.message.Transaction; +import com.dianping.cat.message.spi.AbstractMessageAnalyzer; +import com.dianping.cat.message.spi.MessageTree; +import com.dianping.cat.storage.Bucket; +import com.dianping.cat.storage.BucketManager; +import com.site.lookup.annotation.Inject; + +public class TopIpAnalyzer extends AbstractMessageAnalyzer implements LogEnabled { + private static final String TOKEN = "RemoteIP="; + + @Inject + private BucketManager m_bucketManager; + + @Inject + private ReportDao m_reportDao; + + private Map m_reports = new HashMap(); + + private int m_lastPhase; + + private long m_extraTime; + + private long m_startTime; + + private long m_duration; + + private Logger m_logger; + + @Override + public void doCheckpoint(boolean atEnd) { + storeReports(atEnd); + } + + @Override + public void enableLogging(Logger logger) { + m_logger = logger; + } + + @Override + public Set getDomains() { + return m_reports.keySet(); + } + + public IpReport getReport(String domain) { + IpReport report = m_reports.get(domain); + + if (report == null) { + report = new IpReport(domain); + } + report.getDomainNames().clear(); + report.getDomainNames().addAll(m_reports.keySet()); + + return report; + } + + @Override + protected boolean isTimeout() { + long currentTime = System.currentTimeMillis(); + long endTime = m_startTime + m_duration + m_extraTime; + + return currentTime > endTime; + } + + private void loadReports() { + DefaultXmlParser parser = new DefaultXmlParser(); + Bucket reportBucket = null; + + try { + reportBucket = m_bucketManager.getReportBucket(m_startTime, "ip"); + + for (String id : reportBucket.getIds()) { + String xml = reportBucket.findById(id); + IpReport report = parser.parse(xml); + + m_reports.put(report.getDomain(), report); + } + } catch (Exception e) { + m_logger.error(String.format("Error when loading ip reports of %s!", new Date(m_startTime)), e); + } finally { + if (reportBucket != null) { + m_bucketManager.closeBucket(reportBucket); + } + } + } + + private void clearLastPhase() { + Calendar cal = Calendar.getInstance(); + int minute = cal.get(Calendar.MINUTE); + int currentPhase = minute / 20; // 0, 1, 2 + + if (m_lastPhase != currentPhase) { + int baseIndex = m_lastPhase * 20; + List domains = new ArrayList(); + + for (Map.Entry e : m_reports.entrySet()) { + IpReport report = e.getValue(); + Map periods = report.getPeriods(); + + for (int i = 0; i < 20; i++) { + periods.remove(baseIndex + i); + } + + if (periods.isEmpty()) { + domains.add(e.getKey()); + } + } + + for (String domain : domains) { + m_reports.remove(domain); + } + + m_lastPhase = currentPhase; + } + } + + private String getIpAddress(Transaction root) { + List children = ((Transaction) root).getChildren(); + + for (Message child : children) { + if (child instanceof Event && child.getType().equals("URL") && child.getName().equals("ClientInfo")) { + // URL:ClientInfo RemoteIP=&... + String data = child.getData().toString(); + int off = data.indexOf(TOKEN); + + if (off >= 0) { + int pos = data.indexOf('&', off + TOKEN.length()); + + if (pos > 0) { + return data.substring(off + TOKEN.length(), pos); + } + } + break; + } else if (child instanceof Heartbeat) { + // Heartbeat: + return child.getName(); + } + } + + return null; + } + + @Override + protected void process(MessageTree tree) { + Message root = tree.getMessage(); + + if (root instanceof Transaction) { + String address = getIpAddress((Transaction) root); + + if (address == null) { + address = "N/A"; + + m_logger.debug("Unable to find IP address from message: " + tree); + } + + String domain = tree.getDomain(); + Calendar cal = Calendar.getInstance(); + + cal.setTimeInMillis(root.getTimestamp()); + + int minute = cal.get(Calendar.MINUTE); + IpReport report = findOrCreateReport(domain); + Period period = report.findOrCreatePeriod(minute); + Ip ip = period.findOrCreateIp(address); + + ip.incCount(); + + clearLastPhase(); + } + } + + private IpReport findOrCreateReport(String domain) { + IpReport report = m_reports.get(domain); + + if (report == null) { + synchronized (m_reports) { + report = m_reports.get(domain); + + if (report == null) { + report = new IpReport(domain); + m_reports.put(domain, report); + } + } + } + + return report; + } + + public void setAnalyzerInfo(long startTime, long duration, long extraTime) { + m_extraTime = extraTime; + m_startTime = startTime; + m_duration = duration; + + loadReports(); + } + + private void storeReports(boolean atEnd) { + DefaultXmlBuilder builder = new DefaultXmlBuilder(true); + Bucket reportBucket = null; + Transaction t = Cat.getProducer().newTransaction("Checkpoint", getClass().getSimpleName()); + + try { + reportBucket = m_bucketManager.getReportBucket(m_startTime, "ip"); + + for (IpReport report : m_reports.values()) { + Set domainNames = report.getDomainNames(); + domainNames.clear(); + domainNames.addAll(getDomains()); + + String xml = builder.buildXml(report); + String domain = report.getDomain(); + + reportBucket.storeById(domain, xml); + } + + if (atEnd && !isLocalMode()) { + Date period = new Date(m_startTime); + String ip = NetworkInterfaceManager.INSTANCE.getLocalHostAddress(); + + for (IpReport report : m_reports.values()) { + Report r = m_reportDao.createLocal(); + String xml = builder.buildXml(report); + String domain = report.getDomain(); + + r.setName("ip"); + r.setDomain(domain); + r.setPeriod(period); + r.setIp(ip); + r.setType(1); + r.setContent(xml); + + m_reportDao.insert(r); + } + } + + t.setStatus(Message.SUCCESS); + } catch (Exception e) { + Cat.getProducer().logError(e); + t.setStatus(e); + m_logger.error(String.format("Error when storing ip reports of %s!", new Date(m_startTime)), e); + } finally { + t.complete(); + + if (reportBucket != null) { + m_bucketManager.closeBucket(reportBucket); + } + } + } +} diff --git a/cat-consumer/src/main/resources/META-INF/dal/model/event-report-codegen.xml b/cat-consumer/src/main/resources/META-INF/dal/model/event-report-codegen.xml index 6d332eb72..fd8462dd0 100644 --- a/cat-consumer/src/main/resources/META-INF/dal/model/event-report-codegen.xml +++ b/cat-consumer/src/main/resources/META-INF/dal/model/event-report-codegen.xml @@ -4,7 +4,7 @@ - + diff --git a/cat-consumer/src/main/resources/META-INF/dal/model/event-report.xml b/cat-consumer/src/main/resources/META-INF/dal/model/event-report.xml index 6f155d50d..f588c2113 100644 --- a/cat-consumer/src/main/resources/META-INF/dal/model/event-report.xml +++ b/cat-consumer/src/main/resources/META-INF/dal/model/event-report.xml @@ -3,8 +3,6 @@ enable-xml-parser="true" enable-base-visitor="true"> - - diff --git a/cat-consumer/src/main/resources/META-INF/dal/model/ip-codegen.xml b/cat-consumer/src/main/resources/META-INF/dal/model/ip-codegen.xml index 3886ae0a9..1165f1b99 100644 --- a/cat-consumer/src/main/resources/META-INF/dal/model/ip-codegen.xml +++ b/cat-consumer/src/main/resources/META-INF/dal/model/ip-codegen.xml @@ -4,11 +4,8 @@ - - - - + diff --git a/cat-consumer/src/main/resources/META-INF/dal/model/ip-model.xml b/cat-consumer/src/main/resources/META-INF/dal/model/ip-model.xml index fab74d005..5c5dec1af 100644 --- a/cat-consumer/src/main/resources/META-INF/dal/model/ip-model.xml +++ b/cat-consumer/src/main/resources/META-INF/dal/model/ip-model.xml @@ -2,11 +2,10 @@ + + - - - diff --git a/cat-consumer/src/main/resources/META-INF/plexus/components.xml b/cat-consumer/src/main/resources/META-INF/plexus/components.xml index 6ddc8233d..fd142d068 100644 --- a/cat-consumer/src/main/resources/META-INF/plexus/components.xml +++ b/cat-consumer/src/main/resources/META-INF/plexus/components.xml @@ -102,8 +102,17 @@ - com.dianping.cat.consumer.ip.IpAnalyzer - com.dianping.cat.consumer.ip.IpAnalyzer + com.dianping.cat.consumer.ip.TopIpAnalyzer + com.dianping.cat.consumer.ip.TopIpAnalyzer + per-lookup + + + com.dianping.cat.storage.BucketManager + + + com.dianping.cat.hadoop.dal.ReportDao + + com.dianping.cat.consumer.dump.DumpAnalyzer diff --git a/cat-core/src/main/java/com/dianping/cat/status/StatusInfoCollector.java b/cat-core/src/main/java/com/dianping/cat/status/StatusInfoCollector.java index 95cc5ae26..035afa02c 100644 --- a/cat-core/src/main/java/com/dianping/cat/status/StatusInfoCollector.java +++ b/cat-core/src/main/java/com/dianping/cat/status/StatusInfoCollector.java @@ -7,7 +7,6 @@ import java.lang.management.MemoryMXBean; import java.lang.management.OperatingSystemMXBean; import java.lang.management.RuntimeMXBean; import java.lang.management.ThreadMXBean; -import java.util.Arrays; import java.util.Date; import java.util.List; @@ -164,9 +163,7 @@ class StatusInfoCollector extends BaseVisitor { thread.setPeekCount(bean.getPeakThreadCount()); thread.setTotalStartedCount(bean.getTotalStartedThreadCount()); - // remove below - java.lang.management.ThreadInfo[] threads = bean.dumpAllThreads(true, true); - - System.out.println(Arrays.asList(threads)); + // TODO remove below + // System.out.println(Arrays.asList(bean.dumpAllThreads(true, true))); } } \ No newline at end of file diff --git a/cat-home/src/main/java/com/dianping/cat/report/page/ip/Model.java b/cat-home/src/main/java/com/dianping/cat/report/page/ip/Model.java index 936333b48..35d7f11de 100755 --- a/cat-home/src/main/java/com/dianping/cat/report/page/ip/Model.java +++ b/cat-home/src/main/java/com/dianping/cat/report/page/ip/Model.java @@ -40,7 +40,7 @@ public class Model extends AbstractReportModel { if (m_report == null) { return new ArrayList(); } else { - return StringSortHelper.sortDomain(m_report.getAllDomains().getDomains()); + return StringSortHelper.sortDomain(m_report.getDomainNames()); } } diff --git a/cat-home/src/main/java/com/dianping/cat/report/page/model/ip/IpReportMerger.java b/cat-home/src/main/java/com/dianping/cat/report/page/model/ip/IpReportMerger.java index 0989d83c1..bae0e18fc 100755 --- a/cat-home/src/main/java/com/dianping/cat/report/page/model/ip/IpReportMerger.java +++ b/cat-home/src/main/java/com/dianping/cat/report/page/model/ip/IpReportMerger.java @@ -17,7 +17,7 @@ public class IpReportMerger extends DefaultMerger { @Override public void visitIpReport(IpReport ipReport) { super.visitIpReport(ipReport); - getIpReport().getAllDomains().getDomains().addAll(ipReport.getAllDomains().getDomains()); + getIpReport().getDomainNames().addAll(ipReport.getDomainNames()); } } -- GitLab