From 3829e25aff64c00465beb76333a008928e8fc5f1 Mon Sep 17 00:00:00 2001 From: Frankie Wu Date: Mon, 12 Mar 2012 13:36:52 +0800 Subject: [PATCH] add test case for HDFS bucket --- .../dal/model/problem-report-model.xml | 2 +- .../cat/build/ComponentsConfigurator.java | 7 ++- .../dianping/cat/storage/BucketManager.java | 2 + .../internal/DefaultBucketManager.java | 38 +++++++++++----- .../resources/META-INF/plexus/components.xml | 3 ++ cat-job/src/main/java/META-INF/MANIFEST.MF | 3 -- .../cat/job/build/ComponentsConfigurator.java | 11 +++-- .../com/dianping/cat/job/hdfs/HdfsBucket.java | 1 - .../cat/job/hdfs/HdfsBucketManager.java | 14 ------ .../resources/META-INF/plexus/components.xml | 6 +++ cat-job/src/test/java/META-INF/MANIFEST.MF | 3 -- .../cat/storage/hdfs/HdfsBucketTest.java | 44 +++++++++++++++++++ 12 files changed, 97 insertions(+), 37 deletions(-) delete mode 100644 cat-job/src/main/java/META-INF/MANIFEST.MF delete mode 100644 cat-job/src/main/java/com/dianping/cat/job/hdfs/HdfsBucketManager.java delete mode 100644 cat-job/src/test/java/META-INF/MANIFEST.MF create mode 100644 cat-job/src/test/java/com/dianping/cat/storage/hdfs/HdfsBucketTest.java diff --git a/cat-consumer/src/main/resources/META-INF/dal/model/problem-report-model.xml b/cat-consumer/src/main/resources/META-INF/dal/model/problem-report-model.xml index be19b9b74..67792ff72 100644 --- a/cat-consumer/src/main/resources/META-INF/dal/model/problem-report-model.xml +++ b/cat-consumer/src/main/resources/META-INF/dal/model/problem-report-model.xml @@ -1,6 +1,6 @@ + enable-xml-parser="true" enable-base-visitor="true" enable-model-test="false"> diff --git a/cat-core/src/main/java/com/dianping/cat/build/ComponentsConfigurator.java b/cat-core/src/main/java/com/dianping/cat/build/ComponentsConfigurator.java index 55a2988e8..0e96fb115 100644 --- a/cat-core/src/main/java/com/dianping/cat/build/ComponentsConfigurator.java +++ b/cat-core/src/main/java/com/dianping/cat/build/ComponentsConfigurator.java @@ -92,9 +92,12 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator { all.add(C(MessageHandler.class, DefaultMessageHandler.class) // .req(MessageManager.class, MessageConsumerRegistry.class)); - all.add(C(Bucket.class, String.class.getName(), DefaultBucket.class)); - all.add(C(Bucket.class, byte[].class.getName(), DefaultBucket.class)); + all.add(C(Bucket.class, String.class.getName(), DefaultBucket.class) // + .is(PER_LOOKUP)); + all.add(C(Bucket.class, byte[].class.getName(), DefaultBucket.class) // + .is(PER_LOOKUP)); all.add(C(Bucket.class, MessageTree.class.getName(), DefaultMessageBucket.class) // + .is(PER_LOOKUP) // .req(MessageCodec.class, "plain-text")); all.add(C(BucketManager.class, DefaultBucketManager.class) // .config(E("baseDir").value("target/bucket/"))); diff --git a/cat-core/src/main/java/com/dianping/cat/storage/BucketManager.java b/cat-core/src/main/java/com/dianping/cat/storage/BucketManager.java index 1d787d966..faaeff322 100644 --- a/cat-core/src/main/java/com/dianping/cat/storage/BucketManager.java +++ b/cat-core/src/main/java/com/dianping/cat/storage/BucketManager.java @@ -12,4 +12,6 @@ public interface BucketManager { public Bucket getStringBucket(String path) throws IOException; public Bucket getBytesBucket(String path) throws IOException; + + public Bucket getHdfsBucket(String path) throws IOException; } diff --git a/cat-core/src/main/java/com/dianping/cat/storage/internal/DefaultBucketManager.java b/cat-core/src/main/java/com/dianping/cat/storage/internal/DefaultBucketManager.java index cfa5519d2..4a36ff6c3 100644 --- a/cat-core/src/main/java/com/dianping/cat/storage/internal/DefaultBucketManager.java +++ b/cat-core/src/main/java/com/dianping/cat/storage/internal/DefaultBucketManager.java @@ -19,8 +19,14 @@ public class DefaultBucketManager extends ContainerHolder implements BucketManag private Map> m_map = new HashMap>(); - protected Bucket createBucket(String path, Class type) throws IOException { - Bucket bucket = lookup(Bucket.class, type.getName()); + protected Bucket createBucket(String path, Class type, String namespace) throws IOException { + Bucket bucket; + + if (namespace.equals("hdfs")) { + bucket = lookup(Bucket.class, "hdfs"); + } else { + bucket = lookup(Bucket.class, type.getName()); + } bucket.initialize(type, new File(m_baseDir), path); return bucket; @@ -34,12 +40,12 @@ public class DefaultBucketManager extends ContainerHolder implements BucketManag } @SuppressWarnings("unchecked") - protected Bucket getBucket(Class type, String path) throws IOException { + protected Bucket getBucket(Class type, String path, String namespace) throws IOException { if (type == null || path == null) { throw new IllegalArgumentException(String.format("Type(%s) or path(%s) can't be null.", type, path)); } - Entry entry = new Entry(type, path); + Entry entry = new Entry(type, path, namespace); Bucket bucket = m_map.get(entry); if (bucket == null) { @@ -47,7 +53,7 @@ public class DefaultBucketManager extends ContainerHolder implements BucketManag bucket = m_map.get(entry); if (bucket == null) { - bucket = createBucket(path, type); + bucket = createBucket(path, type, namespace); m_map.put(entry, bucket); } } @@ -58,17 +64,22 @@ public class DefaultBucketManager extends ContainerHolder implements BucketManag @Override public Bucket getBytesBucket(String path) throws IOException { - return getBucket(byte[].class, path); + return getBucket(byte[].class, path, "file"); + } + + @Override + public Bucket getHdfsBucket(String path) throws IOException { + return getBucket(byte[].class, path, "hdfs"); } @Override public Bucket getMessageBucket(String path) throws IOException { - return getBucket(MessageTree.class, path); + return getBucket(MessageTree.class, path, "file"); } @Override public Bucket getStringBucket(String path) throws IOException { - return getBucket(String.class, path); + return getBucket(String.class, path, "file"); } public void setBaseDir(String baseDir) { @@ -80,9 +91,12 @@ public class DefaultBucketManager extends ContainerHolder implements BucketManag private String m_path; - public Entry(Class type, String path) { + private String m_namespace; + + public Entry(Class type, String path, String namespace) { m_type = type; m_path = path; + m_namespace = namespace; } @Override @@ -90,7 +104,7 @@ public class DefaultBucketManager extends ContainerHolder implements BucketManag if (obj instanceof Entry) { Entry e = (Entry) obj; - return e.getClass() == m_type && e.getPath().equals(m_path); + return e.getClass() == m_type && e.getPath().equals(m_path) && e.getNamespace().equals(m_namespace); } return false; @@ -104,6 +118,10 @@ public class DefaultBucketManager extends ContainerHolder implements BucketManag return m_type; } + public String getNamespace() { + return m_namespace; + } + @Override public int hashCode() { int hashcode = m_type.hashCode(); diff --git a/cat-core/src/main/resources/META-INF/plexus/components.xml b/cat-core/src/main/resources/META-INF/plexus/components.xml index 524267a70..4548c9206 100644 --- a/cat-core/src/main/resources/META-INF/plexus/components.xml +++ b/cat-core/src/main/resources/META-INF/plexus/components.xml @@ -183,16 +183,19 @@ com.dianping.cat.storage.Bucket java.lang.String com.dianping.cat.storage.internal.DefaultBucket + per-lookup com.dianping.cat.storage.Bucket [B com.dianping.cat.storage.internal.DefaultBucket + per-lookup com.dianping.cat.storage.Bucket com.dianping.cat.message.spi.MessageTree com.dianping.cat.storage.internal.DefaultMessageBucket + per-lookup com.dianping.cat.message.spi.MessageCodec diff --git a/cat-job/src/main/java/META-INF/MANIFEST.MF b/cat-job/src/main/java/META-INF/MANIFEST.MF deleted file mode 100644 index 254272e1c..000000000 --- a/cat-job/src/main/java/META-INF/MANIFEST.MF +++ /dev/null @@ -1,3 +0,0 @@ -Manifest-Version: 1.0 -Class-Path: - diff --git a/cat-job/src/main/java/com/dianping/cat/job/build/ComponentsConfigurator.java b/cat-job/src/main/java/com/dianping/cat/job/build/ComponentsConfigurator.java index 17eec88ff..1ac995bbc 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/build/ComponentsConfigurator.java +++ b/cat-job/src/main/java/com/dianping/cat/job/build/ComponentsConfigurator.java @@ -4,15 +4,17 @@ import java.util.ArrayList; import java.util.List; import com.dianping.cat.job.HdfsDumpConsumer; -import com.dianping.cat.job.hdfs.OutputChannelManager; -import com.dianping.cat.job.hdfs.DefaultOutputChannelManager; import com.dianping.cat.job.hdfs.DefaultOutputChannel; +import com.dianping.cat.job.hdfs.DefaultOutputChannelManager; +import com.dianping.cat.job.hdfs.HdfsBucket; import com.dianping.cat.job.hdfs.HdfsMessageStorage; import com.dianping.cat.job.hdfs.OutputChannel; +import com.dianping.cat.job.hdfs.OutputChannelManager; import com.dianping.cat.message.spi.MessageCodec; import com.dianping.cat.message.spi.MessageConsumer; import com.dianping.cat.message.spi.MessagePathBuilder; import com.dianping.cat.message.spi.MessageStorage; +import com.dianping.cat.storage.Bucket; import com.site.lookup.configuration.AbstractResourceConfigurator; import com.site.lookup.configuration.Component; @@ -37,13 +39,16 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator { E("serverUri").value("/catlog"))); } + all.add(C(Bucket.class, "hdfs", HdfsBucket.class) // + .is(PER_LOOKUP)); + all.add(C(MessageStorage.class, "hdfs", HdfsMessageStorage.class) // .req(OutputChannelManager.class)); all.add(C(MessageConsumer.class, HdfsDumpConsumer.ID, HdfsDumpConsumer.class) // .req(MessageStorage.class, "hdfs")); all.addAll(new DatabaseConfigurator().defineComponents()); - + return all; } diff --git a/cat-job/src/main/java/com/dianping/cat/job/hdfs/HdfsBucket.java b/cat-job/src/main/java/com/dianping/cat/job/hdfs/HdfsBucket.java index c7559f47e..d1598ebe7 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/hdfs/HdfsBucket.java +++ b/cat-job/src/main/java/com/dianping/cat/job/hdfs/HdfsBucket.java @@ -16,7 +16,6 @@ import com.dianping.tkv.hdfs.HdfsImpl; * @since Mar 9, 2012 */ public class HdfsBucket implements Bucket { - private HdfsImpl hdfs; private int keyLength = 32; diff --git a/cat-job/src/main/java/com/dianping/cat/job/hdfs/HdfsBucketManager.java b/cat-job/src/main/java/com/dianping/cat/job/hdfs/HdfsBucketManager.java deleted file mode 100644 index 11b0133d2..000000000 --- a/cat-job/src/main/java/com/dianping/cat/job/hdfs/HdfsBucketManager.java +++ /dev/null @@ -1,14 +0,0 @@ -/** - * - */ -package com.dianping.cat.job.hdfs; - -import com.dianping.cat.storage.internal.DefaultBucketManager; - -/** - * @author sean.wang - * @since Mar 9, 2012 - */ -public class HdfsBucketManager extends DefaultBucketManager { - -} diff --git a/cat-job/src/main/resources/META-INF/plexus/components.xml b/cat-job/src/main/resources/META-INF/plexus/components.xml index f2ef483e9..268dd121b 100644 --- a/cat-job/src/main/resources/META-INF/plexus/components.xml +++ b/cat-job/src/main/resources/META-INF/plexus/components.xml @@ -23,6 +23,12 @@ + + com.dianping.cat.storage.Bucket + hdfs + com.dianping.cat.job.hdfs.HdfsBucket + per-lookup + com.dianping.cat.message.spi.MessageStorage hdfs diff --git a/cat-job/src/test/java/META-INF/MANIFEST.MF b/cat-job/src/test/java/META-INF/MANIFEST.MF deleted file mode 100644 index 254272e1c..000000000 --- a/cat-job/src/test/java/META-INF/MANIFEST.MF +++ /dev/null @@ -1,3 +0,0 @@ -Manifest-Version: 1.0 -Class-Path: - diff --git a/cat-job/src/test/java/com/dianping/cat/storage/hdfs/HdfsBucketTest.java b/cat-job/src/test/java/com/dianping/cat/storage/hdfs/HdfsBucketTest.java new file mode 100644 index 000000000..8e4247eae --- /dev/null +++ b/cat-job/src/test/java/com/dianping/cat/storage/hdfs/HdfsBucketTest.java @@ -0,0 +1,44 @@ +package com.dianping.cat.storage.hdfs; + +import junit.framework.Assert; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import com.dianping.cat.storage.Bucket; +import com.dianping.cat.storage.BucketManager; +import com.dianping.cat.storage.TagThreadSupport.Direction; +import com.site.lookup.ComponentTestCase; + +@RunWith(JUnit4.class) +public class HdfsBucketTest extends ComponentTestCase { + @Test + public void testLookup() throws Exception { + BucketManager manager = lookup(BucketManager.class); + Bucket bucket = manager.getHdfsBucket("/a/b/c"); + + bucket.storeById("a", "It's A.".getBytes(), "t1"); + bucket.storeById("b", "It's B.".getBytes(), "t1"); + + Assert.assertEquals("It's A.", new String(bucket.findById("a"))); + Assert.assertEquals("It's B.", new String(bucket.findById("b"))); + Assert.assertEquals("It's B.", new String(bucket.findNextById("a", Direction.BACKWARD, "t1"))); + + bucket.close(); + + bucket = manager.getHdfsBucket("/a/b/c"); + + Assert.assertEquals("It's A.", new String(bucket.findById("a"))); + Assert.assertEquals("It's B.", new String(bucket.findById("b"))); + Assert.assertEquals("It's B.", new String(bucket.findNextById("a", Direction.BACKWARD, "t1"))); + + bucket.deleteAndCreate(); + + bucket = manager.getHdfsBucket("/a/b/c"); + + Assert.assertEquals(null, bucket.findById("a")); + Assert.assertEquals(null, bucket.findById("b")); + + } +} -- GitLab