提交 52db9d2b 编写于 作者: F Frankie Wu

bug fix

上级 e6c574c9
......@@ -5,6 +5,7 @@ import java.util.List;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.internal.DefaultMessageProducer;
import com.dianping.cat.message.internal.MessageManager;
import com.dianping.cat.message.io.InMemoryQueue;
import com.dianping.cat.message.io.InMemoryReceiver;
import com.dianping.cat.message.io.InMemorySender;
......@@ -33,7 +34,10 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(MessageReceiver.class, inMemory, InMemoryReceiver.class) //
.req(InMemoryQueue.class));
all.add(C(MessageProducer.class, DefaultMessageProducer.class));
all.add(C(MessageManager.class) //
.req(MessageSender.class, inMemory));
all.add(C(MessageProducer.class, DefaultMessageProducer.class) //
.req(MessageManager.class));
all.add(C(MessageCodec.class, "plain-text", PlainTextMessageCodec.class));
......
......@@ -75,8 +75,7 @@ public interface Message {
* <li>Search: maps to one method of search call</li>
* <li>SQL: maps to one SQL statement</li>
* <li>Cache: maps to one cache access</li>
* <li>Exception: maps to java.lang.Exception</li>
* <li>Error: maps to java.lang.Error</li>
* <li>Error: maps to java.lang.Throwable (java.lang.Exception and java.lang.Error)</li>
* </ul>
* </p>
*
......
package com.dianping.cat.message;
/**
* <p>
* Message factory is used to create new transaction,event and/or heartbeat.
......@@ -132,6 +131,14 @@ public interface MessageProducer {
*/
public void logEvent(String type, String name, String status, String nameValuePairs);
/**
* Log an error.
*
* @param cause
* root cause exception
*/
public void logError(Throwable cause);
/**
* Log a heartbeat in one shot.
*
......
......@@ -42,7 +42,7 @@ public interface Transaction extends Message {
* @param message
* to be added
*/
public void addChild(Message message);
public Transaction addChild(Message message);
/**
* Get all children message within current transaction.
......
package com.dianping.cat.message.internal;
import java.nio.charset.Charset;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.spi.StringRope;
import com.dianping.cat.message.spi.codec.PlainTextMessageCodec;
public abstract class AbstractMessage implements Message {
private String m_type;
......@@ -25,16 +31,16 @@ public abstract class AbstractMessage implements Message {
@Override
public void addData(String keyValuePairs) {
m_data.add(keyValuePairs, true);
m_data.addRaw(keyValuePairs);
}
@Override
public void addData(String key, Object value) {
if (m_data.isEmpty()) {
if (!m_data.isEmpty()) {
m_data.add("&");
}
m_data.add(key).add("=").add(String.valueOf(value), true);
m_data.add(key).add("=").addRaw(String.valueOf(value));
}
@Override
......@@ -81,6 +87,15 @@ public abstract class AbstractMessage implements Message {
}
public void setTimestamp(long timestamp) {
m_timestamp = timestamp;
}
m_timestamp = timestamp;
}
@Override
public String toString() {
PlainTextMessageCodec codec = new PlainTextMessageCodec();
ChannelBuffer buf = ChannelBuffers.dynamicBuffer();
codec.encodeMessage(this, buf);
return buf.toString(Charset.forName("utf-8"));
}
}
......@@ -10,6 +10,5 @@ public class DefaultEvent extends AbstractMessage implements Event {
@Override
public void complete() {
setCompleted(true);
MessageManager.INSTANCE.add(this);
}
}
......@@ -10,6 +10,5 @@ public class DefaultHeartbeat extends AbstractMessage implements Heartbeat {
@Override
public void complete() {
setCompleted(true);
MessageManager.INSTANCE.add(this);
}
}
package com.dianping.cat.message.internal;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import java.io.PrintWriter;
import java.io.StringWriter;
import com.dianping.cat.message.Event;
import com.dianping.cat.message.Heartbeat;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.io.MessageSender;
import com.site.lookup.annotation.Inject;
public class DefaultMessageProducer implements MessageProducer, Initializable {
public class DefaultMessageProducer implements MessageProducer {
@Inject
private MessageSender m_sender;
private MessageManager m_manager;
@Override
public void initialize() throws InitializationException {
MessageManager.INSTANCE.initialize(m_sender);
public void logError(Throwable cause) {
StringWriter writer = new StringWriter(2048);
cause.printStackTrace(new PrintWriter(writer));
logEvent("Error", cause.getClass().getName(), cause.getClass().getSimpleName(), writer.toString());
}
@Override
......@@ -40,16 +43,25 @@ public class DefaultMessageProducer implements MessageProducer, Initializable {
@Override
public Event newEvent(String type, String name) {
return new DefaultEvent(type, name);
DefaultEvent event = new DefaultEvent(type, name);
m_manager.add(event);
return event;
}
@Override
public Heartbeat newHeartbeat(String type, String name) {
return new DefaultHeartbeat(type, name);
DefaultHeartbeat heartbeat = new DefaultHeartbeat(type, name);
m_manager.add(heartbeat);
return heartbeat;
}
@Override
public Transaction newTransaction(String type, String name) {
return new DefaultTransaction(type, name);
DefaultTransaction transaction = new DefaultTransaction(type, name, m_manager);
m_manager.start(transaction);
return transaction;
}
}
......@@ -12,19 +12,22 @@ public class DefaultTransaction extends AbstractMessage implements Transaction {
private List<Message> m_children;
public DefaultTransaction(String type, String name) {
private MessageManager m_manager;
public DefaultTransaction(String type, String name, MessageManager manager) {
super(type, name);
MessageManager.INSTANCE.start(this);
m_manager = manager;
}
@Override
public void addChild(Message message) {
public DefaultTransaction addChild(Message message) {
if (m_children == null) {
m_children = new ArrayList<Message>();
}
m_children.add(message);
return this;
}
@Override
......@@ -35,11 +38,15 @@ public class DefaultTransaction extends AbstractMessage implements Transaction {
event.setStatus("TransactionAlreadyCompleted");
event.complete();
addChild(event);
} else {
m_duration = (long) (System.nanoTime() / 1e6) - getTimestamp();
setCompleted(true);
MessageManager.INSTANCE.end(this);
if (m_manager != null) {
m_manager.end(this);
}
}
}
......
package com.dianping.cat.message.internal;
import java.util.Collections;
import java.util.List;
import java.util.Stack;
import com.dianping.cat.message.Message;
......@@ -11,10 +9,9 @@ import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
import com.site.lookup.annotation.Inject;
public enum MessageManager {
INSTANCE;
private static final ThreadLocal<Context> s_context = new ThreadLocal<Context>() {
public class MessageManager {
// we don't use static modifier since MessageManager is actual a singleton
private ThreadLocal<Context> m_context = new ThreadLocal<Context>() {
@Override
protected Context initialValue() {
return new Context();
......@@ -24,34 +21,29 @@ public enum MessageManager {
@Inject
private MessageSender m_sender;
private boolean m_initialized;
public void add(Message message) {
s_context.get().add(this, message);
m_context.get().add(this, message);
}
public void end(Transaction transaction) {
s_context.get().end(this, transaction);
m_context.get().end(this, transaction);
}
void flush(MessageTree tree) {
if (m_sender != null) {
m_sender.send(tree);
// destroy current thread data
s_context.remove();
} else if (!m_initialized) {
throw new IllegalStateException("MessageManager is not initialized yet!");
m_context.remove();
}
}
public void initialize(MessageSender sender) {
public void setSender(MessageSender sender) {
m_sender = sender;
m_initialized = true;
}
public void start(Transaction transaction) {
s_context.get().start(transaction);
m_context.get().start(transaction);
}
static class Context {
......@@ -62,6 +54,7 @@ public enum MessageManager {
public void add(MessageManager manager, Message message) {
if (m_stack.isEmpty()) {
m_tree.setMessage(message);
manager.flush(m_tree);
} else {
Transaction entry = m_stack.peek();
......@@ -92,6 +85,8 @@ public enum MessageManager {
Transaction entry = m_stack.peek();
entry.addChild(transaction);
} else {
m_tree.setMessage(transaction);
}
m_stack.push(transaction);
......@@ -115,34 +110,9 @@ public enum MessageManager {
notCompleteEvent.setStatus("TransactionNotCompleted");
notCompleteEvent.setCompleted(true);
transaction.addChild(notCompleteEvent);
t.setCompleted(true);
}
}
}
}
static class FakeTransaction extends AbstractMessage implements Transaction {
public FakeTransaction() {
super(null, null);
}
@Override
public void addChild(Message message) {
}
@Override
public void complete() {
}
@Override
public List<Message> getChildren() {
return Collections.emptyList();
}
@Override
public long getDuration() {
return -1;
}
}
}
package com.dianping.cat.message.io;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.spi.MessageHandler;
/**
* <p>
* Transport is a broker that listens to message from previous phase and relay
* (or re-send) it to next phase.
* </p>
*
* Typically, different environment has different transport configuration
* <ul>
* <li>Dev environment: [collector] ====> [in memory transport] ====> [message handler] ====> [consumers]</li>
* <li>QA environment: [collector] ====> [tcp socket transport] ==network==> [tcp socket transport] ====> [message handler] ====> [consumers]</li>
* </ul>
*
* @author Frankie Wu
*/
public interface Transport {
public void onMessage(MessageHandler handler);
public void send(Message message);
public void shutdown();
}
package com.dianping.cat.message.io;
public class TransportManager {
private static TransportManager s_instance;
private Transport m_transport;
public static Transport getTransport() {
if (s_instance == null) {
throw new RuntimeException("Please call method setTransport() to initialize first!");
}
return s_instance.m_transport;
}
public void setTransport(Transport transport) {
if (transport == null) {
s_instance = null;
} else if (s_instance != null) {
throw new RuntimeException("TransportManager is already initialized!");
} else {
s_instance = new TransportManager();
s_instance.m_transport = transport;
}
}
}
......@@ -22,27 +22,82 @@ public class StringRope {
}
public StringRope add(String str) {
return add(str, false);
return addObject(str, false);
}
public StringRope add(String str, boolean utf8) {
return add(str, utf8);
public StringRope add(StringRope rope) {
int size = rope.size();
for (int i = 0; i < size; i++) {
addObject(rope.m_parts.get(i), rope.m_flags.get(i));
}
return this;
}
protected StringRope add(Object obj, boolean utf8) {
m_flags.set(m_parts.size(), utf8);
protected StringRope addObject(Object obj, boolean isRaw) {
m_flags.set(m_parts.size(), isRaw);
m_parts.add(String.valueOf(obj));
return this;
}
public int size() {
return m_parts.size();
public StringRope addRaw(String str) {
return addObject(str, true);
}
public boolean isEmpty() {
return m_parts.isEmpty();
}
public int size() {
return m_parts.size();
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder(256);
for (String part : m_parts) {
sb.append(part);
}
return sb.toString();
}
private int writeRaw(ChannelBuffer buffer, byte[] data) {
int len = data.length;
int count = len;
int offset = 0;
for (int i = 0; i < len; i++) {
byte b = data[i];
if (b == '\t' || b == '\r' || b == '\n' || b == '\\') {
buffer.writeBytes(data, offset, i - offset);
buffer.writeByte('\\');
if (b == '\t') {
buffer.writeByte('t');
} else if (b == '\r') {
buffer.writeByte('r');
} else if (b == '\n') {
buffer.writeByte('n');
} else {
buffer.writeByte(b);
}
count++;
offset = i + 1;
}
}
if (len > offset) {
buffer.writeBytes(data, offset, len - offset);
}
return count;
}
public int writeTo(ChannelBuffer buffer) {
int size = m_parts.size();
int count = 0;
......@@ -51,7 +106,7 @@ public class StringRope {
String part = m_parts.get(i);
byte[] data;
if (!m_flags.get(i)) { // no need to encode
if (!m_flags.get(i)) { // no need to escape
data = part.getBytes();
} else {
try {
......@@ -61,20 +116,9 @@ public class StringRope {
}
}
buffer.writeBytes(data);
count += data.length;
count += writeRaw(buffer, data);
}
return count;
}
public StringRope add(StringRope rope) {
int size = rope.size();
for (int i = 0; i < size; i++) {
add(rope.m_parts.get(i), rope.m_flags.get(i));
}
return this;
}
}
......@@ -75,7 +75,7 @@ public class PlainTextMessageCodec implements MessageCodec {
if (identifier == 'E') {
DefaultEvent event = new DefaultEvent(type, name);
String status = helper.read(buf, TAB);
String data = helper.readUtf8(buf, TAB);
String data = helper.readRaw(buf, TAB);
event.setTimestamp(m_dateHelper.parse(timestamp));
event.setStatus(status);
......@@ -84,23 +84,23 @@ public class PlainTextMessageCodec implements MessageCodec {
} else if (identifier == 'H') {
DefaultHeartbeat heartbeat = new DefaultHeartbeat(type, name);
String status = helper.read(buf, TAB);
String data = helper.readUtf8(buf, TAB);
String data = helper.readRaw(buf, TAB);
heartbeat.setTimestamp(m_dateHelper.parse(timestamp));
heartbeat.setStatus(status);
heartbeat.addData(data);
return heartbeat;
} else if (identifier == 't') {
DefaultTransaction transaction = new DefaultTransaction(type, name);
DefaultTransaction transaction = new DefaultTransaction(type, name, null);
helper.read(buf, LF); // get rid of line feed
transaction.setTimestamp(m_dateHelper.parse(timestamp));
return transaction;
} else if (identifier == 'A') {
DefaultTransaction transaction = new DefaultTransaction(type, name);
DefaultTransaction transaction = new DefaultTransaction(type, name, null);
String status = helper.read(buf, TAB);
String duration = helper.read(buf, TAB);
String data = helper.readUtf8(buf, TAB);
String data = helper.readRaw(buf, TAB);
transaction.setTimestamp(m_dateHelper.parse(timestamp));
transaction.setStatus(status);
......@@ -110,7 +110,7 @@ public class PlainTextMessageCodec implements MessageCodec {
} else if (identifier == 'T') {
String status = helper.read(buf, TAB);
String duration = helper.read(buf, TAB);
String data = helper.readUtf8(buf, TAB);
String data = helper.readRaw(buf, TAB);
parent.setStatus(status);
parent.setDuration(Long.parseLong(duration.substring(0, duration.length() - 2)));
......@@ -161,7 +161,10 @@ public class PlainTextMessageCodec implements MessageCodec {
buf.writeInt(0); // place-holder
count += encodeHeader(tree, buf);
count += encodeMessage(tree.getMessage(), buf);
if (tree.getMessage() != null) {
count += encodeMessage(tree.getMessage(), buf);
}
buf.setInt(index, count);
}
......@@ -171,6 +174,7 @@ public class PlainTextMessageCodec implements MessageCodec {
int count = 0;
count += helper.write(buf, ID);
count += helper.write(buf, TAB);
count += helper.write(buf, tree.getDomain());
count += helper.write(buf, TAB);
count += helper.write(buf, tree.getHostName());
......@@ -196,7 +200,15 @@ public class PlainTextMessageCodec implements MessageCodec {
int count = 0;
count += helper.write(buf, (byte) type);
count += helper.write(buf, m_dateHelper.format(message.getTimestamp()));
if (type == 'T' && message instanceof Transaction) {
long duration = ((Transaction) message).getDuration();
count += helper.write(buf, m_dateHelper.format(message.getTimestamp() + duration));
} else {
count += helper.write(buf, m_dateHelper.format(message.getTimestamp()));
}
count += helper.write(buf, TAB);
count += helper.write(buf, message.getType());
count += helper.write(buf, TAB);
......@@ -223,7 +235,7 @@ public class PlainTextMessageCodec implements MessageCodec {
count += rope.writeTo(buf);
count += helper.write(buf, TAB);
} else {
count += helper.writeUtf8(buf, String.valueOf(data));
count += helper.writeRaw(buf, String.valueOf(data));
count += helper.write(buf, TAB);
}
}
......@@ -233,7 +245,7 @@ public class PlainTextMessageCodec implements MessageCodec {
return count;
}
protected int encodeMessage(Message message, ChannelBuffer buf) {
public int encodeMessage(Message message, ChannelBuffer buf) {
if (message instanceof Event) {
return encodeLine(message, buf, 'E', Policy.DEFAULT);
} else if (message instanceof Transaction) {
......@@ -277,20 +289,47 @@ public class PlainTextMessageCodec implements MessageCodec {
}
}
public String readUtf8(ChannelBuffer buf, byte separator) {
public String readRaw(ChannelBuffer buf, byte separator) {
int count = buf.bytesBefore(separator);
if (count < 0) {
return null;
} else {
byte[] data = new byte[count];
String str;
buf.readBytes(data);
int length = data.length;
for (int i = 0; i < length; i++) {
if (data[i] == '\\') {
if (i + 1 < length) {
byte b = data[i + 1];
if (b == 't') {
data[i] = '\t';
} else if (b == 'r') {
data[i] = '\r';
} else if (b == 'n') {
data[i] = '\n';
} else {
data[i] = b;
}
System.arraycopy(data, i + 2, data, i + 1, length - i - 2);
length--;
}
}
}
try {
return new String(data, "utf-8");
str = new String(data, 0, length, "utf-8");
} catch (UnsupportedEncodingException e) {
return new String(data);
str = new String(data, 0, length);
}
return str;
}
}
......@@ -300,13 +339,52 @@ public class PlainTextMessageCodec implements MessageCodec {
}
public int write(ChannelBuffer buf, String str) {
if (str == null) {
str = "null";
}
byte[] data = str.getBytes();
buf.writeBytes(data);
return data.length;
}
public int writeUtf8(ChannelBuffer buf, String str) {
private int writeRaw(ChannelBuffer buffer, byte[] data) {
int len = data.length;
int count = len;
int offset = 0;
for (int i = 0; i < len; i++) {
byte b = data[i];
if (b == '\t' || b == '\r' || b == '\n' || b == '\\') {
buffer.writeBytes(data, offset, i - offset);
buffer.writeByte('\\');
if (b == '\t') {
buffer.writeByte('t');
} else if (b == '\r') {
buffer.writeByte('r');
} else if (b == '\n') {
buffer.writeByte('n');
} else {
buffer.writeByte(b);
}
count++;
offset = i + 1;
}
}
buffer.writeBytes(data, offset, len - offset);
return count;
}
public int writeRaw(ChannelBuffer buf, String str) {
if (str == null) {
str = "null";
}
byte[] data;
try {
......@@ -315,8 +393,7 @@ public class PlainTextMessageCodec implements MessageCodec {
data = str.getBytes();
}
buf.writeBytes(data);
return data.length;
return writeRaw(buf, data);
}
}
......@@ -330,7 +407,7 @@ public class PlainTextMessageCodec implements MessageCodec {
SimpleDateFormat format = m_queue.poll();
if (format == null) {
format = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss.sss");
format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
}
try {
......@@ -346,7 +423,7 @@ public class PlainTextMessageCodec implements MessageCodec {
SimpleDateFormat format = m_queue.poll();
if (format == null) {
format = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss.sss");
format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.sss");
}
try {
......
package com.dianping.cat.message.spi.internal;
import java.nio.charset.Charset;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.codec.PlainTextMessageCodec;
public class DefaultMessageTree implements MessageTree {
private String m_domain;
......@@ -111,4 +117,14 @@ public class DefaultMessageTree implements MessageTree {
public void setThreadId(String threadId) {
m_threadId = threadId;
}
@Override
public String toString() {
PlainTextMessageCodec codec = new PlainTextMessageCodec();
ChannelBuffer buf = ChannelBuffers.dynamicBuffer();
codec.encode(this, buf);
buf.readInt(); // get rid of length
return buf.toString(Charset.forName("utf-8"));
}
}
......@@ -24,9 +24,24 @@
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.message.internal.MessageManager</role>
<implementation>com.dianping.cat.message.internal.MessageManager</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.io.MessageSender</role>
<role-hint>in-memory</role-hint>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.message.MessageProducer</role>
<implementation>com.dianping.cat.message.internal.DefaultMessageProducer</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.internal.MessageManager</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageCodec</role>
......@@ -38,16 +53,6 @@
<role-hint>dummy</role-hint>
<implementation>com.dianping.cat.message.spi.consumer.DummyConsumer</implementation>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageConsumer</role>
<role-hint>dummy2</role-hint>
<implementation>com.dianping.cat.message.spi.consumer.DummyConsumer</implementation>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageConsumer</role>
<role-hint>dummy3</role-hint>
<implementation>com.dianping.cat.message.spi.consumer.DummyConsumer</implementation>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageConsumerRegistry</role>
<implementation>com.dianping.cat.message.spi.internal.DefaultMessageConsumerRegistry</implementation>
......@@ -56,8 +61,6 @@
<role>com.dianping.cat.message.spi.MessageConsumer</role>
<role-hints>
<role-hint>dummy</role-hint>
<role-hint>dummy2</role-hint>
<role-hint>dummy3</role-hint>
</role-hints>
<field-name>m_consumers</field-name>
</requirement>
......
package com.dianping.cat.message;
import java.io.PrintWriter;
import java.io.StringWriter;
import org.junit.Test;
import com.dianping.cat.Cat;
......@@ -22,12 +19,7 @@ public class EventTest {
@Test
public void testException() {
Exception e = new RuntimeException();
Event event = CAT.newEvent("ERROR", e.getClass().getName());
event.addData(toString(e));
event.setStatus("-1");
event.complete();
CAT.logError(new RuntimeException());
}
@Test
......@@ -36,15 +28,6 @@ public class EventTest {
CAT.logEvent("Review", "New", Message.SUCCESS, "id=12345&user=john");
// Exception case
Exception e = new RuntimeException();
CAT.logEvent("Exception", e.getClass().getName(), Message.SUCCESS, toString(e));
}
private String toString(Exception e) {
StringWriter writer = new StringWriter(2048);
e.printStackTrace(new PrintWriter(writer));
return writer.toString();
CAT.logError(new RuntimeException());
}
}
......@@ -5,20 +5,12 @@ import org.junit.Test;
import com.dianping.cat.Cat;
public class HeartbeatTest {
public static MessageProducer CAT = Cat.getProducer();
private static final MessageProducer CAT = Cat.getProducer();
@Test
public void testStatus() {
Heartbeat heartbeat = CAT.newHeartbeat("System", "Status");
heartbeat.addData("ip", "192.168.10.111");
heartbeat.addData("host", "host-1");
heartbeat.addData("load", "2.1");
heartbeat.addData("cpu", "0.12,0.10");
heartbeat.addData("memory.total", "2G");
heartbeat.addData("memory.free", "456M");
heartbeat.setStatus(Message.SUCCESS);
heartbeat.complete();
public void testInOneShot() {
CAT.logHeartbeat("System", "Status", "0",
"ip=192.168.10.111&host=host-1&load=2.1&cpu=0.12,0.10&memory.total=2G&memory.free=456M");
}
@Test
......@@ -35,8 +27,16 @@ public class HeartbeatTest {
}
@Test
public void testInOneShot() {
CAT.logHeartbeat("System", "Status", "0",
"ip=192.168.10.111&host=host-1&load=2.1&cpu=0.12,0.10&memory.total=2G&memory.free=456M");
public void testStatus() {
Heartbeat heartbeat = CAT.newHeartbeat("System", "Status");
heartbeat.addData("ip", "192.168.10.111");
heartbeat.addData("host", "host-1");
heartbeat.addData("load", "2.1");
heartbeat.addData("cpu", "0.12,0.10");
heartbeat.addData("memory.total", "2G");
heartbeat.addData("memory.free", "456M");
heartbeat.setStatus(Message.SUCCESS);
heartbeat.complete();
}
}
......@@ -5,7 +5,7 @@ import org.junit.Test;
import com.dianping.cat.Cat;
public class TransactionTest {
public static MessageProducer CAT = Cat.getProducer();
private static final MessageProducer CAT = Cat.getProducer();
@Test
public void testNormal() {
......
package com.dianping.cat.message.internal;
import java.nio.charset.Charset;
import junit.framework.Assert;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
......@@ -14,7 +10,6 @@ import com.dianping.cat.message.Message;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.io.InMemoryQueue;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageTree;
import com.site.lookup.ComponentTestCase;
......@@ -23,8 +18,7 @@ public class MessageProducerTest extends ComponentTestCase {
@Test
public void testNormal() throws Exception {
MessageProducer producer = lookup(MessageProducer.class);
InMemoryQueue queue = lookup(InMemoryQueue.class);
MessageCodec codec = lookup(MessageCodec.class, "plain-text");
InMemoryQueue queue = lookup(InMemoryQueue.class, "mock");
Transaction t = producer.newTransaction("URL", "MyPage");
try {
......@@ -32,7 +26,8 @@ public class MessageProducerTest extends ComponentTestCase {
t.addData("k1", "v1");
t.addData("k2", "v2");
t.addData("k3", "v3");
Thread.sleep(30);
Thread.sleep(20);
producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=...");
t.setStatus(Message.SUCCESS);
......@@ -45,9 +40,25 @@ public class MessageProducerTest extends ComponentTestCase {
Assert.assertEquals("One message should be in the queue.", 1, queue.size());
MessageTree tree = queue.peek();
ChannelBuffer buf = ChannelBuffers.dynamicBuffer();
Message m = tree.getMessage();
Assert.assertTrue(Transaction.class.isAssignableFrom(m.getClass()));
Transaction trans = (Transaction) m;
Assert.assertEquals("URL", trans.getType());
Assert.assertEquals("MyPage", trans.getName());
Assert.assertEquals("0", trans.getStatus());
Assert.assertTrue(trans.getDuration() > 0);
Assert.assertEquals("k1=v1&k2=v2&k3=v3", trans.getData().toString());
Assert.assertEquals(1, trans.getChildren().size());
Message c = trans.getChildren().get(0);
codec.encode(tree, buf);
Assert.assertEquals("...", buf.toString(Charset.forName("utf-8")));
Assert.assertEquals("URL", c.getType());
Assert.assertEquals("Payload", c.getName());
Assert.assertEquals("0", c.getStatus());
Assert.assertEquals("host=my-host&ip=127.0.0.1&agent=...", c.getData().toString());
}
}
......@@ -4,7 +4,8 @@ import java.io.File;
import java.util.ArrayList;
import java.util.List;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.io.InMemoryQueue;
import com.dianping.cat.message.io.InMemorySender;
import com.dianping.cat.message.io.MessageSender;
import com.site.lookup.configuration.AbstractResourceConfigurator;
import com.site.lookup.configuration.Component;
......@@ -17,10 +18,12 @@ public class MessageProducerTestConfigurator extends AbstractResourceConfigurato
@Override
public List<Component> defineComponents() {
List<Component> all = new ArrayList<Component>();
String inMemory = "in-memory";
all.add(C(MessageProducer.class, DefaultMessageProducer.class) //
.req(MessageSender.class, inMemory));
all.add(C(InMemoryQueue.class, "mock", InMemoryQueue.class));
all.add(C(MessageSender.class, "mock", InMemorySender.class) //
.req(InMemoryQueue.class, "mock"));
all.add(C(MessageManager.class) //
.req(MessageSender.class, "mock"));
return all;
}
......
package com.dianping.cat.message.io;
import junit.framework.Assert;
import org.junit.Test;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.io.Transport;
import com.dianping.cat.message.io.TransportManager;
import com.dianping.cat.message.spi.MessageHandler;
public class TransportManagerTest {
@Test
public void testInitailized() {
new TransportManager().setTransport(new MockTransport());
Assert.assertNotNull(TransportManager.getTransport());
new TransportManager().setTransport(null);
}
@Test
public void testNotInitailized() {
try {
Assert.assertNotNull(TransportManager.getTransport());
Assert.fail("TransportManager should be initialized first before call getTransport()!");
} catch (RuntimeException e) {
// expected
}
}
@Test
public void testDoubleInitailization() {
new TransportManager().setTransport(new MockTransport());
try {
new TransportManager().setTransport(new MockTransport());
Assert.fail("Double initailization of TransportManager should not be allowed!");
} catch (RuntimeException e) {
// expected
}
}
static class MockTransport implements Transport {
@Override
public void onMessage(MessageHandler handler) {
}
@Override
public void send(Message message) {
}
@Override
public void shutdown() {
}
}
}
package com.dianping.cat.message.spi.codec;
import java.nio.charset.Charset;
import junit.framework.Assert;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.junit.Test;
import com.dianping.cat.message.Event;
import com.dianping.cat.message.Heartbeat;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.internal.DefaultEvent;
import com.dianping.cat.message.internal.DefaultHeartbeat;
import com.dianping.cat.message.internal.DefaultTransaction;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
public class PlainTextMessageCodecTest {
private void check(Message message, String expected) {
PlainTextMessageCodec codec = new PlainTextMessageCodec();
ChannelBuffer buf = ChannelBuffers.dynamicBuffer();
codec.encodeMessage(message, buf);
String actual = buf.toString(Charset.forName("utf-8"));
Assert.assertEquals(expected, actual);
MessageTree tree = new DefaultMessageTree();
codec.decodeMessage(buf, tree);
Assert.assertEquals(expected, tree.getMessage().toString());
}
private Event newEvent(String type, String name, long timestamp, String status, String data) {
DefaultEvent event = new DefaultEvent(type, name);
event.setStatus(status);
event.addData(data);
event.setTimestamp(timestamp);
return event;
}
private Heartbeat newHeartbeat(String type, String name, long timestamp, String status, String data) {
DefaultHeartbeat heartbeat = new DefaultHeartbeat(type, name);
heartbeat.setStatus(status);
heartbeat.addData(data);
heartbeat.setTimestamp(timestamp);
return heartbeat;
}
private DefaultMessageTree newMessageTree() {
DefaultMessageTree tree = new DefaultMessageTree();
tree.setDomain("domain");
tree.setHostName("hostName");
tree.setIpAddress("ipAddress");
tree.setMessageId("messageId");
tree.setPort(1234);
tree.setRequestToken("requestToken");
tree.setSessionToken("sessionToken");
tree.setThreadId("threadId");
return tree;
}
private Transaction newTransaction(String type, String name, long timestamp, String status, int duration, String data) {
DefaultTransaction transaction = new DefaultTransaction(type, name, null);
transaction.setStatus(status);
transaction.addData(data);
transaction.complete();
transaction.setTimestamp(timestamp);
transaction.setDuration(duration);
return transaction;
}
@Test
public void testEvent() {
long timestamp = 1325489621987L;
Event event = newEvent("type", "name", timestamp, "0", "here is the data.");
check(event, "E2012-01-02 15:33:41.987\ttype\tname\t0\there is the data.\t\n");
}
@Test
public void testEventForRawData() {
long timestamp = 1325489621987L;
Event event = newEvent(
"Exception",
Exception.class.getName(),
timestamp,
"ERROR",
"java.lang.Exception\n\tat com.dianping.cat.message.spi.codec.PlainTextMessageCodecTest.testEventForException(PlainTextMessageCodecTest.java:112)\n");// toString(e)
check(event,
"E2012-01-02 15:33:41.987\tException\tjava.lang.Exception\tERROR\t" + //
"java.lang.Exception\\n\\tat com.dianping.cat.message.spi.codec.PlainTextMessageCodecTest.testEventForException(PlainTextMessageCodecTest.java:112)\\n\t\n");
}
@Test
public void testHeartbeat() {
long timestamp = 1325489621987L;
Heartbeat heartbeat = newHeartbeat("type", "name", timestamp, "0", "here is the data.");
check(heartbeat, "H2012-01-02 15:33:41.987\ttype\tname\t0\there is the data.\t\n");
}
@Test
public void testMessageTree() {
DefaultMessageTree tree = newMessageTree();
long timestamp = 1325489621987L;
Assert.assertEquals("PT1\tdomain\thostName\t1234\tipAddress\tthreadId\tmessageId\trequestToken\tsessionToken\n",
tree.toString());
tree.setMessage(newEvent("type", "name", timestamp, "0", "here is the data."));
Assert.assertEquals("PT1\tdomain\thostName\t1234\tipAddress\tthreadId\tmessageId\trequestToken\tsessionToken\n" + //
"E2012-01-02 15:33:41.987\ttype\tname\t0\there is the data.\t\n", tree.toString());
}
@Test
public void testTransactionNormal() {
long timestamp = 1325489621987L;
Transaction root = newTransaction("URL", "Review", timestamp, "0", 100, "/review/2468");
root.addChild(newEvent("URL", "Payload", timestamp, "0", "ip=127.0.0.1&ua=Mozilla 5.0...&refer=...&..."));
root.addChild(newTransaction("Service", "Auth", timestamp, "0", 20, "userId=1357&token=..."));
root.addChild(newTransaction("Cache", "findReviewByPK", timestamp + 22, "Missing", 1, "2468") //
.addChild(newEvent("CacheHost", "host-1", timestamp + 22, "0", "ip=192.168.8.123")));
root.addChild(newTransaction("DAL", "findReviewByPK", timestamp + 25, "0", 5,
"select title,content from Review where id = ?"));
root.addChild(newEvent("URL", "View", timestamp + 40, "0", "view=HTML"));
check(root, "t2012-01-02 15:33:41.987\tURL\tReview\t\n" + //
"E2012-01-02 15:33:41.987\tURL\tPayload\t0\tip=127.0.0.1&ua=Mozilla 5.0...&refer=...&...\t\n" + //
"A2012-01-02 15:33:41.987\tService\tAuth\t0\t20ms\tuserId=1357&token=...\t\n" + //
"t2012-01-02 15:33:42.009\tCache\tfindReviewByPK\t\n" + //
"E2012-01-02 15:33:42.009\tCacheHost\thost-1\t0\tip=192.168.8.123\t\n" + //
"T2012-01-02 15:33:42.010\tCache\tfindReviewByPK\tMissing\t1ms\t2468\t\n" + //
"A2012-01-02 15:33:42.012\tDAL\tfindReviewByPK\t0\t5ms\tselect title,content from Review where id = ?\t\n" + //
"E2012-01-02 15:33:42.027\tURL\tView\t0\tview=HTML\t\n" + //
"T2012-01-02 15:33:42.087\tURL\tReview\t0\t100ms\t/review/2468\t\n");
}
@Test
public void testTransactionSimple() {
long timestamp = 1325489621987L;
Transaction transaction = newTransaction("type", "name", timestamp, "0", 10, "here is the data.");
check(transaction, "A2012-01-02 15:33:41.987\ttype\tname\t0\t10ms\there is the data.\t\n");
}
}
package com.dianping.cat.message.spi.codec;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.internal.DefaultMessageProducer;
import com.dianping.cat.message.io.MessageSender;
import com.site.lookup.configuration.AbstractResourceConfigurator;
import com.site.lookup.configuration.Component;
public class PlainTextMessageCodecTestConfigurator extends AbstractResourceConfigurator {
public static void main(String[] args) {
generatePlexusComponentsXmlFile(new PlainTextMessageCodecTestConfigurator());
}
@Override
public List<Component> defineComponents() {
List<Component> all = new ArrayList<Component>();
String inMemory = "in-memory";
all.add(C(MessageProducer.class, DefaultMessageProducer.class) //
.req(MessageSender.class, inMemory));
return all;
}
@Override
protected File getConfigurationFile() {
return new File("src/test/resources/" + PlainTextMessageCodecTest.class.getName().replace('.', '/') + ".xml");
}
}
<plexus>
<components>
<component>
<role>com.dianping.cat.message.MessageProducer</role>
<implementation>com.dianping.cat.message.internal.DefaultMessageProducer</implementation>
<role>com.dianping.cat.message.io.InMemoryQueue</role>
<role-hint>mock</role-hint>
<implementation>com.dianping.cat.message.io.InMemoryQueue</implementation>
</component>
<component>
<role>com.dianping.cat.message.io.MessageSender</role>
<role-hint>mock</role-hint>
<implementation>com.dianping.cat.message.io.InMemorySender</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.io.InMemoryQueue</role>
<role-hint>mock</role-hint>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.message.internal.MessageManager</role>
<implementation>com.dianping.cat.message.internal.MessageManager</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.io.MessageSender</role>
<role-hint>in-memory</role-hint>
<role-hint>mock</role-hint>
</requirement>
</requirements>
</component>
......
<plexus>
<components>
<component>
<role>com.dianping.cat.message.MessageProducer</role>
<implementation>com.dianping.cat.message.internal.DefaultMessageProducer</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.io.MessageSender</role>
<role-hint>in-memory</role-hint>
</requirement>
</requirements>
</component>
</components>
</plexus>
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册