提交 a29fd94b 编写于 作者: Y youyong205

modify the plaintext decode

上级 5e402cec
......@@ -51,6 +51,8 @@ public class PlainTextMessageCodec implements MessageCodec, LogEnabled {
private Logger m_logger;
private int m_maxDecodeNumber = 5000;
@Override
public MessageTree decode(ChannelBuffer buf) {
MessageTree tree = new DefaultMessageTree();
......@@ -61,8 +63,9 @@ public class PlainTextMessageCodec implements MessageCodec, LogEnabled {
@Override
public void decode(ChannelBuffer buf, MessageTree tree) {
decodeHeader(buf, tree);
buf.markReaderIndex();
decodeHeader(buf, tree);
if (buf.readableBytes() > 0) {
decodeMessage(buf, tree);
}
......@@ -121,7 +124,7 @@ public class PlainTextMessageCodec implements MessageCodec, LogEnabled {
return transaction;
case 'A':
DefaultTransaction tran = new DefaultTransaction(type, name, null);
String status = helper.read(buf, TAB);
String status = helper.readRaw(buf, TAB);
String duration = helper.read(buf, TAB);
String data = helper.readRaw(buf, TAB);
......@@ -140,7 +143,7 @@ public class PlainTextMessageCodec implements MessageCodec, LogEnabled {
return tran;
}
case 'T':
String transactionStatus = helper.read(buf, TAB);
String transactionStatus = helper.readRaw(buf, TAB);
String transactionDuration = helper.read(buf, TAB);
String transactionData = helper.readRaw(buf, TAB);
......@@ -149,12 +152,13 @@ public class PlainTextMessageCodec implements MessageCodec, LogEnabled {
parent.addData(transactionData);
long transactionD = Long.parseLong(transactionDuration.substring(0, transactionDuration.length() - 2));
parent.setDurationInMicros(transactionD);
return stack.pop();
case 'E':
DefaultEvent event = new DefaultEvent(type, name);
String eventStatus = helper.read(buf, TAB);
String eventStatus = helper.readRaw(buf, TAB);
String eventData = helper.readRaw(buf, TAB);
helper.read(buf, LF); // get rid of line feed
......@@ -170,7 +174,7 @@ public class PlainTextMessageCodec implements MessageCodec, LogEnabled {
}
case 'M':
DefaultMetric metric = new DefaultMetric(type, name);
String metricStatus = helper.read(buf, TAB);
String metricStatus = helper.readRaw(buf, TAB);
String metricData = helper.readRaw(buf, TAB);
helper.read(buf, LF); // get rid of line feed
......@@ -186,7 +190,7 @@ public class PlainTextMessageCodec implements MessageCodec, LogEnabled {
}
case 'L':
DefaultTrace trace = new DefaultTrace(type, name);
String traceStatus = helper.read(buf, TAB);
String traceStatus = helper.readRaw(buf, TAB);
String traceData = helper.readRaw(buf, TAB);
helper.read(buf, LF); // get rid of line feed
......@@ -202,7 +206,7 @@ public class PlainTextMessageCodec implements MessageCodec, LogEnabled {
}
case 'H':
DefaultHeartbeat heartbeat = new DefaultHeartbeat(type, name);
String heartbeatStatus = helper.read(buf, TAB);
String heartbeatStatus = helper.readRaw(buf, TAB);
String heartbeatData = helper.readRaw(buf, TAB);
helper.read(buf, LF); // get rid of line feed
......@@ -230,9 +234,19 @@ public class PlainTextMessageCodec implements MessageCodec, LogEnabled {
tree.setMessage(parent);
int total = m_maxDecodeNumber;
while (buf.readableBytes() > 0) {
Message message = decodeLine(buf, (DefaultTransaction) parent, stack, tree);
total--;
if (total == 0) {
buf.resetReaderIndex();
String messageTree = buf.toString(Charset.forName("utf-8"));
m_logger.warn("Decode message in a dead loop" + messageTree);
throw new RuntimeException("Error when decoding cat message! message tree:" + messageTree);
}
if (message instanceof DefaultTransaction) {
parent = message;
} else {
......@@ -312,7 +326,7 @@ public class PlainTextMessageCodec implements MessageCodec, LogEnabled {
count += helper.write(buf, TAB);
if (policy != Policy.WITHOUT_STATUS) {
count += helper.write(buf, message.getStatus());
count += helper.writeRaw(buf, message.getStatus());
count += helper.write(buf, TAB);
Object data = message.getData();
......
package com.dianping.cat.message.spi.core;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.charset.Charset;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.junit.Test;
import com.dianping.cat.Cat;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.internal.DefaultTransaction;
import com.dianping.cat.message.internal.MockMessageBuilder;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.codec.PlainTextMessageCodec;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
public class PlainTextCodecTest {
@Test
public void test() throws InterruptedException {
MessageTree tree = buildMessages();
PlainTextMessageCodec codec = new PlainTextMessageCodec();
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8192);
codec.encode(tree, buf);
int i = buf.readInt(); // get rid of length
System.out.println(i);
String str = buf.toString(Charset.forName("utf-8"));
System.out.println(str);
MessageTree tree2 = new DefaultMessageTree();
codec.decode(buf, tree2);
System.out.println(tree2);
Thread.sleep(1000);
}
public MessageTree buildMessages() {
Transaction t = Cat.newTransaction("type1", "name1");
Transaction t2 = Cat.newTransaction("type2", "name2");
Transaction t3 = Cat.newTransaction("type3", "name3");
Transaction t4 = Cat.newTransaction("type4", "name4");
Cat.logEvent("type1\t\n", "name\t\n", "sdfsdf\t\n", convertException(new NullPointerException()));
Cat.logHeartbeat("type1\t\n", "name\t\n", "sdfsdf\t\n", convertException(new NullPointerException()));
Cat.logError(new RuntimeException());
t2.addData(convertException(new NullPointerException()));
t2.setStatus(convertException(new NullPointerException()));
t2.complete();
t3.complete();
t4.complete();
MessageTree tree = Cat.getManager().getThreadLocalMessageTree();
t.setStatus("sfsf\t\n");
((DefaultTransaction) t).setDurationInMicros(1000);
System.err.println(tree);
return tree;
}
public static String convertException(Throwable cause) {
StringWriter writer = new StringWriter(2048);
cause.printStackTrace(new PrintWriter(writer));
return writer.toString();
}
public MessageTree buildMessage() {
Message message = new MockMessageBuilder() {
@Override
public MessageHolder define() {
TransactionHolder t = t("URL\t\n", "GET\t\n", 112819).child(
t("PigeonCall\t\n",
"groupService:groupNoteService_1.0.0:updateNoteDraft(Integer,Integer,String,String)\t\n",
"testtest\t\ntest\t\ntest\t\n", 100).child(
e("PigeonCall.server\t\n", "10.1.2.99:2011\t\n",
"Execute[34796272]testtest\t\ntest\t\ntest\t\n"))).child(
t("PigeonCall\t\n",
"groupService:groupNoteService_1.0.1:updateNoteDraft2(Integer,Integer,String,String)\t\n", "",
100).child(
e("PigeonCall.server\t\n", "10.1.2.199:2011\t\n",
"Execute[34796272]testtest\t\ntest\t\ntest\t\n")));
return t;
}
}.build();
MessageTree tree = new DefaultMessageTree();
tree.setDomain("cat test");
tree.setHostName("test test");
tree.setIpAddress("test test");
tree.setThreadGroupName("test test");
tree.setThreadId("test test");
tree.setThreadName("test test");
tree.setMessage(message);
tree.setMessageId("MobileApi-0a01077f-379304-1362256");
return tree;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册