提交 8daf9f2d 编写于 作者: F Frankie Wu

abstract bucket implementation

上级 5b19e3e3
......@@ -91,7 +91,8 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(MessageHandler.class, DefaultMessageHandler.class) //
.req(MessageManager.class, MessageConsumerRegistry.class));
all.add(C(Bucket.class, DefaultBucket.class));
all.add(C(Bucket.class, String.class.getName(), DefaultBucket.class));
all.add(C(Bucket.class, byte[].class.getName(), DefaultBucket.class));
all.add(C(Bucket.class, MessageTree.class.getName(), DefaultMessageBucket.class) //
.req(MessageCodec.class, "plain-text"));
all.add(C(BucketManager.class, DefaultBucketManager.class));
......
package com.dianping.cat.storage;
import java.util.List;
import com.dianping.cat.message.spi.MessageTree;
/**
* Map to one HDFS directory for one report.
* <p>
*
* Sample tags: "thread:101", "session:abc", "request:xyz", "parent:xxx"
*/
public interface MessageBucket extends Bucket<MessageTree> {
public boolean storeById(String id, MessageTree value, String... tags);
public List<String> findAllIdsByTag(String tag);
public MessageTree findNextById(String id, Direction direction, String tag);
public static enum Direction {
FORWARD,
public interface MessageBucket extends Bucket<MessageTree>, TagThreadSupport<MessageTree> {
BACKWARD;
}
}
package com.dianping.cat.storage;
import java.util.List;
/**
* Map to one HDFS directory for one report.
* <p>
*
* Sample tags: "thread:101", "session:abc", "request:xyz", "parent:xxx"
*/
public interface TagThreadSupport<T> {
public boolean storeById(String id, T data, String... tags);
public List<String> findAllIdsByTag(String tag);
public T findNextById(String id, Direction direction, String tag);
public static enum Direction {
FORWARD,
BACKWARD;
}
}
package com.dianping.cat.storage.internal;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import com.dianping.cat.storage.Bucket;
import com.dianping.cat.storage.TagThreadSupport;
import com.site.helper.Joiners;
import com.site.helper.Splitters;
import com.site.lookup.annotation.Inject;
public abstract class AbstractBucket<T> implements Bucket<T>, TagThreadSupport<T>, LogEnabled {
private static final String[] EMPTY = new String[0];
@Inject
private String m_baseDir;
// key => offset of record
private Map<String, Long> m_idToOffsets = new HashMap<String, Long>();
// tag => list of ids
private Map<String, List<String>> m_tagToIds = new HashMap<String, List<String>>();
private File m_file;
private RandomAccessFile m_out;
private Logger m_logger;
@Override
public void close() {
try {
m_out.close();
m_idToOffsets.clear();
m_tagToIds.clear();
} catch (IOException e) {
// ignore it
}
}
protected abstract T decode(ChannelBuffer buf) throws IOException;
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
}
protected abstract void encode(T data, ChannelBuffer buf) throws IOException;
@Override
public List<T> findAllByIds(List<String> ids) {
List<T> list = new ArrayList<T>(ids.size());
for (String id : ids) {
list.add(findById(id));
}
return list;
}
@Override
public List<String> findAllIdsByTag(String tag) {
List<String> ids = m_tagToIds.get(tag);
if (ids == null) {
return Collections.emptyList();
} else {
return ids;
}
}
@Override
public T findById(String id) {
Long offset = m_idToOffsets.get(id);
if (offset != null) {
try {
long old = m_out.getFilePointer();
m_out.seek(offset);
m_out.readLine(); // first line is header, get rid of it
int num = Integer.parseInt(m_out.readLine());
byte[] bytes = new byte[num];
m_out.readFully(bytes);
ChannelBuffer buf = ChannelBuffers.wrappedBuffer(bytes);
T data = decode(buf);
m_out.seek(old);
return data;
} catch (Exception e) {
m_logger.error(String.format("Error when reading file(%s)!", m_file), e);
}
}
return null;
}
@Override
public T findNextById(String id, Direction direction, String tag) {
List<String> ids = m_tagToIds.get(tag);
if (ids != null) {
int index = ids.indexOf(id);
switch (direction) {
case FORWARD:
index++;
break;
case BACKWARD:
index--;
break;
}
if (index >= 0 && index < ids.size()) {
String nextId = ids.get(index);
return findById(nextId);
}
}
return null;
}
@Override
public void initialize(Class<?> type, String path) throws IOException {
m_file = new File(m_baseDir, path);
m_file.getParentFile().mkdirs();
m_out = new RandomAccessFile(m_file, "rw");
if (m_file.exists()) {
loadIndexes();
m_out.seek(m_out.length());
}
}
protected void loadIndexes() throws IOException {
byte[] data = new byte[8192];
while (true) {
long offset = m_out.getFilePointer();
String first = m_out.readLine();
if (first == null) { // EOF
break;
}
int num = Integer.parseInt(m_out.readLine());
if (num > data.length) {
int newSize = data.length;
while (newSize < num) {
newSize += newSize / 2;
}
data = new byte[newSize];
}
m_out.readFully(data, 0, num); // get rid of it
m_out.readLine(); // get rid of empty line
List<String> parts = Splitters.by('\t').split(first);
if (parts.size() > 0) {
String id = parts.get(0);
parts.remove(0);
updateIndex(id, parts.toArray(EMPTY), offset);
}
}
}
public void setBaseDir(String baseDir) {
m_baseDir = baseDir;
}
@Override
public boolean storeById(String id, T data) {
return storeById(id, data, EMPTY);
}
/**
* Store the message in the format of:<br>
*
* <xmp> <id>\t<tag1>\t<tag2>\t...\n <length of message>\n <message>\n </xmp>
*/
@Override
public boolean storeById(String id, T data, String... tags) {
try {
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8192);
encode(data, buf);
int length = buf.readInt();
String attributes = id + "\t" + Joiners.by('\t').join(tags) + "\n";
byte[] first = attributes.getBytes("utf-8");
byte[] num = String.valueOf(length).getBytes("utf-8");
long offset = m_out.getFilePointer();
m_out.write(first);
m_out.write(num);
m_out.write('\n');
m_out.write(buf.array(), buf.readerIndex(), length);
m_out.write('\n');
m_out.getChannel().force(true);
updateIndex(id, tags, offset);
return true;
} catch (Exception e) {
m_logger.error(String.format("Error when writing to file(%s)!", m_file), e);
return false;
}
}
protected void updateIndex(String id, String[] tags, long offset) {
m_idToOffsets.put(id, offset);
for (String tag : tags) {
List<String> ids = m_tagToIds.get(tag);
if (ids == null) {
ids = new ArrayList<String>();
m_tagToIds.put(tag, ids);
}
ids.add(id);
}
}
}
package com.dianping.cat.storage.internal;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.nio.charset.Charset;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
import com.dianping.cat.storage.Bucket;
import com.site.lookup.annotation.Inject;
public class DefaultBucket<T> implements Bucket<T>, LogEnabled {
@Inject
private MessageCodec m_codec;
@Inject
private String m_baseDir;
// key => offset of record
private Map<String, Long> m_idToOffsets = new HashMap<String, Long>();
private File m_file;
private RandomAccessFile m_out;
private Logger m_logger;
@Override
public void close() {
try {
m_out.close();
m_idToOffsets.clear();
} catch (IOException e) {
// ignore it
}
}
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
}
public class DefaultBucket<T> extends AbstractBucket<T> {
private Class<?> m_type;
@SuppressWarnings("unchecked")
@Override
public T findById(String id) {
Long offset = m_idToOffsets.get(id);
if (offset != null) {
try {
long old = m_out.getFilePointer();
m_out.seek(offset);
m_out.readLine(); // first line is header, get rid of it
int num = Integer.parseInt(m_out.readLine());
byte[] data = new byte[num];
m_out.readFully(data);
ChannelBuffer buf = ChannelBuffers.wrappedBuffer(data);
MessageTree tree = new DefaultMessageTree();
m_codec.decode(buf, tree);
m_out.seek(old);
return null;
} catch (Exception e) {
m_logger.error(String.format("Error when reading file(%s)!", m_file), e);
}
protected T decode(ChannelBuffer buf) throws IOException {
if (m_type == String.class) {
return (T) buf.toString(buf.readerIndex(), buf.readableBytes(), Charset.forName("utf-8"));
} else if (m_type == byte[].class) {
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
return (T) bytes;
} else {
throw new UnsupportedOperationException(String.format(
"Only String or byte[] are supported so far, but was %s.", m_type));
}
return null;
}
@Override
public List<T> findAllByIds(List<String> ids) {
List<T> list = new ArrayList<T>(ids.size());
for (String id : ids) {
list.add(findById(id));
protected void encode(T data, ChannelBuffer buf) throws IOException {
if (m_type == String.class) {
String str = (String) data;
byte[] bytes = str.getBytes("utf-8");
buf.writeInt(bytes.length);
buf.writeBytes(bytes);
} else if (m_type == byte[].class) {
byte[] bytes = (byte[]) data;
buf.writeInt(bytes.length);
buf.writeBytes(bytes);
} else {
throw new UnsupportedOperationException(String.format(
"Only String or byte[] are supported so far, but was %s.", m_type));
}
return list;
}
@Override
public void initialize(Class<?> type, String path) throws IOException {
m_file = new File(m_baseDir, path);
m_file.getParentFile().mkdirs();
m_out = new RandomAccessFile(m_file, "rw");
super.initialize(type, path);
if (m_file.exists()) {
loadIndexes();
m_out.seek(m_out.length());
}
}
protected void loadIndexes() throws IOException {
byte[] data = new byte[8192];
while (true) {
long offset = m_out.getFilePointer();
String first = m_out.readLine();
if (first == null) { // EOF
break;
}
int num = Integer.parseInt(m_out.readLine());
if (num > data.length) {
int newSize = data.length;
m_type = type;
while (newSize < num) {
newSize += newSize / 2;
}
data = new byte[newSize];
}
m_out.readFully(data, 0, num); // get rid of it
m_out.readLine(); // get rid of empty line
updateIndex(first, offset);
if (m_type != String.class && m_type != byte[].class) {
throw new UnsupportedOperationException(String.format(
"Only String or byte[] are supported so far, but was %s.", m_type));
}
}
public void setBaseDir(String baseDir) {
m_baseDir = baseDir;
}
/**
* Store the data in the format of:<br>
*
* <xmp>
* <id>\n
* <length of data>\n
* <data>\n
* </xmp>
*/
@Override
public boolean storeById(String id, T data) {
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8192);
int length = buf.readInt();
String attributes = id + "\n";
try {
byte[] first = attributes.getBytes("utf-8");
byte[] num = String.valueOf(length).getBytes("utf-8");
long offset = m_out.getFilePointer();
m_out.write(first);
m_out.write(num);
m_out.write('\n');
m_out.write(buf.array(), buf.readerIndex(), length);
m_out.write('\n');
m_out.getChannel().force(false);
updateIndex(id, offset);
return true;
} catch (Exception e) {
m_logger.error(String.format("Error when writing to file(%s)!", m_file), e);
return false;
}
}
protected void updateIndex(String id, long offset) {
m_idToOffsets.put(id, offset);
}
}
package com.dianping.cat.storage.internal;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
import com.dianping.cat.storage.MessageBucket;
import com.site.helper.Joiners;
import com.site.helper.Splitters;
import com.site.lookup.annotation.Inject;
public class DefaultMessageBucket implements MessageBucket, LogEnabled {
private static final String[] EMPTY = new String[0];
public class DefaultMessageBucket extends AbstractBucket<MessageTree> implements MessageBucket {
@Inject
private MessageCodec m_codec;
@Inject
private String m_baseDir;
// key => offset of record
private Map<String, Long> m_idToOffsets = new HashMap<String, Long>();
// tag => list of ids
private Map<String, List<String>> m_tagToIds = new HashMap<String, List<String>>();
private File m_file;
private RandomAccessFile m_out;
private Logger m_logger;
@Override
public void close() {
try {
m_out.close();
m_idToOffsets.clear();
m_tagToIds.clear();
} catch (IOException e) {
// ignore it
}
}
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
}
@Override
public List<MessageTree> findAllByIds(List<String> ids) {
List<MessageTree> list = new ArrayList<MessageTree>(ids.size());
for (String id : ids) {
list.add(findById(id));
}
return list;
}
@Override
public List<String> findAllIdsByTag(String tag) {
List<String> ids = m_tagToIds.get(tag);
if (ids == null) {
return Collections.emptyList();
} else {
return ids;
}
}
@Override
public MessageTree findById(String id) {
Long offset = m_idToOffsets.get(id);
if (offset != null) {
try {
long old = m_out.getFilePointer();
m_out.seek(offset);
m_out.readLine(); // first line is header, get rid of it
int num = Integer.parseInt(m_out.readLine());
byte[] data = new byte[num];
m_out.readFully(data);
ChannelBuffer buf = ChannelBuffers.wrappedBuffer(data);
MessageTree tree = new DefaultMessageTree();
m_codec.decode(buf, tree);
m_out.seek(old);
return tree;
} catch (Exception e) {
m_logger.error(String.format("Error when reading file(%s)!", m_file), e);
}
}
return null;
}
@Override
public MessageTree findNextById(String id, Direction direction, String tag) {
List<String> ids = m_tagToIds.get(tag);
if (ids != null) {
int index = ids.indexOf(id);
switch (direction) {
case FORWARD:
index++;
break;
case BACKWARD:
index--;
break;
}
protected MessageTree decode(ChannelBuffer buf) throws IOException {
MessageTree tree = new DefaultMessageTree();
if (index >= 0 && index < ids.size()) {
String nextId = ids.get(index);
return findById(nextId);
}
}
return null;
m_codec.decode(buf, tree);
return tree;
}
@Override
public void initialize(Class<?> type, String path) throws IOException {
m_file = new File(m_baseDir, path);
m_file.getParentFile().mkdirs();
m_out = new RandomAccessFile(m_file, "rw");
if (m_file.exists()) {
loadIndexes();
m_out.seek(m_out.length());
}
}
protected void loadIndexes() throws IOException {
byte[] data = new byte[8192];
while (true) {
long offset = m_out.getFilePointer();
String first = m_out.readLine();
if (first == null) { // EOF
break;
}
int num = Integer.parseInt(m_out.readLine());
if (num > data.length) {
int newSize = data.length;
while (newSize < num) {
newSize += newSize / 2;
}
data = new byte[newSize];
}
m_out.readFully(data, 0, num); // get rid of it
m_out.readLine(); // get rid of empty line
List<String> parts = Splitters.by('\t').split(first);
if (parts.size() > 0) {
String id = parts.get(0);
parts.remove(0);
updateIndex(id, parts.toArray(new String[0]), offset);
}
}
}
public void setBaseDir(String baseDir) {
m_baseDir = baseDir;
protected void encode(MessageTree tree, ChannelBuffer buf) throws IOException {
m_codec.encode(tree, buf);
}
public void setCodec(MessageCodec codec) {
m_codec = codec;
}
@Override
public boolean storeById(String id, MessageTree tree) {
return storeById(id, tree, EMPTY);
}
/**
* Store the message in the format of:<br>
*
* <xmp>
* <id>\t<tag1>\t<tag2>\t...\n
* <length of message>\n
* <message>\n
* </xmp>
*/
@Override
public boolean storeById(String id, MessageTree tree, String... tags) {
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8192);
m_codec.encode(tree, buf);
int length = buf.readInt();
String attributes = id + "\t" + Joiners.by('\t').join(tags) + "\n";
try {
byte[] first = attributes.getBytes("utf-8");
byte[] num = String.valueOf(length).getBytes("utf-8");
long offset = m_out.getFilePointer();
m_out.write(first);
m_out.write(num);
m_out.write('\n');
m_out.write(buf.array(), buf.readerIndex(), length);
m_out.write('\n');
m_out.getChannel().force(false);
updateIndex(id, tags, offset);
return true;
} catch (Exception e) {
m_logger.error(String.format("Error when writing to file(%s)!", m_file), e);
return false;
}
}
protected void updateIndex(String id, String[] tags, long offset) {
m_idToOffsets.put(id, offset);
for (String tag : tags) {
List<String> ids = m_tagToIds.get(tag);
if (ids == null) {
ids = new ArrayList<String>();
m_tagToIds.put(tag, ids);
}
ids.add(id);
}
}
}
......@@ -171,6 +171,12 @@
</component>
<component>
<role>com.dianping.cat.storage.Bucket</role>
<role-hint>java.lang.String</role-hint>
<implementation>com.dianping.cat.storage.internal.DefaultBucket</implementation>
</component>
<component>
<role>com.dianping.cat.storage.Bucket</role>
<role-hint>[B</role-hint>
<implementation>com.dianping.cat.storage.internal.DefaultBucket</implementation>
</component>
<component>
......
......@@ -8,7 +8,7 @@ import org.junit.runners.JUnit4;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
import com.dianping.cat.storage.MessageBucket.Direction;
import com.dianping.cat.storage.TagThreadSupport.Direction;
import com.site.lookup.ComponentTestCase;
@RunWith(JUnit4.class)
......@@ -28,6 +28,40 @@ public class BucketTest extends ComponentTestCase {
return tree;
}
@Test
public void testBytesBucket() throws Exception {
BucketManager manager = lookup(BucketManager.class);
Bucket<byte[]> bucket = manager.getBucket(byte[].class, "target/bucket/bytes");
// store it and load it
for (int i = 0; i < 100; i++) {
String id = "id" + i;
String t1 = "value" + i;
boolean success = bucket.storeById(id, t1.getBytes());
if (success) {
String t2 = new String(bucket.findById(id));
Assert.assertEquals("Unable to find data after stored it.", t1, t2);
} else {
Assert.fail("Data failed to store at i=" + i + ".");
}
}
// close and reload it, check if everything is okay
bucket.close();
bucket.initialize(byte[].class, "target/bucket/bytes");
// store it and load it
for (int i = 0; i < 100; i++) {
String id = "id" + i;
String t1 = "value" + i;
String t2 = new String(bucket.findById(id));
Assert.assertEquals("Unable to find data by id.", t1, t2);
}
}
@Test
public void testMessageBucket() throws Exception {
BucketManager manager = lookup(BucketManager.class);
......@@ -75,4 +109,38 @@ public class BucketTest extends ComponentTestCase {
Assert.assertEquals("Unable to find next message in the thread " + i + ".", t1.toString(), t2.toString());
}
}
@Test
public void testStringBucket() throws Exception {
BucketManager manager = lookup(BucketManager.class);
Bucket<String> bucket = manager.getBucket(String.class, "target/bucket/data");
// store it and load it
for (int i = 0; i < 100; i++) {
String id = "id" + i;
String t1 = "value" + i;
boolean success = bucket.storeById(id, t1);
if (success) {
String t2 = bucket.findById(id);
Assert.assertEquals("Unable to find data after stored it.", t1, t2);
} else {
Assert.fail("Data failed to store at i=" + i + ".");
}
}
// close and reload it, check if everything is okay
bucket.close();
bucket.initialize(String.class, "target/bucket/data");
// store it and load it
for (int i = 0; i < 100; i++) {
String id = "id" + i;
String t1 = "value" + i;
String t2 = bucket.findById(id);
Assert.assertEquals("Unable to find data by id.", t1, t2);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册