diff --git a/cat-consumer/src/main/java/com/dianping/cat/consumer/RealtimeConsumer.java b/cat-consumer/src/main/java/com/dianping/cat/consumer/RealtimeConsumer.java index 40210018c3cb0b9d6b2be05bfbc04374d81f5c92..d083e42a4bdf8f37bcdb6847552175d7efbd6d02 100644 --- a/cat-consumer/src/main/java/com/dianping/cat/consumer/RealtimeConsumer.java +++ b/cat-consumer/src/main/java/com/dianping/cat/consumer/RealtimeConsumer.java @@ -127,7 +127,7 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer public void doCheckpoint() throws IOException { MessageProducer cat = Cat.getProducer(); - Transaction t = cat.newTransaction(getClass().getSimpleName(), "checkpoint"); + Transaction t = cat.newTransaction("Checkpoint", getClass().getSimpleName()); t.setStatus(Message.SUCCESS); diff --git a/cat-consumer/src/main/java/com/dianping/cat/consumer/event/EventAnalyzer.java b/cat-consumer/src/main/java/com/dianping/cat/consumer/event/EventAnalyzer.java index a4f6eb175fbd1cfacda0a47ef753096635f74c30..d5b4110ac4cfa86fe8e9a3c240e47232e127b0c2 100644 --- a/cat-consumer/src/main/java/com/dianping/cat/consumer/event/EventAnalyzer.java +++ b/cat-consumer/src/main/java/com/dianping/cat/consumer/event/EventAnalyzer.java @@ -24,7 +24,6 @@ import com.dianping.cat.consumer.event.model.transform.DefaultXmlBuilder; import com.dianping.cat.consumer.event.model.transform.DefaultXmlParser; import com.dianping.cat.message.Event; import com.dianping.cat.message.Message; -import com.dianping.cat.message.MessageProducer; import com.dianping.cat.message.Transaction; import com.dianping.cat.message.spi.AbstractMessageAnalyzer; import com.dianping.cat.message.spi.MessagePathBuilder; @@ -78,20 +77,8 @@ public class EventAnalyzer extends AbstractMessageAnalyzer implemen @Override public void doCheckpoint() throws IOException { - MessageProducer cat = Cat.getProducer(); - Event t = cat.newEvent(getClass().getSimpleName(), "checkpoint"); - - try { - storeReports(m_reports.values()); - closeMessageBuckets(m_reports.keySet()); - - t.setStatus(Message.SUCCESS); - } catch (Exception e) { - cat.logError(e); - t.setStatus(e); - } finally { - t.complete(); - } + storeReports(m_reports.values()); + closeMessageBuckets(m_reports.keySet()); } @Override @@ -308,6 +295,7 @@ public class EventAnalyzer extends AbstractMessageAnalyzer implemen DefaultXmlBuilder builder = new DefaultXmlBuilder(true); Bucket localBucket = null; Bucket remoteBucket = null; + Transaction t = Cat.getProducer().newTransaction("Checkpoint", getClass().getSimpleName()); try { localBucket = m_bucketManager.getReportBucket(timestamp, "event", "local"); @@ -323,9 +311,15 @@ public class EventAnalyzer extends AbstractMessageAnalyzer implemen localBucket.storeById(domain, xml); remoteBucket.storeById(domain, xml); } + + t.setStatus(Message.SUCCESS); } catch (Exception e) { + Cat.getProducer().logError(e); + t.setStatus(e); m_logger.error(String.format("Error when storing event reports of %s!", timestamp), e); } finally { + t.complete(); + if (localBucket != null) { m_bucketManager.closeBucket(localBucket); } diff --git a/cat-consumer/src/main/java/com/dianping/cat/consumer/problem/ProblemAnalyzer.java b/cat-consumer/src/main/java/com/dianping/cat/consumer/problem/ProblemAnalyzer.java index 8074de648e5ee3ed2ac49e3ed9f18f0c9e545235..806001d8419f041b28719c014d4ac328f8f8168f 100644 --- a/cat-consumer/src/main/java/com/dianping/cat/consumer/problem/ProblemAnalyzer.java +++ b/cat-consumer/src/main/java/com/dianping/cat/consumer/problem/ProblemAnalyzer.java @@ -25,7 +25,6 @@ import com.dianping.cat.consumer.problem.model.entity.Segment; import com.dianping.cat.consumer.problem.model.transform.DefaultXmlBuilder; import com.dianping.cat.consumer.problem.model.transform.DefaultXmlParser; import com.dianping.cat.message.Message; -import com.dianping.cat.message.MessageProducer; import com.dianping.cat.message.Transaction; import com.dianping.cat.message.spi.AbstractMessageAnalyzer; import com.dianping.cat.message.spi.MessageTree; @@ -78,20 +77,8 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer impl @Override public void doCheckpoint() throws IOException { - MessageProducer cat = Cat.getProducer(); - Transaction t = cat.newTransaction(getClass().getSimpleName(), "checkpoint"); - - try { - storeReports(m_reports.values()); - closeMessageBuckets(m_reports.keySet()); - - t.setStatus(Message.SUCCESS); - } catch (Exception e) { - cat.logError(e); - t.setStatus(e); - } finally { - t.complete(); - } + storeReports(m_reports.values()); + closeMessageBuckets(m_reports.keySet()); } @Override @@ -248,6 +235,7 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer impl DefaultXmlBuilder builder = new DefaultXmlBuilder(true); Bucket localBucket = null; Bucket remoteBucket = null; + Transaction t = Cat.getProducer().newTransaction("Checkpoint", getClass().getSimpleName()); try { localBucket = m_bucketManager.getReportBucket(timestamp, "problem", "local"); @@ -263,9 +251,15 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer impl localBucket.storeById(domain, xml); remoteBucket.storeById(domain, xml); } + + t.setStatus(Message.SUCCESS); } catch (Exception e) { + Cat.getProducer().logError(e); + t.setStatus(e); m_logger.error(String.format("Error when storing problem reports to %s!", timestamp), e); } finally { + t.complete(); + if (localBucket != null) { m_bucketManager.closeBucket(localBucket); } diff --git a/cat-consumer/src/main/java/com/dianping/cat/consumer/transaction/TransactionAnalyzer.java b/cat-consumer/src/main/java/com/dianping/cat/consumer/transaction/TransactionAnalyzer.java index 5ff9e600d487fad23aae69fcd2f53d32e7503e4f..45e6d50427578f161150a28c54091b484f7643de 100644 --- a/cat-consumer/src/main/java/com/dianping/cat/consumer/transaction/TransactionAnalyzer.java +++ b/cat-consumer/src/main/java/com/dianping/cat/consumer/transaction/TransactionAnalyzer.java @@ -22,7 +22,6 @@ import com.dianping.cat.consumer.transaction.model.entity.TransactionType; import com.dianping.cat.consumer.transaction.model.transform.DefaultXmlBuilder; import com.dianping.cat.consumer.transaction.model.transform.DefaultXmlParser; import com.dianping.cat.message.Message; -import com.dianping.cat.message.MessageProducer; import com.dianping.cat.message.Transaction; import com.dianping.cat.message.spi.AbstractMessageAnalyzer; import com.dianping.cat.message.spi.MessagePathBuilder; @@ -76,20 +75,8 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer localBucket = null; Bucket remoteBucket = null; + Transaction t = Cat.getProducer().newTransaction("Checkpoint", getClass().getSimpleName()); try { localBucket = m_bucketManager.getReportBucket(timestamp, "transaction", "local"); @@ -318,9 +306,15 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer extends ModelServiceWithCalSu int size = m_allServices.size(); final List> responses = new ArrayList>(size); final Semaphore semaphore = new Semaphore(0); - final Transaction t = Cat.getProducer().newTransaction(getClass().getSimpleName(), m_name); + final Transaction t = Cat.getProducer().newTransaction("ModelService", getClass().getSimpleName()); int count = 0; t.setStatus(Message.SUCCESS); diff --git a/cat-home/src/main/java/com/dianping/cat/report/page/model/spi/internal/BaseHistoricalModelService.java b/cat-home/src/main/java/com/dianping/cat/report/page/model/spi/internal/BaseHistoricalModelService.java index 258594d63bd19b37e71aa2844be88d2084647e1e..b41268a8f7b09714221be601e987c5c45744a84e 100755 --- a/cat-home/src/main/java/com/dianping/cat/report/page/model/spi/internal/BaseHistoricalModelService.java +++ b/cat-home/src/main/java/com/dianping/cat/report/page/model/spi/internal/BaseHistoricalModelService.java @@ -22,7 +22,7 @@ public abstract class BaseHistoricalModelService extends ModelServiceWithCalS @Override public ModelResponse invoke(ModelRequest request) { ModelResponse response = new ModelResponse(); - Transaction t = newTransaction(getClass().getSimpleName(), m_name); + Transaction t = newTransaction("ModelService", getClass().getSimpleName()); try { T model = buildModel(request); diff --git a/cat-home/src/main/java/com/dianping/cat/report/page/model/spi/internal/BaseLocalModelService.java b/cat-home/src/main/java/com/dianping/cat/report/page/model/spi/internal/BaseLocalModelService.java index 981462b22e3dee5f0ed860935b7b4c00f7c3b326..60050e56bc45ac1f3eda9a40fcb9ffe845591088 100755 --- a/cat-home/src/main/java/com/dianping/cat/report/page/model/spi/internal/BaseLocalModelService.java +++ b/cat-home/src/main/java/com/dianping/cat/report/page/model/spi/internal/BaseLocalModelService.java @@ -55,7 +55,7 @@ public class BaseLocalModelService extends ModelServiceWithCalSupport impleme @Override public ModelResponse invoke(ModelRequest request) { ModelResponse response = new ModelResponse(); - Transaction t = newTransaction(getClass().getSimpleName(), m_name); + Transaction t = newTransaction("ModelService", getClass().getSimpleName()); try { T report = getReport(request, request.getPeriod(), request.getDomain()); diff --git a/cat-home/src/main/java/com/dianping/cat/report/page/model/spi/internal/BaseRemoteModelService.java b/cat-home/src/main/java/com/dianping/cat/report/page/model/spi/internal/BaseRemoteModelService.java index a713b1b7e71ef5182d40edacd9a675d32529f35e..b19d7ed78c2a6d254de3a5bde1cff564ad8990d8 100755 --- a/cat-home/src/main/java/com/dianping/cat/report/page/model/spi/internal/BaseRemoteModelService.java +++ b/cat-home/src/main/java/com/dianping/cat/report/page/model/spi/internal/BaseRemoteModelService.java @@ -60,7 +60,7 @@ public abstract class BaseRemoteModelService extends ModelServiceWithCalSuppo public ModelResponse invoke(ModelRequest request) { ModelResponse response = new ModelResponse(); MessageProducer cat = Cat.getProducer(); - Transaction t = newTransaction(getClass().getSimpleName(), m_name); + Transaction t = newTransaction("ModelService", getClass().getSimpleName()); try { URL url = buildUrl(request); diff --git a/cat-job/src/main/java/com/dianping/cat/job/storage/RemoteStringBucket.java b/cat-job/src/main/java/com/dianping/cat/job/storage/RemoteStringBucket.java index 1bb33da6eff0d41c076a8d13f3754a4034751a65..4abfa43ca2525a0c74d91992361d489da266d973 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/storage/RemoteStringBucket.java +++ b/cat-job/src/main/java/com/dianping/cat/job/storage/RemoteStringBucket.java @@ -10,9 +10,12 @@ import java.util.List; import org.codehaus.plexus.logging.LogEnabled; import org.codehaus.plexus.logging.Logger; +import com.dianping.cat.Cat; import com.dianping.cat.job.sql.dal.Report; import com.dianping.cat.job.sql.dal.ReportDao; import com.dianping.cat.job.sql.dal.ReportEntity; +import com.dianping.cat.message.Message; +import com.dianping.cat.message.Transaction; import com.dianping.cat.storage.Bucket; import com.site.dal.jdbc.DalException; import com.site.lookup.annotation.Inject; @@ -91,19 +94,26 @@ public class RemoteStringBucket implements Bucket, LogEnabled { @Override public boolean storeById(String domain, String data) throws IOException { + Transaction t = Cat.getProducer().newTransaction(getClass().getSimpleName(), "store"); Report report = m_reportDao.createLocal(); + report.setName(m_name); report.setDomain(domain); report.setType(1); report.setContent(data); report.setPeriod(m_period); + t.setStatus(Message.SUCCESS); + try { m_reportDao.insert(report); return true; } catch (DalException e) { + t.setStatus(e); throw new IOException(String.format("Unable to insert report(%s)!", domain), e); + } finally { + t.complete(); } } }