提交 4247fce7 编写于 作者: F Frankie Wu

after review

上级 ea77001e
......@@ -3,30 +3,30 @@ package com.dianping.cat.configuration;
import java.util.ArrayList;
import java.util.List;
import com.dianping.cat.transport.InMemoryTransport;
import com.dianping.cat.transport.TcpSocketTransport;
import com.dianping.cat.transport.Transport;
import com.dianping.cat.transport.TransportManager;
import com.dianping.cat.transport.UdpMulticastTransport;
import com.dianping.cat.message.transport.InMemoryTransport;
import com.dianping.cat.message.transport.TcpSocketTransport;
import com.dianping.cat.message.transport.Transport;
import com.dianping.cat.message.transport.TransportManager;
import com.dianping.cat.message.transport.UdpMulticastTransport;
import com.site.lookup.configuration.AbstractResourceConfigurator;
import com.site.lookup.configuration.Component;
public class ComponentsConfigurator extends AbstractResourceConfigurator {
@Override
public List<Component> defineComponents() {
List<Component> all = new ArrayList<Component>();
@Override
public List<Component> defineComponents() {
List<Component> all = new ArrayList<Component>();
all.add(C(Transport.class, "in-memory", InMemoryTransport.class));
all.add(C(Transport.class, "tcp-socket", TcpSocketTransport.class));
all.add(C(Transport.class, "udp-multicast", UdpMulticastTransport.class));
all.add(C(Transport.class, "in-memory", InMemoryTransport.class));
all.add(C(Transport.class, "tcp-socket", TcpSocketTransport.class));
all.add(C(Transport.class, "udp-multicast", UdpMulticastTransport.class));
all.add(C(TransportManager.class) //
.req(Transport.class, "im-memory"));
all.add(C(TransportManager.class) //
.req(Transport.class, "in-memory"));
return all;
}
return all;
}
public static void main(String[] args) {
generatePlexusComponentsXmlFile(new ComponentsConfigurator());
}
public static void main(String[] args) {
generatePlexusComponentsXmlFile(new ComponentsConfigurator());
}
}
......@@ -15,60 +15,70 @@ package com.dianping.cat.message;
* @author Frankie Wu
*/
public interface Message {
/**
* add one or multiple key-value pairs to the message.
*
* @param keyValuePairs
* key-value pairs like 'a=1&b=2&...'
*/
public void addData(String keyValuePairs);
public static final String SUCCESS = "0";
/**
* add one key-value pair to the message.
*
* @param key
* @param value
*/
public void addData(String key, Object value);
/**
* add one or multiple key-value pairs to the message.
*
* @param keyValuePairs
* key-value pairs like 'a=1&b=2&...'
*/
public void addData(String keyValuePairs);
/**
* Complete the message construction.
*/
public void complete();
/**
* add one key-value pair to the message.
*
* @param key
* @param value
*/
public void addData(String key, Object value);
/**
* Message name.
*
* @return message name
*/
public String getName();
/**
* Complete the message construction.
*/
public void complete();
/**
* Get the message status.
*
* @return message status. "0" means success, otherwise error code.
*/
public String getStatus();
/**
* Message name.
*
* @return message name
*/
public String getName();
/**
* The time stamp the message was created.
*
* @return message creation time stamp in milliseconds
*/
public long getTimestamp();
/**
* Get the message status.
*
* @return message status. "0" means success, otherwise error code.
*/
public String getStatus();
/**
* Message type.
*
* @return message type
*/
public String getType();
/**
* The time stamp the message was created.
*
* @return message creation time stamp in milliseconds
*/
public long getTimestamp();
/**
* Set the message status.
*
* @param status
* message status. "0" means success, otherwise error code.
*/
public void setStatus(String status);
/**
* Message type.
*
* @return message type
*/
public String getType();
/**
* Set the message status.
*
* @param status
* message status. "0" means success, otherwise error code.
*/
public void setStatus(String status);
/**
* Set the message status with exception class name.
*
* @param e
* exception.
*/
public void setStatus(Throwable e);
}
......@@ -34,5 +34,5 @@ package com.dianping.cat.message;
* @author Frankie Wu
*/
public interface Transaction extends Message {
public void setStatus(Throwable e);
public long getDuration();
}
package com.dianping.cat.message.broker;
public interface MessageBroker extends Runnable {
}
package com.dianping.cat.message.broker;
public class MulticastToSocketBroker implements MessageBroker {
@Override
public void run() {
}
}
package com.dianping.cat.message.broker;
public class SocketToMulticastBroker implements MessageBroker {
@Override
public void run() {
}
}
package com.dianping.cat.message.broker;
public class SocketToSocketBroker implements MessageBroker {
@Override
public void run() {
}
}
package com.dianping.cat.message.consumer;
import com.dianping.cat.message.Message;
public interface MessageConsumer {
public void consume(Message message);
}
package com.dianping.cat.message.handler;
import java.util.List;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.consumer.MessageConsumer;
public class MessageDispatcher implements MessageHandler {
private List<MessageConsumer> m_comsumers;
@Override
public void handle(Message message) {
}
}
package com.dianping.cat.transport;
package com.dianping.cat.message.handler;
import com.dianping.cat.message.Message;
......
package com.dianping.cat.transport;
package com.dianping.cat.message.transport;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.handler.MessageHandler;
public class InMemoryTransport implements Transport {
private BlockingQueue<Message> m_queue;
......
package com.dianping.cat.transport;
package com.dianping.cat.message.transport;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.handler.MessageHandler;
public class TcpSocketTransport implements Transport {
@Override
......
package com.dianping.cat.message.transport;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.handler.MessageHandler;
/**
* <p>
* Transport is a broker that listens to message from previous phase and relay
* (or re-send) it to next phase.
* </p>
*
* Typically, different environment has different transport configuration
* <ul>
* <li>Dev environment: [collector] ====> [in memory transport] ====> [message handler] ====> [consumers]</li>
* <li>QA environment: [collector] ====> [tcp socket transport] ==network==> [tcp socket transport] ====> [message handler] ====> [consumers]</li>
* </ul>
*
* @author Frankie Wu
*/
public interface Transport {
public void onMessage(MessageHandler handler);
public void send(Message message);
public void shutdown();
}
package com.dianping.cat.message.transport;
public class TransportManager {
private static TransportManager s_instance;
private Transport m_transport;
public static Transport getTransport() {
if (s_instance == null) {
throw new RuntimeException("Please call method setTransport() to initialize first!");
}
return s_instance.m_transport;
}
public void setTransport(Transport transport) {
if (transport == null) {
s_instance = null;
} else if (s_instance != null) {
throw new RuntimeException("TransportManager is already initialized!");
} else {
s_instance = new TransportManager();
s_instance.m_transport = transport;
}
}
}
package com.dianping.cat.transport;
package com.dianping.cat.message.transport;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.handler.MessageHandler;
public class UdpMulticastTransport implements Transport {
@Override
......
package com.dianping.cat.transport;
import com.dianping.cat.message.Message;
public interface Transport {
public void onMessage(MessageHandler handler);
public void send(Message message);
public void shutdown();
}
package com.dianping.cat.transport;
public class TransportManager {
private static TransportManager s_instance;
private Transport m_transport;
public static Transport getTransport() {
if (s_instance == null) {
throw new RuntimeException("Please call method setTransport() to initialize first!");
}
return s_instance.m_transport;
}
public void setTransport(Transport transport) {
if (transport == null) {
s_instance = null;
} else {
s_instance = new TransportManager();
s_instance.m_transport = transport;
}
}
}
......@@ -16,29 +16,29 @@ public class EventTest {
event.addData("id", 12345);
event.addData("user", "john");
event.setStatus("0");
event.setStatus(Message.SUCCESS);
event.complete();
}
@Test
public void testException() {
Exception e = new RuntimeException();
Event event = CAT.newEvent("Exception", e.getClass().getName());
Event event = CAT.newEvent("ERROR", e.getClass().getName());
event.addData(toString(e));
event.setStatus("0");
event.setStatus("-1");
event.complete();
}
@Test
public void testInOneShot() {
// Normal case
CAT.logEvent("Review", "New", "0", "id=12345&user=john");
CAT.logEvent("Review", "New", Message.SUCCESS, "id=12345&user=john");
// Exception case
Exception e = new RuntimeException();
CAT.logEvent("Exception", e.getClass().getName(), "0", toString(e));
CAT.logEvent("Exception", e.getClass().getName(), Message.SUCCESS, toString(e));
}
private String toString(Exception e) {
......
......@@ -9,29 +9,29 @@ public class HeartbeatTest {
@Test
public void testStatus() {
Heartbeat event = CAT.newHeartbeat("System", "Status");
event.addData("ip", "192.168.10.111");
event.addData("host", "host-1");
event.addData("load", "2.1");
event.addData("cpu", "0.12,0.10");
event.addData("memory.total", "2G");
event.addData("memory.free", "456M");
event.setStatus("0");
event.complete();
Heartbeat heartbeat = CAT.newHeartbeat("System", "Status");
heartbeat.addData("ip", "192.168.10.111");
heartbeat.addData("host", "host-1");
heartbeat.addData("load", "2.1");
heartbeat.addData("cpu", "0.12,0.10");
heartbeat.addData("memory.total", "2G");
heartbeat.addData("memory.free", "456M");
heartbeat.setStatus(Message.SUCCESS);
heartbeat.complete();
}
@Test
public void testService() {
Heartbeat event = CAT.newHeartbeat("Service", "ReviewService");
event.addData("host", "192.168.10.112:1234");
event.addData("weight", "20");
event.addData("visits", "12345");
event.addData("manifest", "addReview,getReview,getShopReviews");
event.addData("more", "...");
event.setStatus("0");
event.complete();
Heartbeat heartbeat = CAT.newHeartbeat("Service", "ReviewService");
heartbeat.addData("host", "192.168.10.112:1234");
heartbeat.addData("weight", "20");
heartbeat.addData("visits", "12345");
heartbeat.addData("manifest", "addReview,getReview,getShopReviews");
heartbeat.addData("more", "...");
heartbeat.setStatus(Message.SUCCESS);
heartbeat.complete();
}
@Test
......
......@@ -18,7 +18,7 @@ public class TransactionTest {
t.addData("k3", "v3");
Thread.sleep(30);
t.setStatus("0");
t.setStatus(Message.SUCCESS);
} catch (Exception e) {
t.setStatus(e);
} finally {
......
package com.dianping.cat.transport;
package com.dianping.cat.message.transport;
import java.util.ArrayList;
import java.util.List;
......@@ -12,6 +12,7 @@ import junit.framework.Assert;
import org.junit.Test;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.handler.MessageHandler;
public class InMemoryTransportTest {
@Test
......@@ -53,10 +54,6 @@ public class InMemoryTransportTest {
static class MockMessage implements Message {
private String m_name;
public void setName(String name) {
m_name = name;
}
@Override
public void addData(String keyValuePairs) {
}
......@@ -89,9 +86,17 @@ public class InMemoryTransportTest {
return null;
}
public void setName(String name) {
m_name = name;
}
@Override
public void setStatus(String status) {
}
@Override
public void setStatus(Throwable e) {
}
}
static class MockMessageHandler implements MessageHandler {
......
package com.dianping.cat.message.transport;
import junit.framework.Assert;
import org.junit.Test;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.handler.MessageHandler;
public class TransportManagerTest {
@Test
public void testInitailized() {
new TransportManager().setTransport(new MockTransport());
Assert.assertNotNull(TransportManager.getTransport());
new TransportManager().setTransport(null);
}
@Test
public void testNotInitailized() {
try {
Assert.assertNotNull(TransportManager.getTransport());
Assert.fail("TransportManager should be initialized first before call getTransport()!");
} catch (RuntimeException e) {
// expected
}
}
@Test
public void testDoubleInitailization() {
new TransportManager().setTransport(new MockTransport());
try {
new TransportManager().setTransport(new MockTransport());
Assert.fail("Double initailization of TransportManager should not be allowed!");
} catch (RuntimeException e) {
// expected
}
}
static class MockTransport implements Transport {
@Override
public void onMessage(MessageHandler handler) {
}
@Override
public void send(Message message) {
}
@Override
public void shutdown() {
}
}
}
package com.dianping.cat.transport;
import junit.framework.Assert;
import org.junit.Test;
public class TransportManagerTest {
@Test
public void testInitailized() {
new TransportManager().setTransport(new InMemoryTransport());
Assert.assertNotNull(TransportManager.getTransport());
new TransportManager().setTransport(null);
}
@Test(expected = RuntimeException.class)
public void testNotInitailized() {
Assert.assertNotNull(TransportManager.getTransport());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册