提交 42209d13 编写于 作者: F Frankie Wu

RealtimeConsumer bug fix and refactory

上级 ee800329
......@@ -12,7 +12,7 @@
<dependencies>
<dependency>
<groupId>com.dianping.cat</groupId>
<artifactId>cat-core</artifactId>
<artifactId>cat-hadoop</artifactId>
</dependency>
<dependency>
<groupId>com.site.common</groupId>
......
package com.dianping.cat.consumer;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
......@@ -39,22 +36,11 @@ import com.site.lookup.annotation.Inject;
* @since Jan 5, 2012
*/
public class RealtimeConsumer extends ContainerHolder implements MessageConsumer, Initializable, LogEnabled {
private static final long HOUR = 60 * 60 * 1000L;
private static final long MINUTE = 60 * 1000L;
private static final int PROCESS_PERIOD = 3;
private static final long FIVE_MINUTES = 5 * 60 * 1000L;
@Inject
private Logger m_logger;
@Inject
private String m_consumerId;
private static final long FIVE_MINUTES = 5 * MINUTE;
@Inject
private List<String> m_eligibleDomains; // domains == null means not limit
private static final long HOUR = 60 * MINUTE;
@Inject
private long m_duration = 1 * HOUR;
......@@ -63,7 +49,7 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
private long m_extraTime = FIVE_MINUTES;
@Inject
private int m_threads = 10;
private int m_threads = 20;
@Inject
private List<String> m_analyzerNames;
......@@ -73,55 +59,27 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
private ExecutorService m_executor;
private List<Period> m_periods = new ArrayList<Period>(PROCESS_PERIOD);
private Map<String, MessageAnalyzer> m_lastAnalyzers = new HashMap<String, MessageAnalyzer>();
private Set<String> m_domains = new HashSet<String>();
private Map<String, MessageAnalyzer> m_currentAnalyzers = new HashMap<String, MessageAnalyzer>();
private Logger m_logger;
private Set<String> m_domains = new HashSet<String>();
private PeriodManager m_periodManager;
@Override
public void consume(MessageTree tree) {
if (!isInDomain(tree)) {
return;
}
long timestamp = tree.getMessage().getTimestamp();
Period current = null;
for (Period period : m_periods) {
if (period.isIn(timestamp)) {
current = period;
break;
}
}
Period period = m_periodManager.findPeriod(timestamp);
if (current != null) {
List<MessageQueue> queues = current.getQueues();
if (period != null) {
period.distribute(tree);
distributeMessage(tree, queues);
} else {
long now = System.currentTimeMillis();
long nextStart = now - now % m_duration - 3 * m_duration;
String domain = tree.getDomain();
if (timestamp < now + MINUTE * 3 && timestamp >= nextStart) {
startTasks(tree);
} else {
m_logger.warn("The timestamp of message is out of range, IGNORED! \r\n" + tree);
if (!m_domains.contains(domain)) {
m_domains.add(domain);
}
}
m_domains.add(tree.getDomain());
}
private void distributeMessage(MessageTree tree, List<MessageQueue> queues) {
int size = queues.size();
for (int i = 0; i < size; i++) {
MessageQueue queue = queues.get(i);
queue.offer(tree);
} else {
m_logger.warn("The timestamp of message is out of range, IGNORED! \r\n" + tree);
}
}
......@@ -130,14 +88,14 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
Transaction t = cat.newTransaction("Checkpoint", getClass().getSimpleName());
try {
for (Map.Entry<String, MessageAnalyzer> e : m_currentAnalyzers.entrySet()) {
e.getValue().doCheckpoint();
long currentStartTime = m_periodManager.getCurrentStartTime();
Period period = m_periodManager.findPeriod(currentStartTime);
for (MessageAnalyzer analyzer : period.getAnalzyers()) {
analyzer.doCheckpoint(false);
}
t.setStatus(Message.SUCCESS);
} catch (IOException e) {
cat.logError(e);
t.setStatus(e);
} catch (RuntimeException e) {
cat.logError(e);
t.setStatus(e);
......@@ -153,147 +111,197 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
@Override
public String getConsumerId() {
return m_consumerId;
return "realtime";
}
public MessageAnalyzer getCurrentAnalyzer(String name) {
return m_currentAnalyzers.get(name);
long currentStartTime = m_periodManager.getCurrentStartTime();
Period period = m_periodManager.findPeriod(currentStartTime);
return period.getAnalyzer(name);
}
public MessageAnalyzer getLastAnalyzer(String name) {
return m_lastAnalyzers.get(name);
long lastStartTime = m_periodManager.getCurrentStartTime() - m_duration;
Period period = m_periodManager.findPeriod(lastStartTime);
return period.getAnalyzer(name);
}
@Override
public void initialize() throws InitializationException {
m_executor = Executors.newFixedThreadPool(m_threads);
}
m_periodManager = new PeriodManager(m_duration);
private boolean isInDomain(MessageTree tree) {
if (m_eligibleDomains == null || m_eligibleDomains.isEmpty()) {
return true;
} else {
return m_eligibleDomains.contains(tree.getDomain());
}
m_periodManager.setName("RealtimeConsumer-PeriodManager");
m_periodManager.start();
}
public void setAnalyzers(String analyzers) {
m_analyzerNames = Splitters.by(',').noEmptyItem().trim().split(analyzers);
}
public void setConsumerId(String consumerId) {
m_consumerId = consumerId;
public void setExtraTime(long time) {
m_extraTime = time;
}
public void setDomains(String domains) {
if (domains != null) {
m_eligibleDomains = Splitters.by(',').noEmptyItem().trim().split(domains);
class Period {
private long m_startTime;
private long m_endTime;
private List<Task> m_tasks;
public Period(long startTime, long endTime) {
m_startTime = startTime;
m_endTime = endTime;
m_tasks = new ArrayList<Task>(m_analyzerNames.size());
for (String name : m_analyzerNames) {
MessageAnalyzer analyzer = m_factory.create(name, startTime, m_duration, m_extraTime);
MessageQueue queue = lookup(MessageQueue.class);
Task task = new Task(m_factory, analyzer, queue);
m_tasks.add(task);
}
}
}
public void setDuration(long duration) {
m_duration = duration;
}
public void distribute(MessageTree tree) {
for (Task task : m_tasks) {
task.enqueue(tree);
}
}
public void setExtraTime(long time) {
m_extraTime = time;
}
public void finish() {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public void setFactory(AnalyzerFactory factory) {
m_factory = factory;
}
m_logger.info(String.format("Finishing %s tasks for period [%s, %s]", m_tasks.size(),
df.format(new Date(m_startTime)), df.format(new Date(m_endTime - 1))));
public void setThreads(int threads) {
m_threads = threads;
}
Cat.setup(null);
for (Task task : m_tasks) {
task.finish();
}
private void startTasks(MessageTree tree) {
long time = tree.getMessage().getTimestamp();
long start = time - time % m_duration;
m_logger.info("Start Tasks At " + new Date(start));
List<MessageQueue> queues = new ArrayList<MessageQueue>();
Period current = new Period(start, start + m_duration, queues);
CountDownLatch latch = new CountDownLatch(m_analyzerNames.size());
m_lastAnalyzers.clear();
m_lastAnalyzers.putAll(m_currentAnalyzers);
m_currentAnalyzers.clear();
Cat.setup("realtime-consumer");
for (String name : m_analyzerNames) {
MessageAnalyzer analyzer = m_factory.create(name, start, m_duration, m_extraTime);
MessageQueue queue = lookup(MessageQueue.class);
Task task = new Task(m_factory, analyzer, queue, latch);
queue.offer(tree);
queues.add(queue);
m_executor.submit(task);
m_currentAnalyzers.put(name, analyzer);
Cat.reset();
}
int len = m_periods.size();
public MessageAnalyzer getAnalyzer(String name) {
int index = m_analyzerNames.indexOf(name);
if (index >= 0) {
Task task = m_tasks.get(index);
return task.getAnalyzer();
}
if (len >= PROCESS_PERIOD) {
m_periods.remove(0);
return null;
}
m_periods.add(current);
}
public List<MessageAnalyzer> getAnalzyers() {
List<MessageAnalyzer> analyzers = new ArrayList<MessageAnalyzer>(m_tasks.size());
static class FinalizerTask implements Runnable {
private AnalyzerFactory m_factory;
for (Task task : m_tasks) {
analyzers.add(task.getAnalyzer());
}
return analyzers;
}
private MessageAnalyzer m_handler;
public boolean isIn(long timestamp) {
return timestamp >= m_startTime && timestamp < m_endTime;
}
public void start() {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
m_logger.info(String.format("Starting %s tasks for period [%s, %s]", m_tasks.size(),
df.format(new Date(m_startTime)), df.format(new Date(m_endTime - 1))));
for (Task task : m_tasks) {
m_executor.submit(task);
}
}
}
class PeriodManager extends Thread {
private long m_duration;
private CountDownLatch m_latch;
private List<Period> m_periods = new ArrayList<RealtimeConsumer.Period>();
public FinalizerTask(AnalyzerFactory factory, MessageAnalyzer handler, long duration, CountDownLatch latch) {
m_factory = factory;
m_handler = handler;
public PeriodManager(long duration) {
m_duration = duration;
m_latch = latch;
}
@Override
public void run() {
try {
m_latch.await(m_duration * 2, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
System.out.println("Waiting time out in FinalizerTask, do logview upload next");
}
private void endPeriod(long startTime) {
int len = m_periods.size();
try {
m_handler.doCheckpoint();
} catch (IOException e) {
e.printStackTrace();
} finally {
m_factory.release(m_handler);
for (int i = 0; i < len; i++) {
Period period = m_periods.get(i);
if (period.isIn(startTime)) {
m_periods.remove(i);
period.finish();
break;
}
}
}
}
static class Period {
private long m_startTime;
public Period findPeriod(long timestamp) {
for (Period period : m_periods) {
if (period.isIn(timestamp)) {
return period;
}
}
private long m_endTime;
return null;
}
private List<MessageQueue> m_queues;
public long getCurrentStartTime() {
long now = System.currentTimeMillis();
long time = now - now % m_duration;
public Period(long startTime, long endTime, List<MessageQueue> queues) {
m_startTime = startTime;
m_endTime = endTime;
m_queues = queues;
return time;
}
public List<MessageQueue> getQueues() {
return m_queues;
@Override
public void run() {
long now = System.currentTimeMillis();
long startTime = now - now % m_duration;
long lastStartTime = startTime;
startPeriod(startTime);
try {
while (true) {
now = System.currentTimeMillis();
// prepare next period in ahead of 3 minutes
if (now - startTime >= m_duration - 3 * MINUTE) {
startTime = now - now % m_duration + m_duration;
startPeriod(startTime);
}
// last period is over
if (now - lastStartTime >= m_duration + m_extraTime) {
endPeriod(lastStartTime);
lastStartTime = startTime;
}
sleep(1000L);
}
} catch (InterruptedException e) {
// ignore it
}
}
public boolean isIn(long timestamp) {
return timestamp >= m_startTime && timestamp < m_endTime;
private void startPeriod(long startTime) {
long endTime = startTime + m_duration;
Period period = new Period(startTime, endTime);
m_periods.add(period);
period.start();
}
}
......@@ -304,29 +312,35 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
private MessageQueue m_queue;
private CountDownLatch m_latch;
public Task(AnalyzerFactory factory, MessageAnalyzer analyzer, MessageQueue queue, CountDownLatch latch) {
public Task(AnalyzerFactory factory, MessageAnalyzer analyzer, MessageQueue queue) {
m_factory = factory;
m_analyzer = analyzer;
m_queue = queue;
m_latch = latch;
}
public MessageQueue getQueue() {
return m_queue;
public boolean enqueue(MessageTree tree) {
return m_queue.offer(tree);
}
public void run() {
Cat.setup("realtime-consumer-task");
public void finish() {
try {
m_analyzer.analyze(m_queue);
m_analyzer.doCheckpoint(true);
} finally {
m_factory.release(m_analyzer);
m_factory.release(m_queue);
m_latch.countDown();
} finally {
Cat.reset();
}
}
public MessageAnalyzer getAnalyzer() {
return m_analyzer;
}
@Override
public void run() {
try {
m_analyzer.analyze(m_queue);
} catch (RuntimeException e) {
e.printStackTrace();
}
}
}
......
......@@ -17,8 +17,8 @@ import com.dianping.cat.consumer.problem.handler.FailureHandler;
import com.dianping.cat.consumer.problem.handler.Handler;
import com.dianping.cat.consumer.problem.handler.LongUrlHandler;
import com.dianping.cat.consumer.transaction.TransactionAnalyzer;
import com.dianping.cat.hadoop.dal.ReportDao;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.storage.BucketManager;
import com.site.lookup.configuration.AbstractResourceConfigurator;
import com.site.lookup.configuration.Component;
......@@ -31,8 +31,8 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(AnalyzerFactory.class, DefaultAnalyzerFactory.class));
all.add(C(MessageConsumer.class, "realtime", RealtimeConsumer.class) //
.req(AnalyzerFactory.class).config(E("consumerId").value("realtime") //
, E("extraTime").value(property("extraTime", "300000"))//
.req(AnalyzerFactory.class) //
.config(E("extraTime").value(property("extraTime", "300000"))//
, E("analyzers").value("problem,transaction,event,ip")));
String failureTypes = "Error,RuntimeException,Exception";
......@@ -41,17 +41,17 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
.config(E("failureType").value(failureTypes)));
all.add(C(Handler.class, LONG_URL.getName(), LongUrlHandler.class) //
.req(ServerConfigManager.class));
.req(ServerConfigManager.class));
all.add(C(ProblemAnalyzer.class).is(PER_LOOKUP) //
.req(Handler.class, new String[] { FAILURE.getName(), LONG_URL.getName() }, "m_handlers") //
.req(BucketManager.class));
.req(BucketManager.class, ReportDao.class));
all.add(C(TransactionAnalyzer.class).is(PER_LOOKUP) //
.req(BucketManager.class, MessagePathBuilder.class));
.req(BucketManager.class, ReportDao.class));
all.add(C(EventAnalyzer.class).is(PER_LOOKUP) //
.req(BucketManager.class, MessagePathBuilder.class));
.req(BucketManager.class, ReportDao.class));
all.add(C(IpAnalyzer.class).is(PER_LOOKUP));
......
package com.dianping.cat.consumer.event;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
......@@ -13,17 +11,19 @@ import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import com.dianping.cat.Cat;
import com.dianping.cat.configuration.NetworkInterfaceManager;
import com.dianping.cat.consumer.event.model.entity.EventName;
import com.dianping.cat.consumer.event.model.entity.EventReport;
import com.dianping.cat.consumer.event.model.entity.EventType;
import com.dianping.cat.consumer.event.model.entity.Range;
import com.dianping.cat.consumer.event.model.transform.DefaultXmlBuilder;
import com.dianping.cat.consumer.event.model.transform.DefaultXmlParser;
import com.dianping.cat.hadoop.dal.Report;
import com.dianping.cat.hadoop.dal.ReportDao;
import com.dianping.cat.message.Event;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.spi.AbstractMessageAnalyzer;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.storage.Bucket;
import com.dianping.cat.storage.BucketManager;
......@@ -36,7 +36,7 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
private BucketManager m_bucketManager;
@Inject
private MessagePathBuilder m_pathBuilder;
private ReportDao m_reportDao;
private Map<String, EventReport> m_reports = new HashMap<String, EventReport>();
......@@ -48,7 +48,7 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
private long m_duration;
void closeMessageBuckets() {
private void closeMessageBuckets() {
Date timestamp = new Date(m_startTime);
for (String domain : m_reports.keySet()) {
......@@ -67,8 +67,8 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
}
@Override
public void doCheckpoint() throws IOException {
storeReports(m_reports.values());
public void doCheckpoint(boolean atEnd) {
storeReports(atEnd);
closeMessageBuckets();
}
......@@ -77,21 +77,6 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
m_logger = logger;
}
@Override
protected List<EventReport> generate() {
List<EventReport> reports = new ArrayList<EventReport>(m_reports.size());
StatisticsComputer computer = new StatisticsComputer();
for (String domain : m_reports.keySet()) {
EventReport report = getReport(domain);
report.accept(computer);
reports.add(report);
}
return reports;
}
public EventReport getReport(String domain) {
EventReport report = m_reports.get(domain);
......@@ -114,7 +99,7 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
return currentTime > endTime;
}
void loadReports() {
private void loadReports() {
DefaultXmlParser parser = new DefaultXmlParser();
Bucket<String> reportBucket = null;
......@@ -165,10 +150,10 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
}
}
int processEvent(EventReport report, MessageTree tree, Event event) {
private int processEvent(EventReport report, MessageTree tree, Event event) {
EventType type = report.findOrCreateType(event.getType());
EventName name = type.findOrCreateName(event.getName());
String url = m_pathBuilder.getLogViewPath(tree.getMessageId());
String messageId = tree.getMessageId();
int count = 0;
synchronized (type) {
......@@ -177,12 +162,12 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
if (event.isSuccess()) {
if (type.getSuccessMessageUrl() == null) {
type.setSuccessMessageUrl(url);
type.setSuccessMessageUrl(messageId);
count++;
}
if (name.getSuccessMessageUrl() == null) {
name.setSuccessMessageUrl(url);
name.setSuccessMessageUrl(messageId);
count++;
}
} else {
......@@ -190,12 +175,12 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
name.incFailCount();
if (type.getFailMessageUrl() == null) {
type.setFailMessageUrl(url);
type.setFailMessageUrl(messageId);
count++;
}
if (name.getFailMessageUrl() == null) {
name.setFailMessageUrl(url);
name.setFailMessageUrl(messageId);
count++;
}
}
......@@ -206,7 +191,7 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
return count;
}
void processEventGrpah(EventName name, Event t) {
private void processEventGrpah(EventName name, Event t) {
Calendar cal = Calendar.getInstance();
cal.setTimeInMillis(t.getTimestamp());
int min = cal.get(Calendar.MINUTE);
......@@ -223,7 +208,7 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
}
}
int processTransaction(EventReport report, MessageTree tree, Transaction t) {
private int processTransaction(EventReport report, MessageTree tree, Transaction t) {
List<Message> children = t.getChildren();
int count = 0;
......@@ -246,17 +231,7 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
loadReports();
}
@Override
protected void store(List<EventReport> reports) {
if (reports == null || reports.size() == 0) {
return;
}
storeReports(reports);
closeMessageBuckets();
}
void storeMessage(MessageTree tree) {
private void storeMessage(MessageTree tree) {
String messageId = tree.getMessageId();
String domain = tree.getDomain();
......@@ -269,7 +244,7 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
}
}
void storeReports(Collection<EventReport> reports) {
private void storeReports(boolean atEnd) {
DefaultXmlBuilder builder = new DefaultXmlBuilder(true);
Bucket<String> reportBucket = null;
Transaction t = Cat.getProducer().newTransaction("Checkpoint", getClass().getSimpleName());
......@@ -277,13 +252,33 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
try {
reportBucket = m_bucketManager.getReportBucket(m_startTime, "event");
for (EventReport report : reports) {
for (EventReport report : m_reports.values()) {
String xml = builder.buildXml(report);
String domain = report.getDomain();
reportBucket.storeById(domain, xml);
}
if (atEnd && !isLocalMode()) {
Date period = new Date(m_startTime);
String ip = NetworkInterfaceManager.INSTANCE.getLocalHostAddress();
for (EventReport report : m_reports.values()) {
Report r = m_reportDao.createLocal();
String xml = builder.buildXml(report);
String domain = report.getDomain();
r.setName("event");
r.setDomain(domain);
r.setPeriod(period);
r.setIp(ip);
r.setType(1);
r.setContent(xml);
m_reportDao.insert(r);
}
}
t.setStatus(Message.SUCCESS);
} catch (Exception e) {
Cat.getProducer().logError(e);
......@@ -296,5 +291,7 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
m_bucketManager.closeBucket(reportBucket);
}
}
// TODO
}
}
......@@ -70,19 +70,6 @@ public class IpAnalyzer extends AbstractMessageAnalyzer<IpReport> {
return report;
}
@Override
public List<IpReport> generate() {
List<IpReport> reports = new ArrayList<IpReport>(m_reports.size());
for (String domain : m_reports.keySet()) {
IpReport report = getReport(domain);
reports.add(report);
}
return reports;
}
public IpReport getReport(String domain) {
IpReport report = m_reports.get(domain);
......@@ -155,9 +142,4 @@ public class IpAnalyzer extends AbstractMessageAnalyzer<IpReport> {
clearLastPhase();
}
}
@Override
protected void store(List<IpReport> reports) {
}
}
package com.dianping.cat.consumer.problem;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
......@@ -13,6 +10,7 @@ import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import com.dianping.cat.Cat;
import com.dianping.cat.configuration.NetworkInterfaceManager;
import com.dianping.cat.consumer.problem.handler.Handler;
import com.dianping.cat.consumer.problem.model.entity.AllDomains;
import com.dianping.cat.consumer.problem.model.entity.JavaThread;
......@@ -21,6 +19,8 @@ import com.dianping.cat.consumer.problem.model.entity.ProblemReport;
import com.dianping.cat.consumer.problem.model.entity.Segment;
import com.dianping.cat.consumer.problem.model.transform.DefaultXmlBuilder;
import com.dianping.cat.consumer.problem.model.transform.DefaultXmlParser;
import com.dianping.cat.hadoop.dal.Report;
import com.dianping.cat.hadoop.dal.ReportDao;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.spi.AbstractMessageAnalyzer;
......@@ -35,6 +35,9 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
@Inject
private BucketManager m_bucketManager;
@Inject
private ReportDao m_reportDao;
@Inject
private List<Handler> m_handlers;
......@@ -48,7 +51,7 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
private long m_duration;
void closeMessageBuckets() {
private void closeMessageBuckets() {
Date timestamp = new Date(m_startTime);
for (String domain : m_reports.keySet()) {
......@@ -67,8 +70,8 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
}
@Override
public void doCheckpoint() throws IOException {
storeReports(m_reports.values());
public void doCheckpoint(boolean atEnd) {
storeReports(atEnd);
closeMessageBuckets();
}
......@@ -90,19 +93,6 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
return segment;
}
@Override
protected List<ProblemReport> generate() {
List<ProblemReport> reports = new ArrayList<ProblemReport>(m_reports.size());
for (String domain : m_reports.keySet()) {
ProblemReport report = getReport(domain);
reports.add(report);
}
return reports;
}
public ProblemReport getReport(String domain) {
ProblemReport report = m_reports.get(domain);
......@@ -128,7 +118,7 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
return currentTime > endTime;
}
void loadReports() {
private void loadReports() {
DefaultXmlParser parser = new DefaultXmlParser();
Bucket<String> bucket = null;
......@@ -183,17 +173,7 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
loadReports();
}
@Override
protected void store(List<ProblemReport> reports) {
if (reports == null || reports.size() == 0) {
return;
}
storeReports(reports);
closeMessageBuckets();
}
void storeMessage(MessageTree tree) {
private void storeMessage(MessageTree tree) {
String messageId = tree.getMessageId();
String domain = tree.getDomain();
......@@ -206,7 +186,7 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
}
}
void storeReports(Collection<ProblemReport> reports) {
private void storeReports(boolean atEnd) {
DefaultXmlBuilder builder = new DefaultXmlBuilder(true);
Bucket<String> reportBucket = null;
Transaction t = Cat.getProducer().newTransaction("Checkpoint", getClass().getSimpleName());
......@@ -214,13 +194,33 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
try {
reportBucket = m_bucketManager.getReportBucket(m_startTime, "problem");
for (ProblemReport report : reports) {
for (ProblemReport report : m_reports.values()) {
String xml = builder.buildXml(report);
String domain = report.getDomain();
reportBucket.storeById(domain, xml);
}
if (atEnd && !isLocalMode()) {
Date period = new Date(m_startTime);
String ip = NetworkInterfaceManager.INSTANCE.getLocalHostAddress();
for (ProblemReport report : m_reports.values()) {
Report r = m_reportDao.createLocal();
String xml = builder.buildXml(report);
String domain = report.getDomain();
r.setName("event");
r.setDomain(domain);
r.setPeriod(period);
r.setIp(ip);
r.setType(1);
r.setContent(xml);
m_reportDao.insert(r);
}
}
t.setStatus(Message.SUCCESS);
} catch (Exception e) {
Cat.getProducer().logError(e);
......
package com.dianping.cat.consumer.transaction;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
......@@ -13,6 +11,7 @@ import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import com.dianping.cat.Cat;
import com.dianping.cat.configuration.NetworkInterfaceManager;
import com.dianping.cat.consumer.transaction.model.entity.Duration;
import com.dianping.cat.consumer.transaction.model.entity.Range;
import com.dianping.cat.consumer.transaction.model.entity.TransactionName;
......@@ -20,10 +19,11 @@ import com.dianping.cat.consumer.transaction.model.entity.TransactionReport;
import com.dianping.cat.consumer.transaction.model.entity.TransactionType;
import com.dianping.cat.consumer.transaction.model.transform.DefaultXmlBuilder;
import com.dianping.cat.consumer.transaction.model.transform.DefaultXmlParser;
import com.dianping.cat.hadoop.dal.Report;
import com.dianping.cat.hadoop.dal.ReportDao;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.spi.AbstractMessageAnalyzer;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.storage.Bucket;
import com.dianping.cat.storage.BucketManager;
......@@ -36,19 +36,19 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
private BucketManager m_bucketManager;
@Inject
private MessagePathBuilder m_pathBuilder;
private ReportDao m_reportDao;
private Map<String, TransactionReport> m_reports = new HashMap<String, TransactionReport>();
private long m_extraTime;
private Logger m_logger;
private long m_startTime;
private long m_duration;
void closeMessageBuckets() {
private Logger m_logger;
private void closeMessageBuckets() {
for (String domain : m_reports.keySet()) {
Bucket<MessageTree> logviewBucket = null;
......@@ -65,8 +65,8 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
}
@Override
public void doCheckpoint() throws IOException {
storeReports(m_reports.values());
public void doCheckpoint(boolean atEnd) {
storeReports(atEnd);
closeMessageBuckets();
}
......@@ -75,21 +75,6 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
m_logger = logger;
}
@Override
protected List<TransactionReport> generate() {
List<TransactionReport> reports = new ArrayList<TransactionReport>(m_reports.size());
StatisticsComputer computer = new StatisticsComputer();
for (String domain : m_reports.keySet()) {
TransactionReport report = getReport(domain);
report.accept(computer);
reports.add(report);
}
return reports;
}
@Override
public TransactionReport getReport(String domain) {
TransactionReport report = m_reports.get(domain);
......@@ -113,7 +98,7 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
return currentTime > endTime;
}
void loadReports() {
private void loadReports() {
DefaultXmlParser parser = new DefaultXmlParser();
Bucket<String> bucket = null;
......@@ -163,7 +148,7 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
int processTransaction(TransactionReport report, MessageTree tree, Transaction t) {
TransactionType type = report.findOrCreateType(t.getType());
TransactionName name = type.findOrCreateName(t.getName());
String url = m_pathBuilder.getLogViewPath(tree.getMessageId());
String messageId = tree.getMessageId();
int count = 0;
synchronized (type) {
......@@ -172,12 +157,12 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
if (t.isSuccess()) {
if (type.getSuccessMessageUrl() == null) {
type.setSuccessMessageUrl(url);
type.setSuccessMessageUrl(messageId);
count++;
}
if (name.getSuccessMessageUrl() == null) {
name.setSuccessMessageUrl(url);
name.setSuccessMessageUrl(messageId);
count++;
}
} else {
......@@ -185,12 +170,12 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
name.incFailCount();
if (type.getFailMessageUrl() == null) {
type.setFailMessageUrl(url);
type.setFailMessageUrl(messageId);
count++;
}
if (name.getFailMessageUrl() == null) {
name.setFailMessageUrl(url);
name.setFailMessageUrl(messageId);
count++;
}
}
......@@ -222,7 +207,7 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
return count;
}
void processTransactionGrpah(TransactionName name, Transaction t) {
private void processTransactionGrpah(TransactionName name, Transaction t) {
long d = t.getDurationInMillis();
Calendar cal = Calendar.getInstance();
cal.setTimeInMillis(t.getTimestamp());
......@@ -257,17 +242,7 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
loadReports();
}
@Override
protected void store(List<TransactionReport> reports) {
if (reports == null || reports.size() == 0) {
return;
}
storeReports(reports);
closeMessageBuckets();
}
void storeMessage(MessageTree tree) {
private void storeMessage(MessageTree tree) {
String messageId = tree.getMessageId();
String domain = tree.getDomain();
......@@ -280,7 +255,7 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
}
}
void storeReports(Collection<TransactionReport> reports) {
private void storeReports(boolean atEnd) {
DefaultXmlBuilder builder = new DefaultXmlBuilder(true);
Transaction t = Cat.getProducer().newTransaction("Checkpoint", getClass().getSimpleName());
Bucket<String> reportBucket = null;
......@@ -288,13 +263,33 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
try {
reportBucket = m_bucketManager.getReportBucket(m_startTime, "transaction");
for (TransactionReport report : reports) {
for (TransactionReport report : m_reports.values()) {
String xml = builder.buildXml(report);
String domain = report.getDomain();
reportBucket.storeById(domain, xml);
}
if (atEnd && !isLocalMode()) {
Date period = new Date(m_startTime);
String ip = NetworkInterfaceManager.INSTANCE.getLocalHostAddress();
for (TransactionReport report : m_reports.values()) {
Report r = m_reportDao.createLocal();
String xml = builder.buildXml(report);
String domain = report.getDomain();
r.setName("transaction");
r.setDomain(domain);
r.setPeriod(period);
r.setIp(ip);
r.setType(1);
r.setContent(xml);
m_reportDao.insert(r);
}
}
t.setStatus(Message.SUCCESS);
} catch (Exception e) {
Cat.getProducer().logError(e);
......
......@@ -9,7 +9,6 @@
<role-hint>realtime</role-hint>
<implementation>com.dianping.cat.consumer.RealtimeConsumer</implementation>
<configuration>
<consumerId>realtime</consumerId>
<extraTime>300000</extraTime>
<analyzers>problem,transaction,event,ip</analyzers>
</configuration>
......@@ -53,6 +52,9 @@
<requirement>
<role>com.dianping.cat.storage.BucketManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.hadoop.dal.ReportDao</role>
</requirement>
</requirements>
</component>
<component>
......@@ -64,7 +66,7 @@
<role>com.dianping.cat.storage.BucketManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.message.spi.MessagePathBuilder</role>
<role>com.dianping.cat.hadoop.dal.ReportDao</role>
</requirement>
</requirements>
</component>
......@@ -77,7 +79,7 @@
<role>com.dianping.cat.storage.BucketManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.message.spi.MessagePathBuilder</role>
<role>com.dianping.cat.hadoop.dal.ReportDao</role>
</requirement>
</requirements>
</component>
......
package com.dianping.cat.consumer;
import java.util.List;
import junit.framework.Assert;
import org.junit.Test;
......@@ -52,15 +50,6 @@ public class ManyAnalyzerTest extends ComponentTestCase {
return false;
}
@Override
public List<AnalyzerResult> generate() {
return null;
}
@Override
protected void store(List<AnalyzerResult> result) {
}
@Override
public AnalyzerResult getReport(String domain) {
return null;
......@@ -79,15 +68,6 @@ public class ManyAnalyzerTest extends ComponentTestCase {
return false;
}
@Override
public List<AnalyzerResult> generate() {
return null;
}
@Override
protected void store(List<AnalyzerResult> result) {
}
@Override
public AnalyzerResult getReport(String domain) {
return null;
......@@ -107,18 +87,9 @@ public class ManyAnalyzerTest extends ComponentTestCase {
}
@Override
public List<AnalyzerResult> generate() {
public AnalyzerResult getReport(String domain) {
return null;
}
@Override
protected void store(List<AnalyzerResult> result) {
}
@Override
public AnalyzerResult getReport(String domain) {
return null;
}
}
public static class AnalyzerResult {
......
package com.dianping.cat.consumer;
import java.util.List;
import junit.framework.Assert;
import org.junit.Test;
......@@ -71,15 +69,6 @@ public class OneAnalyzerTwoDurationTest extends ComponentTestCase {
return false;
}
@Override
public List<AnalyzerResult> generate() {
return null;
}
@Override
protected void store(List<AnalyzerResult> result) {
}
@Override
public AnalyzerResult getReport(String domain) {
return null;
......
......@@ -22,7 +22,7 @@ public class DefaultMessageQueue implements MessageQueue, Initializable {
if (m_size > 0) {
m_queue = new LinkedBlockingQueue<MessageTree>(m_size);
} else {
m_queue = new LinkedBlockingQueue<MessageTree>(1000);
m_queue = new LinkedBlockingQueue<MessageTree>(10000);
}
}
......
package com.dianping.cat.message.spi;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
public abstract class AbstractMessageAnalyzer<R> implements MessageAnalyzer {
import com.dianping.cat.configuration.ServerConfigManager;
import com.dianping.cat.configuration.server.entity.ServerConfig;
import com.site.lookup.ContainerHolder;
public abstract class AbstractMessageAnalyzer<R> extends ContainerHolder implements MessageAnalyzer {
@Override
public void analyze(MessageQueue queue) {
while (!isTimeout()) {
......@@ -27,18 +30,24 @@ public abstract class AbstractMessageAnalyzer<R> implements MessageAnalyzer {
break;
}
}
}
List<R> result = generate();
protected boolean isLocalMode() {
ServerConfigManager manager = lookup(ServerConfigManager.class);
ServerConfig config = manager.getServerConfig();
store(result);
// local mode should be turned off explicitly
if (config != null && !config.isLocalMode()) {
return false;
} else {
return true;
}
}
public void doCheckpoint() throws IOException {
public void doCheckpoint(boolean atEnd) {
// override it
}
protected abstract List<R> generate();
protected List<String> getSortedDomains(Set<String> domains) {
List<String> sortedDomains = new ArrayList<String>(domains);
......@@ -47,6 +56,8 @@ public abstract class AbstractMessageAnalyzer<R> implements MessageAnalyzer {
public int compare(String d1, String d2) {
if (d1.equals("Cat")) {
return 1;
} else if (d2.equals("Cat")) {
return -1;
}
return d1.compareTo(d2);
......@@ -61,6 +72,4 @@ public abstract class AbstractMessageAnalyzer<R> implements MessageAnalyzer {
protected abstract boolean isTimeout();
protected abstract void process(MessageTree tree);
protected abstract void store(List<R> result);
}
package com.dianping.cat.message.spi;
import java.io.IOException;
public interface MessageAnalyzer {
public void analyze(MessageQueue queue);
public void doCheckpoint() throws IOException;
public void doCheckpoint(boolean atEnd);
}
<?xml version="1.0" encoding="UTF-8"?>
<entities do-package="com.dianping.cat.hadoop.dal" gen="true">
<entity name="logview">
<member name="creation-date" insert-expr="NOW()"/>
<var name="direction" value-type="boolean" />
<var name="message-ids" value-type="String[]" />
<query-defs>
......@@ -60,7 +61,7 @@
</query-defs>
</entity>
<entity name="report" table="report" alias="r">
<member name="creation-date" insert-expr="now()" />
<member name="creation-date" insert-expr="NOW()" />
<query-defs>
<query name="find-all-by-period-domain-type-name" type="SELECT" multiple="true">
<param name="period" />
......
......@@ -12,6 +12,7 @@ import com.dianping.cat.report.page.model.event.CompositeEventService;
import com.dianping.cat.report.page.model.event.HistoricalEventService;
import com.dianping.cat.report.page.model.event.LocalEventService;
import com.dianping.cat.report.page.model.ip.CompositeIpService;
import com.dianping.cat.report.page.model.ip.HistoricalIpService;
import com.dianping.cat.report.page.model.ip.LocalIpService;
import com.dianping.cat.report.page.model.logview.CompositeLogViewService;
import com.dianping.cat.report.page.model.logview.HistoricalLogViewService;
......@@ -55,8 +56,10 @@ class ServiceComponentConfigurator extends AbstractResourceConfigurator {
all.add(C(ModelService.class, "ip-local", LocalIpService.class) //
.req(MessageConsumer.class, "realtime"));
all.add(C(ModelService.class, "ip-historical", HistoricalIpService.class) //
.req(BucketManager.class, ReportDao.class));
all.add(C(ModelService.class, "ip", CompositeIpService.class) //
.req(ModelService.class, new String[] { "ip-local" }, "m_services"));
.req(ModelService.class, new String[] { "ip-local", "ip-historical" }, "m_services"));
all.add(C(ModelService.class, "logview-local", LocalLogViewService.class) //
.req(MessageConsumer.class, "realtime") //
......
package com.dianping.cat.report.page.model.ip;
import java.util.Date;
import java.util.List;
import com.dianping.cat.consumer.ip.model.entity.IpReport;
import com.dianping.cat.consumer.ip.model.transform.DefaultXmlParser;
import com.dianping.cat.hadoop.dal.Report;
import com.dianping.cat.hadoop.dal.ReportDao;
import com.dianping.cat.hadoop.dal.ReportEntity;
import com.dianping.cat.report.page.model.spi.ModelRequest;
import com.dianping.cat.report.page.model.spi.internal.BaseHistoricalModelService;
import com.dianping.cat.storage.Bucket;
import com.dianping.cat.storage.BucketManager;
import com.site.lookup.annotation.Inject;
public class HistoricalIpService extends BaseHistoricalModelService<IpReport> {
@Inject
private ReportDao m_reportDao;
@Inject
private BucketManager m_bucketManager;
public HistoricalIpService() {
super("ip");
}
@Override
protected IpReport buildModel(ModelRequest request) throws Exception {
String domain = request.getDomain();
long date = Long.parseLong(request.getProperty("date"));
IpReport report = getLocalReport(date, domain);
// try remote report
if (report == null && !isLocalMode()) {
report = getRemoteReport(date, domain);
}
return report;
}
private IpReport getLocalReport(long timestamp, String domain) throws Exception {
Bucket<String> bucket = m_bucketManager.getReportBucket(timestamp, "ip");
String xml = bucket.findById(domain);
DefaultXmlParser parser = new DefaultXmlParser();
return parser.parse(xml);
}
private IpReport getRemoteReport(long timestamp, String domain) throws Exception {
List<Report> reports = m_reportDao.findAllByPeriodDomainTypeName(new Date(timestamp), domain, 1, getName(),
ReportEntity.READSET_FULL);
DefaultXmlParser parser = new DefaultXmlParser();
IpReportMerger merger = null;
for (Report report : reports) {
String xml = report.getContent();
IpReport model = parser.parse(xml);
if (merger == null) {
merger = new IpReportMerger(model);
} else {
model.accept(merger);
}
}
return merger.getIpReport();
}
}
......@@ -154,6 +154,19 @@
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.report.page.model.spi.ModelService</role>
<role-hint>ip-historical</role-hint>
<implementation>com.dianping.cat.report.page.model.ip.HistoricalIpService</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.storage.BucketManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.hadoop.dal.ReportDao</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.report.page.model.spi.ModelService</role>
<role-hint>ip</role-hint>
......@@ -163,6 +176,7 @@
<role>com.dianping.cat.report.page.model.spi.ModelService</role>
<role-hints>
<role-hint>ip-local</role-hint>
<role-hint>ip-historical</role-hint>
</role-hints>
<field-name>m_services</field-name>
</requirement>
......@@ -256,15 +270,6 @@
<role>com.dianping.cat.consumer.RealtimeConsumer</role>
<implementation>com.dianping.cat.consumer.RealtimeConsumer</implementation>
<requirements>
<requirement>
<role>org.codehaus.plexus.logging.Logger</role>
</requirement>
<requirement>
<role>java.lang.String</role>
</requirement>
<requirement>
<role>java.util.List</role>
</requirement>
<requirement>
<role>long</role>
</requirement>
......
<plexus>
<components>
<component>
<role>org.codehaus.plexus.logging.LoggerManager</role>
<implementation>com.site.lookup.logger.TimedConsoleLoggerManager</implementation>
</component>
</components>
</plexus>
\ No newline at end of file
......@@ -2,6 +2,8 @@ package com.dianping.cat;
import java.io.File;
import java.net.URL;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.codehaus.plexus.PlexusContainer;
import org.junit.AfterClass;
......@@ -82,6 +84,10 @@ public class TestServer extends SimpleServerSupport {
return 2281;
}
protected String getTimestamp() {
return new SimpleDateFormat("MM-dd HH:mm:ss.SSS").format(new Date());
}
@Override
protected File getWarRoot() {
return new File("src/main/webapp");
......@@ -107,7 +113,7 @@ public class TestServer extends SimpleServerSupport {
// open the page in the default browser
s_adaptor.display("/cat/r");
System.out.println(String.format("[%s] Press any key to stop server ... ", getTimestamp()));
System.out.println(String.format("[%s] [INFO] Press any key to stop server ... ", getTimestamp()));
System.in.read();
}
......
......@@ -39,7 +39,7 @@
<dependency>
<groupId>com.site.common</groupId>
<artifactId>lookup</artifactId>
<version>1.0.5</version>
<version>1.0.6</version>
</dependency>
<dependency>
<groupId>junit</groupId>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册