diff --git a/cat-core/src/main/java/com/dianping/cat/message/spi/MessageStorage.java b/cat-core/src/main/java/com/dianping/cat/message/spi/MessageStorage.java index 53ddb92198cacfdb1b31d45f548abd02e86c8201..398c040d7f54788d58c3d681e3cfd2bb468fa5b7 100644 --- a/cat-core/src/main/java/com/dianping/cat/message/spi/MessageStorage.java +++ b/cat-core/src/main/java/com/dianping/cat/message/spi/MessageStorage.java @@ -1,12 +1,25 @@ package com.dianping.cat.message.spi; +/** + * @author franke.wu + * @author sean.wang + * @since Mar 6, 2012 + */ public interface MessageStorage { /** * Store a message tree to the storage. * * @param tree - * message tree to store + * message tree to store * @return relative path to base directory or base URL */ public String store(MessageTree tree); + + /** + * Fetch a message tree from the store. + * + * @param messageId + * @return + */ + public MessageTree get(String messageId); } diff --git a/cat-core/src/main/java/com/dianping/cat/message/spi/internal/DefaultMessageStorage.java b/cat-core/src/main/java/com/dianping/cat/message/spi/internal/DefaultMessageStorage.java index e73eea2cd24a8c70e3a903f142cdb2ceff740ea0..a3c3289b84996b64182fc5f600e257f1ece976d3 100644 --- a/cat-core/src/main/java/com/dianping/cat/message/spi/internal/DefaultMessageStorage.java +++ b/cat-core/src/main/java/com/dianping/cat/message/spi/internal/DefaultMessageStorage.java @@ -132,4 +132,10 @@ public class DefaultMessageStorage implements MessageStorage, Initializable, Dis m_active = false; } } + + @Override + public MessageTree get(String messageId) { + // TODO Auto-generated method stub + return null; + } } diff --git a/cat-job/src/main/java/com/dianping/cat/job/hdfs/DefaultOutputChannel.java b/cat-job/src/main/java/com/dianping/cat/job/hdfs/DefaultOutputChannel.java index 695e3b3275f9298b59903e6b876658884dcfc003..99dce1449c128cb336fd0b6d48fd7a285cfb6a59 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/hdfs/DefaultOutputChannel.java +++ b/cat-job/src/main/java/com/dianping/cat/job/hdfs/DefaultOutputChannel.java @@ -80,4 +80,5 @@ public class DefaultOutputChannel implements OutputChannel { return true; } + } diff --git a/cat-job/src/main/java/com/dianping/cat/job/hdfs/HdfsMessageStorage.java b/cat-job/src/main/java/com/dianping/cat/job/hdfs/HdfsMessageStorage.java index b833f389596632f125aaebf29668ada9efafd23e..026ef03236200ed337e9487f040c23c2409b604d 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/hdfs/HdfsMessageStorage.java +++ b/cat-job/src/main/java/com/dianping/cat/job/hdfs/HdfsMessageStorage.java @@ -138,4 +138,10 @@ public class HdfsMessageStorage implements MessageStorage, Initializable, Dispos } } } + + @Override + public MessageTree get(String messageId) { + // TODO Auto-generated method stub + return null; + } } diff --git a/cat-job/src/main/java/com/dianping/cat/job/hdfs/OutputChannel.java b/cat-job/src/main/java/com/dianping/cat/job/hdfs/OutputChannel.java index 465b33482cc015216616dbda3cac65111e079c0d..79b7f999bdf68457bfde5baea048b24f81faf59f 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/hdfs/OutputChannel.java +++ b/cat-job/src/main/java/com/dianping/cat/job/hdfs/OutputChannel.java @@ -33,4 +33,5 @@ public interface OutputChannel { * @throws IOException */ public boolean write(MessageTree tree) throws IOException; + } diff --git a/cat-job/src/test/java/com/dianping/cat/job/hdfs/HdfsMessageStorageTest.java b/cat-job/src/test/java/com/dianping/cat/job/hdfs/HdfsMessageStorageTest.java index fdd2fedfd04ec7feab5ac43e040818095d0ac86f..895cfee18123ce7978a9c1b7146e64afa1b8f5a9 100644 --- a/cat-job/src/test/java/com/dianping/cat/job/hdfs/HdfsMessageStorageTest.java +++ b/cat-job/src/test/java/com/dianping/cat/job/hdfs/HdfsMessageStorageTest.java @@ -1,52 +1,44 @@ package com.dianping.cat.job.hdfs; +import junit.framework.Assert; + import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import com.dianping.cat.job.hdfs.HdfsMessageStorage; -import com.dianping.cat.message.Message; -import com.dianping.cat.message.MessageProducer; import com.dianping.cat.message.Transaction; -import com.dianping.cat.message.io.InMemoryQueue; +import com.dianping.cat.message.internal.DefaultTransaction; +import com.dianping.cat.message.spi.MessageManager; import com.dianping.cat.message.spi.MessageStorage; import com.dianping.cat.message.spi.MessageTree; +import com.dianping.cat.message.spi.internal.DefaultMessageTree; @RunWith(JUnit4.class) public class HdfsMessageStorageTest extends CatTestCase { + private DefaultMessageTree newMessageTree(String id) { + DefaultMessageTree tree = new DefaultMessageTree(); + tree.setDomain("domain"); + tree.setHostName("hostName"); + tree.setIpAddress("ipAddress"); + tree.setMessageId(id); + tree.setParentMessageId("parentMessageId"); + tree.setRootMessageId("rootMessageId"); + tree.setSessionToken("sessionToken"); + tree.setThreadId("threadId"); + tree.setThreadName("threadName"); + return tree; + } + @Test public void test() throws Exception { MessageStorage storage = lookup(MessageStorage.class, "hdfs"); - MessageProducer producer = lookup(MessageProducer.class); - InMemoryQueue queue = lookup(InMemoryQueue.class); - - for (int i = 0; i < 10000; i++) { - Transaction t = producer.newTransaction("URL", "MyPage" + (int) (i / 500)); - - try { - // do your business here - t.addData("k1", "v1"); - t.addData("k2", "v2"); - t.addData("k3", "v3"); - - Thread.sleep(1); - - producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=..."); - producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=..."); - producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=..."); - producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=..."); - t.setStatus(Message.SUCCESS); - } catch (Exception e) { - t.setStatus(e); - } finally { - t.complete(); - } - - MessageTree tree = queue.poll(0); - - storage.store(tree); - } - + MessageManager manager = lookup(MessageManager.class); + MessageTree tree = newMessageTree("abcdef"); + Transaction t = new DefaultTransaction("t", "n", manager); + tree.setMessage(t); + storage.store(tree); + MessageTree actual = storage.get(tree.getMessageId()); + Assert.assertTrue(tree.equals(actual)); ((HdfsMessageStorage) storage).dispose(); } }