提交 9803a2db 编写于 作者: F Frankie Wu

make ip report hourly

上级 168333c8
......@@ -2,7 +2,7 @@ package com.dianping.cat.consumer;
import com.dianping.cat.consumer.dump.DumpAnalyzer;
import com.dianping.cat.consumer.event.EventAnalyzer;
import com.dianping.cat.consumer.ip.IpAnalyzer;
import com.dianping.cat.consumer.ip.TopIpAnalyzer;
import com.dianping.cat.consumer.problem.ProblemAnalyzer;
import com.dianping.cat.consumer.transaction.TransactionAnalyzer;
import com.dianping.cat.message.spi.MessageAnalyzer;
......@@ -32,8 +32,9 @@ public class DefaultAnalyzerFactory extends ContainerHolder implements AnalyzerF
analyzer.setAnalyzerInfo(start, duration, extraTime);
return analyzer;
} else if (name.equals("ip")) {
IpAnalyzer analyzer = lookup(IpAnalyzer.class);
TopIpAnalyzer analyzer = lookup(TopIpAnalyzer.class);
analyzer.setAnalyzerInfo(start, duration, extraTime);
return analyzer;
}
......
......@@ -17,7 +17,7 @@ import com.dianping.cat.consumer.dump.DumpChannel;
import com.dianping.cat.consumer.dump.DumpChannelManager;
import com.dianping.cat.consumer.dump.DumpUploader;
import com.dianping.cat.consumer.event.EventAnalyzer;
import com.dianping.cat.consumer.ip.IpAnalyzer;
import com.dianping.cat.consumer.ip.TopIpAnalyzer;
import com.dianping.cat.consumer.logview.LogviewUploader;
import com.dianping.cat.consumer.problem.ProblemAnalyzer;
import com.dianping.cat.consumer.problem.handler.ErrorHandler;
......@@ -71,8 +71,9 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(EventAnalyzer.class).is(PER_LOOKUP) //
.req(BucketManager.class, ReportDao.class));
all.add(C(IpAnalyzer.class));
all.add(C(TopIpAnalyzer.class).is(PER_LOOKUP) //
.req(BucketManager.class, ReportDao.class));
all.add(C(DumpAnalyzer.class).is(PER_LOOKUP) //
.req(ServerConfigManager.class, MessagePathBuilder.class) //
......
package com.dianping.cat.consumer.ip;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import com.dianping.cat.consumer.ip.model.entity.AllDomains;
import com.dianping.cat.consumer.ip.model.entity.Ip;
import com.dianping.cat.consumer.ip.model.entity.IpReport;
import com.dianping.cat.consumer.ip.model.entity.Period;
import com.dianping.cat.message.Event;
import com.dianping.cat.message.Heartbeat;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.spi.AbstractMessageAnalyzer;
import com.dianping.cat.message.spi.MessageTree;
public class IpAnalyzer extends AbstractMessageAnalyzer<IpReport> implements LogEnabled {
private static final String TOKEN = "RemoteIP=";
private Map<String, IpReport> m_reports = new HashMap<String, IpReport>();
private int m_lastPhase;
private Logger m_logger;
private void clearLastPhase() {
Calendar cal = Calendar.getInstance();
int minute = cal.get(Calendar.MINUTE);
int currentPhase = minute / 20; // 0, 1, 2
if (m_lastPhase != currentPhase) {
int baseIndex = m_lastPhase * 20;
List<String> domains = new ArrayList<String>();
for (Map.Entry<String, IpReport> e : m_reports.entrySet()) {
IpReport report = e.getValue();
Map<Integer, Period> periods = report.getPeriods();
for (int i = 0; i < 20; i++) {
periods.remove(baseIndex + i);
}
if (periods.isEmpty()) {
domains.add(e.getKey());
}
}
for (String domain : domains) {
m_reports.remove(domain);
}
m_lastPhase = currentPhase;
}
}
private IpReport findOrCreateReport(String domain) {
IpReport report = m_reports.get(domain);
if (report == null) {
synchronized (m_reports) {
report = m_reports.get(domain);
if (report == null) {
report = new IpReport().setDomain(domain);
m_reports.put(domain, report);
}
}
}
return report;
}
@Override
public Set<String> getDomains() {
return m_reports.keySet();
}
private String getIpAddress(Transaction root) {
List<Message> children = ((Transaction) root).getChildren();
for (Message child : children) {
if (child instanceof Event && child.getType().equals("URL") && child.getName().equals("ClientInfo")) {
// URL:ClientInfo RemoteIP=<ip>&...
String data = child.getData().toString();
int off = data.indexOf(TOKEN);
if (off >= 0) {
int pos = data.indexOf('&', off + TOKEN.length());
if (pos > 0) {
return data.substring(off + TOKEN.length(), pos);
}
}
break;
} else if (child instanceof Heartbeat) {
// Heartbeat:<ip>
return child.getName();
}
}
return null;
}
public IpReport getReport(String domain) {
IpReport report = m_reports.get(domain);
if (report == null) {
report = new IpReport();
report.setDomain(domain);
}
AllDomains allDomains = new AllDomains();
allDomains.getDomains().addAll(m_reports.keySet());
report.setAllDomains(allDomains);
return report;
}
@Override
protected boolean isTimeout() {
return false;
}
@Override
protected void process(MessageTree tree) {
Message root = tree.getMessage();
if (root instanceof Transaction) {
String address = getIpAddress((Transaction) root);
if (address == null) {
address = "N/A";
m_logger.debug("Unable to find IP address from message: " + tree);
}
String domain = tree.getDomain();
Calendar cal = Calendar.getInstance();
cal.setTimeInMillis(root.getTimestamp());
int minute = cal.get(Calendar.MINUTE);
IpReport report = findOrCreateReport(domain);
Period period = report.findOrCreatePeriod(minute);
Ip ip = period.findOrCreateIp(address);
ip.incCount();
clearLastPhase();
}
}
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
}
}
package com.dianping.cat.consumer.ip;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import com.dianping.cat.Cat;
import com.dianping.cat.configuration.NetworkInterfaceManager;
import com.dianping.cat.consumer.ip.model.entity.Ip;
import com.dianping.cat.consumer.ip.model.entity.IpReport;
import com.dianping.cat.consumer.ip.model.entity.Period;
import com.dianping.cat.consumer.ip.model.transform.DefaultXmlBuilder;
import com.dianping.cat.consumer.ip.model.transform.DefaultXmlParser;
import com.dianping.cat.hadoop.dal.Report;
import com.dianping.cat.hadoop.dal.ReportDao;
import com.dianping.cat.message.Event;
import com.dianping.cat.message.Heartbeat;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.spi.AbstractMessageAnalyzer;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.storage.Bucket;
import com.dianping.cat.storage.BucketManager;
import com.site.lookup.annotation.Inject;
public class TopIpAnalyzer extends AbstractMessageAnalyzer<IpReport> implements LogEnabled {
private static final String TOKEN = "RemoteIP=";
@Inject
private BucketManager m_bucketManager;
@Inject
private ReportDao m_reportDao;
private Map<String, IpReport> m_reports = new HashMap<String, IpReport>();
private int m_lastPhase;
private long m_extraTime;
private long m_startTime;
private long m_duration;
private Logger m_logger;
@Override
public void doCheckpoint(boolean atEnd) {
storeReports(atEnd);
}
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
}
@Override
public Set<String> getDomains() {
return m_reports.keySet();
}
public IpReport getReport(String domain) {
IpReport report = m_reports.get(domain);
if (report == null) {
report = new IpReport(domain);
}
report.getDomainNames().clear();
report.getDomainNames().addAll(m_reports.keySet());
return report;
}
@Override
protected boolean isTimeout() {
long currentTime = System.currentTimeMillis();
long endTime = m_startTime + m_duration + m_extraTime;
return currentTime > endTime;
}
private void loadReports() {
DefaultXmlParser parser = new DefaultXmlParser();
Bucket<String> reportBucket = null;
try {
reportBucket = m_bucketManager.getReportBucket(m_startTime, "ip");
for (String id : reportBucket.getIds()) {
String xml = reportBucket.findById(id);
IpReport report = parser.parse(xml);
m_reports.put(report.getDomain(), report);
}
} catch (Exception e) {
m_logger.error(String.format("Error when loading ip reports of %s!", new Date(m_startTime)), e);
} finally {
if (reportBucket != null) {
m_bucketManager.closeBucket(reportBucket);
}
}
}
private void clearLastPhase() {
Calendar cal = Calendar.getInstance();
int minute = cal.get(Calendar.MINUTE);
int currentPhase = minute / 20; // 0, 1, 2
if (m_lastPhase != currentPhase) {
int baseIndex = m_lastPhase * 20;
List<String> domains = new ArrayList<String>();
for (Map.Entry<String, IpReport> e : m_reports.entrySet()) {
IpReport report = e.getValue();
Map<Integer, Period> periods = report.getPeriods();
for (int i = 0; i < 20; i++) {
periods.remove(baseIndex + i);
}
if (periods.isEmpty()) {
domains.add(e.getKey());
}
}
for (String domain : domains) {
m_reports.remove(domain);
}
m_lastPhase = currentPhase;
}
}
private String getIpAddress(Transaction root) {
List<Message> children = ((Transaction) root).getChildren();
for (Message child : children) {
if (child instanceof Event && child.getType().equals("URL") && child.getName().equals("ClientInfo")) {
// URL:ClientInfo RemoteIP=<ip>&...
String data = child.getData().toString();
int off = data.indexOf(TOKEN);
if (off >= 0) {
int pos = data.indexOf('&', off + TOKEN.length());
if (pos > 0) {
return data.substring(off + TOKEN.length(), pos);
}
}
break;
} else if (child instanceof Heartbeat) {
// Heartbeat:<ip>
return child.getName();
}
}
return null;
}
@Override
protected void process(MessageTree tree) {
Message root = tree.getMessage();
if (root instanceof Transaction) {
String address = getIpAddress((Transaction) root);
if (address == null) {
address = "N/A";
m_logger.debug("Unable to find IP address from message: " + tree);
}
String domain = tree.getDomain();
Calendar cal = Calendar.getInstance();
cal.setTimeInMillis(root.getTimestamp());
int minute = cal.get(Calendar.MINUTE);
IpReport report = findOrCreateReport(domain);
Period period = report.findOrCreatePeriod(minute);
Ip ip = period.findOrCreateIp(address);
ip.incCount();
clearLastPhase();
}
}
private IpReport findOrCreateReport(String domain) {
IpReport report = m_reports.get(domain);
if (report == null) {
synchronized (m_reports) {
report = m_reports.get(domain);
if (report == null) {
report = new IpReport(domain);
m_reports.put(domain, report);
}
}
}
return report;
}
public void setAnalyzerInfo(long startTime, long duration, long extraTime) {
m_extraTime = extraTime;
m_startTime = startTime;
m_duration = duration;
loadReports();
}
private void storeReports(boolean atEnd) {
DefaultXmlBuilder builder = new DefaultXmlBuilder(true);
Bucket<String> reportBucket = null;
Transaction t = Cat.getProducer().newTransaction("Checkpoint", getClass().getSimpleName());
try {
reportBucket = m_bucketManager.getReportBucket(m_startTime, "ip");
for (IpReport report : m_reports.values()) {
Set<String> domainNames = report.getDomainNames();
domainNames.clear();
domainNames.addAll(getDomains());
String xml = builder.buildXml(report);
String domain = report.getDomain();
reportBucket.storeById(domain, xml);
}
if (atEnd && !isLocalMode()) {
Date period = new Date(m_startTime);
String ip = NetworkInterfaceManager.INSTANCE.getLocalHostAddress();
for (IpReport report : m_reports.values()) {
Report r = m_reportDao.createLocal();
String xml = builder.buildXml(report);
String domain = report.getDomain();
r.setName("ip");
r.setDomain(domain);
r.setPeriod(period);
r.setIp(ip);
r.setType(1);
r.setContent(xml);
m_reportDao.insert(r);
}
}
t.setStatus(Message.SUCCESS);
} catch (Exception e) {
Cat.getProducer().logError(e);
t.setStatus(e);
m_logger.error(String.format("Error when storing ip reports of %s!", new Date(m_startTime)), e);
} finally {
t.complete();
if (reportBucket != null) {
m_bucketManager.closeBucket(reportBucket);
}
}
}
}
......@@ -4,7 +4,7 @@
<attribute name="domain" value-type="String" />
<attribute name="startTime" value-type="Date" format="yyyy-MM-dd HH:mm:ss" />
<attribute name="endTime" value-type="Date" format="yyyy-MM-dd HH:mm:ss" />
<element name="domain" value-type="String" />
<element name="domain" value-type="String" type="list" names="domains" />
<entity-ref name="type" type="list" names="types" />
</entity>
<entity name="type">
......
......@@ -3,8 +3,6 @@
enable-xml-parser="true" enable-base-visitor="true">
<entity name="event-report" root="true">
<attribute name="domain" key="true" />
<attribute name="startTime" value-type="Date" format="yyyy-MM-dd HH:mm:ss" />
<attribute name="endTime" value-type="Date" format="yyyy-MM-dd HH:mm:ss" />
<element name="domain" value-type="String" type="set" names="domain-names" />
<entity-ref name="type" type="map" method-find-or-create="true" />
</entity>
......
......@@ -4,11 +4,8 @@
<attribute name="domain" value-type="String" />
<attribute name="startTime" value-type="Date" format="yyyy-MM-dd HH:mm:ss" />
<attribute name="endTime" value-type="Date" format="yyyy-MM-dd HH:mm:ss" />
<entity-ref name="all-domains" />
<entity-ref name="period" type="list" names="periods" />
</entity>
<entity name="all-domains">
<element name="domain" value-type="String" type="list" names="domains" />
<entity-ref name="period" type="list" names="periods" />
</entity>
<entity name="period">
<attribute name="minute" value-type="int" />
......
......@@ -2,11 +2,10 @@
<model model-package="com.dianping.cat.consumer.ip.model" enable-merger="true" enable-json-builder="true"
enable-xml-parser="true" enable-base-visitor="true">
<entity name="ip-report" root="true">
<attribute name="domain" key="true" />
<element name="domain" value-type="String" type="set" names="domain-names" />
<entity-ref name="period" type="map" names="periods" method-find-or-create="true" />
</entity>
<entity name="all-domains">
<element name="domain" value-type="String" type="set" names="domains" />
</entity>
<entity name="period">
<attribute name="minute" value-type="int" key="true" />
<entity-ref name="ip" type="map" names="ips" method-find-or-create="true" />
......
......@@ -102,8 +102,17 @@
</requirements>
</component>
<component>
<role>com.dianping.cat.consumer.ip.IpAnalyzer</role>
<implementation>com.dianping.cat.consumer.ip.IpAnalyzer</implementation>
<role>com.dianping.cat.consumer.ip.TopIpAnalyzer</role>
<implementation>com.dianping.cat.consumer.ip.TopIpAnalyzer</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<requirements>
<requirement>
<role>com.dianping.cat.storage.BucketManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.hadoop.dal.ReportDao</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.consumer.dump.DumpAnalyzer</role>
......
......@@ -7,7 +7,6 @@ import java.lang.management.MemoryMXBean;
import java.lang.management.OperatingSystemMXBean;
import java.lang.management.RuntimeMXBean;
import java.lang.management.ThreadMXBean;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
......@@ -164,9 +163,7 @@ class StatusInfoCollector extends BaseVisitor {
thread.setPeekCount(bean.getPeakThreadCount());
thread.setTotalStartedCount(bean.getTotalStartedThreadCount());
// remove below
java.lang.management.ThreadInfo[] threads = bean.dumpAllThreads(true, true);
System.out.println(Arrays.asList(threads));
// TODO remove below
// System.out.println(Arrays.asList(bean.dumpAllThreads(true, true)));
}
}
\ No newline at end of file
......@@ -40,7 +40,7 @@ public class Model extends AbstractReportModel<Action, Context> {
if (m_report == null) {
return new ArrayList<String>();
} else {
return StringSortHelper.sortDomain(m_report.getAllDomains().getDomains());
return StringSortHelper.sortDomain(m_report.getDomainNames());
}
}
......
......@@ -17,7 +17,7 @@ public class IpReportMerger extends DefaultMerger {
@Override
public void visitIpReport(IpReport ipReport) {
super.visitIpReport(ipReport);
getIpReport().getAllDomains().getDomains().addAll(ipReport.getAllDomains().getDomains());
getIpReport().getDomainNames().addAll(ipReport.getDomainNames());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册