提交 0b308eae 编写于 作者: F Frankie Wu

separate client and server config

上级 c2985035
......@@ -268,7 +268,7 @@ public class FailureReportAnalyzer extends AbstractMessageAnalyzer<FailureReport
@Override
public void initialize() throws InitializationException {
Config config = m_manager.getConfig();
Config config = m_manager.getClientConfig();
if (config != null) {
Property property = config.findProperty("failure-base-dir");
......
......@@ -126,7 +126,7 @@ public class TransactionReportAnalyzer extends AbstractMessageAnalyzer<Transacti
@Override
public void initialize() throws InitializationException {
Config config = m_manager.getConfig();
Config config = m_manager.getClientConfig();
if (config != null) {
Property property = config.findProperty("transaction-base-dir");
......
......@@ -21,7 +21,7 @@ import com.site.lookup.ComponentTestCase;
public class FailureAnalyzerStoreTest extends ComponentTestCase {
@Test
public void testJson() throws Exception {
long current = System.currentTimeMillis();
long current = 1327470645035L;
long duration = 60 * 60 * 1000;
long extraTime = 5 * 60 * 1000;
long start = current - current % (60 * 60 * 1000);
......@@ -56,7 +56,7 @@ public class FailureAnalyzerStoreTest extends ComponentTestCase {
String json = builder.getString();
String expected = Files.forIO().readFrom(getResourceFile("failure.json"), "utf-8");
Assert.assertEquals("Check json content!", expected, json);
Assert.assertEquals("Check json content!", expected.replace("\r", ""), json.replace("\r", ""));
}
@Test
......
{
"domain": "domain1",
"startTime": "2012-01-21 21:00:00",
"endTime": "2012-01-21 21:59:00",
"startTime": "2012-01-25 13:00:00",
"endTime": "2012-01-25 13:59:00",
"machines": {
"machines": [
"192.168.8.0",
......@@ -20,116 +20,116 @@
]
},
"segments": {
"2012-01-21 21:00": {
"id": "2012-01-21 21:00",
"2012-01-25 13:00": {
"id": "2012-01-25 13:00",
"entries": [
{
"type": "LongUrl",
"path": "file:/Users/qmwu/project/tracking/cat-consumer/target/catlog/20120121/middleware/MessageId0",
"path": "file:/Users/qmwu/project/tracking/cat-consumer/target/catlog/20120125/middleware/MessageId0",
"threadId": "Thread0",
"text": "A1B1"
},
{
"type": "LongUrl",
"path": "file:/Users/qmwu/project/tracking/cat-consumer/target/catlog/20120121/middleware/MessageId0",
"path": "file:/Users/qmwu/project/tracking/cat-consumer/target/catlog/20120125/middleware/MessageId0",
"threadId": "Thread0",
"text": "A1B1"
},
{
"type": "LongUrl",
"path": "file:/Users/qmwu/project/tracking/cat-consumer/target/catlog/20120121/middleware/MessageId0",
"path": "file:/Users/qmwu/project/tracking/cat-consumer/target/catlog/20120125/middleware/MessageId0",
"threadId": "Thread0",
"text": "A1B1"
}
]
},
"2012-01-21 21:01": {
"id": "2012-01-21 21:01",
"2012-01-25 13:01": {
"id": "2012-01-25 13:01",
"entries": [
{
"type": "LongUrl",
"path": "file:/Users/qmwu/project/tracking/cat-consumer/target/catlog/20120121/middleware/MessageId1",
"path": "file:/Users/qmwu/project/tracking/cat-consumer/target/catlog/20120125/middleware/MessageId1",
"threadId": "Thread1",
"text": "A1B1"
},
{
"type": "LongUrl",
"path": "file:/Users/qmwu/project/tracking/cat-consumer/target/catlog/20120121/middleware/MessageId1",
"path": "file:/Users/qmwu/project/tracking/cat-consumer/target/catlog/20120125/middleware/MessageId1",
"threadId": "Thread1",
"text": "A1B1"
},
{
"type": "LongUrl",
"path": "file:/Users/qmwu/project/tracking/cat-consumer/target/catlog/20120121/middleware/MessageId1",
"path": "file:/Users/qmwu/project/tracking/cat-consumer/target/catlog/20120125/middleware/MessageId1",
"threadId": "Thread1",
"text": "A1B1"
}
]
},
"2012-01-21 21:02": {
"id": "2012-01-21 21:02",
"2012-01-25 13:02": {
"id": "2012-01-25 13:02",
"entries": [
{
"type": "LongUrl",
"path": "file:/Users/qmwu/project/tracking/cat-consumer/target/catlog/20120121/middleware/MessageId2",
"path": "file:/Users/qmwu/project/tracking/cat-consumer/target/catlog/20120125/middleware/MessageId2",
"threadId": "Thread2",
"text": "A1B1"
},
{
"type": "LongUrl",
"path": "file:/Users/qmwu/project/tracking/cat-consumer/target/catlog/20120121/middleware/MessageId2",
"path": "file:/Users/qmwu/project/tracking/cat-consumer/target/catlog/20120125/middleware/MessageId2",
"threadId": "Thread2",
"text": "A1B1"
},
{
"type": "LongUrl",
"path": "file:/Users/qmwu/project/tracking/cat-consumer/target/catlog/20120121/middleware/MessageId2",
"path": "file:/Users/qmwu/project/tracking/cat-consumer/target/catlog/20120125/middleware/MessageId2",
"threadId": "Thread2",
"text": "A1B1"
}
]
},
"2012-01-21 21:03": {
"id": "2012-01-21 21:03",
"2012-01-25 13:03": {
"id": "2012-01-25 13:03",
"entries": [
{
"type": "LongUrl",
"path": "file:/Users/qmwu/project/tracking/cat-consumer/target/catlog/20120121/middleware/MessageId3",
"path": "file:/Users/qmwu/project/tracking/cat-consumer/target/catlog/20120125/middleware/MessageId3",
"threadId": "Thread3",
"text": "A1B1"
},
{
"type": "LongUrl",
"path": "file:/Users/qmwu/project/tracking/cat-consumer/target/catlog/20120121/middleware/MessageId3",
"path": "file:/Users/qmwu/project/tracking/cat-consumer/target/catlog/20120125/middleware/MessageId3",
"threadId": "Thread3",
"text": "A1B1"
},
{
"type": "LongUrl",
"path": "file:/Users/qmwu/project/tracking/cat-consumer/target/catlog/20120121/middleware/MessageId3",
"path": "file:/Users/qmwu/project/tracking/cat-consumer/target/catlog/20120125/middleware/MessageId3",
"threadId": "Thread3",
"text": "A1B1"
}
]
},
"2012-01-21 21:04": {
"id": "2012-01-21 21:04",
"2012-01-25 13:04": {
"id": "2012-01-25 13:04",
"entries": [
{
"type": "LongUrl",
"path": "file:/Users/qmwu/project/tracking/cat-consumer/target/catlog/20120121/middleware/MessageId4",
"path": "file:/Users/qmwu/project/tracking/cat-consumer/target/catlog/20120125/middleware/MessageId4",
"threadId": "Thread4",
"text": "A1B1"
},
{
"type": "LongUrl",
"path": "file:/Users/qmwu/project/tracking/cat-consumer/target/catlog/20120121/middleware/MessageId4",
"path": "file:/Users/qmwu/project/tracking/cat-consumer/target/catlog/20120125/middleware/MessageId4",
"threadId": "Thread4",
"text": "A1B1"
},
{
"type": "LongUrl",
"path": "file:/Users/qmwu/project/tracking/cat-consumer/target/catlog/20120121/middleware/MessageId4",
"path": "file:/Users/qmwu/project/tracking/cat-consumer/target/catlog/20120125/middleware/MessageId4",
"threadId": "Thread4",
"text": "A1B1"
}
......
......@@ -96,7 +96,7 @@ public class Cat {
ClientConfigValidator validator = new ClientConfigValidator();
config.accept(validator);
getInstance().m_manager.initialize(config);
getInstance().m_manager.initializeClient(config);
} else {
System.out.println("[WARN] Cat client is disabled due to no config file found!");
}
......
......@@ -81,8 +81,7 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
.req(MessageManager.class));
all.add(C(MessageHandler.class, DefaultMessageHandler.class) //
.req(MessageConsumerRegistry.class) //
.req(TransportManager.class));
.req(MessageManager.class, MessageConsumerRegistry.class));
return all;
}
......
......@@ -27,7 +27,9 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan
}
};
private Config m_config = new Config();
private Config m_clientConfig;
private Config m_serverConfig;
private String m_domain;
......@@ -61,8 +63,13 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan
}
@Override
public Config getConfig() {
return m_config;
public Config getClientConfig() {
return m_clientConfig;
}
@Override
public Config getServerConfig() {
return m_serverConfig;
}
Context getContext() {
......@@ -77,11 +84,16 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan
}
@Override
public void initialize(Config config) {
m_config = config;
public void initializeClient(Config clientConfig) {
if (clientConfig != null) {
m_clientConfig = clientConfig;
} else {
m_clientConfig = new Config();
m_clientConfig.setMode("client");
}
if (m_config.getApp() != null) {
m_domain = m_config.getApp().getDomain();
if (m_clientConfig.getApp() != null) {
m_domain = m_clientConfig.getApp().getDomain();
}
try {
......@@ -96,12 +108,15 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan
m_manager = lookup(TransportManager.class);
}
@Override
public void initializeServer(Config serverConfig) {
m_serverConfig = serverConfig;
}
@Override
public void reset() {
System.out.println(m_context);
// destroy current thread local data
m_context.remove();
System.out.println(m_context.get());
}
@Override
......
......@@ -5,7 +5,6 @@ import java.util.List;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import com.dianping.cat.configuration.model.entity.Bind;
import com.dianping.cat.configuration.model.entity.Config;
import com.dianping.cat.configuration.model.entity.Server;
import com.dianping.cat.message.spi.MessageManager;
......@@ -18,17 +17,6 @@ public class DefaultTransportManager extends ContainerHolder implements Transpor
private MessageSender m_sender;
private MessageReceiver m_receiver;
@Override
public MessageReceiver getReceiver() {
if (m_receiver == null) {
throw new RuntimeException("Client mode only, no receiver is provided!");
}
return m_receiver;
}
@Override
public MessageSender getSender() {
if (m_sender == null) {
......@@ -40,19 +28,19 @@ public class DefaultTransportManager extends ContainerHolder implements Transpor
@Override
public void initialize() throws InitializationException {
Config config = m_manager.getConfig();
Config config = m_manager.getClientConfig();
if (config == null) {
// by default, no configuration needed in develop mode, all in memory
m_sender = lookup(MessageSender.class, "in-memory");
m_receiver = lookup(MessageReceiver.class, "in-memory");
} else {
String mode = config.getMode();
if ("client".equals(mode)) {
List<Server> servers = config.getServers();
int size = servers.size();
if (servers.size() == 1) {
if (size == 1) {
TcpSocketSender sender = (TcpSocketSender) lookup(MessageSender.class, "tcp-socket");
Server server = servers.get(0);
......@@ -61,22 +49,14 @@ public class DefaultTransportManager extends ContainerHolder implements Transpor
sender.initialize();
m_sender = sender;
} else if (size == 0) {
m_sender = lookup(MessageSender.class, "in-memory");
} else {
throw new UnsupportedOperationException("Not implemented yet");
}
} else if ("server".equals(mode)) {
TcpSocketReceiver receiver = (TcpSocketReceiver) lookup(MessageReceiver.class, "tcp-socket");
Bind bind = config.getBind();
receiver.setHost(bind.getIp());
receiver.setPort(bind.getPort());
receiver.initialize();
m_receiver = receiver;
} else if ("broker".equals(mode)) {
throw new UnsupportedOperationException("Not implemented yet");
} else {
throw new IllegalArgumentException(String.format("Unsupported mode(%s)!", mode));
throw new IllegalArgumentException(String.format(
"Only mode(client) was supported in transport manager, but was mode(%s)!", mode));
}
}
}
......
......@@ -106,6 +106,7 @@ public class TcpSocketSender implements MessageSender {
ChannelBuffer buf = (ChannelBuffer) e.getMessage();
while (buf.readable()) {
// TODO do something here
System.out.println((char) buf.readByte());
System.out.flush();
}
......
package com.dianping.cat.message.io;
public interface TransportManager {
public MessageReceiver getReceiver();
public MessageSender getSender();
}
......@@ -9,9 +9,13 @@ public interface MessageManager {
public void end(Transaction transaction);
public Config getConfig();
public Config getClientConfig();
public void initialize(Config config);
public Config getServerConfig();
public void initializeClient(Config config);
public void initializeServer(Config config);
public void reset();
......
......@@ -2,21 +2,30 @@ package com.dianping.cat.message.spi.internal;
import java.util.List;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import com.dianping.cat.configuration.model.entity.Bind;
import com.dianping.cat.configuration.model.entity.Config;
import com.dianping.cat.message.io.MessageReceiver;
import com.dianping.cat.message.io.TransportManager;
import com.dianping.cat.message.io.TcpSocketReceiver;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessageConsumerRegistry;
import com.dianping.cat.message.spi.MessageHandler;
import com.dianping.cat.message.spi.MessageManager;
import com.dianping.cat.message.spi.MessageTree;
import com.site.lookup.ContainerHolder;
import com.site.lookup.annotation.Inject;
public class DefaultMessageHandler implements MessageHandler, Runnable {
public class DefaultMessageHandler extends ContainerHolder implements MessageHandler, Initializable, Runnable {
@Inject
private TransportManager m_manager;
private MessageManager m_manager;
@Inject
private MessageConsumerRegistry m_registry;
private MessageReceiver m_receiver;
@Override
public void handle(MessageTree tree) {
List<MessageConsumer> consumers = m_registry.getConsumers();
......@@ -34,17 +43,38 @@ public class DefaultMessageHandler implements MessageHandler, Runnable {
}
@Override
public void run() {
MessageReceiver receiver = m_manager.getReceiver();
public void initialize() throws InitializationException {
Config config = m_manager.getServerConfig();
receiver.onMessage(this);
if (config == null) {
// by default, no configuration needed in develop mode, all in memory
m_receiver = lookup(MessageReceiver.class, "in-memory");
} else {
String mode = config.getMode();
if ("server".equals(mode)) {
TcpSocketReceiver receiver = (TcpSocketReceiver) lookup(MessageReceiver.class, "tcp-socket");
Bind bind = config.getBind();
receiver.setHost(bind.getIp());
receiver.setPort(bind.getPort());
receiver.initialize();
m_receiver = receiver;
} else if ("broker".equals(mode)) {
throw new UnsupportedOperationException("Not implemented yet");
} else {
throw new IllegalArgumentException(String.format("Unsupported mode(%s) in message handler!", mode));
}
}
}
public void setRegistry(MessageConsumerRegistry registry) {
m_registry = registry;
@Override
public void run() {
m_receiver.onMessage(this);
}
public void setTransportManager(TransportManager manager) {
m_manager = manager;
public void setRegistry(MessageConsumerRegistry registry) {
m_registry = registry;
}
}
......@@ -45,7 +45,7 @@ public class DefaultMessagePathBuilder implements MessagePathBuilder, Initializa
@Override
public void initialize() throws InitializationException {
Config config = m_manager.getConfig();
Config config = m_manager.getClientConfig();
String baseLogDir = config.getBaseLogDir();
String baseLogUrl = config.getBaseLogUrl();
......
......@@ -162,10 +162,10 @@
<implementation>com.dianping.cat.message.spi.internal.DefaultMessageHandler</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessageConsumerRegistry</role>
<role>com.dianping.cat.message.spi.MessageManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.message.io.TransportManager</role>
<role>com.dianping.cat.message.spi.MessageConsumerRegistry</role>
</requirement>
</requirements>
</component>
......
......@@ -22,7 +22,7 @@ public class MessageProducerTest extends ComponentTestCase {
public void before() throws Exception {
MessageManager manager = lookup(MessageManager.class);
manager.initialize(null);
manager.initializeClient(null);
manager.setup(null, null);
}
......
......@@ -13,8 +13,7 @@ com.dianping.cat.report.page.transaction.Handler.class,
com.dianping.cat.report.page.failure.Handler.class,
com.dianping.cat.report.page.logview.Handler.class,
com.dianping.cat.report.page.logview.Handler.class
})
public class ReportModule extends AbstractModule {
......
......@@ -7,13 +7,11 @@ public enum ReportPage implements Page {
HOME("home", "Home", true),
TRANSACTION("t", "Transaction", true),
TRANSACTION("transaction", "Transaction", true),
FAILURE("f", "Failure", true),
FAILURE("failure", "Failure", true),
LOGVIEW("m", "Logview", true),
;
LOGVIEW("logview", "Logview", true);
private String m_name;
......
......@@ -34,7 +34,7 @@ public class CatServlet extends AbstractContainerServlet {
try {
MessageManager manager = lookup(MessageManager.class);
manager.initialize(config);
manager.initializeServer(config);
DefaultMessageHandler handler = (DefaultMessageHandler) lookup(MessageHandler.class);
......
<config mode="client" xmlns:xsi="http://www.w3.org/2001/XMLSchema" xsi:noNamespaceSchemaLocation="config.xsd">
<app domain="Cat" />
<servers>
<server ip="127.0.0.1" port="2280" />
</servers>
</config>
\ No newline at end of file
......@@ -73,9 +73,9 @@ public class SimpleServer extends SimpleServerSupport {
protected void postConfigure(Context ctx) {
ServletHolder holder = new ServletHolder(s_mvc);
ctx.addServlet(new ServletHolder(s_cat), "/s/*");
ctx.addServlet(holder, "/");
ctx.addServlet(holder, "/r/*");
ctx.addServlet(new ServletHolder(s_cat), "/s/*");
ctx.addFilter(GzipFilter.class, "/r/*", Handler.ALL);
super.postConfigure(ctx);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册