提交 36d93aad 编写于 作者: F Frankie Wu

adjust cat type and name for model services

上级 1d5c7ed3
......@@ -127,7 +127,7 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
public void doCheckpoint() throws IOException {
MessageProducer cat = Cat.getProducer();
Transaction t = cat.newTransaction(getClass().getSimpleName(), "checkpoint");
Transaction t = cat.newTransaction("Checkpoint", getClass().getSimpleName());
t.setStatus(Message.SUCCESS);
......
......@@ -24,7 +24,6 @@ import com.dianping.cat.consumer.event.model.transform.DefaultXmlBuilder;
import com.dianping.cat.consumer.event.model.transform.DefaultXmlParser;
import com.dianping.cat.message.Event;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.spi.AbstractMessageAnalyzer;
import com.dianping.cat.message.spi.MessagePathBuilder;
......@@ -78,20 +77,8 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
@Override
public void doCheckpoint() throws IOException {
MessageProducer cat = Cat.getProducer();
Event t = cat.newEvent(getClass().getSimpleName(), "checkpoint");
try {
storeReports(m_reports.values());
closeMessageBuckets(m_reports.keySet());
t.setStatus(Message.SUCCESS);
} catch (Exception e) {
cat.logError(e);
t.setStatus(e);
} finally {
t.complete();
}
storeReports(m_reports.values());
closeMessageBuckets(m_reports.keySet());
}
@Override
......@@ -308,6 +295,7 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
DefaultXmlBuilder builder = new DefaultXmlBuilder(true);
Bucket<String> localBucket = null;
Bucket<String> remoteBucket = null;
Transaction t = Cat.getProducer().newTransaction("Checkpoint", getClass().getSimpleName());
try {
localBucket = m_bucketManager.getReportBucket(timestamp, "event", "local");
......@@ -323,9 +311,15 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
localBucket.storeById(domain, xml);
remoteBucket.storeById(domain, xml);
}
t.setStatus(Message.SUCCESS);
} catch (Exception e) {
Cat.getProducer().logError(e);
t.setStatus(e);
m_logger.error(String.format("Error when storing event reports of %s!", timestamp), e);
} finally {
t.complete();
if (localBucket != null) {
m_bucketManager.closeBucket(localBucket);
}
......
......@@ -25,7 +25,6 @@ import com.dianping.cat.consumer.problem.model.entity.Segment;
import com.dianping.cat.consumer.problem.model.transform.DefaultXmlBuilder;
import com.dianping.cat.consumer.problem.model.transform.DefaultXmlParser;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.spi.AbstractMessageAnalyzer;
import com.dianping.cat.message.spi.MessageTree;
......@@ -78,20 +77,8 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
@Override
public void doCheckpoint() throws IOException {
MessageProducer cat = Cat.getProducer();
Transaction t = cat.newTransaction(getClass().getSimpleName(), "checkpoint");
try {
storeReports(m_reports.values());
closeMessageBuckets(m_reports.keySet());
t.setStatus(Message.SUCCESS);
} catch (Exception e) {
cat.logError(e);
t.setStatus(e);
} finally {
t.complete();
}
storeReports(m_reports.values());
closeMessageBuckets(m_reports.keySet());
}
@Override
......@@ -248,6 +235,7 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
DefaultXmlBuilder builder = new DefaultXmlBuilder(true);
Bucket<String> localBucket = null;
Bucket<String> remoteBucket = null;
Transaction t = Cat.getProducer().newTransaction("Checkpoint", getClass().getSimpleName());
try {
localBucket = m_bucketManager.getReportBucket(timestamp, "problem", "local");
......@@ -263,9 +251,15 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
localBucket.storeById(domain, xml);
remoteBucket.storeById(domain, xml);
}
t.setStatus(Message.SUCCESS);
} catch (Exception e) {
Cat.getProducer().logError(e);
t.setStatus(e);
m_logger.error(String.format("Error when storing problem reports to %s!", timestamp), e);
} finally {
t.complete();
if (localBucket != null) {
m_bucketManager.closeBucket(localBucket);
}
......
......@@ -22,7 +22,6 @@ import com.dianping.cat.consumer.transaction.model.entity.TransactionType;
import com.dianping.cat.consumer.transaction.model.transform.DefaultXmlBuilder;
import com.dianping.cat.consumer.transaction.model.transform.DefaultXmlParser;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.spi.AbstractMessageAnalyzer;
import com.dianping.cat.message.spi.MessagePathBuilder;
......@@ -76,20 +75,8 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
@Override
public void doCheckpoint() throws IOException {
MessageProducer cat = Cat.getProducer();
Transaction t = cat.newTransaction(getClass().getSimpleName(), "checkpoint");
try {
storeReports(m_reports.values());
closeMessageBuckets(m_reports.keySet());
t.setStatus(Message.SUCCESS);
} catch (Exception e) {
cat.logError(e);
t.setStatus(e);
} finally {
t.complete();
}
storeReports(m_reports.values());
closeMessageBuckets(m_reports.keySet());
}
@Override
......@@ -303,6 +290,7 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
DefaultXmlBuilder builder = new DefaultXmlBuilder(true);
Bucket<String> localBucket = null;
Bucket<String> remoteBucket = null;
Transaction t = Cat.getProducer().newTransaction("Checkpoint", getClass().getSimpleName());
try {
localBucket = m_bucketManager.getReportBucket(timestamp, "transaction", "local");
......@@ -318,9 +306,15 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
localBucket.storeById(domain, xml);
remoteBucket.storeById(domain, xml);
}
t.setStatus(Message.SUCCESS);
} catch (Exception e) {
Cat.getProducer().logError(e);
t.setStatus(e);
m_logger.error(String.format("Error when storing transaction reports of %s!", timestamp), e);
} finally {
t.complete();
if (localBucket != null) {
m_bucketManager.closeBucket(localBucket);
}
......
......@@ -54,7 +54,7 @@ public abstract class BaseCompositeModelService<T> extends ModelServiceWithCalSu
int size = m_allServices.size();
final List<ModelResponse<T>> responses = new ArrayList<ModelResponse<T>>(size);
final Semaphore semaphore = new Semaphore(0);
final Transaction t = Cat.getProducer().newTransaction(getClass().getSimpleName(), m_name);
final Transaction t = Cat.getProducer().newTransaction("ModelService", getClass().getSimpleName());
int count = 0;
t.setStatus(Message.SUCCESS);
......
......@@ -22,7 +22,7 @@ public abstract class BaseHistoricalModelService<T> extends ModelServiceWithCalS
@Override
public ModelResponse<T> invoke(ModelRequest request) {
ModelResponse<T> response = new ModelResponse<T>();
Transaction t = newTransaction(getClass().getSimpleName(), m_name);
Transaction t = newTransaction("ModelService", getClass().getSimpleName());
try {
T model = buildModel(request);
......
......@@ -55,7 +55,7 @@ public class BaseLocalModelService<T> extends ModelServiceWithCalSupport impleme
@Override
public ModelResponse<T> invoke(ModelRequest request) {
ModelResponse<T> response = new ModelResponse<T>();
Transaction t = newTransaction(getClass().getSimpleName(), m_name);
Transaction t = newTransaction("ModelService", getClass().getSimpleName());
try {
T report = getReport(request, request.getPeriod(), request.getDomain());
......
......@@ -60,7 +60,7 @@ public abstract class BaseRemoteModelService<T> extends ModelServiceWithCalSuppo
public ModelResponse<T> invoke(ModelRequest request) {
ModelResponse<T> response = new ModelResponse<T>();
MessageProducer cat = Cat.getProducer();
Transaction t = newTransaction(getClass().getSimpleName(), m_name);
Transaction t = newTransaction("ModelService", getClass().getSimpleName());
try {
URL url = buildUrl(request);
......
......@@ -10,9 +10,12 @@ import java.util.List;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import com.dianping.cat.Cat;
import com.dianping.cat.job.sql.dal.Report;
import com.dianping.cat.job.sql.dal.ReportDao;
import com.dianping.cat.job.sql.dal.ReportEntity;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.storage.Bucket;
import com.site.dal.jdbc.DalException;
import com.site.lookup.annotation.Inject;
......@@ -91,19 +94,26 @@ public class RemoteStringBucket implements Bucket<String>, LogEnabled {
@Override
public boolean storeById(String domain, String data) throws IOException {
Transaction t = Cat.getProducer().newTransaction(getClass().getSimpleName(), "store");
Report report = m_reportDao.createLocal();
report.setName(m_name);
report.setDomain(domain);
report.setType(1);
report.setContent(data);
report.setPeriod(m_period);
t.setStatus(Message.SUCCESS);
try {
m_reportDao.insert(report);
return true;
} catch (DalException e) {
t.setStatus(e);
throw new IOException(String.format("Unable to insert report(%s)!", domain), e);
} finally {
t.complete();
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册