提交 7ae45388 编写于 作者: Y You Yong

modify the cat-core, change the tcp send mode

上级 f1e47b9c
......@@ -8,7 +8,6 @@ import java.util.List;
import com.dianping.cat.consumer.AnalyzerFactory;
import com.dianping.cat.consumer.DefaultAnalyzerFactory;
import com.dianping.cat.consumer.DefaultMessageQueue;
import com.dianping.cat.consumer.RealtimeConsumer;
import com.dianping.cat.consumer.ip.IpAnalyzer;
import com.dianping.cat.consumer.problem.ProblemAnalyzer;
......@@ -16,6 +15,7 @@ import com.dianping.cat.consumer.problem.handler.FailureHandler;
import com.dianping.cat.consumer.problem.handler.Handler;
import com.dianping.cat.consumer.problem.handler.LongUrlHandler;
import com.dianping.cat.consumer.transaction.TransactionAnalyzer;
import com.dianping.cat.message.io.DefaultMessageQueue;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageQueue;
......@@ -28,8 +28,6 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
public List<Component> defineComponents() {
List<Component> all = new ArrayList<Component>();
all.add(C(MessageQueue.class, DefaultMessageQueue.class).is(PER_LOOKUP));
all.add(C(AnalyzerFactory.class, DefaultAnalyzerFactory.class));
all.add(C(MessageConsumer.class, "realtime", RealtimeConsumer.class) //
......
<plexus>
<components>
<component>
<role>com.dianping.cat.message.spi.MessageQueue</role>
<implementation>com.dianping.cat.consumer.DefaultMessageQueue</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
</component>
<component>
<role>com.dianping.cat.consumer.AnalyzerFactory</role>
<implementation>com.dianping.cat.consumer.DefaultAnalyzerFactory</implementation>
......
......@@ -7,6 +7,7 @@ import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.internal.DefaultMessageManager;
import com.dianping.cat.message.internal.DefaultMessageProducer;
import com.dianping.cat.message.internal.MessageIdFactory;
import com.dianping.cat.message.io.DefaultMessageQueue;
import com.dianping.cat.message.io.DefaultTransportManager;
import com.dianping.cat.message.io.InMemoryQueue;
import com.dianping.cat.message.io.InMemoryReceiver;
......@@ -22,6 +23,7 @@ import com.dianping.cat.message.spi.MessageConsumerRegistry;
import com.dianping.cat.message.spi.MessageHandler;
import com.dianping.cat.message.spi.MessageManager;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageQueue;
import com.dianping.cat.message.spi.MessageStorage;
import com.dianping.cat.message.spi.consumer.DummyConsumer;
import com.dianping.cat.message.spi.consumer.DumpToHtmlConsumer;
......@@ -49,7 +51,7 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(MessageIdFactory.class));
all.add(C(MessagePathBuilder.class, DefaultMessagePathBuilder.class) //
.req(MessageManager.class));
all.add(C(MessageStorage.class, "html", DefaultMessageStorage.class) //
.req(MessagePathBuilder.class) //
.req(MessageCodec.class, "html"));
......@@ -60,9 +62,12 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(MessageConsumerRegistry.class, DefaultMessageConsumerRegistry.class) //
.req(MessageConsumer.class, new String[] { DummyConsumer.ID }, "m_consumers"));
all.add(C(MessageQueue.class, DefaultMessageQueue.class).config(E("size").value("1000")).is(PER_LOOKUP));
all.add(C(MessageSender.class, "tcp-socket", TcpSocketSender.class) //
.is(PER_LOOKUP) //
.req(MessageCodec.class, "plain-text"));
.req(MessageCodec.class, "plain-text", "m_codec")//
.req(MessageQueue.class, "default", "m_queue"));
all.add(C(MessageReceiver.class, "tcp-socket", TcpSocketReceiver.class) //
.is(PER_LOOKUP) //
.req(MessageCodec.class, "plain-text"));
......
package com.dianping.cat.consumer;
package com.dianping.cat.message.io;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import com.dianping.cat.message.spi.MessageQueue;
import com.dianping.cat.message.spi.MessageTree;
import com.site.lookup.annotation.Inject;
public class DefaultMessageQueue implements MessageQueue, Initializable {
private BlockingQueue<MessageTree> m_queue;
/**
* @author yong.you
* @since Jan 5, 2012
*/
public class DefaultMessageQueue implements MessageQueue {
private BlockingQueue<MessageTree> queue = new LinkedBlockingQueue<MessageTree>();
@Inject
private int m_size;
@Override
public void initialize() throws InitializationException {
if (m_size > 0) {
m_queue = new LinkedBlockingQueue<MessageTree>(m_size);
}else{
m_queue = new LinkedBlockingQueue<MessageTree>();
}
}
@Override
public MessageTree poll() {
try {
return queue.poll(1, TimeUnit.MILLISECONDS);
return m_queue.poll(5, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
return null;
}
}
@Override
public void offer(MessageTree tree) {
queue.add(tree);
public boolean offer(MessageTree tree) {
return m_queue.offer(tree);
}
@Override
public int size() {
return queue.size();
return m_queue.size();
}
public void setSize(int size) {
m_size = size;
}
}
......@@ -21,10 +21,11 @@ import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageQueue;
import com.dianping.cat.message.spi.MessageTree;
import com.site.lookup.annotation.Inject;
public class TcpSocketSender implements MessageSender, LogEnabled {
public class TcpSocketSender extends Thread implements MessageSender, LogEnabled {
@Inject
private String m_host;
......@@ -34,6 +35,9 @@ public class TcpSocketSender implements MessageSender, LogEnabled {
@Inject
private MessageCodec m_codec;
@Inject
private MessageQueue m_queue;
private ChannelFactory m_factory;
private ChannelFuture m_future;
......@@ -46,6 +50,20 @@ public class TcpSocketSender implements MessageSender, LogEnabled {
private Logger m_logger;
public void run() {
while (true) {
try {
MessageTree tree = m_queue.poll();
if (tree != null) {
sendReal(tree);
}
} catch (Throwable t) {
m_logger.error("Error when sending message over TCP socket!", t);
}
}
}
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
......@@ -84,6 +102,8 @@ public class TcpSocketSender implements MessageSender, LogEnabled {
}
m_bootstrap = bootstrap;
this.start();
}
public void reconnect() {
......@@ -110,6 +130,14 @@ public class TcpSocketSender implements MessageSender, LogEnabled {
@Override
public void send(MessageTree tree) {
boolean result = m_queue.offer(tree);
if (!result) {
m_logger.error("Message queue is full in tcp socket sender!");
}
}
private void sendReal(MessageTree tree) {
if (m_future == null || !m_future.getChannel().isOpen()) {
reconnect();
}
......
......@@ -2,9 +2,10 @@ package com.dianping.cat.message.spi;
public interface MessageQueue {
//the current size of the queue
public int size();
public MessageTree poll();
public void offer(MessageTree tree);
public boolean offer(MessageTree tree);
}
......@@ -55,7 +55,9 @@ public class DefaultMessageTree implements MessageTree {
@Override
public void setParentMessageId(String parentMessageId) {
m_parentMessageId = parentMessageId;
if (parentMessageId != null && parentMessageId.length() > 0) {
m_parentMessageId = parentMessageId;
}
}
@Override
......@@ -65,7 +67,9 @@ public class DefaultMessageTree implements MessageTree {
@Override
public void setRootMessageId(String rootMessageId) {
m_rootMessageId = rootMessageId;
if (rootMessageId != null && rootMessageId.length() > 0) {
m_rootMessageId = rootMessageId;
}
}
@Override
......@@ -129,7 +133,9 @@ public class DefaultMessageTree implements MessageTree {
@Override
public void setMessageId(String messageId) {
m_messageId = messageId;
if (messageId != null && messageId.length() > 0) {
m_messageId = messageId;
}
}
@Override
......
......@@ -99,6 +99,14 @@
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageQueue</role>
<implementation>com.dianping.cat.message.io.DefaultMessageQueue</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<configuration>
<size>1000</size>
</configuration>
</component>
<component>
<role>com.dianping.cat.message.io.MessageSender</role>
<role-hint>tcp-socket</role-hint>
......@@ -108,6 +116,11 @@
<requirement>
<role>com.dianping.cat.message.spi.MessageCodec</role>
<role-hint>plain-text</role-hint>
<field-name>m_codec</field-name>
</requirement>
<requirement>
<role>com.dianping.cat.message.spi.MessageQueue</role>
<field-name>m_queue</field-name>
</requirement>
</requirements>
</component>
......
......@@ -6,6 +6,7 @@ import java.util.List;
import com.dianping.cat.message.io.TcpSocketTest.MockMessageCodec;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageQueue;
import com.site.lookup.configuration.AbstractResourceConfigurator;
import com.site.lookup.configuration.Component;
......@@ -22,7 +23,8 @@ public class TcpSocketTestConfigurator extends AbstractResourceConfigurator {
all.add(C(MessageCodec.class, tcpSocket, MockMessageCodec.class));
all.add(C(MessageSender.class, tcpSocket, TcpSocketSender.class).is(PER_LOOKUP) //
.req(MessageCodec.class, tcpSocket) //
.req(MessageCodec.class, tcpSocket, "m_codec") //
.req(MessageQueue.class, "default", "m_queue") //
.config(E("host").value("localhost")));
all.add(C(MessageReceiver.class, tcpSocket, TcpSocketReceiver.class).is(PER_LOOKUP) //
.req(MessageCodec.class, tcpSocket) //
......
......@@ -17,6 +17,11 @@
<requirement>
<role>com.dianping.cat.message.spi.MessageCodec</role>
<role-hint>tcp-socket</role-hint>
<field-name>m_codec</field-name>
</requirement>
<requirement>
<role>com.dianping.cat.message.spi.MessageQueue</role>
<field-name>m_queue</field-name>
</requirement>
</requirements>
</component>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册