提交 3ba117f4 编写于 作者: Y youyong

refactor analyzer the gzip thread

上级 2ff1ee52
......@@ -169,8 +169,8 @@ public class CrossAnalyzer extends AbstractMessageAnalyzer<CrossReport> implemen
private CrossInfo parsePigeonServerTransaction(Transaction t, MessageTree tree) {
CrossInfo crossInfo = new CrossInfo();
String localIp = tree.getIpAddress();
List<Message> messages = t.getChildren();
for (Message message : messages) {
if (message instanceof Event) {
if (message.getType().equals("PigeonService.client")) {
......
......@@ -131,34 +131,31 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
int count = 0;
report.addIp(tree.getIpAddress());
type.incTotalCount();
name.incTotalCount();
synchronized (type) {
type.incTotalCount();
name.incTotalCount();
if (event.isSuccess()) {
if (type.getSuccessMessageUrl() == null) {
type.setSuccessMessageUrl(messageId);
count++;
}
if (event.isSuccess()) {
if (type.getSuccessMessageUrl() == null) {
type.setSuccessMessageUrl(messageId);
count++;
}
if (name.getSuccessMessageUrl() == null) {
name.setSuccessMessageUrl(messageId);
count++;
}
} else {
type.incFailCount();
name.incFailCount();
if (name.getSuccessMessageUrl() == null) {
name.setSuccessMessageUrl(messageId);
count++;
}
} else {
type.incFailCount();
name.incFailCount();
if (type.getFailMessageUrl() == null) {
type.setFailMessageUrl(messageId);
count++;
}
if (type.getFailMessageUrl() == null) {
type.setFailMessageUrl(messageId);
count++;
}
if (name.getFailMessageUrl() == null) {
name.setFailMessageUrl(messageId);
count++;
}
if (name.getFailMessageUrl() == null) {
name.setFailMessageUrl(messageId);
count++;
}
}
......@@ -184,8 +181,8 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
}
private int processTransaction(EventReport report, MessageTree tree, Transaction t) {
List<Message> children = t.getChildren();
int count = 0;
List<Message> children = t.getChildren();
for (Message child : children) {
if (child instanceof Transaction) {
......@@ -210,7 +207,7 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
DefaultXmlBuilder builder = new DefaultXmlBuilder(true);
Bucket<String> reportBucket = null;
Transaction t = Cat.getProducer().newTransaction("Checkpoint", getClass().getSimpleName());
t.setStatus(Message.SUCCESS);
try {
reportBucket = m_bucketManager.getReportBucket(m_startTime, "event");
......
......@@ -62,16 +62,18 @@ public class TopAnalyzer extends AbstractMessageAnalyzer<TopReport> implements L
}
public synchronized TopReport getReport(String domain) {
Set<String> domains = m_transactionAnalyzer.getDomains();
TopReport topReport = new TopReport("Cat");
topReport.setStartTime(new Date(m_startTime));
topReport.setEndTime(new Date(m_startTime + 60 * MINUTE - 1));
Set<String> domains = m_transactionAnalyzer.getDomains();
for (String temp : domains) {
TransactionReport report = m_transactionAnalyzer.getReport(temp);
new TransactionReportVisitor(topReport).visitTransactionReport(report);
}
for (String temp : domains) {
ProblemReport report = m_problemAnalyzer.getReport(temp);
......@@ -156,7 +158,7 @@ public class TopAnalyzer extends AbstractMessageAnalyzer<TopReport> implements L
m_problemAnalyzer = problemAnalyzer;
}
class TransactionReportVisitor extends com.dianping.cat.consumer.transaction.model.transform.BaseVisitor {
static class TransactionReportVisitor extends com.dianping.cat.consumer.transaction.model.transform.BaseVisitor {
private String m_domain;
......@@ -283,7 +285,7 @@ public class TopAnalyzer extends AbstractMessageAnalyzer<TopReport> implements L
public abstract void apply(Range2 range2, com.dianping.cat.consumer.top.model.entity.Segment detail);
}
class ProblemReportVisitor extends com.dianping.cat.consumer.problem.model.transform.BaseVisitor {
static class ProblemReportVisitor extends com.dianping.cat.consumer.problem.model.transform.BaseVisitor {
private String m_domain;
private String m_type;
......
......@@ -7,6 +7,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
......@@ -54,32 +55,55 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
private TransactionReport buildTotalTransactionReport() {
TransactionReport all = new TransactionReport(ALL);
all.setStartTime(new Date(m_startTime));
all.setEndTime(new Date(m_startTime + MINUTE * 60 - 1));
TransactionReportVisitor visitor = new TransactionReportVisitor(all);
all.setStartTime(new Date(m_startTime));
all.setEndTime(new Date(m_startTime + MINUTE * 60 - 1));
try {
for (TransactionReport temp : m_reports.values()) {
all.getIps().add(temp.getDomain());
all.getDomainNames().add(temp.getDomain());
visitor.visitTransactionReport(temp);
}
} catch (Exception e) {
Cat.logError(e);
}
for (TransactionReport temp : m_reports.values()) {
all.getIps().add(temp.getDomain());
all.getDomainNames().add(temp.getDomain());
visitor.visitTransactionReport(temp);
}
} catch (Exception e) {
Cat.logError(e);
}
return all;
}
private void clearAllDuration(TransactionReport report) {
Collection<Machine> machines = report.getMachines().values();
private double compute95LineDetail(Map<Integer, AllDuration> durations) {
int totalCount = 0;
for (AllDuration duration : durations.values()) {
totalCount += duration.getCount();
}
int index = totalCount * 5 / 100;
Map<Integer, AllDuration> result = getSortDuration(durations);
for (Entry<Integer, AllDuration> entry : result.entrySet()) {
index = index - entry.getValue().getCount();
if (index <= 0) {
return entry.getKey();
}
}
return 0;
}
private void compute95Line(TransactionReport report) {
Collection<Machine> machines = report.getMachines().values();
for (Machine machine : machines) {
for (TransactionType type : machine.getTypes().values()) {
type.getAllDurations().clear();
double typeValue = compute95LineDetail(type.getAllDurations());
type.setLine95Value(typeValue);
type.setLine95Count(1);
type.setLine95Sum(typeValue);
for (TransactionName name : type.getNames().values()) {
name.getAllDurations().clear();
double nameValue = compute95LineDetail(name.getAllDurations());
name.setLine95Value(nameValue);
name.setLine95Count(1);
name.setLine95Sum(nameValue);
}
}
}
......@@ -95,25 +119,6 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
m_logger = logger;
}
private double get95Line(Map<Integer, AllDuration> durations) {
int totalCount = 0;
for (AllDuration duration : durations.values()) {
totalCount += duration.getCount();
}
int index = totalCount * 5 / 100;
Map<Integer, AllDuration> result = getSortDuration(durations);
for (Entry<Integer, AllDuration> entry : result.entrySet()) {
index = index - entry.getValue().getCount();
if (index <= 0) {
return entry.getKey();
}
}
return 0;
}
@Override
public Set<String> getDomains() {
Set<String> keySet = m_reports.keySet();
......@@ -132,7 +137,7 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
}
report.getDomainNames().addAll(m_reports.keySet());
report.accept(new StatisticsComputer());
set95Line(report);
compute95Line(report);
return report;
}
......@@ -189,7 +194,7 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
if (report == null) {
report = new TransactionReport(domain);
report.setStartTime(new Date(m_startTime));
report.setEndTime(new Date(m_startTime + MINUTE * 60 - 1));
m_reports.put(domain, report);
......@@ -212,54 +217,57 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
String messageId = tree.getMessageId();
int count = 0;
synchronized (type) {
type.incTotalCount();
name.incTotalCount();
type.incTotalCount();
name.incTotalCount();
if (t.isSuccess()) {
if (type.getSuccessMessageUrl() == null) {
type.setSuccessMessageUrl(messageId);
count++;
}
if (name.getSuccessMessageUrl() == null) {
name.setSuccessMessageUrl(messageId);
count++;
}
} else {
type.incFailCount();
name.incFailCount();
if (t.isSuccess()) {
if (type.getSuccessMessageUrl() == null) {
type.setSuccessMessageUrl(messageId);
count++;
}
if (type.getFailMessageUrl() == null) {
type.setFailMessageUrl(messageId);
count++;
}
if (name.getSuccessMessageUrl() == null) {
name.setSuccessMessageUrl(messageId);
count++;
}
} else {
type.incFailCount();
name.incFailCount();
if (name.getFailMessageUrl() == null) {
name.setFailMessageUrl(messageId);
count++;
}
if (type.getFailMessageUrl() == null) {
type.setFailMessageUrl(messageId);
count++;
}
// update statistics
double duration = t.getDurationInMicros() / 1000d;
Integer allDuration = new Integer((int) duration);
name.setMax(Math.max(name.getMax(), duration));
name.setMin(Math.min(name.getMin(), duration));
name.setSum(name.getSum() + duration);
name.setSum2(name.getSum2() + duration * duration);
name.findOrCreateAllDuration(allDuration).incCount();
type.setMax(Math.max(type.getMax(), duration));
type.setMin(Math.min(type.getMin(), duration));
type.setSum(type.getSum() + duration);
type.setSum2(type.getSum2() + duration * duration);
type.findOrCreateAllDuration(allDuration).incCount();
if (name.getFailMessageUrl() == null) {
name.setFailMessageUrl(messageId);
count++;
}
}
processTransactionGraph(name, t);
processTransactionRange(t, type);
// update statistics
double duration = t.getDurationInMicros() / 1000d;
Integer allDuration = new Integer((int) duration);
name.setMax(Math.max(name.getMax(), duration));
name.setMin(Math.min(name.getMin(), duration));
name.setSum(name.getSum() + duration);
name.setSum2(name.getSum2() + duration * duration);
name.findOrCreateAllDuration(allDuration).incCount();
type.setMax(Math.max(type.getMax(), duration));
type.setMin(Math.min(type.getMin(), duration));
type.setSum(type.getSum() + duration);
type.setSum2(type.getSum2() + duration * duration);
type.findOrCreateAllDuration(allDuration).incCount();
double d = t.getDurationInMicros() / 1000d;
Calendar cal = Calendar.getInstance();
cal.setTimeInMillis(t.getTimestamp());
int min = cal.get(Calendar.MINUTE);
processNameGraph(name, t, min, d);
processTypeRange(t, type, min, d);
List<Message> children = t.getChildren();
......@@ -272,11 +280,7 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
return count;
}
private void processTransactionGraph(TransactionName name, Transaction t) {
double d = t.getDurationInMicros() / 1000d;
Calendar cal = Calendar.getInstance();
cal.setTimeInMillis(t.getTimestamp());
int min = cal.get(Calendar.MINUTE);
private void processNameGraph(TransactionName name, Transaction t, int min, double d) {
int dk = 1;
int tk = min - min % 5;
......@@ -287,25 +291,16 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
Duration duration = name.findOrCreateDuration(dk);
Range range = name.findOrCreateRange(tk);
synchronized (name) {
duration.incCount();
range.incCount();
if (!t.isSuccess()) {
range.incFails();
}
duration.incCount();
range.incCount();
range.setSum(range.getSum() + d);
if (!t.isSuccess()) {
range.incFails();
}
range.setSum(range.getSum() + d);
}
private void processTransactionRange(Transaction t, TransactionType type) {
double d = t.getDurationInMicros() / 1000d;
Calendar cal = Calendar.getInstance();
cal.setTimeInMillis(t.getTimestamp());
int min = cal.get(Calendar.MINUTE);
private void processTypeRange(Transaction t, TransactionType type, int min, double d) {
Range2 range = type.findOrCreateRange2(min);
if (!t.isSuccess()) {
......@@ -315,24 +310,6 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
range.setSum(range.getSum() + d);
}
private void set95Line(TransactionReport report) {
Collection<Machine> machines = report.getMachines().values();
for (Machine machine : machines) {
for (TransactionType type : machine.getTypes().values()) {
double typeValuevalue = get95Line(type.getAllDurations());
type.setLine95Value(typeValuevalue);
type.setLine95Count(1);
type.setLine95Sum(typeValuevalue);
for (TransactionName name : type.getNames().values()) {
double nameValue = get95Line(name.getAllDurations());
name.setLine95Value(nameValue);
name.setLine95Count(1);
name.setLine95Sum(nameValue);
}
}
}
}
public void setAnalyzerInfo(long startTime, long duration, long extraTime) {
m_extraTime = extraTime;
m_startTime = startTime;
......@@ -355,8 +332,7 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
domainNames.clear();
domainNames.addAll(getDomains());
set95Line(report);
clearAllDuration(report);
compute95Line(report);
String xml = new TransactionReportUrlFilter().buildXml(report);
String domain = report.getDomain();
......@@ -416,12 +392,25 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
}
}
static class TransactionReportVisitor extends BaseVisitor {
public static class TransactionReportVisitor extends BaseVisitor {
private TransactionReport m_report;
public String m_currentDomain;
public static Set<String> ALL_TYPES = new HashSet<String>();
static {
ALL_TYPES.add("URL");
ALL_TYPES.add("Call");
ALL_TYPES.add("PigeonCall");
ALL_TYPES.add("Service");
ALL_TYPES.add("PigeonService");
ALL_TYPES.add("SQL");
ALL_TYPES.add("MsgProduceTried");
ALL_TYPES.add("MsgProduced");
}
public TransactionReportVisitor(TransactionReport report) {
m_report = report;
}
......@@ -468,13 +457,11 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
@Override
public void visitType(TransactionType type) {
Machine machine = m_report.findOrCreateMachine(m_currentDomain);
String typeName = type.getId();
if (typeName.equals("URL") || typeName.equals("Call") || typeName.equals("PigeonCall")
|| typeName.equals("PigeonService") || typeName.equals("Service") || typeName.equals("SQL")
|| typeName.startsWith("Cache.") || typeName.equals("MsgProduceTried") || typeName.equals("MsgProduced")) {
if (typeName.startsWith("Cache.") || ALL_TYPES.contains(typeName)) {
TransactionType result = machine.findOrCreateType(typeName);
mergeType(result, type);
}
}
......
......@@ -71,7 +71,6 @@ public class TransactionReportUrlFilter extends com.dianping.cat.consumer.transa
}
for (String name : invalidates) {
transactionNames.remove(name);
}
......@@ -112,7 +111,6 @@ public class TransactionReportUrlFilter extends com.dianping.cat.consumer.transa
}
}
super.visitType(type);
}
public static class TransactionNameCompator implements Comparator<TransactionName> {
......
......@@ -22,7 +22,7 @@ public class DefaultMessageQueue implements MessageQueue, Initializable {
if (m_size > 0) {
m_queue = new LinkedBlockingQueue<MessageTree>(m_size);
} else {
m_queue = new LinkedBlockingQueue<MessageTree>(10000);
m_queue = new LinkedBlockingQueue<MessageTree>(50000);
}
}
......
package com.dianping.cat.message.spi;
import java.util.HashSet;
import java.util.Set;
import org.codehaus.plexus.logging.Logger;
import org.unidal.lookup.ContainerHolder;
import com.dianping.cat.Cat;
import com.dianping.cat.configuration.ServerConfigManager;
import com.dianping.cat.message.Transaction;
import org.unidal.lookup.ContainerHolder;
public abstract class AbstractMessageAnalyzer<R> extends ContainerHolder implements MessageAnalyzer {
......@@ -25,6 +28,18 @@ public abstract class AbstractMessageAnalyzer<R> extends ContainerHolder impleme
protected static final String ALL = "All";
protected static Set<String> UNUSED_TYPES = new HashSet<String>();
protected static Set<String> UNUSED_NAMES = new HashSet<String>();
static {
UNUSED_TYPES.add("Service");
UNUSED_TYPES.add("PigeonService");
UNUSED_NAMES.add("piegonService:heartTaskService:heartBeat");
UNUSED_NAMES.add("piegonService:heartTaskService:heartBeat()");
UNUSED_NAMES.add("pigeon:HeartBeatService:null");
}
@Override
public void analyze(MessageQueue queue) {
while (!isTimeout() && isActive()) {
......@@ -89,10 +104,7 @@ public abstract class AbstractMessageAnalyzer<R> extends ContainerHolder impleme
String type = t.getType();
String name = t.getName();
if ((("Service").equals(type) || ("PigeonService").equals(type))
&& (("piegonService:heartTaskService:heartBeat").equals(name)
|| ("piegonService:heartTaskService:heartBeat()").equals(name) || ("pigeon:HeartBeatService:null")
.equals(name))) {
if (UNUSED_TYPES.contains(type) && UNUSED_NAMES.contains(name)) {
return true;
}
return false;
......
......@@ -70,7 +70,7 @@ public class LocalMessageBucketManager extends ContainerHolder implements Messag
private Logger m_logger;
private int m_gzipThreads = 5;
private int m_gzipThreads = 10;
private BlockingQueue<MessageBlock> m_messageBlocks = new LinkedBlockingQueue<MessageBlock>(10000);
......@@ -136,7 +136,7 @@ public class LocalMessageBucketManager extends ContainerHolder implements Messag
for (int i = 0; i < m_gzipThreads; i++) {
LinkedBlockingQueue<MessageItem> messageQueue = new LinkedBlockingQueue<LocalMessageBucketManager.MessageItem>(
10000);
50000);
m_messageQueues.put(i, messageQueue);
Threads.forGroup("Cat").start(new MessageGzip(messageQueue));
......@@ -330,26 +330,19 @@ public class LocalMessageBucketManager extends ContainerHolder implements Messag
abs = -abs;
}
int bucketIndex = abs % m_gzipThreads;
if (bucketIndex > m_gzipThreads || bucketIndex < 0) {
m_logger.error("Error when compute the message bucket index!" + bucketIndex);
} else {
m_processMessages[bucketIndex]++;
}
m_processMessages[bucketIndex]++;
LinkedBlockingQueue<MessageItem> items = m_messageQueues.get(bucketIndex);
boolean result = items.offer(new MessageItem(tree, id));
if (result == false) {
m_serverStateManager.addMessageDumpLoss(1);
m_error++;
if (m_error % CatConstants.ERROR_COUNT == 0) {
if (!result) {
if ((++m_error) % CatConstants.ERROR_COUNT == 0) {
m_logger.error("Error when offer message tree to gzip queue! overflow :" + m_error);
}
m_serverStateManager.addMessageDumpLoss(1);
}
m_total++;
if (m_total % (CatConstants.SUCCESS_COUNT) == 0) {
if ((++m_total) % (CatConstants.SUCCESS_COUNT) == 0) {
logState(tree);
}
}
......@@ -400,12 +393,9 @@ public class LocalMessageBucketManager extends ContainerHolder implements Messag
if (item != null) {
try {
MessageTree tree = item.getTree();
MessageId id = item.getMessageId();
String name = id.getDomain() + '-' + id.getIpAddress() + '-' + m_localIp;
String dataFile = m_pathBuilder.getPath(new Date(id.getTimestamp()), name);
LocalMessageBucket bucket = m_buckets.get(dataFile);
if (bucket == null) {
......@@ -415,12 +405,8 @@ public class LocalMessageBucketManager extends ContainerHolder implements Messag
m_buckets.put(dataFile, bucket);
}
DefaultMessageTree defaultTree = (DefaultMessageTree) tree;
ChannelBuffer buf = defaultTree.getBuf();
int size = buf.readableBytes();
m_totalSize += size;
DefaultMessageTree tree = (DefaultMessageTree) item.getTree();
ChannelBuffer buf = tree.getBuf();
MessageBlock bolck = bucket.storeMessage(buf, id);
if (bolck != null) {
......@@ -429,6 +415,7 @@ public class LocalMessageBucketManager extends ContainerHolder implements Messag
m_logger.error("Error when offer the block to the dump!");
}
}
m_totalSize += buf.readableBytes();
} catch (Exception e) {
Cat.logError(e);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册