From 36d93aad8cb977d1e71907cc0e816e0339feab37 Mon Sep 17 00:00:00 2001 From: Frankie Wu Date: Fri, 23 Mar 2012 13:36:52 +0800 Subject: [PATCH] adjust cat type and name for model services --- .../cat/consumer/RealtimeConsumer.java | 2 +- .../cat/consumer/event/EventAnalyzer.java | 24 +++++++------------ .../cat/consumer/problem/ProblemAnalyzer.java | 24 +++++++------------ .../transaction/TransactionAnalyzer.java | 24 +++++++------------ .../internal/BaseCompositeModelService.java | 2 +- .../internal/BaseHistoricalModelService.java | 2 +- .../spi/internal/BaseLocalModelService.java | 2 +- .../spi/internal/BaseRemoteModelService.java | 2 +- .../cat/job/storage/RemoteStringBucket.java | 10 ++++++++ 9 files changed, 42 insertions(+), 50 deletions(-) diff --git a/cat-consumer/src/main/java/com/dianping/cat/consumer/RealtimeConsumer.java b/cat-consumer/src/main/java/com/dianping/cat/consumer/RealtimeConsumer.java index 40210018c..d083e42a4 100644 --- a/cat-consumer/src/main/java/com/dianping/cat/consumer/RealtimeConsumer.java +++ b/cat-consumer/src/main/java/com/dianping/cat/consumer/RealtimeConsumer.java @@ -127,7 +127,7 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer public void doCheckpoint() throws IOException { MessageProducer cat = Cat.getProducer(); - Transaction t = cat.newTransaction(getClass().getSimpleName(), "checkpoint"); + Transaction t = cat.newTransaction("Checkpoint", getClass().getSimpleName()); t.setStatus(Message.SUCCESS); diff --git a/cat-consumer/src/main/java/com/dianping/cat/consumer/event/EventAnalyzer.java b/cat-consumer/src/main/java/com/dianping/cat/consumer/event/EventAnalyzer.java index a4f6eb175..d5b4110ac 100644 --- a/cat-consumer/src/main/java/com/dianping/cat/consumer/event/EventAnalyzer.java +++ b/cat-consumer/src/main/java/com/dianping/cat/consumer/event/EventAnalyzer.java @@ -24,7 +24,6 @@ import com.dianping.cat.consumer.event.model.transform.DefaultXmlBuilder; import com.dianping.cat.consumer.event.model.transform.DefaultXmlParser; import com.dianping.cat.message.Event; import com.dianping.cat.message.Message; -import com.dianping.cat.message.MessageProducer; import com.dianping.cat.message.Transaction; import com.dianping.cat.message.spi.AbstractMessageAnalyzer; import com.dianping.cat.message.spi.MessagePathBuilder; @@ -78,20 +77,8 @@ public class EventAnalyzer extends AbstractMessageAnalyzer implemen @Override public void doCheckpoint() throws IOException { - MessageProducer cat = Cat.getProducer(); - Event t = cat.newEvent(getClass().getSimpleName(), "checkpoint"); - - try { - storeReports(m_reports.values()); - closeMessageBuckets(m_reports.keySet()); - - t.setStatus(Message.SUCCESS); - } catch (Exception e) { - cat.logError(e); - t.setStatus(e); - } finally { - t.complete(); - } + storeReports(m_reports.values()); + closeMessageBuckets(m_reports.keySet()); } @Override @@ -308,6 +295,7 @@ public class EventAnalyzer extends AbstractMessageAnalyzer implemen DefaultXmlBuilder builder = new DefaultXmlBuilder(true); Bucket localBucket = null; Bucket remoteBucket = null; + Transaction t = Cat.getProducer().newTransaction("Checkpoint", getClass().getSimpleName()); try { localBucket = m_bucketManager.getReportBucket(timestamp, "event", "local"); @@ -323,9 +311,15 @@ public class EventAnalyzer extends AbstractMessageAnalyzer implemen localBucket.storeById(domain, xml); remoteBucket.storeById(domain, xml); } + + t.setStatus(Message.SUCCESS); } catch (Exception e) { + Cat.getProducer().logError(e); + t.setStatus(e); m_logger.error(String.format("Error when storing event reports of %s!", timestamp), e); } finally { + t.complete(); + if (localBucket != null) { m_bucketManager.closeBucket(localBucket); } diff --git a/cat-consumer/src/main/java/com/dianping/cat/consumer/problem/ProblemAnalyzer.java b/cat-consumer/src/main/java/com/dianping/cat/consumer/problem/ProblemAnalyzer.java index 8074de648..806001d84 100644 --- a/cat-consumer/src/main/java/com/dianping/cat/consumer/problem/ProblemAnalyzer.java +++ b/cat-consumer/src/main/java/com/dianping/cat/consumer/problem/ProblemAnalyzer.java @@ -25,7 +25,6 @@ import com.dianping.cat.consumer.problem.model.entity.Segment; import com.dianping.cat.consumer.problem.model.transform.DefaultXmlBuilder; import com.dianping.cat.consumer.problem.model.transform.DefaultXmlParser; import com.dianping.cat.message.Message; -import com.dianping.cat.message.MessageProducer; import com.dianping.cat.message.Transaction; import com.dianping.cat.message.spi.AbstractMessageAnalyzer; import com.dianping.cat.message.spi.MessageTree; @@ -78,20 +77,8 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer impl @Override public void doCheckpoint() throws IOException { - MessageProducer cat = Cat.getProducer(); - Transaction t = cat.newTransaction(getClass().getSimpleName(), "checkpoint"); - - try { - storeReports(m_reports.values()); - closeMessageBuckets(m_reports.keySet()); - - t.setStatus(Message.SUCCESS); - } catch (Exception e) { - cat.logError(e); - t.setStatus(e); - } finally { - t.complete(); - } + storeReports(m_reports.values()); + closeMessageBuckets(m_reports.keySet()); } @Override @@ -248,6 +235,7 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer impl DefaultXmlBuilder builder = new DefaultXmlBuilder(true); Bucket localBucket = null; Bucket remoteBucket = null; + Transaction t = Cat.getProducer().newTransaction("Checkpoint", getClass().getSimpleName()); try { localBucket = m_bucketManager.getReportBucket(timestamp, "problem", "local"); @@ -263,9 +251,15 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer impl localBucket.storeById(domain, xml); remoteBucket.storeById(domain, xml); } + + t.setStatus(Message.SUCCESS); } catch (Exception e) { + Cat.getProducer().logError(e); + t.setStatus(e); m_logger.error(String.format("Error when storing problem reports to %s!", timestamp), e); } finally { + t.complete(); + if (localBucket != null) { m_bucketManager.closeBucket(localBucket); } diff --git a/cat-consumer/src/main/java/com/dianping/cat/consumer/transaction/TransactionAnalyzer.java b/cat-consumer/src/main/java/com/dianping/cat/consumer/transaction/TransactionAnalyzer.java index 5ff9e600d..45e6d5042 100644 --- a/cat-consumer/src/main/java/com/dianping/cat/consumer/transaction/TransactionAnalyzer.java +++ b/cat-consumer/src/main/java/com/dianping/cat/consumer/transaction/TransactionAnalyzer.java @@ -22,7 +22,6 @@ import com.dianping.cat.consumer.transaction.model.entity.TransactionType; import com.dianping.cat.consumer.transaction.model.transform.DefaultXmlBuilder; import com.dianping.cat.consumer.transaction.model.transform.DefaultXmlParser; import com.dianping.cat.message.Message; -import com.dianping.cat.message.MessageProducer; import com.dianping.cat.message.Transaction; import com.dianping.cat.message.spi.AbstractMessageAnalyzer; import com.dianping.cat.message.spi.MessagePathBuilder; @@ -76,20 +75,8 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer localBucket = null; Bucket remoteBucket = null; + Transaction t = Cat.getProducer().newTransaction("Checkpoint", getClass().getSimpleName()); try { localBucket = m_bucketManager.getReportBucket(timestamp, "transaction", "local"); @@ -318,9 +306,15 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer extends ModelServiceWithCalSu int size = m_allServices.size(); final List> responses = new ArrayList>(size); final Semaphore semaphore = new Semaphore(0); - final Transaction t = Cat.getProducer().newTransaction(getClass().getSimpleName(), m_name); + final Transaction t = Cat.getProducer().newTransaction("ModelService", getClass().getSimpleName()); int count = 0; t.setStatus(Message.SUCCESS); diff --git a/cat-home/src/main/java/com/dianping/cat/report/page/model/spi/internal/BaseHistoricalModelService.java b/cat-home/src/main/java/com/dianping/cat/report/page/model/spi/internal/BaseHistoricalModelService.java index 258594d63..b41268a8f 100755 --- a/cat-home/src/main/java/com/dianping/cat/report/page/model/spi/internal/BaseHistoricalModelService.java +++ b/cat-home/src/main/java/com/dianping/cat/report/page/model/spi/internal/BaseHistoricalModelService.java @@ -22,7 +22,7 @@ public abstract class BaseHistoricalModelService extends ModelServiceWithCalS @Override public ModelResponse invoke(ModelRequest request) { ModelResponse response = new ModelResponse(); - Transaction t = newTransaction(getClass().getSimpleName(), m_name); + Transaction t = newTransaction("ModelService", getClass().getSimpleName()); try { T model = buildModel(request); diff --git a/cat-home/src/main/java/com/dianping/cat/report/page/model/spi/internal/BaseLocalModelService.java b/cat-home/src/main/java/com/dianping/cat/report/page/model/spi/internal/BaseLocalModelService.java index 981462b22..60050e56b 100755 --- a/cat-home/src/main/java/com/dianping/cat/report/page/model/spi/internal/BaseLocalModelService.java +++ b/cat-home/src/main/java/com/dianping/cat/report/page/model/spi/internal/BaseLocalModelService.java @@ -55,7 +55,7 @@ public class BaseLocalModelService extends ModelServiceWithCalSupport impleme @Override public ModelResponse invoke(ModelRequest request) { ModelResponse response = new ModelResponse(); - Transaction t = newTransaction(getClass().getSimpleName(), m_name); + Transaction t = newTransaction("ModelService", getClass().getSimpleName()); try { T report = getReport(request, request.getPeriod(), request.getDomain()); diff --git a/cat-home/src/main/java/com/dianping/cat/report/page/model/spi/internal/BaseRemoteModelService.java b/cat-home/src/main/java/com/dianping/cat/report/page/model/spi/internal/BaseRemoteModelService.java index a713b1b7e..b19d7ed78 100755 --- a/cat-home/src/main/java/com/dianping/cat/report/page/model/spi/internal/BaseRemoteModelService.java +++ b/cat-home/src/main/java/com/dianping/cat/report/page/model/spi/internal/BaseRemoteModelService.java @@ -60,7 +60,7 @@ public abstract class BaseRemoteModelService extends ModelServiceWithCalSuppo public ModelResponse invoke(ModelRequest request) { ModelResponse response = new ModelResponse(); MessageProducer cat = Cat.getProducer(); - Transaction t = newTransaction(getClass().getSimpleName(), m_name); + Transaction t = newTransaction("ModelService", getClass().getSimpleName()); try { URL url = buildUrl(request); diff --git a/cat-job/src/main/java/com/dianping/cat/job/storage/RemoteStringBucket.java b/cat-job/src/main/java/com/dianping/cat/job/storage/RemoteStringBucket.java index 1bb33da6e..4abfa43ca 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/storage/RemoteStringBucket.java +++ b/cat-job/src/main/java/com/dianping/cat/job/storage/RemoteStringBucket.java @@ -10,9 +10,12 @@ import java.util.List; import org.codehaus.plexus.logging.LogEnabled; import org.codehaus.plexus.logging.Logger; +import com.dianping.cat.Cat; import com.dianping.cat.job.sql.dal.Report; import com.dianping.cat.job.sql.dal.ReportDao; import com.dianping.cat.job.sql.dal.ReportEntity; +import com.dianping.cat.message.Message; +import com.dianping.cat.message.Transaction; import com.dianping.cat.storage.Bucket; import com.site.dal.jdbc.DalException; import com.site.lookup.annotation.Inject; @@ -91,19 +94,26 @@ public class RemoteStringBucket implements Bucket, LogEnabled { @Override public boolean storeById(String domain, String data) throws IOException { + Transaction t = Cat.getProducer().newTransaction(getClass().getSimpleName(), "store"); Report report = m_reportDao.createLocal(); + report.setName(m_name); report.setDomain(domain); report.setType(1); report.setContent(data); report.setPeriod(m_period); + t.setStatus(Message.SUCCESS); + try { m_reportDao.insert(report); return true; } catch (DalException e) { + t.setStatus(e); throw new IOException(String.format("Unable to insert report(%s)!", domain), e); + } finally { + t.complete(); } } } -- GitLab