diff --git a/cat-consumer/src/main/java/com/dianping/cat/consumer/cross/CrossAnalyzer.java b/cat-consumer/src/main/java/com/dianping/cat/consumer/cross/CrossAnalyzer.java index b95013178cb8509358c52afffd2438c9f14c0bd4..380822e4ce4d9d9335e184073950124a942de36c 100644 --- a/cat-consumer/src/main/java/com/dianping/cat/consumer/cross/CrossAnalyzer.java +++ b/cat-consumer/src/main/java/com/dianping/cat/consumer/cross/CrossAnalyzer.java @@ -169,8 +169,8 @@ public class CrossAnalyzer extends AbstractMessageAnalyzer implemen private CrossInfo parsePigeonServerTransaction(Transaction t, MessageTree tree) { CrossInfo crossInfo = new CrossInfo(); String localIp = tree.getIpAddress(); - List messages = t.getChildren(); + for (Message message : messages) { if (message instanceof Event) { if (message.getType().equals("PigeonService.client")) { diff --git a/cat-consumer/src/main/java/com/dianping/cat/consumer/event/EventAnalyzer.java b/cat-consumer/src/main/java/com/dianping/cat/consumer/event/EventAnalyzer.java index 05102a460ae75d8246d2d289b5e32440001c8bd8..a8c3d4ad5034d80db5c0961d7d70779e714c199b 100644 --- a/cat-consumer/src/main/java/com/dianping/cat/consumer/event/EventAnalyzer.java +++ b/cat-consumer/src/main/java/com/dianping/cat/consumer/event/EventAnalyzer.java @@ -131,34 +131,31 @@ public class EventAnalyzer extends AbstractMessageAnalyzer implemen int count = 0; report.addIp(tree.getIpAddress()); + type.incTotalCount(); + name.incTotalCount(); - synchronized (type) { - type.incTotalCount(); - name.incTotalCount(); - - if (event.isSuccess()) { - if (type.getSuccessMessageUrl() == null) { - type.setSuccessMessageUrl(messageId); - count++; - } + if (event.isSuccess()) { + if (type.getSuccessMessageUrl() == null) { + type.setSuccessMessageUrl(messageId); + count++; + } - if (name.getSuccessMessageUrl() == null) { - name.setSuccessMessageUrl(messageId); - count++; - } - } else { - type.incFailCount(); - name.incFailCount(); + if (name.getSuccessMessageUrl() == null) { + name.setSuccessMessageUrl(messageId); + count++; + } + } else { + type.incFailCount(); + name.incFailCount(); - if (type.getFailMessageUrl() == null) { - type.setFailMessageUrl(messageId); - count++; - } + if (type.getFailMessageUrl() == null) { + type.setFailMessageUrl(messageId); + count++; + } - if (name.getFailMessageUrl() == null) { - name.setFailMessageUrl(messageId); - count++; - } + if (name.getFailMessageUrl() == null) { + name.setFailMessageUrl(messageId); + count++; } } @@ -184,8 +181,8 @@ public class EventAnalyzer extends AbstractMessageAnalyzer implemen } private int processTransaction(EventReport report, MessageTree tree, Transaction t) { - List children = t.getChildren(); int count = 0; + List children = t.getChildren(); for (Message child : children) { if (child instanceof Transaction) { @@ -210,7 +207,7 @@ public class EventAnalyzer extends AbstractMessageAnalyzer implemen DefaultXmlBuilder builder = new DefaultXmlBuilder(true); Bucket reportBucket = null; Transaction t = Cat.getProducer().newTransaction("Checkpoint", getClass().getSimpleName()); - + t.setStatus(Message.SUCCESS); try { reportBucket = m_bucketManager.getReportBucket(m_startTime, "event"); diff --git a/cat-consumer/src/main/java/com/dianping/cat/consumer/top/TopAnalyzer.java b/cat-consumer/src/main/java/com/dianping/cat/consumer/top/TopAnalyzer.java index 6d34f1718cfebc7b1c3ae61a1394ec11193bea77..edf6f36ee1fc19e3c486d5e2018495574fd103c9 100644 --- a/cat-consumer/src/main/java/com/dianping/cat/consumer/top/TopAnalyzer.java +++ b/cat-consumer/src/main/java/com/dianping/cat/consumer/top/TopAnalyzer.java @@ -62,16 +62,18 @@ public class TopAnalyzer extends AbstractMessageAnalyzer implements L } public synchronized TopReport getReport(String domain) { + Set domains = m_transactionAnalyzer.getDomains(); TopReport topReport = new TopReport("Cat"); + topReport.setStartTime(new Date(m_startTime)); topReport.setEndTime(new Date(m_startTime + 60 * MINUTE - 1)); - Set domains = m_transactionAnalyzer.getDomains(); for (String temp : domains) { TransactionReport report = m_transactionAnalyzer.getReport(temp); new TransactionReportVisitor(topReport).visitTransactionReport(report); } + for (String temp : domains) { ProblemReport report = m_problemAnalyzer.getReport(temp); @@ -156,7 +158,7 @@ public class TopAnalyzer extends AbstractMessageAnalyzer implements L m_problemAnalyzer = problemAnalyzer; } - class TransactionReportVisitor extends com.dianping.cat.consumer.transaction.model.transform.BaseVisitor { + static class TransactionReportVisitor extends com.dianping.cat.consumer.transaction.model.transform.BaseVisitor { private String m_domain; @@ -283,7 +285,7 @@ public class TopAnalyzer extends AbstractMessageAnalyzer implements L public abstract void apply(Range2 range2, com.dianping.cat.consumer.top.model.entity.Segment detail); } - class ProblemReportVisitor extends com.dianping.cat.consumer.problem.model.transform.BaseVisitor { + static class ProblemReportVisitor extends com.dianping.cat.consumer.problem.model.transform.BaseVisitor { private String m_domain; private String m_type; diff --git a/cat-consumer/src/main/java/com/dianping/cat/consumer/transaction/TransactionAnalyzer.java b/cat-consumer/src/main/java/com/dianping/cat/consumer/transaction/TransactionAnalyzer.java index 1d996d7127b496575c7b9470d010e57a67d1f09c..65a240379c9119f0c19f2fdebdd63ada7a65bc97 100644 --- a/cat-consumer/src/main/java/com/dianping/cat/consumer/transaction/TransactionAnalyzer.java +++ b/cat-consumer/src/main/java/com/dianping/cat/consumer/transaction/TransactionAnalyzer.java @@ -7,6 +7,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -54,32 +55,55 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer machines = report.getMachines().values(); + private double compute95LineDetail(Map durations) { + int totalCount = 0; + + for (AllDuration duration : durations.values()) { + totalCount += duration.getCount(); + } + int index = totalCount * 5 / 100; + Map result = getSortDuration(durations); + + for (Entry entry : result.entrySet()) { + index = index - entry.getValue().getCount(); + if (index <= 0) { + return entry.getKey(); + } + } + return 0; + } + private void compute95Line(TransactionReport report) { + Collection machines = report.getMachines().values(); for (Machine machine : machines) { for (TransactionType type : machine.getTypes().values()) { - type.getAllDurations().clear(); + double typeValue = compute95LineDetail(type.getAllDurations()); + type.setLine95Value(typeValue); + type.setLine95Count(1); + type.setLine95Sum(typeValue); for (TransactionName name : type.getNames().values()) { - name.getAllDurations().clear(); + double nameValue = compute95LineDetail(name.getAllDurations()); + + name.setLine95Value(nameValue); + name.setLine95Count(1); + name.setLine95Sum(nameValue); } } } @@ -95,25 +119,6 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer durations) { - int totalCount = 0; - - for (AllDuration duration : durations.values()) { - totalCount += duration.getCount(); - } - - int index = totalCount * 5 / 100; - Map result = getSortDuration(durations); - - for (Entry entry : result.entrySet()) { - index = index - entry.getValue().getCount(); - if (index <= 0) { - return entry.getKey(); - } - } - return 0; - } - @Override public Set getDomains() { Set keySet = m_reports.keySet(); @@ -132,7 +137,7 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer children = t.getChildren(); @@ -272,11 +280,7 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer machines = report.getMachines().values(); - for (Machine machine : machines) { - for (TransactionType type : machine.getTypes().values()) { - double typeValuevalue = get95Line(type.getAllDurations()); - type.setLine95Value(typeValuevalue); - type.setLine95Count(1); - type.setLine95Sum(typeValuevalue); - for (TransactionName name : type.getNames().values()) { - double nameValue = get95Line(name.getAllDurations()); - name.setLine95Value(nameValue); - name.setLine95Count(1); - name.setLine95Sum(nameValue); - } - } - } - } - public void setAnalyzerInfo(long startTime, long duration, long extraTime) { m_extraTime = extraTime; m_startTime = startTime; @@ -355,8 +332,7 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer ALL_TYPES = new HashSet(); + + static { + ALL_TYPES.add("URL"); + ALL_TYPES.add("Call"); + ALL_TYPES.add("PigeonCall"); + ALL_TYPES.add("Service"); + ALL_TYPES.add("PigeonService"); + ALL_TYPES.add("SQL"); + ALL_TYPES.add("MsgProduceTried"); + ALL_TYPES.add("MsgProduced"); + } + public TransactionReportVisitor(TransactionReport report) { m_report = report; } @@ -468,13 +457,11 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer { diff --git a/cat-core/src/main/java/com/dianping/cat/message/io/DefaultMessageQueue.java b/cat-core/src/main/java/com/dianping/cat/message/io/DefaultMessageQueue.java index bfc5bb283d0ed3312fbea768dd0d2b34cde48d29..3f1328610b53711915bb2d27a998dea14574c041 100644 --- a/cat-core/src/main/java/com/dianping/cat/message/io/DefaultMessageQueue.java +++ b/cat-core/src/main/java/com/dianping/cat/message/io/DefaultMessageQueue.java @@ -22,7 +22,7 @@ public class DefaultMessageQueue implements MessageQueue, Initializable { if (m_size > 0) { m_queue = new LinkedBlockingQueue(m_size); } else { - m_queue = new LinkedBlockingQueue(10000); + m_queue = new LinkedBlockingQueue(50000); } } diff --git a/cat-core/src/main/java/com/dianping/cat/message/spi/AbstractMessageAnalyzer.java b/cat-core/src/main/java/com/dianping/cat/message/spi/AbstractMessageAnalyzer.java index a5e0480705725cf7af27aafb8ce45ced04f52e0a..c5a8ad8a80ba0283b80f0bc7f7c44300135086ad 100644 --- a/cat-core/src/main/java/com/dianping/cat/message/spi/AbstractMessageAnalyzer.java +++ b/cat-core/src/main/java/com/dianping/cat/message/spi/AbstractMessageAnalyzer.java @@ -1,11 +1,14 @@ package com.dianping.cat.message.spi; +import java.util.HashSet; +import java.util.Set; + import org.codehaus.plexus.logging.Logger; +import org.unidal.lookup.ContainerHolder; import com.dianping.cat.Cat; import com.dianping.cat.configuration.ServerConfigManager; import com.dianping.cat.message.Transaction; -import org.unidal.lookup.ContainerHolder; public abstract class AbstractMessageAnalyzer extends ContainerHolder implements MessageAnalyzer { @@ -25,6 +28,18 @@ public abstract class AbstractMessageAnalyzer extends ContainerHolder impleme protected static final String ALL = "All"; + protected static Set UNUSED_TYPES = new HashSet(); + + protected static Set UNUSED_NAMES = new HashSet(); + + static { + UNUSED_TYPES.add("Service"); + UNUSED_TYPES.add("PigeonService"); + UNUSED_NAMES.add("piegonService:heartTaskService:heartBeat"); + UNUSED_NAMES.add("piegonService:heartTaskService:heartBeat()"); + UNUSED_NAMES.add("pigeon:HeartBeatService:null"); + } + @Override public void analyze(MessageQueue queue) { while (!isTimeout() && isActive()) { @@ -89,10 +104,7 @@ public abstract class AbstractMessageAnalyzer extends ContainerHolder impleme String type = t.getType(); String name = t.getName(); - if ((("Service").equals(type) || ("PigeonService").equals(type)) - && (("piegonService:heartTaskService:heartBeat").equals(name) - || ("piegonService:heartTaskService:heartBeat()").equals(name) || ("pigeon:HeartBeatService:null") - .equals(name))) { + if (UNUSED_TYPES.contains(type) && UNUSED_NAMES.contains(name)) { return true; } return false; diff --git a/cat-core/src/main/java/com/dianping/cat/storage/dump/LocalMessageBucketManager.java b/cat-core/src/main/java/com/dianping/cat/storage/dump/LocalMessageBucketManager.java index 062d35049a59742070261306d12ec2b52a897ddb..67df6e3a5d67af050bc0dbcd0ac5e0fdcbb7fc1f 100644 --- a/cat-core/src/main/java/com/dianping/cat/storage/dump/LocalMessageBucketManager.java +++ b/cat-core/src/main/java/com/dianping/cat/storage/dump/LocalMessageBucketManager.java @@ -70,7 +70,7 @@ public class LocalMessageBucketManager extends ContainerHolder implements Messag private Logger m_logger; - private int m_gzipThreads = 5; + private int m_gzipThreads = 10; private BlockingQueue m_messageBlocks = new LinkedBlockingQueue(10000); @@ -136,7 +136,7 @@ public class LocalMessageBucketManager extends ContainerHolder implements Messag for (int i = 0; i < m_gzipThreads; i++) { LinkedBlockingQueue messageQueue = new LinkedBlockingQueue( - 10000); + 50000); m_messageQueues.put(i, messageQueue); Threads.forGroup("Cat").start(new MessageGzip(messageQueue)); @@ -330,26 +330,19 @@ public class LocalMessageBucketManager extends ContainerHolder implements Messag abs = -abs; } int bucketIndex = abs % m_gzipThreads; - if (bucketIndex > m_gzipThreads || bucketIndex < 0) { - m_logger.error("Error when compute the message bucket index!" + bucketIndex); - } else { - m_processMessages[bucketIndex]++; - } + m_processMessages[bucketIndex]++; LinkedBlockingQueue items = m_messageQueues.get(bucketIndex); - boolean result = items.offer(new MessageItem(tree, id)); - if (result == false) { - m_serverStateManager.addMessageDumpLoss(1); - m_error++; - if (m_error % CatConstants.ERROR_COUNT == 0) { + if (!result) { + if ((++m_error) % CatConstants.ERROR_COUNT == 0) { m_logger.error("Error when offer message tree to gzip queue! overflow :" + m_error); } + m_serverStateManager.addMessageDumpLoss(1); } - m_total++; - if (m_total % (CatConstants.SUCCESS_COUNT) == 0) { + if ((++m_total) % (CatConstants.SUCCESS_COUNT) == 0) { logState(tree); } } @@ -400,12 +393,9 @@ public class LocalMessageBucketManager extends ContainerHolder implements Messag if (item != null) { try { - MessageTree tree = item.getTree(); MessageId id = item.getMessageId(); - String name = id.getDomain() + '-' + id.getIpAddress() + '-' + m_localIp; String dataFile = m_pathBuilder.getPath(new Date(id.getTimestamp()), name); - LocalMessageBucket bucket = m_buckets.get(dataFile); if (bucket == null) { @@ -415,12 +405,8 @@ public class LocalMessageBucketManager extends ContainerHolder implements Messag m_buckets.put(dataFile, bucket); } - DefaultMessageTree defaultTree = (DefaultMessageTree) tree; - ChannelBuffer buf = defaultTree.getBuf(); - - int size = buf.readableBytes(); - m_totalSize += size; - + DefaultMessageTree tree = (DefaultMessageTree) item.getTree(); + ChannelBuffer buf = tree.getBuf(); MessageBlock bolck = bucket.storeMessage(buf, id); if (bolck != null) { @@ -429,6 +415,7 @@ public class LocalMessageBucketManager extends ContainerHolder implements Messag m_logger.error("Error when offer the block to the dump!"); } } + m_totalSize += buf.readableBytes(); } catch (Exception e) { Cat.logError(e); }