提交 74a20738 编写于 作者: F Frankie Wu

small change

上级 f5d66f17
......@@ -4,8 +4,6 @@ import java.util.ArrayList;
import java.util.List;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.broker.DefaultMessageBroker;
import com.dianping.cat.message.broker.MessageBroker;
import com.dianping.cat.message.internal.DefaultMessageProducer;
import com.dianping.cat.message.io.InMemoryQueue;
import com.dianping.cat.message.io.InMemoryReceiver;
......@@ -13,9 +11,11 @@ import com.dianping.cat.message.io.InMemorySender;
import com.dianping.cat.message.io.MessageReceiver;
import com.dianping.cat.message.io.MessageSender;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageConsumerRegistry;
import com.dianping.cat.message.spi.MessageHandler;
import com.dianping.cat.message.spi.codec.PlainTextMessageCodec;
import com.dianping.cat.message.spi.internal.MessageDispatcher;
import com.dianping.cat.message.spi.internal.DefaultMessageConsumerRegistry;
import com.dianping.cat.message.spi.internal.DefaultMessageHandler;
import com.site.lookup.configuration.AbstractResourceConfigurator;
import com.site.lookup.configuration.Component;
......@@ -32,15 +32,13 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
.req(InMemoryQueue.class));
all.add(C(MessageProducer.class, DefaultMessageProducer.class));
all.add(C(MessageConsumerRegistry.class, DefaultMessageConsumerRegistry.class));
all.add(C(MessageCodec.class, "plain-text", PlainTextMessageCodec.class));
// the following are not used right now
all.add(C(MessageHandler.class, MessageDispatcher.class) //
.req(MessageReceiver.class, inMemory));
all.add(C(MessageBroker.class, inMemory, DefaultMessageBroker.class) //
.req(MessageSender.class, inMemory) //
all.add(C(MessageHandler.class, DefaultMessageHandler.class) //
.req(MessageConsumerRegistry.class) //
.req(MessageReceiver.class, inMemory));
return all;
......
......@@ -29,10 +29,18 @@ public class DefaultTransaction extends AbstractMessage implements Transaction {
@Override
public void complete() {
m_duration = (long) (System.nanoTime() / 1e6) - getTimestamp();
if (isCompleted()) {
// complete() was called more than once
DefaultEvent event = new DefaultEvent("CAT", "BadInstrument");
setCompleted(true);
MessageManager.INSTANCE.end(this);
event.setStatus("TransactionAlreadyCompleted");
event.complete();
} else {
m_duration = (long) (System.nanoTime() / 1e6) - getTimestamp();
setCompleted(true);
MessageManager.INSTANCE.end(this);
}
}
@Override
......
......@@ -112,6 +112,7 @@ public class TcpSocketReceiver implements MessageReceiver {
return null;
}
// TODO filter
return buffer.readBytes(length);
}
}
......
package com.dianping.cat.message.spi;
public interface MessageConsumer {
public String getId();
public String getConsumerId();
public String getDomain();
public void consume(MessageTree tree);
}
package com.dianping.cat.message.spi;
import java.util.Map;
import java.util.List;
public interface MessageConsumerRegistry {
public void registerFilter(MessageFilter filter);
public void registerConsumer(MessageConsumer consumer);
public Map<String, MessageConsumer> getConsumers();
public List<MessageConsumer> getConsumers();
}
\ No newline at end of file
package com.dianping.cat.message.spi.internal;
import java.util.LinkedHashMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessageConsumerRegistry;
import com.dianping.cat.message.spi.MessageFilter;
public class DefaultMessageConsumerRegistry implements MessageConsumerRegistry {
private Map<String, MessageConsumer> m_consumers = new LinkedHashMap<String, MessageConsumer>();
private List<MessageConsumer> m_consumers = new ArrayList<MessageConsumer>();
@Override
public Map<String, MessageConsumer> getConsumers() {
public List<MessageConsumer> getConsumers() {
return m_consumers;
}
@Override
public void registerConsumer(MessageConsumer consumer) {
m_consumers.put(consumer.getId(), consumer);
}
@Override
public void registerFilter(MessageFilter filter) {
}
static class Entry {
private MessageConsumer m_message;
private List<MessageFilter> m_filters;
m_consumers.add(consumer);
}
}
package com.dianping.cat.message.spi.internal;
import java.util.Collection;
import java.util.List;
import com.dianping.cat.message.io.MessageReceiver;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessageConsumerRegistry;
import com.dianping.cat.message.spi.MessageHandler;
import com.dianping.cat.message.spi.MessageTree;
import com.site.lookup.annotation.Inject;
public class MessageDispatcher implements MessageHandler, Runnable {
public class DefaultMessageHandler implements MessageHandler, Runnable {
@Inject
private MessageReceiver m_receiver;
@Inject
private DefaultMessageConsumerRegistry m_registry;
private MessageConsumerRegistry m_registry;
@Override
public void handle(MessageTree tree) {
Collection<MessageConsumer> comsumers = m_registry.getConsumers().values();
List<MessageConsumer> consumers = m_registry.getConsumers();
int size = consumers.size();
for (int i = 0; i < size; i++) {
MessageConsumer consumer = consumers.get(i);
for (MessageConsumer consumer : comsumers) {
try {
consumer.consume(tree);
} catch (Exception e) {
......@@ -37,7 +41,7 @@ public class MessageDispatcher implements MessageHandler, Runnable {
m_receiver = receiver;
}
public void setRegistry(DefaultMessageConsumerRegistry registry) {
public void setRegistry(MessageConsumerRegistry registry) {
m_registry = registry;
}
}
......@@ -28,6 +28,10 @@
<role>com.dianping.cat.message.MessageProducer</role>
<implementation>com.dianping.cat.message.internal.DefaultMessageProducer</implementation>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageConsumerRegistry</role>
<implementation>com.dianping.cat.message.spi.internal.DefaultMessageConsumerRegistry</implementation>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageCodec</role>
<role-hint>plain-text</role-hint>
......@@ -35,22 +39,10 @@
</component>
<component>
<role>com.dianping.cat.message.spi.MessageHandler</role>
<implementation>com.dianping.cat.message.spi.internal.MessageDispatcher</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.io.MessageReceiver</role>
<role-hint>in-memory</role-hint>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.message.broker.MessageBroker</role>
<role-hint>in-memory</role-hint>
<implementation>com.dianping.cat.message.broker.DefaultMessageBroker</implementation>
<implementation>com.dianping.cat.message.spi.internal.DefaultMessageHandler</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.io.MessageSender</role>
<role-hint>in-memory</role-hint>
<role>com.dianping.cat.message.spi.MessageConsumerRegistry</role>
</requirement>
<requirement>
<role>com.dianping.cat.message.io.MessageReceiver</role>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册