diff --git a/cat-consumer/src/main/java/com/dianping/cat/consumer/RealtimeConsumer.java b/cat-consumer/src/main/java/com/dianping/cat/consumer/RealtimeConsumer.java index 72873406df1294ffd53f2825e22ddf2c5d509d45..22c6a50e313784c3536cce1d99c9a23b9e5de793 100644 --- a/cat-consumer/src/main/java/com/dianping/cat/consumer/RealtimeConsumer.java +++ b/cat-consumer/src/main/java/com/dianping/cat/consumer/RealtimeConsumer.java @@ -19,6 +19,7 @@ import org.codehaus.plexus.logging.Logger; import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable; import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException; +import com.dianping.cat.Cat; import com.dianping.cat.message.spi.MessageAnalyzer; import com.dianping.cat.message.spi.MessageConsumer; import com.dianping.cat.message.spi.MessageQueue; @@ -208,6 +209,8 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer m_lastAnalyzers.putAll(m_currentAnalyzers); m_currentAnalyzers.clear(); + Cat.setup("realtime-consumer"); + for (String name : m_analyzerNames) { MessageAnalyzer analyzer = m_factory.create(name, start, m_duration, m_extraTime); MessageQueue queue = lookup(MessageQueue.class); @@ -305,10 +308,16 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer } public void run() { - m_analyzer.analyze(m_queue); - m_factory.release(m_analyzer); - m_factory.release(m_queue); - m_latch.countDown(); + Cat.setup("realtime-consumer-task"); + + try { + m_analyzer.analyze(m_queue); + m_factory.release(m_analyzer); + m_factory.release(m_queue); + m_latch.countDown(); + } finally { + Cat.reset(); + } } } } \ No newline at end of file diff --git a/cat-consumer/src/main/java/com/dianping/cat/consumer/build/ComponentsConfigurator.java b/cat-consumer/src/main/java/com/dianping/cat/consumer/build/ComponentsConfigurator.java index 2259d8ad100b42b51017d46bdf14eb500d4e3614..52939f3ff23231e17f70525f832859510ab976bd 100644 --- a/cat-consumer/src/main/java/com/dianping/cat/consumer/build/ComponentsConfigurator.java +++ b/cat-consumer/src/main/java/com/dianping/cat/consumer/build/ComponentsConfigurator.java @@ -39,7 +39,7 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator { .config(E("failureType").value(failureTypes))); all.add(C(Handler.class, LONG_URL.getName(), LongUrlHandler.class) // - .config(E("threshold").value("10"))); + .config(E("threshold").value("1000"))); all.add(C(ProblemAnalyzer.class).is(PER_LOOKUP) // .req(Handler.class, new String[] { FAILURE.getName(), LONG_URL.getName() }, "m_handlers") // diff --git a/cat-consumer/src/main/java/com/dianping/cat/consumer/problem/ProblemAnalyzer.java b/cat-consumer/src/main/java/com/dianping/cat/consumer/problem/ProblemAnalyzer.java index 6c21a2f29d306b3847002d4ad7cbf404615b17a4..498f00c006564a407fe1aab3a5520bcf97580ae1 100644 --- a/cat-consumer/src/main/java/com/dianping/cat/consumer/problem/ProblemAnalyzer.java +++ b/cat-consumer/src/main/java/com/dianping/cat/consumer/problem/ProblemAnalyzer.java @@ -10,6 +10,7 @@ import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import org.codehaus.plexus.logging.LogEnabled; import org.codehaus.plexus.logging.Logger; @@ -46,9 +47,34 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer impl private long m_duration; + void closeMessageBuckets(Set set) { + Date timestamp = new Date(m_startTime); + + for (String domain : m_reports.keySet()) { + Bucket localBucket = null; + Bucket remoteBucket = null; + + try { + localBucket = m_bucketManager.getMessageBucket(new Date(m_startTime), domain, "local"); + remoteBucket = m_bucketManager.getMessageBucket(new Date(m_startTime), domain, "remote"); + } catch (Exception e) { + m_logger.error(String.format("Error when getting message bucket of %s!", timestamp), e); + } finally { + if (localBucket != null) { + m_bucketManager.closeBucket(localBucket); + } + + if (remoteBucket != null) { + m_bucketManager.closeBucket(remoteBucket); + } + } + } + } + @Override public void doCheckpoint() throws IOException { storeReports(m_reports.values()); + closeMessageBuckets(m_reports.keySet()); } @Override @@ -56,6 +82,18 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer impl m_logger = logger; } + private Segment findOrCreateSegment(ProblemReport report, MessageTree tree) { + Machine machine = report.findOrCreateMachine(tree.getIpAddress()); + JavaThread thread = machine.findOrCreateThread(tree.getThreadId()); + Calendar cal = Calendar.getInstance(); + + cal.setTimeInMillis(tree.getMessage().getTimestamp()); + + int minute = cal.get(Calendar.MINUTE); + Segment segment = thread.findOrCreateSegment(minute); + return segment; + } + @Override protected List generate() { List reports = new ArrayList(m_reports.size()); @@ -120,7 +158,7 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer impl Bucket bucket = null; try { - bucket = m_bucketManager.getReportBucket(timestamp, "problem"); + bucket = m_bucketManager.getReportBucket(timestamp, "problem", "local"); for (String id : bucket.getIdsByPrefix("")) { String xml = bucket.findById(id); @@ -161,27 +199,18 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer impl String messageId = tree.getMessageId(); try { - Bucket bucket = m_bucketManager.getMessageBucket(new Date(m_startTime), domain); + Bucket localBucket = m_bucketManager.getMessageBucket(new Date(m_startTime), domain, "local"); + Bucket remoteBucket = m_bucketManager + .getMessageBucket(new Date(m_startTime), domain, "remote"); - bucket.storeById(messageId, tree); + localBucket.storeById(messageId, tree); + remoteBucket.storeById(messageId, tree); } catch (IOException e) { m_logger.error("Error when storing message for problem analyzer!", e); } } } - private Segment findOrCreateSegment(ProblemReport report, MessageTree tree) { - Machine machine = report.findOrCreateMachine(tree.getIpAddress()); - JavaThread thread = machine.findOrCreateThread(tree.getThreadId()); - Calendar cal = Calendar.getInstance(); - - cal.setTimeInMillis(tree.getMessage().getTimestamp()); - - int minute = cal.get(Calendar.MINUTE); - Segment segment = thread.findOrCreateSegment(minute); - return segment; - } - public void setAnalyzerInfo(long startTime, long duration, long extraTime) { m_extraTime = extraTime; m_startTime = startTime; @@ -197,30 +226,38 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer impl } storeReports(reports); + closeMessageBuckets(m_reports.keySet()); } void storeReports(Collection reports) { Date timestamp = new Date(m_startTime); DefaultXmlBuilder builder = new DefaultXmlBuilder(true); - Bucket bucket = null; + Bucket localBucket = null; + Bucket remoteBucket = null; try { - bucket = m_bucketManager.getReportBucket(timestamp, "problem"); + localBucket = m_bucketManager.getReportBucket(timestamp, "problem", "local"); + remoteBucket = m_bucketManager.getReportBucket(timestamp, "problem", "remote"); // delete old one, not append mode - bucket.deleteAndCreate(); + localBucket.deleteAndCreate(); for (ProblemReport report : reports) { String xml = builder.buildXml(report); String domain = report.getDomain(); - bucket.storeById(domain, xml); + localBucket.storeById(domain, xml); + remoteBucket.storeById(domain, xml); } } catch (Exception e) { - m_logger.error(String.format("Error when storing transaction reports of %s!", timestamp), e); + m_logger.error(String.format("Error when storing problem reports to %s!", timestamp), e); } finally { - if (bucket != null) { - m_bucketManager.closeBucket(bucket); + if (localBucket != null) { + m_bucketManager.closeBucket(localBucket); + } + + if (remoteBucket != null) { + m_bucketManager.closeBucket(remoteBucket); } } } diff --git a/cat-consumer/src/main/java/com/dianping/cat/consumer/transaction/TransactionAnalyzer.java b/cat-consumer/src/main/java/com/dianping/cat/consumer/transaction/TransactionAnalyzer.java index 0237aca8d7e3a2057a9c8951b782d611251d81fc..b8b72a29e7d9c99edfbf0e750b63194be1568f51 100644 --- a/cat-consumer/src/main/java/com/dianping/cat/consumer/transaction/TransactionAnalyzer.java +++ b/cat-consumer/src/main/java/com/dianping/cat/consumer/transaction/TransactionAnalyzer.java @@ -10,6 +10,7 @@ import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import org.codehaus.plexus.logging.LogEnabled; import org.codehaus.plexus.logging.Logger; @@ -49,9 +50,34 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer set) { + Date timestamp = new Date(m_startTime); + + for (String domain : m_reports.keySet()) { + Bucket localBucket = null; + Bucket remoteBucket = null; + + try { + localBucket = m_bucketManager.getMessageBucket(new Date(m_startTime), domain, "local"); + remoteBucket = m_bucketManager.getMessageBucket(new Date(m_startTime), domain, "remote"); + } catch (Exception e) { + m_logger.error(String.format("Error when getting message bucket of %s!", timestamp), e); + } finally { + if (localBucket != null) { + m_bucketManager.closeBucket(localBucket); + } + + if (remoteBucket != null) { + m_bucketManager.closeBucket(remoteBucket); + } + } + } + } + @Override public void doCheckpoint() throws IOException { storeReports(m_reports.values()); + closeMessageBuckets(m_reports.keySet()); } @Override @@ -125,7 +151,7 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer bucket = null; try { - bucket = m_bucketManager.getReportBucket(timestamp, "transaction"); + bucket = m_bucketManager.getReportBucket(timestamp, "transaction", "local"); for (String id : bucket.getIdsByPrefix("")) { String xml = bucket.findById(id); @@ -134,7 +160,7 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer bucket = m_bucketManager.getMessageBucket(new Date(m_startTime), domain); + Bucket localBucket = m_bucketManager.getMessageBucket(new Date(m_startTime), domain, + "local"); + Bucket remoteBucket = m_bucketManager.getMessageBucket(new Date(m_startTime), domain, + "remote"); - bucket.storeById(messageId, tree); + localBucket.storeById(messageId, tree); + remoteBucket.storeById(messageId, tree); } catch (IOException e) { m_logger.error("Error when storing message for transaction analyzer!", e); } @@ -275,30 +305,38 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer reports) { Date timestamp = new Date(m_startTime); DefaultXmlBuilder builder = new DefaultXmlBuilder(true); - Bucket bucket = null; + Bucket localBucket = null; + Bucket remoteBucket = null; try { - bucket = m_bucketManager.getReportBucket(timestamp, "transaction"); + localBucket = m_bucketManager.getReportBucket(timestamp, "transaction", "local"); + remoteBucket = m_bucketManager.getReportBucket(timestamp, "transaction", "remote"); // delete old one, not append mode - bucket.deleteAndCreate(); + localBucket.deleteAndCreate(); for (TransactionReport report : reports) { String xml = builder.buildXml(report); String domain = report.getDomain(); - bucket.storeById(domain, xml); + localBucket.storeById(domain, xml); + remoteBucket.storeById(domain, xml); } } catch (Exception e) { - m_logger.error(String.format("Error when storing transaction reports to %s!", timestamp), e); + m_logger.error(String.format("Error when storing transaction reports of %s!", timestamp), e); } finally { - if (bucket != null) { - m_bucketManager.closeBucket(bucket); + if (localBucket != null) { + m_bucketManager.closeBucket(localBucket); + } + + if (remoteBucket != null) { + m_bucketManager.closeBucket(remoteBucket); } } } diff --git a/cat-consumer/src/main/resources/META-INF/plexus/components.xml b/cat-consumer/src/main/resources/META-INF/plexus/components.xml index 7539de167fe8d4c5ebcb01db7cd4603b558b6d50..db9a005d8be9af8a873f163f12a4fd0299027bfa 100644 --- a/cat-consumer/src/main/resources/META-INF/plexus/components.xml +++ b/cat-consumer/src/main/resources/META-INF/plexus/components.xml @@ -32,7 +32,7 @@ long-url com.dianping.cat.consumer.problem.handler.LongUrlHandler - 10 + 1000 diff --git a/cat-core/src/main/java/com/dianping/cat/build/StorageComponentConfigurator.java b/cat-core/src/main/java/com/dianping/cat/build/StorageComponentConfigurator.java index 558aa11d74ea43c6626871c2fd8f3459153c1576..a795bb2136665ab1e89699d8a383f43e01685b16 100644 --- a/cat-core/src/main/java/com/dianping/cat/build/StorageComponentConfigurator.java +++ b/cat-core/src/main/java/com/dianping/cat/build/StorageComponentConfigurator.java @@ -19,14 +19,15 @@ class StorageComponentConfigurator extends AbstractResourceConfigurator { public List defineComponents() { List all = new ArrayList(); - all.add(C(Bucket.class, String.class.getName(), LocalStringBucket.class) // + all.add(C(Bucket.class, String.class.getName() + "-local", LocalStringBucket.class) // .is(PER_LOOKUP) // .req(MessagePathBuilder.class)); - all.add(C(Bucket.class, MessageTree.class.getName(), LocalMessageBucket.class) // + all.add(C(Bucket.class, MessageTree.class.getName() + "-local", LocalMessageBucket.class) // .is(PER_LOOKUP) // .req(MessagePathBuilder.class) // .req(MessageCodec.class, "plain-text")); - all.add(C(BucketManager.class, DefaultBucketManager.class)); + all.add(C(BucketManager.class, DefaultBucketManager.class) // + .req(MessagePathBuilder.class)); return all; } diff --git a/cat-core/src/main/java/com/dianping/cat/message/internal/DefaultMessageManager.java b/cat-core/src/main/java/com/dianping/cat/message/internal/DefaultMessageManager.java index e1d46ca26e064fd8ea6539f415048b5026023996..d3dc530ecfab27ebaeac8077558ffc7f188b31b4 100644 --- a/cat-core/src/main/java/com/dianping/cat/message/internal/DefaultMessageManager.java +++ b/cat-core/src/main/java/com/dianping/cat/message/internal/DefaultMessageManager.java @@ -50,7 +50,14 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan @Override public void add(Message message) { if (Cat.isInitialized()) { - getContext().add(this, message); + Context ctx = m_context.get(); + + if (ctx != null) { + ctx.add(this, message); + } else if (m_clientConfig.isDevMode()) { + throw new RuntimeException("Cat has not been initialized successfully, " + + "please call Cal.setup(...) first for each thread."); + } } } @@ -62,7 +69,14 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan @Override public void end(Transaction transaction) { if (Cat.isInitialized()) { - getContext().end(this, transaction); + Context ctx = m_context.get(); + + if (ctx != null) { + ctx.end(this, transaction); + } else if (m_clientConfig.isDevMode()) { + throw new RuntimeException("Cat has not been initialized successfully, " + + "please call Cal.setup(...) first for each thread."); + } } } @@ -81,17 +95,6 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan return m_clientConfig; } - Context getContext() { - Context ctx = m_context.get(); - - if (ctx == null) { - throw new RuntimeException("Cat has not been initialized successfully, " - + "please call Cal.setup(...) first for each thread."); - } else { - return ctx; - } - } - @Override public Config getServerConfig() { return m_serverConfig; @@ -151,7 +154,7 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan @Override public boolean isCatEnabled() { - return m_domain != null && m_domain.isEnabled(); + return m_domain != null && m_domain.isEnabled() && m_context.get() != null; } String nextMessageId() { @@ -166,7 +169,13 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan @Override public void setup() { - Context ctx = new Context(m_domain.getId(), m_hostName, m_domain.getIp()); + Context ctx; + + if (m_domain != null) { + ctx = new Context(m_domain.getId(), m_hostName, m_domain.getIp()); + } else { + ctx = new Context("Unknown", m_hostName, ""); + } m_context.set(ctx); } @@ -174,7 +183,14 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan @Override public void start(Transaction transaction) { if (Cat.isInitialized()) { - getContext().start(this, transaction); + Context ctx = m_context.get(); + + if (ctx != null) { + ctx.start(this, transaction); + } else if (m_clientConfig.isDevMode()) { + throw new RuntimeException("Cat has not been initialized successfully, " + + "please call Cal.setup(...) first for each thread."); + } } else if (m_firstMessage) { m_firstMessage = false; m_logger.warn("CAT client is not enabled because it's not initialized yet"); diff --git a/cat-core/src/main/java/com/dianping/cat/message/io/TcpSocketReceiver.java b/cat-core/src/main/java/com/dianping/cat/message/io/TcpSocketReceiver.java index 9bb17a11da6e8740428036e2ea89b51c2ba2b43d..8a44f24bc312307b4761b86132a0e1a7d09f34af 100644 --- a/cat-core/src/main/java/com/dianping/cat/message/io/TcpSocketReceiver.java +++ b/cat-core/src/main/java/com/dianping/cat/message/io/TcpSocketReceiver.java @@ -26,6 +26,7 @@ import org.jboss.netty.channel.group.DefaultChannelGroup; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import org.jboss.netty.handler.codec.frame.FrameDecoder; +import com.dianping.cat.Cat; import com.dianping.cat.message.spi.MessageCodec; import com.dianping.cat.message.spi.MessageHandler; import com.dianping.cat.message.spi.MessageTree; @@ -95,6 +96,8 @@ public class TcpSocketReceiver implements MessageReceiver, LogEnabled { @Override public void onMessage(MessageHandler handler) { + Cat.setup("tcp-socket-receiver"); + try { while (true) { ChannelBuffer buf = m_queue.poll(1, TimeUnit.MILLISECONDS); @@ -110,6 +113,8 @@ public class TcpSocketReceiver implements MessageReceiver, LogEnabled { } } catch (InterruptedException e) { // ignore it + } finally { + Cat.reset(); } ChannelGroupFuture future = m_channelGroup.close(); diff --git a/cat-core/src/main/java/com/dianping/cat/storage/Bucket.java b/cat-core/src/main/java/com/dianping/cat/storage/Bucket.java index 68232ba14a372083050314d5b7a1d10e77eaa763..54288ca1526f99bd44a11bf9ae052e0b32f0d1ca 100644 --- a/cat-core/src/main/java/com/dianping/cat/storage/Bucket.java +++ b/cat-core/src/main/java/com/dianping/cat/storage/Bucket.java @@ -21,5 +21,13 @@ public interface Bucket { public void initialize(Class type, String name, Date timestamp) throws IOException;; + /** + * store the data by id into the bucket. + * + * @param id + * @param data + * @return true means the data was stored in the bucket, otherwise false. + * @throws IOException + */ public boolean storeById(String id, T data) throws IOException;; } diff --git a/cat-core/src/main/java/com/dianping/cat/storage/BucketManager.java b/cat-core/src/main/java/com/dianping/cat/storage/BucketManager.java index eee55c709030c3b297533446b90d69fe00da52a5..238a3c45a9df69199c1664be5bc4c95071a90ed5 100644 --- a/cat-core/src/main/java/com/dianping/cat/storage/BucketManager.java +++ b/cat-core/src/main/java/com/dianping/cat/storage/BucketManager.java @@ -8,7 +8,7 @@ import com.dianping.cat.message.spi.MessageTree; public interface BucketManager { public void closeBucket(Bucket bucket); - public Bucket getMessageBucket(Date timestamp, String domain) throws IOException; + public Bucket getMessageBucket(Date timestamp, String domain, String namespace) throws IOException; - public Bucket getReportBucket(Date timestamp, String name) throws IOException; + public Bucket getReportBucket(Date timestamp, String name, String namespace) throws IOException; } diff --git a/cat-core/src/main/java/com/dianping/cat/storage/internal/DefaultBucketManager.java b/cat-core/src/main/java/com/dianping/cat/storage/internal/DefaultBucketManager.java index 3cb1c9b1e932bfd5abda3b7d35effac712a3d5cb..aea8aeff784ec156220f8923372be1d69a491246 100644 --- a/cat-core/src/main/java/com/dianping/cat/storage/internal/DefaultBucketManager.java +++ b/cat-core/src/main/java/com/dianping/cat/storage/internal/DefaultBucketManager.java @@ -7,12 +7,17 @@ import java.util.Map; import org.codehaus.plexus.personality.plexus.lifecycle.phase.Disposable; +import com.dianping.cat.message.spi.MessagePathBuilder; import com.dianping.cat.message.spi.MessageTree; import com.dianping.cat.storage.Bucket; import com.dianping.cat.storage.BucketManager; import com.site.lookup.ContainerHolder; +import com.site.lookup.annotation.Inject; public class DefaultBucketManager extends ContainerHolder implements BucketManager, Disposable { + @Inject + private MessagePathBuilder m_pathBuilder; + private Map> m_map = new HashMap>(); @Override @@ -23,34 +28,59 @@ public class DefaultBucketManager extends ContainerHolder implements BucketManag // ignore it } + synchronized (m_map) { + for (Map.Entry> e : m_map.entrySet()) { + if (e.getValue() == bucket) { + m_map.remove(e.getKey()); + break; + } + } + } + release(bucket); } - protected Bucket createBucket(Class type, Date timestamp, String name) throws IOException { - Bucket bucket = lookup(Bucket.class, type.getName()); + protected Bucket createBucket(Class type, Date timestamp, String name, String namespace) throws IOException { + try { + Bucket bucket = lookup(Bucket.class, type.getName() + "-" + namespace); - bucket.initialize(type, name, timestamp); - return bucket; + bucket.initialize(type, name, timestamp); + return bucket; + } catch (RuntimeException e) { + e.printStackTrace(); + + throw e; + } } @Override public void dispose() { - for (Bucket bucket : m_map.values()) { - release(bucket); + synchronized (m_map) { + for (Bucket bucket : m_map.values()) { + release(bucket); + } } } @SuppressWarnings("unchecked") - protected Bucket getBucket(Class type, Date timestamp, String name) throws IOException { - Entry entry = new Entry(type, timestamp, name); + protected Bucket getBucket(Class type, Date timestamp, String name, String namespace) throws IOException { + String path; + + if (type == MessageTree.class) { + path = m_pathBuilder.getMessagePath(name, timestamp); + } else { + path = m_pathBuilder.getReportPath(name, timestamp); + } + + Entry entry = new Entry(type, path, namespace); Bucket bucket = m_map.get(entry); if (bucket == null) { - synchronized (this) { + synchronized (m_map) { bucket = m_map.get(entry); if (bucket == null) { - bucket = createBucket(type, timestamp, name); + bucket = createBucket(type, timestamp, name, namespace); m_map.put(entry, bucket); } } @@ -60,26 +90,26 @@ public class DefaultBucketManager extends ContainerHolder implements BucketManag } @Override - public Bucket getMessageBucket(Date timestamp, String domain) throws IOException { - return getBucket(MessageTree.class, timestamp, domain); + public Bucket getMessageBucket(Date timestamp, String domain, String namespace) throws IOException { + return getBucket(MessageTree.class, timestamp, domain, namespace); } @Override - public Bucket getReportBucket(Date timestamp, String name) throws IOException { - return getBucket(String.class, timestamp, name); + public Bucket getReportBucket(Date timestamp, String name, String namespace) throws IOException { + return getBucket(String.class, timestamp, name, namespace); } static class Entry { private Class m_type; - private Date m_timestamp; + private String m_path; - private String m_name; + private String m_namespace; - public Entry(Class type, Date timestamp, String name) { + public Entry(Class type, String path, String namespace) { m_type = type; - m_timestamp = timestamp; - m_name = name; + m_path = path; + m_namespace = namespace; } @Override @@ -87,31 +117,23 @@ public class DefaultBucketManager extends ContainerHolder implements BucketManag if (obj instanceof Entry) { Entry e = (Entry) obj; - return e.getType() == m_type && e.getTimestamp().getTime() == m_timestamp.getTime() - && e.getName().equals(m_name); + return e.m_type == m_type && e.m_path.equals(m_path) && e.m_namespace.equals(m_namespace); } return false; } - public String getName() { - return m_name; - } - - public Date getTimestamp() { - return m_timestamp; - } - - public Class getType() { - return m_type; - } - @Override public int hashCode() { int hashcode = m_type.hashCode(); - hashcode = hashcode * 31 + m_name.hashCode(); + hashcode = hashcode * 31 + m_path.hashCode(); return hashcode; } + + @Override + public String toString() { + return String.format("Entry[type=%s,path=%s]", m_type, m_path); + } } } diff --git a/cat-core/src/main/resources/META-INF/dal/model/config-codegen.xml b/cat-core/src/main/resources/META-INF/dal/model/config-codegen.xml index d43525d06ad09bdad623cf8c1e2ef2da50837c61..965a74d8ae405a8c5d03305ae6eca841d293e49b 100644 --- a/cat-core/src/main/resources/META-INF/dal/model/config-codegen.xml +++ b/cat-core/src/main/resources/META-INF/dal/model/config-codegen.xml @@ -3,6 +3,7 @@ + diff --git a/cat-core/src/main/resources/META-INF/plexus/components.xml b/cat-core/src/main/resources/META-INF/plexus/components.xml index ab28b3c878074dfbf8291024516bf2c5052c14fd..5470c53b08bba5c2602b3973d74ff27497b1f1d4 100644 --- a/cat-core/src/main/resources/META-INF/plexus/components.xml +++ b/cat-core/src/main/resources/META-INF/plexus/components.xml @@ -194,7 +194,7 @@ com.dianping.cat.storage.Bucket - java.lang.String + java.lang.String-local com.dianping.cat.storage.internal.LocalStringBucket per-lookup @@ -205,7 +205,7 @@ com.dianping.cat.storage.Bucket - com.dianping.cat.message.spi.MessageTree + com.dianping.cat.message.spi.MessageTree-local com.dianping.cat.storage.internal.LocalMessageBucket per-lookup @@ -221,6 +221,11 @@ com.dianping.cat.storage.BucketManager com.dianping.cat.storage.internal.DefaultBucketManager + + + com.dianping.cat.message.spi.MessagePathBuilder + + diff --git a/cat-core/src/test/java/com/dianping/cat/storage/BucketConcurrentTest.java b/cat-core/src/test/java/com/dianping/cat/storage/BucketConcurrentTest.java index e4634976eb28e3c9b229cc710b19d6f7e61d90b9..fdb4861e40c8165fa15419ec7453490777fbcd9e 100644 --- a/cat-core/src/test/java/com/dianping/cat/storage/BucketConcurrentTest.java +++ b/cat-core/src/test/java/com/dianping/cat/storage/BucketConcurrentTest.java @@ -47,7 +47,7 @@ public class BucketConcurrentTest extends ComponentTestCase { public void testMessageBucket() throws Exception { Date timestamp = new Date(); BucketManager manager = lookup(BucketManager.class); - final Bucket bucket = manager.getMessageBucket(timestamp, "concurrent/message"); + final Bucket bucket = manager.getMessageBucket(timestamp, "concurrent/message", "local"); ExecutorService pool = Executors.newFixedThreadPool(10); for (int p = 0; p < 10; p++) { @@ -112,7 +112,7 @@ public class BucketConcurrentTest extends ComponentTestCase { public void testStringBucket() throws Exception { Date timestamp = new Date(); BucketManager manager = lookup(BucketManager.class); - final Bucket bucket = manager.getReportBucket(timestamp, "concurrent/data"); + final Bucket bucket = manager.getReportBucket(timestamp, "concurrent/data", "local"); ExecutorService pool = Executors.newFixedThreadPool(10); for (int p = 0; p < 10; p++) { @@ -140,7 +140,7 @@ public class BucketConcurrentTest extends ComponentTestCase { pool.awaitTermination(5000, TimeUnit.MILLISECONDS); - final Bucket bucket2 = manager.getReportBucket(timestamp, "concurrent/data"); + final Bucket bucket2 = manager.getReportBucket(timestamp, "concurrent/data", "local"); for (int p = 0; p < 10; p++) { final int num = p; diff --git a/cat-core/src/test/java/com/dianping/cat/storage/BucketManagerTest.java b/cat-core/src/test/java/com/dianping/cat/storage/BucketManagerTest.java index 4021e47ace1784d5330e6c5306c6b76154677efa..e241c04c20bc23aeb178b3a2401a6590b1939f8a 100644 --- a/cat-core/src/test/java/com/dianping/cat/storage/BucketManagerTest.java +++ b/cat-core/src/test/java/com/dianping/cat/storage/BucketManagerTest.java @@ -17,10 +17,10 @@ public class BucketManagerTest extends ComponentTestCase { public void test() throws Exception { Date timestamp = new Date(); BucketManager manager = lookup(BucketManager.class); - Bucket bucket1 = manager.getMessageBucket(timestamp, "test/path1"); - Bucket bucket2 = manager.getMessageBucket(timestamp, "test/path2"); - Bucket bucket3 = manager.getMessageBucket(timestamp, "test/path1"); - Bucket bucket4 = manager.getMessageBucket(timestamp, "test/path2"); + Bucket bucket1 = manager.getMessageBucket(timestamp, "test/path1", "local"); + Bucket bucket2 = manager.getMessageBucket(timestamp, "test/path2", "local"); + Bucket bucket3 = manager.getMessageBucket(timestamp, "test/path1", "local"); + Bucket bucket4 = manager.getMessageBucket(timestamp, "test/path2", "local"); Assert.assertEquals(bucket1, bucket3); Assert.assertEquals(bucket2, bucket4); diff --git a/cat-core/src/test/resources/com/dianping/cat/message/configuration/config.xml b/cat-core/src/test/resources/com/dianping/cat/message/configuration/config.xml index 78db15a32dbd0f91dcd9b10f0a4aa33314e4e89f..9e4aaaf7636332ff1d4eebad756a048572b9636d 100644 --- a/cat-core/src/test/resources/com/dianping/cat/message/configuration/config.xml +++ b/cat-core/src/test/resources/com/dianping/cat/message/configuration/config.xml @@ -1,4 +1,4 @@ - + diff --git a/cat-home/src/main/java/com/dianping/cat/report/page/model/logview/HdfsLogViewService.java b/cat-home/src/main/java/com/dianping/cat/report/page/model/logview/HdfsLogViewService.java index b7d56d1a58336da53ea7292b027e97cea8bd9901..998656e5570b7009e0851d65b83d3a865e2bae3b 100644 --- a/cat-home/src/main/java/com/dianping/cat/report/page/model/logview/HdfsLogViewService.java +++ b/cat-home/src/main/java/com/dianping/cat/report/page/model/logview/HdfsLogViewService.java @@ -32,7 +32,7 @@ public class HdfsLogViewService implements ModelService { ModelResponse response = new ModelResponse(); try { - Bucket bucket = m_bucketManager.getMessageBucket(new Date(id.getTimestamp()), id.getDomain()); + Bucket bucket = m_bucketManager.getMessageBucket(new Date(id.getTimestamp()), id.getDomain(), "remote"); MessageTree tree = null; if (tag != null && direction != null) { diff --git a/cat-home/src/main/java/com/dianping/cat/report/page/model/logview/LocalLogViewService.java b/cat-home/src/main/java/com/dianping/cat/report/page/model/logview/LocalLogViewService.java index 11cb011dcb8442afc1b649868c6fc48cf074c63e..6f5d4e1e2d5795ad5d7b2a9d65e33450d4e9da97 100644 --- a/cat-home/src/main/java/com/dianping/cat/report/page/model/logview/LocalLogViewService.java +++ b/cat-home/src/main/java/com/dianping/cat/report/page/model/logview/LocalLogViewService.java @@ -32,7 +32,7 @@ public class LocalLogViewService implements ModelService { ModelResponse response = new ModelResponse(); try { - Bucket bucket = m_bucketManager.getMessageBucket(new Date(id.getTimestamp()), id.getDomain()); + Bucket bucket = m_bucketManager.getMessageBucket(new Date(id.getTimestamp()), id.getDomain(), "local"); MessageTree tree = null; if (tag != null && direction != null) { @@ -58,6 +58,7 @@ public class LocalLogViewService implements ModelService { response.setModel(buf.toString(Charset.forName("utf-8"))); } } catch (Exception e) { + e.printStackTrace(); response.setException(e); } diff --git a/cat-home/src/main/java/com/dianping/cat/report/page/model/problem/HdfsProblemService.java b/cat-home/src/main/java/com/dianping/cat/report/page/model/problem/HdfsProblemService.java index 45b20bd6d942855dc516b17692900c5a8d0c319d..40d9a57e187c71cf58860cdc499930f287de479f 100644 --- a/cat-home/src/main/java/com/dianping/cat/report/page/model/problem/HdfsProblemService.java +++ b/cat-home/src/main/java/com/dianping/cat/report/page/model/problem/HdfsProblemService.java @@ -23,7 +23,7 @@ public class HdfsProblemService implements ModelService { Bucket bucket = null; try { - bucket = m_bucketManager.getReportBucket(new Date(date), domain); + bucket = m_bucketManager.getReportBucket(new Date(date), domain, "remote"); String xml = bucket.findById("problem-" + domain); diff --git a/cat-home/src/main/java/com/dianping/cat/report/page/model/transaction/HdfsTransactionService.java b/cat-home/src/main/java/com/dianping/cat/report/page/model/transaction/HdfsTransactionService.java index 2c78723ea66bd5caf19d7acc74a18c8ac443915c..a8d521066e23bed7103611c9471733819bba5c92 100644 --- a/cat-home/src/main/java/com/dianping/cat/report/page/model/transaction/HdfsTransactionService.java +++ b/cat-home/src/main/java/com/dianping/cat/report/page/model/transaction/HdfsTransactionService.java @@ -23,7 +23,7 @@ public class HdfsTransactionService implements ModelService { Bucket bucket = null; try { - bucket = m_bucketManager.getReportBucket(new Date(date), domain); + bucket = m_bucketManager.getReportBucket(new Date(date), domain, "remote"); String xml = bucket.findById(domain); diff --git a/cat-job/src/main/java/com/dianping/cat/job/build/ComponentsConfigurator.java b/cat-job/src/main/java/com/dianping/cat/job/build/ComponentsConfigurator.java index 1d2bc91b8456b014226d05c3b4cf3c6ce4c3961c..aa9c8d705baaa983180b460a598a788d59f7defa 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/build/ComponentsConfigurator.java +++ b/cat-job/src/main/java/com/dianping/cat/job/build/ComponentsConfigurator.java @@ -4,9 +4,12 @@ import java.util.ArrayList; import java.util.List; import com.dianping.cat.job.DumpToHdfsConsumer; +import com.dianping.cat.job.hdfs.DefaultInputChannel; +import com.dianping.cat.job.hdfs.DefaultInputChannelManager; import com.dianping.cat.job.hdfs.DefaultOutputChannel; import com.dianping.cat.job.hdfs.DefaultOutputChannelManager; import com.dianping.cat.job.hdfs.HdfsMessageStorage; +import com.dianping.cat.job.hdfs.InputChannel; import com.dianping.cat.job.hdfs.InputChannelManager; import com.dianping.cat.job.hdfs.OutputChannel; import com.dianping.cat.job.hdfs.OutputChannelManager; @@ -20,8 +23,6 @@ import com.dianping.cat.message.spi.MessagePathBuilder; import com.dianping.cat.message.spi.MessageStorage; import com.dianping.cat.message.spi.MessageTree; import com.dianping.cat.storage.Bucket; -import com.dianping.cat.storage.internal.LocalMessageBucket; -import com.dianping.cat.storage.internal.LocalStringBucket; import com.site.lookup.configuration.AbstractResourceConfigurator; import com.site.lookup.configuration.Component; @@ -36,6 +37,9 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator { .config(E("maxSize").value(String.valueOf(2 * 1024 * 1024L)))); all.add(C(OutputChannelManager.class, DefaultOutputChannelManager.class) // .req(MessagePathBuilder.class)); + all.add(C(InputChannel.class, DefaultInputChannel.class).is(PER_LOOKUP) // + .req(MessageCodec.class, "plain-text")); + all.add(C(InputChannelManager.class, DefaultInputChannelManager.class)); } else { all.add(C(OutputChannel.class, DefaultOutputChannel.class).is(PER_LOOKUP) // .req(MessageCodec.class, "plain-text") // @@ -44,6 +48,10 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator { .req(MessagePathBuilder.class) // .config(E("baseDir").value("data"), // E("serverUri").value("hdfs://192.168.7.43:9000/user/cat/"))); + all.add(C(InputChannel.class, DefaultInputChannel.class).is(PER_LOOKUP) // + .req(MessageCodec.class, "plain-text")); + all.add(C(InputChannelManager.class, DefaultInputChannelManager.class) // + .config(E("serverUri").value("hdfs://192.168.7.43:9000/user/cat/"))); } all.add(C(MessageStorage.class, "hdfs", HdfsMessageStorage.class) // @@ -51,22 +59,13 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator { all.add(C(MessageConsumer.class, DumpToHdfsConsumer.ID, DumpToHdfsConsumer.class) // .req(MessageStorage.class, "hdfs")); - if (isEnv("dev") || property("env", null) == null) { - all.add(C(Bucket.class, String.class.getName(), LocalStringBucket.class) // - .is(PER_LOOKUP)); - all.add(C(Bucket.class, MessageTree.class.getName(), LocalMessageBucket.class) // - .is(PER_LOOKUP) // - .req(MessageCodec.class, "plain-text")); - } else { - all.add(C(Bucket.class, String.class.getName(), RemoteStringBucket.class) // - .is(PER_LOOKUP) // - .req(ReportDao.class)); - all.add(C(Bucket.class, MessageTree.class.getName(), RemoteMessageBucket.class) // - .is(PER_LOOKUP) // - .req(LogviewDao.class, MessagePathBuilder.class) // - .req(OutputChannelManager.class, InputChannelManager.class) // - .req(MessageCodec.class, "plain-text")); - } + all.add(C(Bucket.class, String.class.getName() + "-remote", RemoteStringBucket.class) // + .is(PER_LOOKUP) // + .req(ReportDao.class)); + all.add(C(Bucket.class, MessageTree.class.getName() + "-remote", RemoteMessageBucket.class) // + .is(PER_LOOKUP) // + .req(OutputChannelManager.class, InputChannelManager.class) // + .req(LogviewDao.class, MessagePathBuilder.class)); all.addAll(new DatabaseConfigurator().defineComponents()); diff --git a/cat-job/src/main/java/com/dianping/cat/job/hdfs/DefaultInputChannelManager.java b/cat-job/src/main/java/com/dianping/cat/job/hdfs/DefaultInputChannelManager.java index d688110323d6c9e3264bc73afe0a1299289bd90a..f101885e69b5c0801074ddb9a431b2b8ddec7284 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/hdfs/DefaultInputChannelManager.java +++ b/cat-job/src/main/java/com/dianping/cat/job/hdfs/DefaultInputChannelManager.java @@ -24,6 +24,9 @@ public class DefaultInputChannelManager extends ContainerHolder implements Input @Inject private URI m_serverUri; + @Inject + private String m_baseDir = "target/hdfs"; + private FileSystem m_fs; private Map m_channels = new HashMap(); @@ -94,7 +97,7 @@ public class DefaultInputChannelManager extends ContainerHolder implements Input DefaultInputChannel channel = m_channels.get(path); if (channel == null) { - Path file = new Path(path); + Path file = new Path(m_baseDir, path); FSDataInputStream in = m_fs.open(file); channel = (DefaultInputChannel) lookup(InputChannel.class); @@ -106,6 +109,10 @@ public class DefaultInputChannelManager extends ContainerHolder implements Input return channel; } + public void setBaseDir(String baseDir) { + m_baseDir = baseDir; + } + public void setServerUri(String serverUri) { m_serverUri = URI.create(serverUri); } diff --git a/cat-job/src/main/java/com/dianping/cat/job/hdfs/DefaultOutputChannel.java b/cat-job/src/main/java/com/dianping/cat/job/hdfs/DefaultOutputChannel.java index 046aaf1ca1aae72a51b647cdb569235189b1442e..7acb713d456956a592cf186e690c679fbcf79172 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/hdfs/DefaultOutputChannel.java +++ b/cat-job/src/main/java/com/dianping/cat/job/hdfs/DefaultOutputChannel.java @@ -76,6 +76,7 @@ public class DefaultOutputChannel implements OutputChannel { // a blank line used to separate two message trees m_out.write('\n'); + m_out.flush(); m_count += length + 1; return length+1; diff --git a/cat-job/src/main/java/com/dianping/cat/job/hdfs/DefaultOutputChannelManager.java b/cat-job/src/main/java/com/dianping/cat/job/hdfs/DefaultOutputChannelManager.java index 208fdde8d5163dba485b189bb741986cf7461dd1..2345c634948c8f9f760f721f3798eaa6fac56434 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/hdfs/DefaultOutputChannelManager.java +++ b/cat-job/src/main/java/com/dianping/cat/job/hdfs/DefaultOutputChannelManager.java @@ -1,7 +1,6 @@ package com.dianping.cat.job.hdfs; import java.io.IOException; -import java.io.OutputStream; import java.net.URI; import java.util.ArrayList; import java.util.HashMap; @@ -9,6 +8,7 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.codehaus.plexus.logging.LogEnabled; @@ -21,7 +21,8 @@ import com.dianping.cat.message.spi.MessageTree; import com.site.lookup.ContainerHolder; import com.site.lookup.annotation.Inject; -public class DefaultOutputChannelManager extends ContainerHolder implements OutputChannelManager, Initializable, LogEnabled { +public class DefaultOutputChannelManager extends ContainerHolder implements OutputChannelManager, Initializable, + LogEnabled { @Inject private MessagePathBuilder m_builder; @@ -104,16 +105,16 @@ public class DefaultOutputChannelManager extends ContainerHolder implements Outp @Override public OutputChannel openChannel(MessageTree tree, boolean forceNew) throws IOException { String path = m_builder.getHdfsPath(tree.getMessageId()); - + return openChannel(path, forceNew); - } - + } + public OutputChannel openChannel(String path, boolean forceNew) throws IOException { OutputChannel channel = m_channels.get(path); if (channel == null) { - Path file = new Path(m_basePath, path + "-0"); - OutputStream out = m_fs.create(file); + Path file = new Path(m_basePath, path); + FSDataOutputStream out = m_fs.create(file); channel = lookup(OutputChannel.class); channel.initialize(out); @@ -127,7 +128,7 @@ public class DefaultOutputChannelManager extends ContainerHolder implements Outp m_indexes.put(path, ++index); Path file = new Path(m_basePath, path + "-" + index); - OutputStream out = m_fs.create(file); + FSDataOutputStream out = m_fs.create(file); channel = lookup(OutputChannel.class); channel.initialize(out); diff --git a/cat-job/src/main/java/com/dianping/cat/job/storage/RemoteMessageBucket.java b/cat-job/src/main/java/com/dianping/cat/job/storage/RemoteMessageBucket.java index 69b844558f7f0d141d97975b8d93b3370bb4d45f..da5ec10c7ec1c85977196b54a0a6453c8ebc53f9 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/storage/RemoteMessageBucket.java +++ b/cat-job/src/main/java/com/dianping/cat/job/storage/RemoteMessageBucket.java @@ -4,6 +4,9 @@ import java.io.IOException; import java.net.InetAddress; import java.util.Collection; import java.util.Date; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; import org.codehaus.plexus.logging.LogEnabled; import org.codehaus.plexus.logging.Logger; @@ -38,6 +41,15 @@ public class RemoteMessageBucket implements Bucket, LogEnabled { private String m_path; + private Map m_lruCache = new LinkedHashMap(100, 0.75f, true) { + private static final long serialVersionUID = 1L; + + @Override + protected boolean removeEldestEntry(Entry eldest) { + return size() > 100; + } + }; + private Logger m_logger; @Override @@ -147,34 +159,32 @@ public class RemoteMessageBucket implements Bucket, LogEnabled { @Override public boolean storeById(String id, MessageTree tree) throws IOException { - // check if it's already stored - try { - m_logviewDao.findByPK(id, LogviewEntity.READSET_FULL); + String messageId = tree.getMessageId(); + if (m_lruCache.containsKey(messageId)) { return false; - } catch (DalException e) { - // not exist } + + m_lruCache.put(messageId, messageId); int offset = m_outputChannel.getSize(); int length = m_outputChannel.write(tree); Logview logview = m_logviewDao.createLocal(); - logview.setMessageId(tree.getMessageId()); + logview.setMessageId(messageId); logview.setDataPath(m_path); logview.setDataOffset(offset); logview.setDataLength(length); logview.setTagThread("t:" + tree.getThreadId()); logview.setTagSession("s:" + tree.getSessionToken()); - logview.setTagRequest("r:" + tree.getMessageId()); + logview.setTagRequest("r:" + messageId); try { m_logviewDao.insert(logview); - return true; } catch (DalException e) { - throw new IOException("Error when inserting into logiew table!", e); + throw new IOException("Error when inserting into logview table!", e); } } } diff --git a/cat-job/src/main/resources/META-INF/plexus/components.xml b/cat-job/src/main/resources/META-INF/plexus/components.xml index c70bee08c808d21aaacea86178185d73514e87be..1cf2003f0a710eb57dc040264891c3949813dd0e 100644 --- a/cat-job/src/main/resources/META-INF/plexus/components.xml +++ b/cat-job/src/main/resources/META-INF/plexus/components.xml @@ -23,6 +23,21 @@ + + com.dianping.cat.job.hdfs.InputChannel + com.dianping.cat.job.hdfs.DefaultInputChannel + per-lookup + + + com.dianping.cat.message.spi.MessageCodec + plain-text + + + + + com.dianping.cat.job.hdfs.InputChannelManager + com.dianping.cat.job.hdfs.DefaultInputChannelManager + com.dianping.cat.message.spi.MessageStorage hdfs @@ -46,19 +61,32 @@ com.dianping.cat.storage.Bucket - java.lang.String - com.dianping.cat.storage.internal.LocalStringBucket + java.lang.String-remote + com.dianping.cat.job.storage.RemoteStringBucket per-lookup + + + com.dianping.cat.job.sql.dal.ReportDao + + com.dianping.cat.storage.Bucket - com.dianping.cat.message.spi.MessageTree - com.dianping.cat.storage.internal.LocalMessageBucket + com.dianping.cat.message.spi.MessageTree-remote + com.dianping.cat.job.storage.RemoteMessageBucket per-lookup - com.dianping.cat.message.spi.MessageCodec - plain-text + com.dianping.cat.job.hdfs.OutputChannelManager + + + com.dianping.cat.job.hdfs.InputChannelManager + + + com.dianping.cat.job.sql.dal.LogviewDao + + + com.dianping.cat.message.spi.MessagePathBuilder diff --git a/pom.xml b/pom.xml index bbdefb5cafdbbf65656b4310976085f10452ff7b..4a55a3f3607faf463a639638284837126b9b17f6 100644 --- a/pom.xml +++ b/pom.xml @@ -53,7 +53,7 @@ com.site.common web-framework - 1.0.4 + 1.0.5 org.unidal.webres