diff --git a/cat-core/src/main/java/com/dianping/cat/Cat.java b/cat-core/src/main/java/com/dianping/cat/Cat.java index 3e9f09a60d9991c6f52be40cd2b1c6ff642e80f8..9bdefe5ce85c9ca20afa6d82a7fe2eb8a2c7aae2 100644 --- a/cat-core/src/main/java/com/dianping/cat/Cat.java +++ b/cat-core/src/main/java/com/dianping/cat/Cat.java @@ -1,7 +1,11 @@ package com.dianping.cat; -import com.dianping.cat.message.MessageProducer; +import org.codehaus.plexus.DefaultPlexusContainer; +import org.codehaus.plexus.PlexusContainer; +import org.codehaus.plexus.PlexusContainerException; +import org.codehaus.plexus.component.repository.exception.ComponentLookupException; +import com.dianping.cat.message.MessageProducer; /** * This is the main entry point to the system. @@ -9,7 +13,22 @@ import com.dianping.cat.message.MessageProducer; * @author Frankie Wu */ public class Cat { + private static PlexusContainer s_container; + + static { + try { + s_container = new DefaultPlexusContainer(); + } catch (PlexusContainerException e) { + e.printStackTrace(); + } + } + public static MessageProducer getProducer() { - return null; + try { + return (MessageProducer) s_container.lookup(MessageProducer.class); + } catch (ComponentLookupException e) { + throw new RuntimeException("Unable to get instance of MessageProducer, " + + "please make sure the environment wa setup correctly!", e); + } } } diff --git a/cat-core/src/main/java/com/dianping/cat/configuration/ComponentsConfigurator.java b/cat-core/src/main/java/com/dianping/cat/configuration/ComponentsConfigurator.java index b1be67704cb559cbc6c5b1fdcf9dbf97625c78ea..0327a15bb31facca94f353e4c88670a137695076 100644 --- a/cat-core/src/main/java/com/dianping/cat/configuration/ComponentsConfigurator.java +++ b/cat-core/src/main/java/com/dianping/cat/configuration/ComponentsConfigurator.java @@ -3,10 +3,12 @@ package com.dianping.cat.configuration; import java.util.ArrayList; import java.util.List; +import com.dianping.cat.message.MessageProducer; import com.dianping.cat.message.broker.DefaultMessageBroker; import com.dianping.cat.message.broker.MessageBroker; import com.dianping.cat.message.handler.MessageDispatcher; import com.dianping.cat.message.handler.MessageHandler; +import com.dianping.cat.message.internal.DefaultMessageProducer; import com.dianping.cat.message.io.InMemoryQueue; import com.dianping.cat.message.io.InMemoryReceiver; import com.dianping.cat.message.io.InMemorySender; @@ -28,6 +30,9 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator { all.add(C(MessageReceiver.class, inMemory, InMemoryReceiver.class) // .req(InMemoryQueue.class)); + all.add(C(MessageProducer.class, DefaultMessageProducer.class)); + + // the following are not used right now all.add(C(MessageHandler.class, MessageDispatcher.class) // .req(MessageReceiver.class, inMemory)); diff --git a/cat-core/src/main/java/com/dianping/cat/message/Message.java b/cat-core/src/main/java/com/dianping/cat/message/Message.java index 54d230f6fd0a810c7d4dcde8fb99746cb5616cef..c8601dafaf84c6013354098405ae8dd6d1e8ef0f 100644 --- a/cat-core/src/main/java/com/dianping/cat/message/Message.java +++ b/cat-core/src/main/java/com/dianping/cat/message/Message.java @@ -79,6 +79,13 @@ public interface Message { */ public String getType(); + /** + * If the complete() method was called or not. + * + * @return true means the complete() method was called, false otherwise. + */ + public boolean isCompleted(); + /** * Set the message status. * diff --git a/cat-core/src/main/java/com/dianping/cat/message/Transaction.java b/cat-core/src/main/java/com/dianping/cat/message/Transaction.java index 0ff357146ce52519746ff1423143e6879e106816..03143910f6b0d8bcac7c838eb680c1b1fd4ad466 100644 --- a/cat-core/src/main/java/com/dianping/cat/message/Transaction.java +++ b/cat-core/src/main/java/com/dianping/cat/message/Transaction.java @@ -1,5 +1,7 @@ package com.dianping.cat.message; +import java.util.List; + /** *

* Transaction is any interesting unit of work that takes time to @@ -34,5 +36,33 @@ package com.dianping.cat.message; * @author Frankie Wu */ public interface Transaction extends Message { + /** + * Add one nested child message to current transaction. + * + * @param message + * to be added + */ + public void addChild(Message message); + + /** + * Get all children message within current transaction. + * + *

+ * Typically, a Transaction can nest other + * Transactions, Events and Heartbeat + * s, while an Event or Heartbeat can't nest other + * messages. + *

+ * + * @return all children messages, empty if there is no nested children. + */ + public List getChildren(); + + /** + * How long the transaction took from construction to complete. Time unit is + * millisecond. + * + * @return duration time in millisecond + */ public long getDuration(); } diff --git a/cat-core/src/main/java/com/dianping/cat/message/internal/AbstractMessage.java b/cat-core/src/main/java/com/dianping/cat/message/internal/AbstractMessage.java new file mode 100644 index 0000000000000000000000000000000000000000..6dff253e02c9f47a73aef53f77087571123a88ff --- /dev/null +++ b/cat-core/src/main/java/com/dianping/cat/message/internal/AbstractMessage.java @@ -0,0 +1,80 @@ +package com.dianping.cat.message.internal; + +import com.dianping.cat.message.Message; + +public abstract class AbstractMessage implements Message { + private String m_type; + + private String m_name; + + private String m_status; + + private long m_timestamp; + + private StringRope m_data; + + private boolean m_completed; + + public AbstractMessage(String type, String name) { + m_type = type; + m_name = name; + m_timestamp = (long) (System.nanoTime() / 1e6); + m_data = new StringRope(); + } + + @Override + public void addData(String keyValuePairs) { + m_data.append(keyValuePairs); + } + + @Override + public void addData(String key, Object value) { + if (m_data.isEmpty()) { + m_data.append("&"); + } + + m_data.append(key).append("=").append(String.valueOf(value)); + } + + public StringRope getData() { + return m_data; + } + + @Override + public String getName() { + return m_name; + } + + @Override + public String getStatus() { + return m_status; + } + + @Override + public long getTimestamp() { + return m_timestamp; + } + + @Override + public String getType() { + return m_type; + } + + public boolean isCompleted() { + return m_completed; + } + + protected void setCompleted(boolean completed) { + m_completed = completed; + } + + @Override + public void setStatus(String status) { + m_status = status; + } + + @Override + public void setStatus(Throwable e) { + m_status = e.getClass().getName(); + } +} diff --git a/cat-core/src/main/java/com/dianping/cat/message/internal/DefaultEvent.java b/cat-core/src/main/java/com/dianping/cat/message/internal/DefaultEvent.java new file mode 100644 index 0000000000000000000000000000000000000000..f507d84521a1e4f1c77cc60ebf75377a1e977a39 --- /dev/null +++ b/cat-core/src/main/java/com/dianping/cat/message/internal/DefaultEvent.java @@ -0,0 +1,15 @@ +package com.dianping.cat.message.internal; + +import com.dianping.cat.message.Event; + +public class DefaultEvent extends AbstractMessage implements Event { + public DefaultEvent(String type, String name) { + super(type, name); + } + + @Override + public void complete() { + setCompleted(true); + MessageManager.INSTANCE.add(this); + } +} diff --git a/cat-core/src/main/java/com/dianping/cat/message/internal/DefaultHeartbeat.java b/cat-core/src/main/java/com/dianping/cat/message/internal/DefaultHeartbeat.java new file mode 100644 index 0000000000000000000000000000000000000000..f76ba4ed17ccee3a2eb1fdd2c0eba1d9f73aeb6a --- /dev/null +++ b/cat-core/src/main/java/com/dianping/cat/message/internal/DefaultHeartbeat.java @@ -0,0 +1,15 @@ +package com.dianping.cat.message.internal; + +import com.dianping.cat.message.Heartbeat; + +public class DefaultHeartbeat extends AbstractMessage implements Heartbeat { + public DefaultHeartbeat(String type, String name) { + super(type, name); + } + + @Override + public void complete() { + setCompleted(true); + MessageManager.INSTANCE.add(this); + } +} diff --git a/cat-core/src/main/java/com/dianping/cat/message/internal/DefaultMessageProducer.java b/cat-core/src/main/java/com/dianping/cat/message/internal/DefaultMessageProducer.java new file mode 100644 index 0000000000000000000000000000000000000000..747e6c5d0770cadb26cedef2604d8cd10f8e5d1e --- /dev/null +++ b/cat-core/src/main/java/com/dianping/cat/message/internal/DefaultMessageProducer.java @@ -0,0 +1,42 @@ +package com.dianping.cat.message.internal; + +import com.dianping.cat.message.Event; +import com.dianping.cat.message.Heartbeat; +import com.dianping.cat.message.Message; +import com.dianping.cat.message.MessageProducer; +import com.dianping.cat.message.Transaction; + +public class DefaultMessageProducer implements MessageProducer { + @Override + public void logEvent(String type, String name, String status, String nameValuePairs) { + Event event = newEvent(type, name); + + event.addData(nameValuePairs); + event.setStatus(Message.SUCCESS); + event.complete(); + } + + @Override + public void logHeartbeat(String type, String name, String status, String nameValuePairs) { + Heartbeat heartbeat = newHeartbeat(type, name); + + heartbeat.addData(nameValuePairs); + heartbeat.setStatus(Message.SUCCESS); + heartbeat.complete(); + } + + @Override + public Event newEvent(String type, String name) { + return new DefaultEvent(type, name); + } + + @Override + public Heartbeat newHeartbeat(String type, String name) { + return new DefaultHeartbeat(type, name); + } + + @Override + public Transaction newTransaction(String type, String name) { + return new DefaultTransaction(type, name); + } +} diff --git a/cat-core/src/main/java/com/dianping/cat/message/internal/DefaultTransaction.java b/cat-core/src/main/java/com/dianping/cat/message/internal/DefaultTransaction.java new file mode 100644 index 0000000000000000000000000000000000000000..5c6946c0abc686756b8a7bfe326df95e53c209e8 --- /dev/null +++ b/cat-core/src/main/java/com/dianping/cat/message/internal/DefaultTransaction.java @@ -0,0 +1,51 @@ +package com.dianping.cat.message.internal; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import com.dianping.cat.message.Message; +import com.dianping.cat.message.Transaction; + +public class DefaultTransaction extends AbstractMessage implements Transaction { + private long m_duration; + + private List m_children; + + public DefaultTransaction(String type, String name) { + super(type, name); + + MessageManager.INSTANCE.start(this); + } + + @Override + public void addChild(Message message) { + if (m_children == null) { + m_children = new ArrayList(); + } + + m_children.add(message); + } + + @Override + public void complete() { + m_duration = (long) (System.nanoTime() / 1e6) - getTimestamp(); + + setCompleted(true); + MessageManager.INSTANCE.end(this); + } + + @Override + public List getChildren() { + if (m_children == null) { + return Collections.emptyList(); + } + + return m_children; + } + + @Override + public long getDuration() { + return m_duration; + } +} diff --git a/cat-core/src/main/java/com/dianping/cat/message/internal/MessageManager.java b/cat-core/src/main/java/com/dianping/cat/message/internal/MessageManager.java new file mode 100644 index 0000000000000000000000000000000000000000..604d2690d827829f842ded9124f1727e53f1b9c4 --- /dev/null +++ b/cat-core/src/main/java/com/dianping/cat/message/internal/MessageManager.java @@ -0,0 +1,131 @@ +package com.dianping.cat.message.internal; + +import java.util.Collections; +import java.util.List; +import java.util.Stack; + +import com.dianping.cat.message.Message; +import com.dianping.cat.message.Transaction; + +public enum MessageManager { + INSTANCE; + + private static final ThreadLocal s_context = new ThreadLocal() { + @Override + protected Context initialValue() { + return new Context(); + } + }; + + public void add(Message message) { + s_context.get().add(message); + } + + public void end(Transaction transaction) { + s_context.get().end(transaction); + } + + public void start(Transaction transaction) { + s_context.get().start(transaction); + } + + void handle(Transaction transaction) { + // TODO + System.out.println(transaction); + } + + static class Context { + private Stack m_stack = new Stack(); + + public void add(Message message) { + if (!m_stack.isEmpty()) { + Transaction entry = m_stack.peek(); + + entry.addChild(message); + } else { + // add a mock transaction as its parent + Transaction t = new FakeTransaction(); + + start(t); + t.addChild(message); + end(t); + } + } + + public void end(Transaction transaction) { + if (!m_stack.isEmpty()) { + Transaction current = m_stack.peek(); + + if (transaction.equals(current)) { + validateTransaction(current); + } else { + throw new RuntimeException("Internal error: Transaction logging mismatched!"); + } + + m_stack.pop(); + + if (m_stack.isEmpty()) { + INSTANCE.handle(transaction); + } + } + } + + public void start(Transaction transaction) { + if (!m_stack.isEmpty()) { + Transaction entry = m_stack.peek(); + + entry.addChild(transaction); + } + + m_stack.push(transaction); + } + + private void validateTransaction(Transaction transaction) { + for (Message message : transaction.getChildren()) { + if (message.getStatus() == null) { + message.setStatus("unset"); + } + + if (!message.isCompleted() && message instanceof DefaultTransaction) { + DefaultTransaction t = (DefaultTransaction) message; + + validateTransaction(t); + + // missing transaction end, log a BadInstrument event so that + // developer can fix the code + DefaultEvent notCompleteEvent = new DefaultEvent("CAT", "BadInstrument"); + + notCompleteEvent.setStatus("TransactionNotCompleted"); + notCompleteEvent.setCompleted(true); + transaction.addChild(notCompleteEvent); + + t.setCompleted(true); + } + } + } + } + + static class FakeTransaction extends AbstractMessage implements Transaction { + public FakeTransaction() { + super(null, null); + } + + @Override + public void complete() { + } + + @Override + public long getDuration() { + return -1; + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public void addChild(Message message) { + } + } +} diff --git a/cat-core/src/main/java/com/dianping/cat/message/internal/StringRope.java b/cat-core/src/main/java/com/dianping/cat/message/internal/StringRope.java new file mode 100644 index 0000000000000000000000000000000000000000..4846660d1ad546963ac0abeafd96a4e507f9db63 --- /dev/null +++ b/cat-core/src/main/java/com/dianping/cat/message/internal/StringRope.java @@ -0,0 +1,22 @@ +package com.dianping.cat.message.internal; + +import java.util.ArrayList; +import java.util.List; + +public class StringRope { + private List m_parts = new ArrayList(); + + public StringRope append(String str) { + if (str == null) { + m_parts.add("null"); + } else if (str.length() > 0) { + m_parts.add(str); + } + + return this; + } + + public boolean isEmpty() { + return m_parts.isEmpty(); + } +} diff --git a/cat-core/src/main/resources/META-INF/plexus/components.xml b/cat-core/src/main/resources/META-INF/plexus/components.xml index c3ff109f57e125245a8f293606854196942613e7..d8686944a9954c660e96000f3b3fb9424fcf2cab 100644 --- a/cat-core/src/main/resources/META-INF/plexus/components.xml +++ b/cat-core/src/main/resources/META-INF/plexus/components.xml @@ -1,15 +1,5 @@ - - com.dianping.cat.message.io.Transport - tcp-socket - com.dianping.cat.message.io.TcpSocketTransport - - - com.dianping.cat.message.io.Transport - udp-multicast - com.dianping.cat.message.io.UdpMulticastTransport - com.dianping.cat.message.io.InMemoryQueue com.dianping.cat.message.io.InMemoryQueue @@ -34,6 +24,10 @@ + + com.dianping.cat.message.MessageProducer + com.dianping.cat.message.internal.DefaultMessageProducer + com.dianping.cat.message.handler.MessageHandler com.dianping.cat.message.handler.MessageDispatcher diff --git a/cat-core/src/test/java/com/dianping/cat/message/io/InMemoryTest.java b/cat-core/src/test/java/com/dianping/cat/message/io/InMemoryTest.java index caaa1936672fc894a34cd225594ede7cd076f063..8ec98560ba9d32446999f9920f08b05bcacc3d5a 100644 --- a/cat-core/src/test/java/com/dianping/cat/message/io/InMemoryTest.java +++ b/cat-core/src/test/java/com/dianping/cat/message/io/InMemoryTest.java @@ -14,6 +14,7 @@ import org.junit.runners.JUnit4; import com.dianping.cat.message.Message; import com.dianping.cat.message.handler.MessageHandler; +import com.dianping.cat.message.internal.AbstractMessage; import com.site.lookup.ComponentTestCase; @RunWith(JUnit4.class) @@ -57,52 +58,14 @@ public class InMemoryTest extends ComponentTestCase { Assert.assertEquals(len, sb.length()); } - static class MockMessage implements Message { - private String m_name; - - @Override - public void addData(String keyValuePairs) { - } - - @Override - public void addData(String key, Object value) { + static class MockMessage extends AbstractMessage { + public MockMessage() { + super(null, null); } @Override public void complete() { } - - @Override - public String getName() { - return m_name; - } - - @Override - public String getStatus() { - return null; - } - - @Override - public long getTimestamp() { - return 0; - } - - @Override - public String getType() { - return null; - } - - public void setName(String name) { - m_name = name; - } - - @Override - public void setStatus(String status) { - } - - @Override - public void setStatus(Throwable e) { - } } static class MockMessageHandler implements MessageHandler { diff --git a/cat-core/src/test/java/com/dianping/cat/message/io/TcpSocketTest.java b/cat-core/src/test/java/com/dianping/cat/message/io/TcpSocketTest.java index 091b4d71c306fc5dd67de328a6c4545e130bb816..a9a8de3c89fd105413af82baa0e55e7104895920 100644 --- a/cat-core/src/test/java/com/dianping/cat/message/io/TcpSocketTest.java +++ b/cat-core/src/test/java/com/dianping/cat/message/io/TcpSocketTest.java @@ -15,6 +15,7 @@ import org.junit.runners.JUnit4; import com.dianping.cat.message.Message; import com.dianping.cat.message.codec.MessageCodec; import com.dianping.cat.message.handler.MessageHandler; +import com.dianping.cat.message.internal.AbstractMessage; import com.site.lookup.ComponentTestCase; @RunWith(JUnit4.class) @@ -59,52 +60,14 @@ public class TcpSocketTest extends ComponentTestCase { Assert.assertEquals(len, sb.length()); } - static class MockMessage implements Message { - private String m_name; - - @Override - public void addData(String keyValuePairs) { - } - - @Override - public void addData(String key, Object value) { + static class MockMessage extends AbstractMessage { + public MockMessage() { + super(null, null); } @Override public void complete() { } - - @Override - public String getName() { - return m_name; - } - - @Override - public String getStatus() { - return null; - } - - @Override - public long getTimestamp() { - return 0; - } - - @Override - public String getType() { - return null; - } - - public void setName(String name) { - m_name = name; - } - - @Override - public void setStatus(String status) { - } - - @Override - public void setStatus(Throwable e) { - } } public static class MockMessageCodec implements MessageCodec {