提交 23f044b7 编写于 作者: F Frankie Wu

integrate configuration

上级 2a90459f
......@@ -26,6 +26,23 @@
</dependencies>
<build>
<plugins>
<plugin>
<groupId>com.site.maven.plugins</groupId>
<artifactId>maven-codegen-plugin</artifactId>
<version>1.0.0-a1</version>
<executions>
<execution>
<id>default-cli</id>
<phase>generate-sources</phase>
<goals>
<goal>dal-model</goal>
</goals>
<configuration>
<manifest>${basedir}/src/main/resources/META-INF/dal/model/manifest.xml</manifest>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
package com.dianping.cat;
import java.io.File;
import java.io.InputStream;
import org.codehaus.plexus.DefaultPlexusContainer;
import org.codehaus.plexus.PlexusContainer;
import org.codehaus.plexus.PlexusContainerException;
import org.codehaus.plexus.component.repository.exception.ComponentLookupException;
import com.dianping.cat.configuration.model.ClientConfigValidator;
import com.dianping.cat.configuration.model.entity.Config;
import com.dianping.cat.configuration.model.transform.DefaultParser;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.spi.MessageManager;
import com.site.helper.Files;
/**
* This is the main entry point to the system.
......@@ -13,22 +21,86 @@ import com.dianping.cat.message.MessageProducer;
* @author Frankie Wu
*/
public class Cat {
public static final String CAT_CONFIG_XML = "/META-INF/cat/config.xml";
private static PlexusContainer s_container;
private static MessageProducer s_producer;
private static MessageManager s_manager;
static {
try {
s_container = new DefaultPlexusContainer();
} catch (PlexusContainerException e) {
e.printStackTrace();
throw new RuntimeException("Error when creating Plexus container, "
+ "please make sure the environment was setup correctly!", e);
}
}
public static MessageProducer getProducer() {
try {
return (MessageProducer) s_container.lookup(MessageProducer.class);
s_manager = (MessageManager) s_container.lookup(MessageManager.class);
} catch (ComponentLookupException e) {
throw new RuntimeException("Unable to get instance of MessageManager, "
+ "please make sure the environment was setup correctly!", e);
}
try {
s_producer = (MessageProducer) s_container.lookup(MessageProducer.class);
} catch (ComponentLookupException e) {
throw new RuntimeException("Unable to get instance of MessageProducer, "
+ "please make sure the environment wa setup correctly!", e);
+ "please make sure the environment was setup correctly!", e);
}
}
public static MessageProducer getProducer() {
return s_producer;
}
// this should be called during application initialization time
public static void initialize(File configFile) {
Config config = null;
// read config from local file system
try {
if (configFile != null) {
String xml = Files.forIO().readFrom(configFile.getCanonicalFile(), "utf-8");
config = new DefaultParser().parse(xml);
}
if (config == null) {
InputStream in = Thread.currentThread().getContextClassLoader().getResourceAsStream(CAT_CONFIG_XML);
if (in == null) {
in = Cat.class.getResourceAsStream(CAT_CONFIG_XML);
}
if (in != null) {
String xml = Files.forIO().readFrom(in, "utf-8");
config = new DefaultParser().parse(xml);
}
}
} catch (Exception e) {
throw new RuntimeException(String.format("Error when loading configuration file: %s!", configFile), e);
}
if (config != null) {
ClientConfigValidator validator = new ClientConfigValidator();
config.accept(validator);
s_manager.initialize(config);
}
}
// this should be called when a thread ends to clean some thread local data
public static void reset() {
s_manager.reset();
}
// this should be called when a thread starts to create some thread local
// data
public static void setup(String sessionToken, String requestToken) {
s_manager.setup(sessionToken, requestToken);
}
}
......@@ -4,18 +4,22 @@ import java.util.ArrayList;
import java.util.List;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.internal.DefaultMessageManager;
import com.dianping.cat.message.internal.DefaultMessageProducer;
import com.dianping.cat.message.internal.MessageManager;
import com.dianping.cat.message.io.DefaultTransportManager;
import com.dianping.cat.message.io.InMemoryQueue;
import com.dianping.cat.message.io.InMemoryReceiver;
import com.dianping.cat.message.io.InMemorySender;
import com.dianping.cat.message.io.MessageReceiver;
import com.dianping.cat.message.io.MessageSender;
import com.dianping.cat.message.io.TcpSocketReceiver;
import com.dianping.cat.message.io.TcpSocketSender;
import com.dianping.cat.message.io.TransportManager;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessageConsumerRegistry;
import com.dianping.cat.message.spi.MessageHandler;
import com.dianping.cat.message.spi.MessageManager;
import com.dianping.cat.message.spi.codec.PlainTextMessageCodec;
import com.dianping.cat.message.spi.consumer.DummyConsumer;
import com.dianping.cat.message.spi.consumer.DumpToFileConsumer;
......@@ -35,8 +39,7 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(MessageReceiver.class, "in-memory", InMemoryReceiver.class) //
.req(InMemoryQueue.class));
// MessageSender of MessageManager should be provided by application
all.add(C(MessageManager.class));
all.add(C(MessageManager.class, DefaultMessageManager.class));
all.add(C(MessageProducer.class, DefaultMessageProducer.class) //
.req(MessageManager.class));
......@@ -47,13 +50,18 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(MessageConsumerRegistry.class, DefaultMessageConsumerRegistry.class) //
.req(MessageConsumer.class, new String[] { "dummy" }, "m_consumers"));
all.add(C(MessageSender.class, "tcp-socket", TcpSocketSender.class) //
.is(PER_LOOKUP) //
.req(MessageCodec.class, "plain-text"));
all.add(C(MessageReceiver.class, "tcp-socket", TcpSocketReceiver.class) //
.is(PER_LOOKUP) //
.req(MessageCodec.class, "plain-text"));
all.add(C(TransportManager.class, DefaultTransportManager.class) //
.req(MessageManager.class));
all.add(C(MessageHandler.class, DefaultMessageHandler.class) //
.req(MessageConsumerRegistry.class) //
.req(MessageReceiver.class, "tcp-socket"));
.req(TransportManager.class));
return all;
}
......
package com.dianping.cat.configuration.model;
import com.dianping.cat.configuration.model.entity.Config;
import com.dianping.cat.configuration.model.transform.DefaultValidator;
public class ClientConfigValidator extends DefaultValidator {
@Override
public void visitConfig(Config config) {
if (!"client".equals(config.getMode())) {
throw new RuntimeException(String.format("Attribute(%)s at path(%s) is required!", "mode", "/config"));
} else if (config.getApp() == null) {
throw new RuntimeException(String.format("Element(%s) at path(%s) is required!", "app", "/config"));
} else if (config.getServers().size() == 0) {
throw new RuntimeException(String.format("Element(%s) at path(%s) is required!", "servers", "/config"));
}
super.visitConfig(config);
}
}
package com.dianping.cat.message.internal;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Stack;
import com.dianping.cat.configuration.model.entity.Config;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.io.MessageSender;
import com.dianping.cat.message.io.TransportManager;
import com.dianping.cat.message.spi.MessageManager;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
import com.site.lookup.annotation.Inject;
import com.site.lookup.ContainerHolder;
public class MessageManager {
// we don't use static modifier since MessageManager is actual a singleton
private ThreadLocal<Context> m_context = new ThreadLocal<Context>() {
public class DefaultMessageManager extends ContainerHolder implements MessageManager {
private TransportManager m_manager;
// we don't use static modifier since MessageManager is a singleton in
// production actually
private InheritableThreadLocal<Context> m_context = new InheritableThreadLocal<Context>() {
@Override
protected Context initialValue() {
return new Context();
return null;
}
};
@Inject
private MessageSender m_sender;
private Config m_config;
private String m_domain;
private String m_hostName;
private String m_ipAddress;
@Override
public void add(Message message) {
m_context.get().add(this, message);
getContext().add(this, message);
}
@Override
public void end(Transaction transaction) {
m_context.get().end(this, transaction);
getContext().end(this, transaction);
}
void flush(MessageTree tree) {
if (m_sender != null) {
m_sender.send(tree);
// if (m_manager == null) {
// throw new
// RuntimeException("Cat has not been initialized successfully, please call Cat.initialize() first!");
// }
if (m_manager != null) {
MessageSender sender = m_manager.getSender();
if (sender != null) {
sender.send(tree);
}
}
}
@Override
public Config getConfig() {
return m_config;
}
Context getContext() {
Context ctx = m_context.get();
if (ctx == null) {
throw new RuntimeException(
"Cat has not been initialized successfully, please call Cal.setup(...) first for each thread.");
} else {
return ctx;
}
}
// destroy current thread data
m_context.remove();
@Override
public void initialize(Config config) {
m_config = config;
if (m_config != null && m_config.getApp() != null) {
m_domain = m_config.getApp().getDomain();
}
try {
InetAddress localHost = InetAddress.getLocalHost();
m_hostName = localHost.getHostName();
m_ipAddress = localHost.getHostAddress();
} catch (UnknownHostException e) {
e.printStackTrace();
}
m_manager = lookup(TransportManager.class);
}
public void setSender(MessageSender sender) {
m_sender = sender;
@Override
public void reset() {
// destroy current thread local data
m_context.remove();
}
@Override
public void setup(String sessionToken, String requestToken) {
Context ctx = new Context(m_domain, m_hostName, m_ipAddress, sessionToken, requestToken);
m_context.set(ctx);
}
@Override
public void start(Transaction transaction) {
m_context.get().start(transaction);
getContext().start(transaction);
}
static class Context {
private MessageTree m_tree = new DefaultMessageTree();
private MessageTree m_tree;
private Stack<Transaction> m_stack = new Stack<Transaction>();
private Stack<Transaction> m_stack;
public Context(String domain, String hostName, String ipAddress, String sessionToken, String requestToken) {
m_tree = new DefaultMessageTree();
m_stack = new Stack<Transaction>();
m_tree.setDomain(domain);
m_tree.setSessionToken(sessionToken);
m_tree.setRequestToken(requestToken);
m_tree.setThreadId(Long.toHexString(Thread.currentThread().getId()));
m_tree.setHostName(hostName);
m_tree.setIpAddress(ipAddress);
m_tree.setMessageId("?"); // TODO
}
public void add(MessageManager manager, Message message) {
public void add(DefaultMessageManager manager, Message message) {
if (m_stack.isEmpty()) {
m_tree.setMessage(message);
manager.flush(m_tree);
......@@ -62,7 +142,7 @@ public class MessageManager {
}
}
public void end(MessageManager manager, Transaction transaction) {
public void end(DefaultMessageManager manager, Transaction transaction) {
if (!m_stack.isEmpty()) {
Transaction current = m_stack.peek();
......@@ -92,7 +172,7 @@ public class MessageManager {
m_stack.push(transaction);
}
private void validateTransaction(Transaction transaction) {
void validateTransaction(Transaction transaction) {
for (Message message : transaction.getChildren()) {
if (message.getStatus() == null) {
message.setStatus("unset");
......
......@@ -8,6 +8,7 @@ import com.dianping.cat.message.Heartbeat;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.spi.MessageManager;
import com.site.lookup.annotation.Inject;
public class DefaultMessageProducer implements MessageProducer {
......
......@@ -6,6 +6,7 @@ import java.util.List;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.spi.MessageManager;
public class DefaultTransaction extends AbstractMessage implements Transaction {
private long m_duration = -1;
......
package com.dianping.cat.message.io;
import java.util.List;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import com.dianping.cat.configuration.model.entity.Bind;
import com.dianping.cat.configuration.model.entity.Config;
import com.dianping.cat.configuration.model.entity.Server;
import com.dianping.cat.message.spi.MessageManager;
import com.site.lookup.ContainerHolder;
import com.site.lookup.annotation.Inject;
public class DefaultTransportManager extends ContainerHolder implements TransportManager, Initializable {
@Inject
private MessageManager m_manager;
private MessageSender m_sender;
private MessageReceiver m_receiver;
@Override
public MessageReceiver getReceiver() {
if (m_receiver == null) {
throw new RuntimeException("Client mode only, no receiver is provided!");
}
return m_receiver;
}
@Override
public MessageSender getSender() {
if (m_sender == null) {
throw new RuntimeException("Server mode only, no sender is provided!");
}
return m_sender;
}
@Override
public void initialize() throws InitializationException {
Config config = m_manager.getConfig();
if (config == null) {
// by default, no configuration needed in develop mode, all in memory
m_sender = lookup(MessageSender.class, "in-memory");
m_receiver = lookup(MessageReceiver.class, "in-memory");
} else {
String mode = config.getMode();
if ("client".equals(mode)) {
List<Server> servers = config.getServers();
if (servers.size() == 1) {
TcpSocketSender sender = (TcpSocketSender) lookup(MessageSender.class, "tcp-socket");
Server server = servers.get(0);
sender.setHost(server.getIp());
sender.setPort(server.getPort());
sender.initialize();
m_sender = sender;
} else {
throw new UnsupportedOperationException("Not implemented yet");
}
} else if ("server".equals(mode)) {
TcpSocketReceiver receiver = (TcpSocketReceiver) lookup(MessageReceiver.class, "tcp-socket");
Bind bind = config.getBind();
receiver.setHost(bind.getIp());
receiver.setPort(bind.getPort());
receiver.initialize();
m_receiver = receiver;
} else if ("broker".equals(mode)) {
throw new UnsupportedOperationException("Not implemented yet");
} else {
throw new IllegalArgumentException(String.format("Unsupported mode(%s)!", mode));
}
}
}
public void setMessageManager(MessageManager manager) {
m_manager = manager;
}
}
......@@ -75,6 +75,10 @@ public class TcpSocketReceiver implements MessageReceiver {
m_messageHandler = handler;
}
public void setCodec(MessageCodec codec) {
m_codec = codec;
}
public void setHost(String host) {
m_host = host;
}
......
......@@ -75,6 +75,10 @@ public class TcpSocketSender implements MessageSender {
m_future.getChannel().write(buf);
}
public void setCodec(MessageCodec codec) {
m_codec = codec;
}
public void setHost(String host) {
m_host = host;
}
......
package com.dianping.cat.message.io;
public interface TransportManager {
public MessageReceiver getReceiver();
public MessageSender getSender();
}
......@@ -15,6 +15,7 @@ public abstract class AbstractMessageAnalyzer<R> implements MessageAnalyzer {
}
}
}
R result = generate();
store(result);
}
......
package com.dianping.cat.message.spi;
import com.dianping.cat.configuration.model.entity.Config;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
public interface MessageManager {
public void add(Message message);
public void end(Transaction transaction);
public Config getConfig();
public void initialize(Config config);
public void reset();
public void setup(String sessionToken, String requestToken);
public void start(Transaction transaction);
}
\ No newline at end of file
......@@ -13,8 +13,6 @@ public interface MessageTree {
public String getMessageId();
public int getPort();
public String getRequestToken();
public String getSessionToken();
......@@ -31,8 +29,6 @@ public interface MessageTree {
public void setMessageId(String messageId);
public void setPort(int port);
public void setRequestToken(String requestToken);
public void setSessionToken(String sessionToken);
......
......@@ -44,7 +44,6 @@ public class PlainTextMessageCodec implements MessageCodec {
String id = helper.read(buf, TAB);
String domain = helper.read(buf, TAB);
String hostName = helper.read(buf, TAB);
String port = helper.read(buf, TAB);
String ipAddress = helper.read(buf, TAB);
String threadId = helper.read(buf, TAB);
String messageId = helper.read(buf, TAB);
......@@ -54,7 +53,6 @@ public class PlainTextMessageCodec implements MessageCodec {
if (ID.equals(id)) {
tree.setDomain(domain);
tree.setHostName(hostName);
tree.setPort(Integer.parseInt(port));
tree.setIpAddress(ipAddress);
tree.setThreadId(threadId);
tree.setMessageId(messageId);
......@@ -199,8 +197,6 @@ public class PlainTextMessageCodec implements MessageCodec {
count += helper.write(buf, TAB);
count += helper.write(buf, tree.getHostName());
count += helper.write(buf, TAB);
count += helper.write(buf, String.valueOf(tree.getPort()));
count += helper.write(buf, TAB);
count += helper.write(buf, tree.getIpAddress());
count += helper.write(buf, TAB);
count += helper.write(buf, tree.getThreadId());
......
......@@ -3,6 +3,7 @@ package com.dianping.cat.message.spi.internal;
import java.util.List;
import com.dianping.cat.message.io.MessageReceiver;
import com.dianping.cat.message.io.TransportManager;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessageConsumerRegistry;
import com.dianping.cat.message.spi.MessageHandler;
......@@ -11,7 +12,7 @@ import com.site.lookup.annotation.Inject;
public class DefaultMessageHandler implements MessageHandler, Runnable {
@Inject
private MessageReceiver m_receiver;
private TransportManager m_manager;
@Inject
private MessageConsumerRegistry m_registry;
......@@ -34,14 +35,16 @@ public class DefaultMessageHandler implements MessageHandler, Runnable {
@Override
public void run() {
m_receiver.onMessage(this);
}
MessageReceiver receiver = m_manager.getReceiver();
public void setReceiver(MessageReceiver receiver) {
m_receiver = receiver;
receiver.onMessage(this);
}
public void setRegistry(MessageConsumerRegistry registry) {
m_registry = registry;
}
public void setTransportManager(TransportManager manager) {
m_manager = manager;
}
}
......@@ -18,8 +18,6 @@ public class DefaultMessageTree implements MessageTree {
private String m_messageId;
private int m_port;
private String m_requestToken;
private String m_sessionToken;
......@@ -53,11 +51,6 @@ public class DefaultMessageTree implements MessageTree {
return m_messageId;
}
@Override
public int getPort() {
return m_port;
}
@Override
public String getRequestToken() {
return m_requestToken;
......@@ -98,11 +91,6 @@ public class DefaultMessageTree implements MessageTree {
m_messageId = messageId;
}
@Override
public void setPort(int port) {
m_port = port;
}
@Override
public void setRequestToken(String requestToken) {
m_requestToken = requestToken;
......
<?xml version="1.0" encoding="UTF-8"?>
<model>
<entity name="config" root="true">
<attribute name="mode" value-type="String" />
<entity-ref name="app" />
<entity-ref name="server" list="true" list-name="servers" xml-indent="true" />
<entity-ref name="bind" />
<entity-ref name="filter" />
</entity>
<entity name="app">
<attribute name="domain" value-type="String" />
<attribute name="ip" value-type="String" />
</entity>
<entity name="server">
<attribute name="ip" value-type="String" />
<attribute name="port" value-type="int" />
<attribute name="enabled" value-type="boolean" />
</entity>
<entity name="bind">
<attribute name="ip" value-type="String" />
<attribute name="port" value-type="int" />
</entity>
<entity name="filter">
<element name="domain" value-type="String" list="true" list-name="domains" />
</entity>
</model>
<?xml version="1.0" encoding="UTF-8"?>
<manifest>
<file path="codegen.xml" />
<file path="model.xml" />
</manifest>
<?xml version="1.0" encoding="UTF-8"?>
<model model-package="com.dianping.cat.configuration.model" enable-xml-parser="true" enable-validator="true"
enable-xml-schema="true" enable-xml-sample="true">
<entity name="config" root="true">
<attribute name="mode" required="true" />
</entity>
<entity name="app">
<attribute name="domain" required="true" />
</entity>
<entity name="server">
<attribute name="ip" required="true" />
<attribute name="port" default-value="2280" />
<attribute name="enabled" default-value="true" />
</entity>
<entity name="bind">
<attribute name="port" default-value="2280" />
</entity>
</model>
......@@ -25,15 +25,15 @@
</requirements>
</component>
<component>
<role>com.dianping.cat.message.internal.MessageManager</role>
<implementation>com.dianping.cat.message.internal.MessageManager</implementation>
<role>com.dianping.cat.message.spi.MessageManager</role>
<implementation>com.dianping.cat.message.internal.DefaultMessageManager</implementation>
</component>
<component>
<role>com.dianping.cat.message.MessageProducer</role>
<implementation>com.dianping.cat.message.internal.DefaultMessageProducer</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.internal.MessageManager</role>
<role>com.dianping.cat.message.spi.MessageManager</role>
</requirement>
</requirements>
</component>
......@@ -47,6 +47,11 @@
<role-hint>dummy</role-hint>
<implementation>com.dianping.cat.message.spi.consumer.DummyConsumer</implementation>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageConsumer</role>
<role-hint>dump-to-file</role-hint>
<implementation>com.dianping.cat.message.spi.consumer.DumpToFileConsumer</implementation>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageConsumerRegistry</role>
<implementation>com.dianping.cat.message.spi.internal.DefaultMessageConsumerRegistry</implementation>
......@@ -60,6 +65,39 @@
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.message.io.MessageSender</role>
<role-hint>tcp-socket</role-hint>
<implementation>com.dianping.cat.message.io.TcpSocketSender</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessageCodec</role>
<role-hint>plain-text</role-hint>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.message.io.MessageReceiver</role>
<role-hint>tcp-socket</role-hint>
<implementation>com.dianping.cat.message.io.TcpSocketReceiver</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessageCodec</role>
<role-hint>plain-text</role-hint>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.message.io.TransportManager</role>
<implementation>com.dianping.cat.message.io.DefaultTransportManager</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessageManager</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageHandler</role>
<implementation>com.dianping.cat.message.spi.internal.DefaultMessageHandler</implementation>
......@@ -68,8 +106,7 @@
<role>com.dianping.cat.message.spi.MessageConsumerRegistry</role>
</requirement>
<requirement>
<role>com.dianping.cat.message.io.MessageReceiver</role>
<role-hint>in-memory</role-hint>
<role>com.dianping.cat.message.io.TransportManager</role>
</requirement>
</requirements>
</component>
......
package com.dianping.cat.message;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.dianping.cat.Cat;
......@@ -7,6 +9,16 @@ import com.dianping.cat.Cat;
public class EventTest {
public static MessageProducer CAT = Cat.getProducer();
@Before
public void before() {
Cat.setup(null, null);
}
@After
public void after() {
Cat.reset();
}
@Test
public void testNormal() {
Event event = CAT.newEvent("Review", "New");
......
package com.dianping.cat.message;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.dianping.cat.Cat;
......@@ -7,6 +9,16 @@ import com.dianping.cat.Cat;
public class HeartbeatTest {
private static final MessageProducer CAT = Cat.getProducer();
@Before
public void before() {
Cat.setup(null, null);
}
@After
public void after() {
Cat.reset();
}
@Test
public void testInOneShot() {
CAT.logHeartbeat("System", "Status", "0",
......
package com.dianping.cat.message;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.dianping.cat.Cat;
......@@ -7,6 +9,16 @@ import com.dianping.cat.Cat;
public class TransactionTest {
private static final MessageProducer CAT = Cat.getProducer();
@Before
public void before() {
Cat.setup(null, null);
}
@After
public void after() {
Cat.reset();
}
@Test
public void testNormal() {
Transaction t = CAT.newTransaction("URL", "MyPage");
......
package com.dianping.cat.message.configuration.model;
import java.io.InputStream;
import junit.framework.Assert;
import org.junit.Test;
import com.dianping.cat.configuration.model.entity.Config;
import com.dianping.cat.configuration.model.transform.DefaultParser;
import com.site.helper.Files;
public class ConfigTest {
@Test
public void testClient() throws Exception {
InputStream in = getClass().getResourceAsStream("client.xml");
String xml = Files.forIO().readFrom(in, "utf-8");
Config config = new DefaultParser().parse(xml);
Assert.assertEquals("client", config.getMode());
Assert.assertEquals("Review", config.getApp().getDomain());
Assert.assertEquals("192.168.8.1", config.getApp().getIp());
Assert.assertEquals(3, config.getServers().size());
Assert.assertEquals(2280, config.getServers().get(0).getPort().intValue());
Assert.assertEquals(true, config.getServers().get(0).isEnabled());
Assert.assertEquals(2281, config.getServers().get(1).getPort().intValue());
Assert.assertEquals(false, config.getServers().get(1).isEnabled());
Assert.assertEquals(2280, config.getServers().get(2).getPort().intValue());
Assert.assertEquals(true, config.getServers().get(2).isEnabled());
}
@Test
public void testServer() throws Exception {
InputStream in = getClass().getResourceAsStream("server.xml");
String xml = Files.forIO().readFrom(in, "utf-8");
Config config = new DefaultParser().parse(xml);
Assert.assertEquals("server", config.getMode());
Assert.assertEquals("192.168.8.21", config.getBind().getIp());
Assert.assertEquals(2280, config.getBind().getPort().intValue());
Assert.assertEquals("[Review, Group]", config.getFilter().getDomains().toString());
}
}
......@@ -2,6 +2,8 @@ package com.dianping.cat.message.internal;
import junit.framework.Assert;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
......@@ -10,15 +12,31 @@ import com.dianping.cat.message.Message;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.io.InMemoryQueue;
import com.dianping.cat.message.spi.MessageManager;
import com.dianping.cat.message.spi.MessageTree;
import com.site.lookup.ComponentTestCase;
@RunWith(JUnit4.class)
public class MessageProducerTest extends ComponentTestCase {
@Before
public void before() throws Exception {
MessageManager manager = lookup(MessageManager.class);
manager.initialize(null);
manager.setup(null, null);
}
@After
public void after() throws Exception {
MessageManager manager = lookup(MessageManager.class);
manager.reset();
}
@Test
public void testNormal() throws Exception {
MessageProducer producer = lookup(MessageProducer.class);
InMemoryQueue queue = lookup(InMemoryQueue.class, "mock");
InMemoryQueue queue = lookup(InMemoryQueue.class);
Transaction t = producer.newTransaction("URL", "MyPage");
try {
......
package com.dianping.cat.message.internal;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import com.dianping.cat.message.io.InMemoryQueue;
import com.dianping.cat.message.io.InMemorySender;
import com.dianping.cat.message.io.MessageSender;
import com.site.lookup.configuration.AbstractResourceConfigurator;
import com.site.lookup.configuration.Component;
public class MessageProducerTestConfigurator extends AbstractResourceConfigurator {
public static void main(String[] args) {
generatePlexusComponentsXmlFile(new MessageProducerTestConfigurator());
}
@Override
public List<Component> defineComponents() {
List<Component> all = new ArrayList<Component>();
all.add(C(InMemoryQueue.class, "mock", InMemoryQueue.class));
all.add(C(MessageSender.class, "mock", InMemorySender.class) //
.req(InMemoryQueue.class, "mock"));
all.add(C(MessageManager.class) //
.req(MessageSender.class, "mock"));
return all;
}
@Override
protected File getConfigurationFile() {
return new File("src/test/resources/" + MessageProducerTest.class.getName().replace('.', '/') + ".xml");
}
}
......@@ -60,7 +60,6 @@ public class PlainTextMessageCodecTest {
tree.setHostName("hostName");
tree.setIpAddress("ipAddress");
tree.setMessageId("messageId");
tree.setPort(1234);
tree.setRequestToken("requestToken");
tree.setSessionToken("sessionToken");
tree.setThreadId("threadId");
......@@ -116,12 +115,12 @@ public class PlainTextMessageCodecTest {
DefaultMessageTree tree = newMessageTree();
long timestamp = 1325489621987L;
Assert.assertEquals("PT1\tdomain\thostName\t1234\tipAddress\tthreadId\tmessageId\trequestToken\tsessionToken\n",
Assert.assertEquals("PT1\tdomain\thostName\tipAddress\tthreadId\tmessageId\trequestToken\tsessionToken\n",
tree.toString());
tree.setMessage(newEvent("type", "name", timestamp, "0", "here is the data."));
Assert.assertEquals("PT1\tdomain\thostName\t1234\tipAddress\tthreadId\tmessageId\trequestToken\tsessionToken\n" + //
Assert.assertEquals("PT1\tdomain\thostName\tipAddress\tthreadId\tmessageId\trequestToken\tsessionToken\n" + //
"E2012-01-02 15:33:41.987\ttype\tname\t0\there is the data.\t\n", tree.toString());
}
......
<config mode="client" xmlns:xsi="http://www.w3.org/2001/XMLSchema" xsi:noNamespaceSchemaLocation="config.xsd">
<app domain="Review" ip="192.168.8.1" />
<servers>
<server ip="192.168.8.21" port="2280" />
<server ip="192.168.8.22" port="2281" enabled="false" />
<server ip="192.168.8.23" />
</servers>
</config>
\ No newline at end of file
<?xml version="1.0" encoding="utf-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" elementFormDefault="qualified" attributeFormDefault="unqualified">
<xs:element name="config" type="ConfigType"/>
<xs:complexType name="ConfigType">
<xs:sequence>
<xs:element name="app" type="AppType" minOccurs="0" maxOccurs="1"/>
<xs:element name="servers">
<xs:complexType>
<xs:sequence minOccurs="0" maxOccurs="unbounded">
<xs:element name="server" type="ServerType"/>
</xs:sequence>
</xs:complexType>
</xs:element>
<xs:element name="bind" type="BindType" minOccurs="0" maxOccurs="1"/>
<xs:element name="filter" type="FilterType" minOccurs="0" maxOccurs="1"/>
</xs:sequence>
<xs:attribute name="mode" type="xs:string"/>
</xs:complexType>
<xs:complexType name="AppType">
<xs:attribute name="domain" type="xs:string"/>
<xs:attribute name="ip" type="xs:string"/>
</xs:complexType>
<xs:complexType name="ServerType">
<xs:attribute name="ip" type="xs:string"/>
<xs:attribute name="port" type="xs:string"/>
<xs:attribute name="enabled" type="xs:boolean" default="false"/>
</xs:complexType>
<xs:complexType name="BindType">
<xs:attribute name="ip" type="xs:string"/>
<xs:attribute name="port" type="xs:string"/>
</xs:complexType>
<xs:complexType name="FilterType">
<xs:sequence>
<xs:element name="domain" type="xs:string" minOccurs="0" maxOccurs="1"/>
</xs:sequence>
</xs:complexType>
</xs:schema>
<config mode="server" xmlns:xsi="http://www.w3.org/2001/XMLSchema" xsi:noNamespaceSchemaLocation="config.xsd">
<bind ip="192.168.8.21" />
<filter>
<domain>Review</domain>
<domain>Group</domain>
</filter>
</config>
\ No newline at end of file
<plexus>
<components>
<component>
<role>com.dianping.cat.message.io.InMemoryQueue</role>
<role-hint>mock</role-hint>
<implementation>com.dianping.cat.message.io.InMemoryQueue</implementation>
</component>
<component>
<role>com.dianping.cat.message.io.MessageSender</role>
<role-hint>mock</role-hint>
<implementation>com.dianping.cat.message.io.InMemorySender</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.io.InMemoryQueue</role>
<role-hint>mock</role-hint>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.message.internal.MessageManager</role>
<implementation>com.dianping.cat.message.internal.MessageManager</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.io.MessageSender</role>
<role-hint>mock</role-hint>
</requirement>
</requirements>
</component>
</components>
</plexus>
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册