提交 53c758a1 编写于 作者: S sean.wang

dev...

上级 5b8a62c1
package com.dianping.cat.message.spi;
/**
* @author franke.wu
* @author sean.wang
* @since Mar 6, 2012
*/
public interface MessageStorage {
/**
* Store a message tree to the storage.
......@@ -9,4 +14,12 @@ public interface MessageStorage {
* @return relative path to base directory or base URL
*/
public String store(MessageTree tree);
/**
* Fetch a message tree from the store.
*
* @param messageId
* @return
*/
public MessageTree get(String messageId);
}
......@@ -132,4 +132,10 @@ public class DefaultMessageStorage implements MessageStorage, Initializable, Dis
m_active = false;
}
}
@Override
public MessageTree get(String messageId) {
// TODO Auto-generated method stub
return null;
}
}
......@@ -80,4 +80,5 @@ public class DefaultOutputChannel implements OutputChannel {
return true;
}
}
......@@ -138,4 +138,10 @@ public class HdfsMessageStorage implements MessageStorage, Initializable, Dispos
}
}
}
@Override
public MessageTree get(String messageId) {
// TODO Auto-generated method stub
return null;
}
}
......@@ -33,4 +33,5 @@ public interface OutputChannel {
* @throws IOException
*/
public boolean write(MessageTree tree) throws IOException;
}
package com.dianping.cat.job.hdfs;
import junit.framework.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import com.dianping.cat.job.hdfs.HdfsMessageStorage;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.io.InMemoryQueue;
import com.dianping.cat.message.internal.DefaultTransaction;
import com.dianping.cat.message.spi.MessageManager;
import com.dianping.cat.message.spi.MessageStorage;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
@RunWith(JUnit4.class)
public class HdfsMessageStorageTest extends CatTestCase {
private DefaultMessageTree newMessageTree(String id) {
DefaultMessageTree tree = new DefaultMessageTree();
tree.setDomain("domain");
tree.setHostName("hostName");
tree.setIpAddress("ipAddress");
tree.setMessageId(id);
tree.setParentMessageId("parentMessageId");
tree.setRootMessageId("rootMessageId");
tree.setSessionToken("sessionToken");
tree.setThreadId("threadId");
tree.setThreadName("threadName");
return tree;
}
@Test
public void test() throws Exception {
MessageStorage storage = lookup(MessageStorage.class, "hdfs");
MessageProducer producer = lookup(MessageProducer.class);
InMemoryQueue queue = lookup(InMemoryQueue.class);
for (int i = 0; i < 10000; i++) {
Transaction t = producer.newTransaction("URL", "MyPage" + (int) (i / 500));
try {
// do your business here
t.addData("k1", "v1");
t.addData("k2", "v2");
t.addData("k3", "v3");
Thread.sleep(1);
producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=...");
producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=...");
producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=...");
producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=...");
t.setStatus(Message.SUCCESS);
} catch (Exception e) {
t.setStatus(e);
} finally {
t.complete();
}
MessageTree tree = queue.poll(0);
MessageManager manager = lookup(MessageManager.class);
MessageTree tree = newMessageTree("abcdef");
Transaction t = new DefaultTransaction("t", "n", manager);
tree.setMessage(t);
storage.store(tree);
}
MessageTree actual = storage.get(tree.getMessageId());
Assert.assertTrue(tree.equals(actual));
((HdfsMessageStorage) storage).dispose();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册