diff --git a/cat-job/src/main/java/com/dianping/cat/job/build/ComponentsConfigurator.java b/cat-job/src/main/java/com/dianping/cat/job/build/ComponentsConfigurator.java index 4c958f4b44f917d5e69c64e849c02194da5c7344..3e10ad848a0fccbe5cf071cc48a26c1d0370f31e 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/build/ComponentsConfigurator.java +++ b/cat-job/src/main/java/com/dianping/cat/job/build/ComponentsConfigurator.java @@ -50,7 +50,7 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator { .config(E("baseDir").value("dump"), // E("serverUri").value(serverUri))); all.add(C(MessageStorage.class, "hdfs", HdfsMessageStorage.class) // - .req(OutputChannelManager.class, "dump")); + .req(OutputChannelManager.class, "dump").req(MessagePathBuilder.class)); all.add(C(MessageConsumer.class, DumpToHdfsConsumer.ID, DumpToHdfsConsumer.class) // .req(MessageStorage.class, "hdfs")); diff --git a/cat-job/src/main/java/com/dianping/cat/job/hdfs/HdfsMessageStorage.java b/cat-job/src/main/java/com/dianping/cat/job/hdfs/HdfsMessageStorage.java index dfa370926051471fae24798885021dab03f9b504..1164dede9ccfa438f48b4fcf6aefd777158a911b 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/hdfs/HdfsMessageStorage.java +++ b/cat-job/src/main/java/com/dianping/cat/job/hdfs/HdfsMessageStorage.java @@ -1,6 +1,8 @@ package com.dianping.cat.job.hdfs; import java.io.IOException; +import java.net.InetAddress; +import java.util.Date; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -87,7 +89,10 @@ public class HdfsMessageStorage implements MessageStorage, Initializable, Dispos private void handle(MessageTree tree) { try { - OutputChannel channel = m_manager.openChannel(tree, false); + String tmp = m_builder.getMessagePath(tree.getDomain(), new Date(tree.getMessage().getTimestamp())); + String ipAddress = InetAddress.getLocalHost().getHostAddress(); + String path = tmp + "-" + ipAddress; + OutputChannel channel = m_manager.openChannel(path, false); int length = channel.write(tree); if (length <= 0) { @@ -151,19 +156,16 @@ public class HdfsMessageStorage implements MessageStorage, Initializable, Dispos } public MessageTree get(String messageId) { - // TODO - return null; + throw new UnsupportedOperationException(); } @Override public MessageTree next(String messageId, String tag) { - // TODO Auto-generated method stub - return null; + throw new UnsupportedOperationException(); } @Override public MessageTree previous(String messageId, String tag) { - // TODO Auto-generated method stub - return null; + throw new UnsupportedOperationException(); } } diff --git a/cat-job/src/main/resources/META-INF/plexus/components.xml b/cat-job/src/main/resources/META-INF/plexus/components.xml index e6bff2c1e12e50eb7f09d493a9929c6279c91ee0..6c1d437cc5646d8c05626382a8e851af0771221f 100644 --- a/cat-job/src/main/resources/META-INF/plexus/components.xml +++ b/cat-job/src/main/resources/META-INF/plexus/components.xml @@ -69,6 +69,9 @@ com.dianping.cat.job.hdfs.OutputChannelManager dump + + com.dianping.cat.message.spi.MessagePathBuilder +