From bba52f5140bfc3f99c99d373c6eb18244344c67c Mon Sep 17 00:00:00 2001 From: youyong205 Date: Mon, 20 Apr 2015 15:52:01 +0800 Subject: [PATCH] modify the analyzer --- .../message/internal/DefaultMessageManager.java | 2 +- .../dianping/cat/message/io/TcpSocketSender.java | 6 ++++-- .../cat/consumer/event/EventAnalyzer.java | 15 ++++++++------- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/cat-client/src/main/java/com/dianping/cat/message/internal/DefaultMessageManager.java b/cat-client/src/main/java/com/dianping/cat/message/internal/DefaultMessageManager.java index f1823c89a..0f446a484 100644 --- a/cat-client/src/main/java/com/dianping/cat/message/internal/DefaultMessageManager.java +++ b/cat-client/src/main/java/com/dianping/cat/message/internal/DefaultMessageManager.java @@ -556,7 +556,7 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan String childId = nextMessageId(); DefaultTransaction source = (DefaultTransaction) message; DefaultTransaction target = new DefaultTransaction(source.getType(), source.getName(), - DefaultMessageManager.this); + DefaultMessageManager.this); target.setTimestamp(source.getTimestamp()); target.setDurationInMicros(source.getDurationInMicros()); diff --git a/cat-client/src/main/java/com/dianping/cat/message/io/TcpSocketSender.java b/cat-client/src/main/java/com/dianping/cat/message/io/TcpSocketSender.java index 5fb3f37bc..a8318cc37 100644 --- a/cat-client/src/main/java/com/dianping/cat/message/io/TcpSocketSender.java +++ b/cat-client/src/main/java/com/dianping/cat/message/io/TcpSocketSender.java @@ -47,6 +47,8 @@ public class TcpSocketSender implements Task, MessageSender, LogEnabled { private MessageQueue m_queue = new DefaultMessageQueue(SIZE); + private BlockingQueue m_atomicTrees = new LinkedBlockingQueue(SIZE); + private List m_serverAddresses; private ChannelManager m_manager; @@ -59,8 +61,6 @@ public class TcpSocketSender implements Task, MessageSender, LogEnabled { private AtomicInteger m_attempts = new AtomicInteger(); - private BlockingQueue m_atomicTrees = new LinkedBlockingQueue(SIZE * 10); - private static final int MAX_CHILD_NUMBER = 200; private boolean checkWritable(ChannelFuture future) { @@ -127,6 +127,8 @@ public class TcpSocketSender implements Task, MessageSender, LogEnabled { if (count % 1000 == 0 || count == 1) { m_logger.error("Message queue is full in tcp socket sender! Count: " + count); } + + tree = null; } private MessageTree mergeTree(BlockingQueue trees) { diff --git a/cat-consumer/src/main/java/com/dianping/cat/consumer/event/EventAnalyzer.java b/cat-consumer/src/main/java/com/dianping/cat/consumer/event/EventAnalyzer.java index 054ad71b0..28ecd10ad 100644 --- a/cat-consumer/src/main/java/com/dianping/cat/consumer/event/EventAnalyzer.java +++ b/cat-consumer/src/main/java/com/dianping/cat/consumer/event/EventAnalyzer.java @@ -70,18 +70,18 @@ public class EventAnalyzer extends AbstractMessageAnalyzer implemen if (m_serverConfigManager.validateDomain(domain)) { EventReport report = m_reportManager.getHourlyReport(getStartTime(), domain, true); Message message = tree.getMessage(); + String ip = tree.getIpAddress(); if (message instanceof Transaction) { - processTransaction(report, tree, (Transaction) message); + processTransaction(report, tree, (Transaction) message, ip); } else if (message instanceof Event) { - processEvent(report, tree, (Event) message); + processEvent(report, tree, (Event) message, ip); } } } - private void processEvent(EventReport report, MessageTree tree, Event event) { + private void processEvent(EventReport report, MessageTree tree, Event event, String ip) { int count = 1; - String ip = tree.getIpAddress(); EventType type = report.findOrCreateMachine(ip).findOrCreateType(event.getType()); EventName name = type.findOrCreateName(event.getName()); String messageId = tree.getMessageId(); @@ -127,14 +127,14 @@ public class EventAnalyzer extends AbstractMessageAnalyzer implemen } } - private void processTransaction(EventReport report, MessageTree tree, Transaction t) { + private void processTransaction(EventReport report, MessageTree tree, Transaction t, String ip) { List children = t.getChildren(); for (Message child : children) { if (child instanceof Transaction) { - processTransaction(report, tree, (Transaction) child); + processTransaction(report, tree, (Transaction) child, ip); } else if (child instanceof Event) { - processEvent(report, tree, (Event) child); + processEvent(report, tree, (Event) child, ip); } } } @@ -142,4 +142,5 @@ public class EventAnalyzer extends AbstractMessageAnalyzer implemen public void setReportManager(ReportManager reportManager) { m_reportManager = reportManager; } + } -- GitLab