提交 a802ca61 编写于 作者: F Frankie Wu

message coding and decoding

上级 74a20738
......@@ -11,9 +11,11 @@ import com.dianping.cat.message.io.InMemorySender;
import com.dianping.cat.message.io.MessageReceiver;
import com.dianping.cat.message.io.MessageSender;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessageConsumerRegistry;
import com.dianping.cat.message.spi.MessageHandler;
import com.dianping.cat.message.spi.codec.PlainTextMessageCodec;
import com.dianping.cat.message.spi.consumer.DummyConsumer;
import com.dianping.cat.message.spi.internal.DefaultMessageConsumerRegistry;
import com.dianping.cat.message.spi.internal.DefaultMessageHandler;
import com.site.lookup.configuration.AbstractResourceConfigurator;
......@@ -32,10 +34,13 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
.req(InMemoryQueue.class));
all.add(C(MessageProducer.class, DefaultMessageProducer.class));
all.add(C(MessageConsumerRegistry.class, DefaultMessageConsumerRegistry.class));
all.add(C(MessageCodec.class, "plain-text", PlainTextMessageCodec.class));
all.add(C(MessageConsumer.class, "dummy", DummyConsumer.class));
all.add(C(MessageConsumerRegistry.class, DefaultMessageConsumerRegistry.class) //
.req(MessageConsumer.class, new String[] { "dummy" }, "m_consumers"));
// the following are not used right now
all.add(C(MessageHandler.class, DefaultMessageHandler.class) //
.req(MessageConsumerRegistry.class) //
......
......@@ -38,6 +38,11 @@ public interface Message {
*/
public void complete();
/**
* @return key value pairs data
*/
public Object getData();
/**
* Message name.
*
......
package com.dianping.cat.message.internal;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.spi.StringRope;
public abstract class AbstractMessage implements Message {
private String m_type;
......@@ -24,18 +25,19 @@ public abstract class AbstractMessage implements Message {
@Override
public void addData(String keyValuePairs) {
m_data.append(keyValuePairs);
m_data.add(keyValuePairs, true);
}
@Override
public void addData(String key, Object value) {
if (m_data.isEmpty()) {
m_data.append("&");
m_data.add("&");
}
m_data.append(key).append("=").append(String.valueOf(value));
m_data.add(key).add("=").add(String.valueOf(value), true);
}
@Override
public StringRope getData() {
return m_data;
}
......
package com.dianping.cat.message.internal;
import java.util.ArrayList;
import java.util.List;
public class StringRope {
private List<String> m_parts = new ArrayList<String>();
public StringRope append(String str) {
if (str == null) {
m_parts.add("null");
} else if (str.length() > 0) {
m_parts.add(str);
}
return this;
}
public boolean isEmpty() {
return m_parts.isEmpty();
}
}
......@@ -24,6 +24,7 @@ import org.jboss.netty.handler.codec.frame.FrameDecoder;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageHandler;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
import com.site.lookup.annotation.Inject;
public class TcpSocketReceiver implements MessageReceiver {
......@@ -42,12 +43,6 @@ public class TcpSocketReceiver implements MessageReceiver {
private MessageHandler m_messageHandler;
void handleMessage(byte[] data) {
MessageTree tree = m_codec.decode(data);
m_messageHandler.handle(tree);
}
@Override
public void initialize() {
InetSocketAddress address;
......@@ -98,6 +93,9 @@ public class TcpSocketReceiver implements MessageReceiver {
public class MyDecoder extends FrameDecoder {
@Override
/**
* return null means not all data is ready, so waiting for next network package.
*/
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) {
if (buffer.readableBytes() < 4) {
return null;
......@@ -107,37 +105,38 @@ public class TcpSocketReceiver implements MessageReceiver {
int length = buffer.readInt();
buffer.resetReaderIndex();
if (buffer.readableBytes() < length) {
buffer.resetReaderIndex();
return null;
}
// TODO filter
return buffer.readBytes(length);
return buffer.readBytes(length + 4);
}
}
class MyHandler extends SimpleChannelHandler {
@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
m_channelGroup.add(e.getChannel());
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent event) throws Exception {
m_channelGroup.add(event.getChannel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
e.getCause().printStackTrace();
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
event.getCause().printStackTrace();
e.getChannel().close();
event.getChannel().close();
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
ChannelBuffer buf = (ChannelBuffer) e.getMessage();
int length = buf.readableBytes();
byte[] data = new byte[length];
public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) {
ChannelBuffer buf = (ChannelBuffer) event.getMessage();
MessageTree tree = new DefaultMessageTree();
buf.readBytes(data);
handleMessage(data);
m_codec.decode(buf, tree);
m_messageHandler.handle(tree);
}
}
}
......@@ -5,8 +5,7 @@ import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
......@@ -70,15 +69,10 @@ public class TcpSocketSender implements MessageSender {
@Override
public void send(MessageTree tree) {
byte[] data = m_codec.encode(tree);
Channel channel = m_future.getChannel();
ChannelBufferFactory factory = channel.getConfig().getBufferFactory();
ChannelBuffer buf = factory.getBuffer(data.length + 4);
buf.writeInt(data.length);
buf.writeBytes(data);
channel.write(buf);
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(20 * 1024); // 20K
m_codec.encode(tree, buf);
m_future.getChannel().write(buf);
}
public void setHost(String host) {
......
package com.dianping.cat.message.spi;
import org.jboss.netty.buffer.ChannelBuffer;
public interface MessageCodec {
public byte[] encode(MessageTree tree);
public void decode(ChannelBuffer buf, MessageTree tree);
public MessageTree decode(byte[] data);
public void encode(MessageTree tree, ChannelBuffer buf);
}
package com.dianping.cat.message.spi;
import com.dianping.cat.message.Message;
public interface MessageTree {
public String getDomain();
......@@ -7,7 +9,7 @@ public interface MessageTree {
public String getIpAddress();
public MessageTree getMessage();
public Message getMessage();
public String getMessageId();
......
package com.dianping.cat.message.spi;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import org.jboss.netty.buffer.ChannelBuffer;
public class StringRope {
private List<String> m_parts;
private BitSet m_flags;
public StringRope() {
this(10);
}
public StringRope(int initialSize) {
m_parts = new ArrayList<String>(initialSize);
m_flags = new BitSet(1024);
}
public StringRope add(String str) {
return add(str, false);
}
public StringRope add(String str, boolean utf8) {
return add(str, utf8);
}
protected StringRope add(Object obj, boolean utf8) {
m_flags.set(m_parts.size(), utf8);
m_parts.add(String.valueOf(obj));
return this;
}
public int size() {
return m_parts.size();
}
public boolean isEmpty() {
return m_parts.isEmpty();
}
public void writeTo(ChannelBuffer buffer) {
int size = m_parts.size();
int writeIndex = buffer.writerIndex();
int total = 0;
buffer.writeInt(0); // place-holder
for (int i = 0; i < size; i++) {
String part = m_parts.get(i);
byte[] data;
if (!m_flags.get(i)) { // no need to encode
data = part.getBytes();
} else {
try {
data = part.getBytes("utf-8");
} catch (UnsupportedEncodingException e) {
data = part.getBytes();
}
}
buffer.writeBytes(data);
total += data.length;
}
buffer.setInt(writeIndex, total);
}
public StringRope add(StringRope rope) {
int size = rope.size();
for (int i = 0; i < size; i++) {
add(rope.m_parts.get(i), rope.m_flags.get(i));
}
return this;
}
}
package com.dianping.cat.message.spi.codec;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import org.jboss.netty.buffer.ChannelBuffer;
import com.dianping.cat.message.Event;
import com.dianping.cat.message.Heartbeat;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.StringRope;
public class CopyOfPlainTextMessageCodec implements MessageCodec {
private static final String ID = "PT1"; // plain text version 1
private static final String TAB = "\t";
private static final String LF = "\n";
private SimpleDateFormat m_dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss.sss");
@Override
public void decode(ChannelBuffer buf, MessageTree tree) {
// TODO Auto-generated method stub
}
public StringRope encode(MessageTree tree) {
StringRope rope = new StringRope(1024);
encodeHeader(rope, tree);
encodeMessage(rope, tree.getMessage());
return rope;
}
protected void encodeHeader(StringRope rope, MessageTree tree) {
rope.add(ID);
rope.add(tree.getDomain()).add(TAB);
rope.add(tree.getHostName()).add(TAB);
rope.add(String.valueOf(tree.getPort())).add(TAB);
rope.add(tree.getIpAddress()).add(TAB);
rope.add(tree.getThreadId()).add(TAB);
rope.add(tree.getMessageId()).add(TAB);
rope.add(tree.getRequestToken()).add(TAB);
rope.add(tree.getSessionToken()).add(TAB);
rope.add(LF);
}
protected void encodeLine(StringRope rope, Message message, String type, Policy policy) {
rope.add(type);
rope.add(m_dateFormat.format(new Date(message.getTimestamp()))).add(TAB);
rope.add(message.getType()).add(TAB);
rope.add(message.getName()).add(TAB);
if (policy != Policy.WITHOUT_STATUS) {
rope.add(message.getStatus()).add(TAB);
Object data = message.getData();
if (policy == Policy.WITH_DURATION && message instanceof Transaction) {
long duration = ((Transaction) message).getDuration();
rope.add(String.valueOf(duration)).add("ms").add(TAB);
}
if (data instanceof StringRope) {
rope.add((StringRope) message.getData()).add(TAB);
} else {
rope.add(String.valueOf(data), true).add(TAB);
}
}
rope.add(LF);
}
protected void encodeMessage(StringRope rope, Message message) {
if (message instanceof Event) {
encodeLine(rope, message, "E", Policy.DEFAULT);
} else if (message instanceof Transaction) {
Transaction transaction = (Transaction) message;
List<Message> children = transaction.getChildren();
if (children.isEmpty()) {
encodeLine(rope, message, "A", Policy.WITH_DURATION);
} else {
encodeLine(rope, message, "t", Policy.WITHOUT_STATUS);
for (Message child : children) {
encodeMessage(rope, child);
}
encodeLine(rope, message, "T", Policy.WITH_DURATION);
}
} else if (message instanceof Heartbeat) {
encodeLine(rope, message, "H", Policy.DEFAULT);
} else {
throw new RuntimeException(String.format("Unsupported message type: %s.", message.getClass()));
}
}
protected static enum Policy {
DEFAULT,
WITHOUT_STATUS,
WITH_DURATION;
}
@Override
public void encode(MessageTree tree, ChannelBuffer buf) {
}
}
package com.dianping.cat.message.spi.codec;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import org.jboss.netty.buffer.ChannelBuffer;
import com.dianping.cat.message.Event;
import com.dianping.cat.message.Heartbeat;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.StringRope;
public class PlainTextMessageCodec implements MessageCodec {
private static final String ID = "PT1"; // plain text version 1
private static final String TAB = "\t";
private static final String LF = "\n";
private SimpleDateFormat m_dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss.sss");
@Override
public byte[] encode(MessageTree tree) {
return null;
public void decode(ChannelBuffer buf, MessageTree tree) {
// TODO Auto-generated method stub
}
public StringRope encode(MessageTree tree) {
StringRope rope = new StringRope(1024);
encodeHeader(rope, tree);
encodeMessage(rope, tree.getMessage());
return rope;
}
@Override
public MessageTree decode(byte[] data) {
return null;
public void encode(MessageTree tree, ChannelBuffer buf) {
encodeHeader(tree, buf);
}
private void encodeHeader(MessageTree tree, ChannelBuffer buf) {
}
protected void encodeHeader(StringRope rope, MessageTree tree) {
rope.add(ID);
rope.add(tree.getDomain()).add(TAB);
rope.add(tree.getHostName()).add(TAB);
rope.add(String.valueOf(tree.getPort())).add(TAB);
rope.add(tree.getIpAddress()).add(TAB);
rope.add(tree.getThreadId()).add(TAB);
rope.add(tree.getMessageId()).add(TAB);
rope.add(tree.getRequestToken()).add(TAB);
rope.add(tree.getSessionToken()).add(TAB);
rope.add(LF);
}
protected void encodeLine(StringRope rope, Message message, String type, Policy policy) {
rope.add(type);
rope.add(m_dateFormat.format(new Date(message.getTimestamp()))).add(TAB);
rope.add(message.getType()).add(TAB);
rope.add(message.getName()).add(TAB);
if (policy != Policy.WITHOUT_STATUS) {
rope.add(message.getStatus()).add(TAB);
Object data = message.getData();
if (policy == Policy.WITH_DURATION && message instanceof Transaction) {
long duration = ((Transaction) message).getDuration();
rope.add(String.valueOf(duration)).add("ms").add(TAB);
}
if (data instanceof StringRope) {
rope.add((StringRope) message.getData()).add(TAB);
} else {
rope.add(String.valueOf(data), true).add(TAB);
}
}
rope.add(LF);
}
protected void encodeMessage(StringRope rope, Message message) {
if (message instanceof Event) {
encodeLine(rope, message, "E", Policy.DEFAULT);
} else if (message instanceof Transaction) {
Transaction transaction = (Transaction) message;
List<Message> children = transaction.getChildren();
if (children.isEmpty()) {
encodeLine(rope, message, "A", Policy.WITH_DURATION);
} else {
encodeLine(rope, message, "t", Policy.WITHOUT_STATUS);
for (Message child : children) {
encodeMessage(rope, child);
}
encodeLine(rope, message, "T", Policy.WITH_DURATION);
}
} else if (message instanceof Heartbeat) {
encodeLine(rope, message, "H", Policy.DEFAULT);
} else {
throw new RuntimeException(String.format("Unsupported message type: %s.", message.getClass()));
}
}
protected static enum Policy {
DEFAULT,
WITHOUT_STATUS,
WITH_DURATION;
}
}
package com.dianping.cat.message.spi.consumer;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessageTree;
public class DummyConsumer implements MessageConsumer {
@Override
public String getConsumerId() {
return "dummy";
}
@Override
public String getDomain() {
return null;
}
@Override
public void consume(MessageTree tree) {
// Do nothing here
}
}
......@@ -5,8 +5,10 @@ import java.util.List;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessageConsumerRegistry;
import com.site.lookup.annotation.Inject;
public class DefaultMessageConsumerRegistry implements MessageConsumerRegistry {
@Inject
private List<MessageConsumer> m_consumers = new ArrayList<MessageConsumer>();
@Override
......
......@@ -20,7 +20,7 @@ public class DefaultMessageTree implements MessageTree {
}
@Override
public MessageTree getMessage() {
public Message getMessage() {
return null;
}
......@@ -50,6 +50,6 @@ public class DefaultMessageTree implements MessageTree {
}
public void setMessage(Message message) {
}
}
}
......@@ -28,15 +28,41 @@
<role>com.dianping.cat.message.MessageProducer</role>
<implementation>com.dianping.cat.message.internal.DefaultMessageProducer</implementation>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageConsumerRegistry</role>
<implementation>com.dianping.cat.message.spi.internal.DefaultMessageConsumerRegistry</implementation>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageCodec</role>
<role-hint>plain-text</role-hint>
<implementation>com.dianping.cat.message.spi.codec.PlainTextMessageCodec</implementation>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageConsumer</role>
<role-hint>dummy</role-hint>
<implementation>com.dianping.cat.message.spi.consumer.DummyConsumer</implementation>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageConsumer</role>
<role-hint>dummy2</role-hint>
<implementation>com.dianping.cat.message.spi.consumer.DummyConsumer</implementation>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageConsumer</role>
<role-hint>dummy3</role-hint>
<implementation>com.dianping.cat.message.spi.consumer.DummyConsumer</implementation>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageConsumerRegistry</role>
<implementation>com.dianping.cat.message.spi.internal.DefaultMessageConsumerRegistry</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessageConsumer</role>
<role-hints>
<role-hint>dummy</role-hint>
<role-hint>dummy2</role-hint>
<role-hint>dummy3</role-hint>
</role-hints>
<field-name>m_consumers</field-name>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageHandler</role>
<implementation>com.dianping.cat.message.spi.internal.DefaultMessageHandler</implementation>
......
package com.dianping.cat.message.internal;
import java.nio.charset.Charset;
import junit.framework.Assert;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
......@@ -41,7 +45,9 @@ public class MessageProducerTest extends ComponentTestCase {
Assert.assertEquals("One message should be in the queue.", 1, queue.size());
MessageTree tree = queue.peek();
ChannelBuffer buf = ChannelBuffers.dynamicBuffer();
Assert.assertEquals("...", codec.encode(tree));
codec.encode(tree, buf);
Assert.assertEquals("...", buf.toString(Charset.forName("utf-8")));
}
}
......@@ -8,6 +8,7 @@ import java.util.concurrent.Future;
import junit.framework.Assert;
import org.jboss.netty.buffer.ChannelBuffer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
......@@ -73,13 +74,14 @@ public class TcpSocketTest extends ComponentTestCase {
public static class MockMessageCodec implements MessageCodec {
@Override
public MessageTree decode(byte[] bytes) {
return new DefaultMessageTree();
public void decode(ChannelBuffer buf, MessageTree tree) {
// do nothing here
}
@Override
public byte[] encode(MessageTree message) {
return "mock".getBytes();
public void encode(MessageTree tree, ChannelBuffer buf) {
buf.writeInt(4);
buf.writeBytes("mock".getBytes());
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册