提交 3829e25a 编写于 作者: F Frankie Wu

add test case for HDFS bucket

上级 e16aee58
<?xml version="1.0" encoding="UTF-8"?>
<model model-package="com.dianping.cat.consumer.problem.model" enable-merger="true" enable-json-builder="true"
enable-xml-parser="true" enable-base-visitor="true" enable-model-test="true">
enable-xml-parser="true" enable-base-visitor="true" enable-model-test="false">
<entity name="problem-report" root="true">
<attribute name="domain" value-type="String" key="true" />
<entity-ref name="all-domains" />
......
......@@ -92,9 +92,12 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(MessageHandler.class, DefaultMessageHandler.class) //
.req(MessageManager.class, MessageConsumerRegistry.class));
all.add(C(Bucket.class, String.class.getName(), DefaultBucket.class));
all.add(C(Bucket.class, byte[].class.getName(), DefaultBucket.class));
all.add(C(Bucket.class, String.class.getName(), DefaultBucket.class) //
.is(PER_LOOKUP));
all.add(C(Bucket.class, byte[].class.getName(), DefaultBucket.class) //
.is(PER_LOOKUP));
all.add(C(Bucket.class, MessageTree.class.getName(), DefaultMessageBucket.class) //
.is(PER_LOOKUP) //
.req(MessageCodec.class, "plain-text"));
all.add(C(BucketManager.class, DefaultBucketManager.class) //
.config(E("baseDir").value("target/bucket/")));
......
......@@ -12,4 +12,6 @@ public interface BucketManager {
public Bucket<String> getStringBucket(String path) throws IOException;
public Bucket<byte[]> getBytesBucket(String path) throws IOException;
public Bucket<byte[]> getHdfsBucket(String path) throws IOException;
}
......@@ -19,8 +19,14 @@ public class DefaultBucketManager extends ContainerHolder implements BucketManag
private Map<Entry, Bucket<?>> m_map = new HashMap<Entry, Bucket<?>>();
protected Bucket<?> createBucket(String path, Class<?> type) throws IOException {
Bucket<?> bucket = lookup(Bucket.class, type.getName());
protected Bucket<?> createBucket(String path, Class<?> type, String namespace) throws IOException {
Bucket<?> bucket;
if (namespace.equals("hdfs")) {
bucket = lookup(Bucket.class, "hdfs");
} else {
bucket = lookup(Bucket.class, type.getName());
}
bucket.initialize(type, new File(m_baseDir), path);
return bucket;
......@@ -34,12 +40,12 @@ public class DefaultBucketManager extends ContainerHolder implements BucketManag
}
@SuppressWarnings("unchecked")
protected <T> Bucket<T> getBucket(Class<T> type, String path) throws IOException {
protected <T> Bucket<T> getBucket(Class<T> type, String path, String namespace) throws IOException {
if (type == null || path == null) {
throw new IllegalArgumentException(String.format("Type(%s) or path(%s) can't be null.", type, path));
}
Entry entry = new Entry(type, path);
Entry entry = new Entry(type, path, namespace);
Bucket<?> bucket = m_map.get(entry);
if (bucket == null) {
......@@ -47,7 +53,7 @@ public class DefaultBucketManager extends ContainerHolder implements BucketManag
bucket = m_map.get(entry);
if (bucket == null) {
bucket = createBucket(path, type);
bucket = createBucket(path, type, namespace);
m_map.put(entry, bucket);
}
}
......@@ -58,17 +64,22 @@ public class DefaultBucketManager extends ContainerHolder implements BucketManag
@Override
public Bucket<byte[]> getBytesBucket(String path) throws IOException {
return getBucket(byte[].class, path);
return getBucket(byte[].class, path, "file");
}
@Override
public Bucket<byte[]> getHdfsBucket(String path) throws IOException {
return getBucket(byte[].class, path, "hdfs");
}
@Override
public Bucket<MessageTree> getMessageBucket(String path) throws IOException {
return getBucket(MessageTree.class, path);
return getBucket(MessageTree.class, path, "file");
}
@Override
public Bucket<String> getStringBucket(String path) throws IOException {
return getBucket(String.class, path);
return getBucket(String.class, path, "file");
}
public void setBaseDir(String baseDir) {
......@@ -80,9 +91,12 @@ public class DefaultBucketManager extends ContainerHolder implements BucketManag
private String m_path;
public Entry(Class<?> type, String path) {
private String m_namespace;
public Entry(Class<?> type, String path, String namespace) {
m_type = type;
m_path = path;
m_namespace = namespace;
}
@Override
......@@ -90,7 +104,7 @@ public class DefaultBucketManager extends ContainerHolder implements BucketManag
if (obj instanceof Entry) {
Entry e = (Entry) obj;
return e.getClass() == m_type && e.getPath().equals(m_path);
return e.getClass() == m_type && e.getPath().equals(m_path) && e.getNamespace().equals(m_namespace);
}
return false;
......@@ -104,6 +118,10 @@ public class DefaultBucketManager extends ContainerHolder implements BucketManag
return m_type;
}
public String getNamespace() {
return m_namespace;
}
@Override
public int hashCode() {
int hashcode = m_type.hashCode();
......
......@@ -183,16 +183,19 @@
<role>com.dianping.cat.storage.Bucket</role>
<role-hint>java.lang.String</role-hint>
<implementation>com.dianping.cat.storage.internal.DefaultBucket</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
</component>
<component>
<role>com.dianping.cat.storage.Bucket</role>
<role-hint>[B</role-hint>
<implementation>com.dianping.cat.storage.internal.DefaultBucket</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
</component>
<component>
<role>com.dianping.cat.storage.Bucket</role>
<role-hint>com.dianping.cat.message.spi.MessageTree</role-hint>
<implementation>com.dianping.cat.storage.internal.DefaultMessageBucket</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessageCodec</role>
......
......@@ -4,15 +4,17 @@ import java.util.ArrayList;
import java.util.List;
import com.dianping.cat.job.HdfsDumpConsumer;
import com.dianping.cat.job.hdfs.OutputChannelManager;
import com.dianping.cat.job.hdfs.DefaultOutputChannelManager;
import com.dianping.cat.job.hdfs.DefaultOutputChannel;
import com.dianping.cat.job.hdfs.DefaultOutputChannelManager;
import com.dianping.cat.job.hdfs.HdfsBucket;
import com.dianping.cat.job.hdfs.HdfsMessageStorage;
import com.dianping.cat.job.hdfs.OutputChannel;
import com.dianping.cat.job.hdfs.OutputChannelManager;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageStorage;
import com.dianping.cat.storage.Bucket;
import com.site.lookup.configuration.AbstractResourceConfigurator;
import com.site.lookup.configuration.Component;
......@@ -37,13 +39,16 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
E("serverUri").value("/catlog")));
}
all.add(C(Bucket.class, "hdfs", HdfsBucket.class) //
.is(PER_LOOKUP));
all.add(C(MessageStorage.class, "hdfs", HdfsMessageStorage.class) //
.req(OutputChannelManager.class));
all.add(C(MessageConsumer.class, HdfsDumpConsumer.ID, HdfsDumpConsumer.class) //
.req(MessageStorage.class, "hdfs"));
all.addAll(new DatabaseConfigurator().defineComponents());
return all;
}
......
......@@ -16,7 +16,6 @@ import com.dianping.tkv.hdfs.HdfsImpl;
* @since Mar 9, 2012
*/
public class HdfsBucket implements Bucket<byte[]> {
private HdfsImpl hdfs;
private int keyLength = 32;
......
/**
*
*/
package com.dianping.cat.job.hdfs;
import com.dianping.cat.storage.internal.DefaultBucketManager;
/**
* @author sean.wang
* @since Mar 9, 2012
*/
public class HdfsBucketManager extends DefaultBucketManager {
}
......@@ -23,6 +23,12 @@
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.storage.Bucket</role>
<role-hint>hdfs</role-hint>
<implementation>com.dianping.cat.job.hdfs.HdfsBucket</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageStorage</role>
<role-hint>hdfs</role-hint>
......
package com.dianping.cat.storage.hdfs;
import junit.framework.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import com.dianping.cat.storage.Bucket;
import com.dianping.cat.storage.BucketManager;
import com.dianping.cat.storage.TagThreadSupport.Direction;
import com.site.lookup.ComponentTestCase;
@RunWith(JUnit4.class)
public class HdfsBucketTest extends ComponentTestCase {
@Test
public void testLookup() throws Exception {
BucketManager manager = lookup(BucketManager.class);
Bucket<byte[]> bucket = manager.getHdfsBucket("/a/b/c");
bucket.storeById("a", "It's A.".getBytes(), "t1");
bucket.storeById("b", "It's B.".getBytes(), "t1");
Assert.assertEquals("It's A.", new String(bucket.findById("a")));
Assert.assertEquals("It's B.", new String(bucket.findById("b")));
Assert.assertEquals("It's B.", new String(bucket.findNextById("a", Direction.BACKWARD, "t1")));
bucket.close();
bucket = manager.getHdfsBucket("/a/b/c");
Assert.assertEquals("It's A.", new String(bucket.findById("a")));
Assert.assertEquals("It's B.", new String(bucket.findById("b")));
Assert.assertEquals("It's B.", new String(bucket.findNextById("a", Direction.BACKWARD, "t1")));
bucket.deleteAndCreate();
bucket = manager.getHdfsBucket("/a/b/c");
Assert.assertEquals(null, bucket.findById("a"));
Assert.assertEquals(null, bucket.findById("b"));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册