From 62883bb134acacfa9f14c8d50e59094d42b2de56 Mon Sep 17 00:00:00 2001 From: "sean.wang" Date: Mon, 26 Mar 2012 16:27:05 +0800 Subject: [PATCH] bugfix --- .../cat/job/build/ComponentsConfigurator.java | 2 +- .../cat/job/hdfs/HdfsMessageStorage.java | 16 +++++++++------- .../resources/META-INF/plexus/components.xml | 3 +++ 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/cat-job/src/main/java/com/dianping/cat/job/build/ComponentsConfigurator.java b/cat-job/src/main/java/com/dianping/cat/job/build/ComponentsConfigurator.java index 4c958f4b4..3e10ad848 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/build/ComponentsConfigurator.java +++ b/cat-job/src/main/java/com/dianping/cat/job/build/ComponentsConfigurator.java @@ -50,7 +50,7 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator { .config(E("baseDir").value("dump"), // E("serverUri").value(serverUri))); all.add(C(MessageStorage.class, "hdfs", HdfsMessageStorage.class) // - .req(OutputChannelManager.class, "dump")); + .req(OutputChannelManager.class, "dump").req(MessagePathBuilder.class)); all.add(C(MessageConsumer.class, DumpToHdfsConsumer.ID, DumpToHdfsConsumer.class) // .req(MessageStorage.class, "hdfs")); diff --git a/cat-job/src/main/java/com/dianping/cat/job/hdfs/HdfsMessageStorage.java b/cat-job/src/main/java/com/dianping/cat/job/hdfs/HdfsMessageStorage.java index dfa370926..1164dede9 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/hdfs/HdfsMessageStorage.java +++ b/cat-job/src/main/java/com/dianping/cat/job/hdfs/HdfsMessageStorage.java @@ -1,6 +1,8 @@ package com.dianping.cat.job.hdfs; import java.io.IOException; +import java.net.InetAddress; +import java.util.Date; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -87,7 +89,10 @@ public class HdfsMessageStorage implements MessageStorage, Initializable, Dispos private void handle(MessageTree tree) { try { - OutputChannel channel = m_manager.openChannel(tree, false); + String tmp = m_builder.getMessagePath(tree.getDomain(), new Date(tree.getMessage().getTimestamp())); + String ipAddress = InetAddress.getLocalHost().getHostAddress(); + String path = tmp + "-" + ipAddress; + OutputChannel channel = m_manager.openChannel(path, false); int length = channel.write(tree); if (length <= 0) { @@ -151,19 +156,16 @@ public class HdfsMessageStorage implements MessageStorage, Initializable, Dispos } public MessageTree get(String messageId) { - // TODO - return null; + throw new UnsupportedOperationException(); } @Override public MessageTree next(String messageId, String tag) { - // TODO Auto-generated method stub - return null; + throw new UnsupportedOperationException(); } @Override public MessageTree previous(String messageId, String tag) { - // TODO Auto-generated method stub - return null; + throw new UnsupportedOperationException(); } } diff --git a/cat-job/src/main/resources/META-INF/plexus/components.xml b/cat-job/src/main/resources/META-INF/plexus/components.xml index e6bff2c1e..6c1d437cc 100644 --- a/cat-job/src/main/resources/META-INF/plexus/components.xml +++ b/cat-job/src/main/resources/META-INF/plexus/components.xml @@ -69,6 +69,9 @@ com.dianping.cat.job.hdfs.OutputChannelManager dump + + com.dianping.cat.message.spi.MessagePathBuilder + -- GitLab