From 8daf9f2df8d649e00141a67b8dcf9e8125ac23bb Mon Sep 17 00:00:00 2001 From: Frankie Wu Date: Tue, 21 Feb 2012 21:23:54 +0800 Subject: [PATCH] abstract bucket implementation --- .../cat/build/ComponentsConfigurator.java | 3 +- .../dianping/cat/storage/MessageBucket.java | 20 +- .../cat/storage/TagThreadSupport.java | 23 ++ .../cat/storage/internal/AbstractBucket.java | 246 ++++++++++++++++++ .../cat/storage/internal/DefaultBucket.java | 196 +++----------- .../internal/DefaultMessageBucket.java | 237 +---------------- .../resources/META-INF/plexus/components.xml | 6 + .../com/dianping/cat/storage/BucketTest.java | 70 ++++- 8 files changed, 390 insertions(+), 411 deletions(-) create mode 100644 cat-core/src/main/java/com/dianping/cat/storage/TagThreadSupport.java create mode 100644 cat-core/src/main/java/com/dianping/cat/storage/internal/AbstractBucket.java diff --git a/cat-core/src/main/java/com/dianping/cat/build/ComponentsConfigurator.java b/cat-core/src/main/java/com/dianping/cat/build/ComponentsConfigurator.java index 00f036859..a42ff16e7 100644 --- a/cat-core/src/main/java/com/dianping/cat/build/ComponentsConfigurator.java +++ b/cat-core/src/main/java/com/dianping/cat/build/ComponentsConfigurator.java @@ -91,7 +91,8 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator { all.add(C(MessageHandler.class, DefaultMessageHandler.class) // .req(MessageManager.class, MessageConsumerRegistry.class)); - all.add(C(Bucket.class, DefaultBucket.class)); + all.add(C(Bucket.class, String.class.getName(), DefaultBucket.class)); + all.add(C(Bucket.class, byte[].class.getName(), DefaultBucket.class)); all.add(C(Bucket.class, MessageTree.class.getName(), DefaultMessageBucket.class) // .req(MessageCodec.class, "plain-text")); all.add(C(BucketManager.class, DefaultBucketManager.class)); diff --git a/cat-core/src/main/java/com/dianping/cat/storage/MessageBucket.java b/cat-core/src/main/java/com/dianping/cat/storage/MessageBucket.java index 3255bdb39..c46d427a7 100644 --- a/cat-core/src/main/java/com/dianping/cat/storage/MessageBucket.java +++ b/cat-core/src/main/java/com/dianping/cat/storage/MessageBucket.java @@ -1,25 +1,7 @@ package com.dianping.cat.storage; -import java.util.List; - import com.dianping.cat.message.spi.MessageTree; -/** - * Map to one HDFS directory for one report. - *

- * - * Sample tags: "thread:101", "session:abc", "request:xyz", "parent:xxx" - */ -public interface MessageBucket extends Bucket { - public boolean storeById(String id, MessageTree value, String... tags); - - public List findAllIdsByTag(String tag); - - public MessageTree findNextById(String id, Direction direction, String tag); - - public static enum Direction { - FORWARD, +public interface MessageBucket extends Bucket, TagThreadSupport { - BACKWARD; - } } diff --git a/cat-core/src/main/java/com/dianping/cat/storage/TagThreadSupport.java b/cat-core/src/main/java/com/dianping/cat/storage/TagThreadSupport.java new file mode 100644 index 000000000..6b15b83dc --- /dev/null +++ b/cat-core/src/main/java/com/dianping/cat/storage/TagThreadSupport.java @@ -0,0 +1,23 @@ +package com.dianping.cat.storage; + +import java.util.List; + +/** + * Map to one HDFS directory for one report. + *

+ * + * Sample tags: "thread:101", "session:abc", "request:xyz", "parent:xxx" + */ +public interface TagThreadSupport { + public boolean storeById(String id, T data, String... tags); + + public List findAllIdsByTag(String tag); + + public T findNextById(String id, Direction direction, String tag); + + public static enum Direction { + FORWARD, + + BACKWARD; + } +} diff --git a/cat-core/src/main/java/com/dianping/cat/storage/internal/AbstractBucket.java b/cat-core/src/main/java/com/dianping/cat/storage/internal/AbstractBucket.java new file mode 100644 index 000000000..6e11fa0e3 --- /dev/null +++ b/cat-core/src/main/java/com/dianping/cat/storage/internal/AbstractBucket.java @@ -0,0 +1,246 @@ +package com.dianping.cat.storage.internal; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.codehaus.plexus.logging.LogEnabled; +import org.codehaus.plexus.logging.Logger; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; + +import com.dianping.cat.storage.Bucket; +import com.dianping.cat.storage.TagThreadSupport; +import com.site.helper.Joiners; +import com.site.helper.Splitters; +import com.site.lookup.annotation.Inject; + +public abstract class AbstractBucket implements Bucket, TagThreadSupport, LogEnabled { + private static final String[] EMPTY = new String[0]; + + @Inject + private String m_baseDir; + + // key => offset of record + private Map m_idToOffsets = new HashMap(); + + // tag => list of ids + private Map> m_tagToIds = new HashMap>(); + + private File m_file; + + private RandomAccessFile m_out; + + private Logger m_logger; + + @Override + public void close() { + try { + m_out.close(); + m_idToOffsets.clear(); + m_tagToIds.clear(); + } catch (IOException e) { + // ignore it + } + } + + protected abstract T decode(ChannelBuffer buf) throws IOException; + + @Override + public void enableLogging(Logger logger) { + m_logger = logger; + } + + protected abstract void encode(T data, ChannelBuffer buf) throws IOException; + + @Override + public List findAllByIds(List ids) { + List list = new ArrayList(ids.size()); + + for (String id : ids) { + list.add(findById(id)); + } + + return list; + } + + @Override + public List findAllIdsByTag(String tag) { + List ids = m_tagToIds.get(tag); + + if (ids == null) { + return Collections.emptyList(); + } else { + return ids; + } + } + + @Override + public T findById(String id) { + Long offset = m_idToOffsets.get(id); + + if (offset != null) { + try { + long old = m_out.getFilePointer(); + + m_out.seek(offset); + m_out.readLine(); // first line is header, get rid of it + + int num = Integer.parseInt(m_out.readLine()); + byte[] bytes = new byte[num]; + + m_out.readFully(bytes); + + ChannelBuffer buf = ChannelBuffers.wrappedBuffer(bytes); + + T data = decode(buf); + m_out.seek(old); + + return data; + } catch (Exception e) { + m_logger.error(String.format("Error when reading file(%s)!", m_file), e); + } + } + + return null; + } + + @Override + public T findNextById(String id, Direction direction, String tag) { + List ids = m_tagToIds.get(tag); + + if (ids != null) { + int index = ids.indexOf(id); + + switch (direction) { + case FORWARD: + index++; + break; + case BACKWARD: + index--; + break; + } + + if (index >= 0 && index < ids.size()) { + String nextId = ids.get(index); + + return findById(nextId); + } + } + + return null; + } + + @Override + public void initialize(Class type, String path) throws IOException { + m_file = new File(m_baseDir, path); + m_file.getParentFile().mkdirs(); + m_out = new RandomAccessFile(m_file, "rw"); + + if (m_file.exists()) { + loadIndexes(); + m_out.seek(m_out.length()); + } + } + + protected void loadIndexes() throws IOException { + byte[] data = new byte[8192]; + + while (true) { + long offset = m_out.getFilePointer(); + String first = m_out.readLine(); + + if (first == null) { // EOF + break; + } + + int num = Integer.parseInt(m_out.readLine()); + + if (num > data.length) { + int newSize = data.length; + + while (newSize < num) { + newSize += newSize / 2; + } + + data = new byte[newSize]; + } + + m_out.readFully(data, 0, num); // get rid of it + m_out.readLine(); // get rid of empty line + + List parts = Splitters.by('\t').split(first); + if (parts.size() > 0) { + String id = parts.get(0); + + parts.remove(0); + updateIndex(id, parts.toArray(EMPTY), offset); + } + } + } + + public void setBaseDir(String baseDir) { + m_baseDir = baseDir; + } + + @Override + public boolean storeById(String id, T data) { + return storeById(id, data, EMPTY); + } + + /** + * Store the message in the format of:
+ * + *

<id>\t<tag1>\t<tag2>\t...\n <length of message>\n <message>\n + */ + @Override + public boolean storeById(String id, T data, String... tags) { + try { + ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8192); + + encode(data, buf); + + int length = buf.readInt(); + String attributes = id + "\t" + Joiners.by('\t').join(tags) + "\n"; + + byte[] first = attributes.getBytes("utf-8"); + byte[] num = String.valueOf(length).getBytes("utf-8"); + long offset = m_out.getFilePointer(); + + m_out.write(first); + m_out.write(num); + m_out.write('\n'); + m_out.write(buf.array(), buf.readerIndex(), length); + m_out.write('\n'); + m_out.getChannel().force(true); + + updateIndex(id, tags, offset); + + return true; + } catch (Exception e) { + m_logger.error(String.format("Error when writing to file(%s)!", m_file), e); + + return false; + } + } + + protected void updateIndex(String id, String[] tags, long offset) { + m_idToOffsets.put(id, offset); + + for (String tag : tags) { + List ids = m_tagToIds.get(tag); + + if (ids == null) { + ids = new ArrayList(); + m_tagToIds.put(tag, ids); + } + + ids.add(id); + } + } +} diff --git a/cat-core/src/main/java/com/dianping/cat/storage/internal/DefaultBucket.java b/cat-core/src/main/java/com/dianping/cat/storage/internal/DefaultBucket.java index 252fb5183..31ba88300 100644 --- a/cat-core/src/main/java/com/dianping/cat/storage/internal/DefaultBucket.java +++ b/cat-core/src/main/java/com/dianping/cat/storage/internal/DefaultBucket.java @@ -1,182 +1,58 @@ package com.dianping.cat.storage.internal; -import java.io.File; import java.io.IOException; -import java.io.RandomAccessFile; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.nio.charset.Charset; -import org.codehaus.plexus.logging.LogEnabled; -import org.codehaus.plexus.logging.Logger; import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; -import com.dianping.cat.message.spi.MessageCodec; -import com.dianping.cat.message.spi.MessageTree; -import com.dianping.cat.message.spi.internal.DefaultMessageTree; -import com.dianping.cat.storage.Bucket; -import com.site.lookup.annotation.Inject; - -public class DefaultBucket implements Bucket, LogEnabled { - @Inject - private MessageCodec m_codec; - - @Inject - private String m_baseDir; - - // key => offset of record - private Map m_idToOffsets = new HashMap(); - - private File m_file; - - private RandomAccessFile m_out; - - private Logger m_logger; - - @Override - public void close() { - try { - m_out.close(); - m_idToOffsets.clear(); - } catch (IOException e) { - // ignore it - } - } - - @Override - public void enableLogging(Logger logger) { - m_logger = logger; - } +public class DefaultBucket extends AbstractBucket { + private Class m_type; + @SuppressWarnings("unchecked") @Override - public T findById(String id) { - Long offset = m_idToOffsets.get(id); - - if (offset != null) { - try { - long old = m_out.getFilePointer(); - - m_out.seek(offset); - m_out.readLine(); // first line is header, get rid of it - - int num = Integer.parseInt(m_out.readLine()); - byte[] data = new byte[num]; - - m_out.readFully(data); - - ChannelBuffer buf = ChannelBuffers.wrappedBuffer(data); - MessageTree tree = new DefaultMessageTree(); - - m_codec.decode(buf, tree); - m_out.seek(old); - - return null; - } catch (Exception e) { - m_logger.error(String.format("Error when reading file(%s)!", m_file), e); - } + protected T decode(ChannelBuffer buf) throws IOException { + if (m_type == String.class) { + return (T) buf.toString(buf.readerIndex(), buf.readableBytes(), Charset.forName("utf-8")); + } else if (m_type == byte[].class) { + byte[] bytes = new byte[buf.readableBytes()]; + + buf.readBytes(bytes); + + return (T) bytes; + } else { + throw new UnsupportedOperationException(String.format( + "Only String or byte[] are supported so far, but was %s.", m_type)); } - - return null; } @Override - public List findAllByIds(List ids) { - List list = new ArrayList(ids.size()); - - for (String id : ids) { - list.add(findById(id)); + protected void encode(T data, ChannelBuffer buf) throws IOException { + if (m_type == String.class) { + String str = (String) data; + byte[] bytes = str.getBytes("utf-8"); + + buf.writeInt(bytes.length); + buf.writeBytes(bytes); + } else if (m_type == byte[].class) { + byte[] bytes = (byte[]) data; + + buf.writeInt(bytes.length); + buf.writeBytes(bytes); + } else { + throw new UnsupportedOperationException(String.format( + "Only String or byte[] are supported so far, but was %s.", m_type)); } - - return list; } @Override public void initialize(Class type, String path) throws IOException { - m_file = new File(m_baseDir, path); - m_file.getParentFile().mkdirs(); - m_out = new RandomAccessFile(m_file, "rw"); + super.initialize(type, path); - if (m_file.exists()) { - loadIndexes(); - m_out.seek(m_out.length()); - } - } - - protected void loadIndexes() throws IOException { - byte[] data = new byte[8192]; - - while (true) { - long offset = m_out.getFilePointer(); - String first = m_out.readLine(); - - if (first == null) { // EOF - break; - } - - int num = Integer.parseInt(m_out.readLine()); - - if (num > data.length) { - int newSize = data.length; + m_type = type; - while (newSize < num) { - newSize += newSize / 2; - } - - data = new byte[newSize]; - } - - m_out.readFully(data, 0, num); // get rid of it - m_out.readLine(); // get rid of empty line - - updateIndex(first, offset); + if (m_type != String.class && m_type != byte[].class) { + throw new UnsupportedOperationException(String.format( + "Only String or byte[] are supported so far, but was %s.", m_type)); } } - - public void setBaseDir(String baseDir) { - m_baseDir = baseDir; - } - - /** - * Store the data in the format of:
- * - * - * <id>\n - * <length of data>\n - * <data>\n - * - */ - @Override - public boolean storeById(String id, T data) { - ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8192); - - int length = buf.readInt(); - String attributes = id + "\n"; - - try { - byte[] first = attributes.getBytes("utf-8"); - byte[] num = String.valueOf(length).getBytes("utf-8"); - long offset = m_out.getFilePointer(); - - m_out.write(first); - m_out.write(num); - m_out.write('\n'); - m_out.write(buf.array(), buf.readerIndex(), length); - m_out.write('\n'); - m_out.getChannel().force(false); - - updateIndex(id, offset); - - return true; - } catch (Exception e) { - m_logger.error(String.format("Error when writing to file(%s)!", m_file), e); - - return false; - } - } - - protected void updateIndex(String id, long offset) { - m_idToOffsets.put(id, offset); - } } diff --git a/cat-core/src/main/java/com/dianping/cat/storage/internal/DefaultMessageBucket.java b/cat-core/src/main/java/com/dianping/cat/storage/internal/DefaultMessageBucket.java index 6a5816242..7efbbb4a5 100644 --- a/cat-core/src/main/java/com/dianping/cat/storage/internal/DefaultMessageBucket.java +++ b/cat-core/src/main/java/com/dianping/cat/storage/internal/DefaultMessageBucket.java @@ -1,256 +1,33 @@ package com.dianping.cat.storage.internal; -import java.io.File; import java.io.IOException; -import java.io.RandomAccessFile; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.codehaus.plexus.logging.LogEnabled; -import org.codehaus.plexus.logging.Logger; import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; import com.dianping.cat.message.spi.MessageCodec; import com.dianping.cat.message.spi.MessageTree; import com.dianping.cat.message.spi.internal.DefaultMessageTree; import com.dianping.cat.storage.MessageBucket; -import com.site.helper.Joiners; -import com.site.helper.Splitters; import com.site.lookup.annotation.Inject; -public class DefaultMessageBucket implements MessageBucket, LogEnabled { - private static final String[] EMPTY = new String[0]; - +public class DefaultMessageBucket extends AbstractBucket implements MessageBucket { @Inject private MessageCodec m_codec; - @Inject - private String m_baseDir; - - // key => offset of record - private Map m_idToOffsets = new HashMap(); - - // tag => list of ids - private Map> m_tagToIds = new HashMap>(); - - private File m_file; - - private RandomAccessFile m_out; - - private Logger m_logger; - - @Override - public void close() { - try { - m_out.close(); - m_idToOffsets.clear(); - m_tagToIds.clear(); - } catch (IOException e) { - // ignore it - } - } - - @Override - public void enableLogging(Logger logger) { - m_logger = logger; - } - - @Override - public List findAllByIds(List ids) { - List list = new ArrayList(ids.size()); - - for (String id : ids) { - list.add(findById(id)); - } - - return list; - } - - @Override - public List findAllIdsByTag(String tag) { - List ids = m_tagToIds.get(tag); - - if (ids == null) { - return Collections.emptyList(); - } else { - return ids; - } - } - - @Override - public MessageTree findById(String id) { - Long offset = m_idToOffsets.get(id); - - if (offset != null) { - try { - long old = m_out.getFilePointer(); - - m_out.seek(offset); - m_out.readLine(); // first line is header, get rid of it - - int num = Integer.parseInt(m_out.readLine()); - byte[] data = new byte[num]; - - m_out.readFully(data); - - ChannelBuffer buf = ChannelBuffers.wrappedBuffer(data); - MessageTree tree = new DefaultMessageTree(); - - m_codec.decode(buf, tree); - m_out.seek(old); - - return tree; - } catch (Exception e) { - m_logger.error(String.format("Error when reading file(%s)!", m_file), e); - } - } - - return null; - } - @Override - public MessageTree findNextById(String id, Direction direction, String tag) { - List ids = m_tagToIds.get(tag); - - if (ids != null) { - int index = ids.indexOf(id); - - switch (direction) { - case FORWARD: - index++; - break; - case BACKWARD: - index--; - break; - } + protected MessageTree decode(ChannelBuffer buf) throws IOException { + MessageTree tree = new DefaultMessageTree(); - if (index >= 0 && index < ids.size()) { - String nextId = ids.get(index); - - return findById(nextId); - } - } - - return null; + m_codec.decode(buf, tree); + return tree; } @Override - public void initialize(Class type, String path) throws IOException { - m_file = new File(m_baseDir, path); - m_file.getParentFile().mkdirs(); - m_out = new RandomAccessFile(m_file, "rw"); - - if (m_file.exists()) { - loadIndexes(); - m_out.seek(m_out.length()); - } - } - - protected void loadIndexes() throws IOException { - byte[] data = new byte[8192]; - - while (true) { - long offset = m_out.getFilePointer(); - String first = m_out.readLine(); - - if (first == null) { // EOF - break; - } - - int num = Integer.parseInt(m_out.readLine()); - - if (num > data.length) { - int newSize = data.length; - - while (newSize < num) { - newSize += newSize / 2; - } - - data = new byte[newSize]; - } - - m_out.readFully(data, 0, num); // get rid of it - m_out.readLine(); // get rid of empty line - - List parts = Splitters.by('\t').split(first); - if (parts.size() > 0) { - String id = parts.get(0); - - parts.remove(0); - updateIndex(id, parts.toArray(new String[0]), offset); - } - } - } - - public void setBaseDir(String baseDir) { - m_baseDir = baseDir; + protected void encode(MessageTree tree, ChannelBuffer buf) throws IOException { + m_codec.encode(tree, buf); } public void setCodec(MessageCodec codec) { m_codec = codec; } - - @Override - public boolean storeById(String id, MessageTree tree) { - return storeById(id, tree, EMPTY); - } - - /** - * Store the message in the format of:
- * - * - * <id>\t<tag1>\t<tag2>\t...\n - * <length of message>\n - * <message>\n - * - */ - @Override - public boolean storeById(String id, MessageTree tree, String... tags) { - ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8192); - - m_codec.encode(tree, buf); - - int length = buf.readInt(); - String attributes = id + "\t" + Joiners.by('\t').join(tags) + "\n"; - - try { - byte[] first = attributes.getBytes("utf-8"); - byte[] num = String.valueOf(length).getBytes("utf-8"); - long offset = m_out.getFilePointer(); - - m_out.write(first); - m_out.write(num); - m_out.write('\n'); - m_out.write(buf.array(), buf.readerIndex(), length); - m_out.write('\n'); - m_out.getChannel().force(false); - - updateIndex(id, tags, offset); - - return true; - } catch (Exception e) { - m_logger.error(String.format("Error when writing to file(%s)!", m_file), e); - - return false; - } - } - - protected void updateIndex(String id, String[] tags, long offset) { - m_idToOffsets.put(id, offset); - - for (String tag : tags) { - List ids = m_tagToIds.get(tag); - - if (ids == null) { - ids = new ArrayList(); - m_tagToIds.put(tag, ids); - } - - ids.add(id); - } - } } diff --git a/cat-core/src/main/resources/META-INF/plexus/components.xml b/cat-core/src/main/resources/META-INF/plexus/components.xml index e7247cc1f..789a2af5b 100644 --- a/cat-core/src/main/resources/META-INF/plexus/components.xml +++ b/cat-core/src/main/resources/META-INF/plexus/components.xml @@ -171,6 +171,12 @@ com.dianping.cat.storage.Bucket + java.lang.String + com.dianping.cat.storage.internal.DefaultBucket + + + com.dianping.cat.storage.Bucket + [B com.dianping.cat.storage.internal.DefaultBucket diff --git a/cat-core/src/test/java/com/dianping/cat/storage/BucketTest.java b/cat-core/src/test/java/com/dianping/cat/storage/BucketTest.java index c88a16612..65154cba9 100644 --- a/cat-core/src/test/java/com/dianping/cat/storage/BucketTest.java +++ b/cat-core/src/test/java/com/dianping/cat/storage/BucketTest.java @@ -8,7 +8,7 @@ import org.junit.runners.JUnit4; import com.dianping.cat.message.spi.MessageTree; import com.dianping.cat.message.spi.internal.DefaultMessageTree; -import com.dianping.cat.storage.MessageBucket.Direction; +import com.dianping.cat.storage.TagThreadSupport.Direction; import com.site.lookup.ComponentTestCase; @RunWith(JUnit4.class) @@ -28,6 +28,40 @@ public class BucketTest extends ComponentTestCase { return tree; } + @Test + public void testBytesBucket() throws Exception { + BucketManager manager = lookup(BucketManager.class); + Bucket bucket = manager.getBucket(byte[].class, "target/bucket/bytes"); + + // store it and load it + for (int i = 0; i < 100; i++) { + String id = "id" + i; + String t1 = "value" + i; + boolean success = bucket.storeById(id, t1.getBytes()); + + if (success) { + String t2 = new String(bucket.findById(id)); + + Assert.assertEquals("Unable to find data after stored it.", t1, t2); + } else { + Assert.fail("Data failed to store at i=" + i + "."); + } + } + + // close and reload it, check if everything is okay + bucket.close(); + bucket.initialize(byte[].class, "target/bucket/bytes"); + + // store it and load it + for (int i = 0; i < 100; i++) { + String id = "id" + i; + String t1 = "value" + i; + String t2 = new String(bucket.findById(id)); + + Assert.assertEquals("Unable to find data by id.", t1, t2); + } + } + @Test public void testMessageBucket() throws Exception { BucketManager manager = lookup(BucketManager.class); @@ -75,4 +109,38 @@ public class BucketTest extends ComponentTestCase { Assert.assertEquals("Unable to find next message in the thread " + i + ".", t1.toString(), t2.toString()); } } + + @Test + public void testStringBucket() throws Exception { + BucketManager manager = lookup(BucketManager.class); + Bucket bucket = manager.getBucket(String.class, "target/bucket/data"); + + // store it and load it + for (int i = 0; i < 100; i++) { + String id = "id" + i; + String t1 = "value" + i; + boolean success = bucket.storeById(id, t1); + + if (success) { + String t2 = bucket.findById(id); + + Assert.assertEquals("Unable to find data after stored it.", t1, t2); + } else { + Assert.fail("Data failed to store at i=" + i + "."); + } + } + + // close and reload it, check if everything is okay + bucket.close(); + bucket.initialize(String.class, "target/bucket/data"); + + // store it and load it + for (int i = 0; i < 100; i++) { + String id = "id" + i; + String t1 = "value" + i; + String t2 = bucket.findById(id); + + Assert.assertEquals("Unable to find data by id.", t1, t2); + } + } } -- GitLab