提交 3863bfd8 编写于 作者: F Frankie Wu

message implementation, to be continue

上级 9a6b786b
package com.dianping.cat;
import com.dianping.cat.message.MessageProducer;
import org.codehaus.plexus.DefaultPlexusContainer;
import org.codehaus.plexus.PlexusContainer;
import org.codehaus.plexus.PlexusContainerException;
import org.codehaus.plexus.component.repository.exception.ComponentLookupException;
import com.dianping.cat.message.MessageProducer;
/**
* This is the main entry point to the system.
......@@ -9,7 +13,22 @@ import com.dianping.cat.message.MessageProducer;
* @author Frankie Wu
*/
public class Cat {
private static PlexusContainer s_container;
static {
try {
s_container = new DefaultPlexusContainer();
} catch (PlexusContainerException e) {
e.printStackTrace();
}
}
public static MessageProducer getProducer() {
return null;
try {
return (MessageProducer) s_container.lookup(MessageProducer.class);
} catch (ComponentLookupException e) {
throw new RuntimeException("Unable to get instance of MessageProducer, "
+ "please make sure the environment wa setup correctly!", e);
}
}
}
......@@ -3,10 +3,12 @@ package com.dianping.cat.configuration;
import java.util.ArrayList;
import java.util.List;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.broker.DefaultMessageBroker;
import com.dianping.cat.message.broker.MessageBroker;
import com.dianping.cat.message.handler.MessageDispatcher;
import com.dianping.cat.message.handler.MessageHandler;
import com.dianping.cat.message.internal.DefaultMessageProducer;
import com.dianping.cat.message.io.InMemoryQueue;
import com.dianping.cat.message.io.InMemoryReceiver;
import com.dianping.cat.message.io.InMemorySender;
......@@ -28,6 +30,9 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(MessageReceiver.class, inMemory, InMemoryReceiver.class) //
.req(InMemoryQueue.class));
all.add(C(MessageProducer.class, DefaultMessageProducer.class));
// the following are not used right now
all.add(C(MessageHandler.class, MessageDispatcher.class) //
.req(MessageReceiver.class, inMemory));
......
......@@ -79,6 +79,13 @@ public interface Message {
*/
public String getType();
/**
* If the complete() method was called or not.
*
* @return true means the complete() method was called, false otherwise.
*/
public boolean isCompleted();
/**
* Set the message status.
*
......
package com.dianping.cat.message;
import java.util.List;
/**
* <p>
* <code>Transaction</code> is any interesting unit of work that takes time to
......@@ -34,5 +36,33 @@ package com.dianping.cat.message;
* @author Frankie Wu
*/
public interface Transaction extends Message {
/**
* Add one nested child message to current transaction.
*
* @param message
* to be added
*/
public void addChild(Message message);
/**
* Get all children message within current transaction.
*
* <p>
* Typically, a <code>Transaction</code> can nest other
* <code>Transaction</code>s, <code>Event</code>s and <code>Heartbeat</code>
* s, while an <code>Event</code> or <code>Heartbeat</code> can't nest other
* messages.
* </p>
*
* @return all children messages, empty if there is no nested children.
*/
public List<Message> getChildren();
/**
* How long the transaction took from construction to complete. Time unit is
* millisecond.
*
* @return duration time in millisecond
*/
public long getDuration();
}
package com.dianping.cat.message.internal;
import com.dianping.cat.message.Message;
public abstract class AbstractMessage implements Message {
private String m_type;
private String m_name;
private String m_status;
private long m_timestamp;
private StringRope m_data;
private boolean m_completed;
public AbstractMessage(String type, String name) {
m_type = type;
m_name = name;
m_timestamp = (long) (System.nanoTime() / 1e6);
m_data = new StringRope();
}
@Override
public void addData(String keyValuePairs) {
m_data.append(keyValuePairs);
}
@Override
public void addData(String key, Object value) {
if (m_data.isEmpty()) {
m_data.append("&");
}
m_data.append(key).append("=").append(String.valueOf(value));
}
public StringRope getData() {
return m_data;
}
@Override
public String getName() {
return m_name;
}
@Override
public String getStatus() {
return m_status;
}
@Override
public long getTimestamp() {
return m_timestamp;
}
@Override
public String getType() {
return m_type;
}
public boolean isCompleted() {
return m_completed;
}
protected void setCompleted(boolean completed) {
m_completed = completed;
}
@Override
public void setStatus(String status) {
m_status = status;
}
@Override
public void setStatus(Throwable e) {
m_status = e.getClass().getName();
}
}
package com.dianping.cat.message.internal;
import com.dianping.cat.message.Event;
public class DefaultEvent extends AbstractMessage implements Event {
public DefaultEvent(String type, String name) {
super(type, name);
}
@Override
public void complete() {
setCompleted(true);
MessageManager.INSTANCE.add(this);
}
}
package com.dianping.cat.message.internal;
import com.dianping.cat.message.Heartbeat;
public class DefaultHeartbeat extends AbstractMessage implements Heartbeat {
public DefaultHeartbeat(String type, String name) {
super(type, name);
}
@Override
public void complete() {
setCompleted(true);
MessageManager.INSTANCE.add(this);
}
}
package com.dianping.cat.message.internal;
import com.dianping.cat.message.Event;
import com.dianping.cat.message.Heartbeat;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.Transaction;
public class DefaultMessageProducer implements MessageProducer {
@Override
public void logEvent(String type, String name, String status, String nameValuePairs) {
Event event = newEvent(type, name);
event.addData(nameValuePairs);
event.setStatus(Message.SUCCESS);
event.complete();
}
@Override
public void logHeartbeat(String type, String name, String status, String nameValuePairs) {
Heartbeat heartbeat = newHeartbeat(type, name);
heartbeat.addData(nameValuePairs);
heartbeat.setStatus(Message.SUCCESS);
heartbeat.complete();
}
@Override
public Event newEvent(String type, String name) {
return new DefaultEvent(type, name);
}
@Override
public Heartbeat newHeartbeat(String type, String name) {
return new DefaultHeartbeat(type, name);
}
@Override
public Transaction newTransaction(String type, String name) {
return new DefaultTransaction(type, name);
}
}
package com.dianping.cat.message.internal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
public class DefaultTransaction extends AbstractMessage implements Transaction {
private long m_duration;
private List<Message> m_children;
public DefaultTransaction(String type, String name) {
super(type, name);
MessageManager.INSTANCE.start(this);
}
@Override
public void addChild(Message message) {
if (m_children == null) {
m_children = new ArrayList<Message>();
}
m_children.add(message);
}
@Override
public void complete() {
m_duration = (long) (System.nanoTime() / 1e6) - getTimestamp();
setCompleted(true);
MessageManager.INSTANCE.end(this);
}
@Override
public List<Message> getChildren() {
if (m_children == null) {
return Collections.emptyList();
}
return m_children;
}
@Override
public long getDuration() {
return m_duration;
}
}
package com.dianping.cat.message.internal;
import java.util.Collections;
import java.util.List;
import java.util.Stack;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
public enum MessageManager {
INSTANCE;
private static final ThreadLocal<Context> s_context = new ThreadLocal<Context>() {
@Override
protected Context initialValue() {
return new Context();
}
};
public void add(Message message) {
s_context.get().add(message);
}
public void end(Transaction transaction) {
s_context.get().end(transaction);
}
public void start(Transaction transaction) {
s_context.get().start(transaction);
}
void handle(Transaction transaction) {
// TODO
System.out.println(transaction);
}
static class Context {
private Stack<Transaction> m_stack = new Stack<Transaction>();
public void add(Message message) {
if (!m_stack.isEmpty()) {
Transaction entry = m_stack.peek();
entry.addChild(message);
} else {
// add a mock transaction as its parent
Transaction t = new FakeTransaction();
start(t);
t.addChild(message);
end(t);
}
}
public void end(Transaction transaction) {
if (!m_stack.isEmpty()) {
Transaction current = m_stack.peek();
if (transaction.equals(current)) {
validateTransaction(current);
} else {
throw new RuntimeException("Internal error: Transaction logging mismatched!");
}
m_stack.pop();
if (m_stack.isEmpty()) {
INSTANCE.handle(transaction);
}
}
}
public void start(Transaction transaction) {
if (!m_stack.isEmpty()) {
Transaction entry = m_stack.peek();
entry.addChild(transaction);
}
m_stack.push(transaction);
}
private void validateTransaction(Transaction transaction) {
for (Message message : transaction.getChildren()) {
if (message.getStatus() == null) {
message.setStatus("unset");
}
if (!message.isCompleted() && message instanceof DefaultTransaction) {
DefaultTransaction t = (DefaultTransaction) message;
validateTransaction(t);
// missing transaction end, log a BadInstrument event so that
// developer can fix the code
DefaultEvent notCompleteEvent = new DefaultEvent("CAT", "BadInstrument");
notCompleteEvent.setStatus("TransactionNotCompleted");
notCompleteEvent.setCompleted(true);
transaction.addChild(notCompleteEvent);
t.setCompleted(true);
}
}
}
}
static class FakeTransaction extends AbstractMessage implements Transaction {
public FakeTransaction() {
super(null, null);
}
@Override
public void complete() {
}
@Override
public long getDuration() {
return -1;
}
@Override
public List<Message> getChildren() {
return Collections.emptyList();
}
@Override
public void addChild(Message message) {
}
}
}
package com.dianping.cat.message.internal;
import java.util.ArrayList;
import java.util.List;
public class StringRope {
private List<String> m_parts = new ArrayList<String>();
public StringRope append(String str) {
if (str == null) {
m_parts.add("null");
} else if (str.length() > 0) {
m_parts.add(str);
}
return this;
}
public boolean isEmpty() {
return m_parts.isEmpty();
}
}
<plexus>
<components>
<component>
<role>com.dianping.cat.message.io.Transport</role>
<role-hint>tcp-socket</role-hint>
<implementation>com.dianping.cat.message.io.TcpSocketTransport</implementation>
</component>
<component>
<role>com.dianping.cat.message.io.Transport</role>
<role-hint>udp-multicast</role-hint>
<implementation>com.dianping.cat.message.io.UdpMulticastTransport</implementation>
</component>
<component>
<role>com.dianping.cat.message.io.InMemoryQueue</role>
<implementation>com.dianping.cat.message.io.InMemoryQueue</implementation>
......@@ -34,6 +24,10 @@
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.message.MessageProducer</role>
<implementation>com.dianping.cat.message.internal.DefaultMessageProducer</implementation>
</component>
<component>
<role>com.dianping.cat.message.handler.MessageHandler</role>
<implementation>com.dianping.cat.message.handler.MessageDispatcher</implementation>
......
......@@ -14,6 +14,7 @@ import org.junit.runners.JUnit4;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.handler.MessageHandler;
import com.dianping.cat.message.internal.AbstractMessage;
import com.site.lookup.ComponentTestCase;
@RunWith(JUnit4.class)
......@@ -57,52 +58,14 @@ public class InMemoryTest extends ComponentTestCase {
Assert.assertEquals(len, sb.length());
}
static class MockMessage implements Message {
private String m_name;
@Override
public void addData(String keyValuePairs) {
}
@Override
public void addData(String key, Object value) {
static class MockMessage extends AbstractMessage {
public MockMessage() {
super(null, null);
}
@Override
public void complete() {
}
@Override
public String getName() {
return m_name;
}
@Override
public String getStatus() {
return null;
}
@Override
public long getTimestamp() {
return 0;
}
@Override
public String getType() {
return null;
}
public void setName(String name) {
m_name = name;
}
@Override
public void setStatus(String status) {
}
@Override
public void setStatus(Throwable e) {
}
}
static class MockMessageHandler implements MessageHandler {
......
......@@ -15,6 +15,7 @@ import org.junit.runners.JUnit4;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.codec.MessageCodec;
import com.dianping.cat.message.handler.MessageHandler;
import com.dianping.cat.message.internal.AbstractMessage;
import com.site.lookup.ComponentTestCase;
@RunWith(JUnit4.class)
......@@ -59,52 +60,14 @@ public class TcpSocketTest extends ComponentTestCase {
Assert.assertEquals(len, sb.length());
}
static class MockMessage implements Message {
private String m_name;
@Override
public void addData(String keyValuePairs) {
}
@Override
public void addData(String key, Object value) {
static class MockMessage extends AbstractMessage {
public MockMessage() {
super(null, null);
}
@Override
public void complete() {
}
@Override
public String getName() {
return m_name;
}
@Override
public String getStatus() {
return null;
}
@Override
public long getTimestamp() {
return 0;
}
@Override
public String getType() {
return null;
}
public void setName(String name) {
m_name = name;
}
@Override
public void setStatus(String status) {
}
@Override
public void setStatus(Throwable e) {
}
}
public static class MockMessageCodec implements MessageCodec {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册