diff --git a/java/Makefile b/java/Makefile index 42f465e10c3714b049840dabc7becfbf6730758a..97f0b0244d2c79382861e688e2f4f3fccb4dfae3 100644 --- a/java/Makefile +++ b/java/Makefile @@ -29,6 +29,7 @@ NATIVE_JAVA_CLASSES = org.rocksdb.AbstractComparator\ org.rocksdb.SkipListMemTableConfig\ org.rocksdb.Slice\ org.rocksdb.Statistics\ + org.rocksdb.TransactionLogIterator\ org.rocksdb.TtlDB\ org.rocksdb.VectorMemTableConfig\ org.rocksdb.StringAppendOperator\ @@ -81,6 +82,7 @@ JAVA_TESTS = org.rocksdb.test.BackupableDBOptionsTest\ org.rocksdb.test.SizeUnitTest\ org.rocksdb.test.SliceTest\ org.rocksdb.test.SnapshotTest\ + org.rocksdb.test.TransactionLogIteratorTest\ org.rocksdb.test.TtlDBTest\ org.rocksdb.test.StatisticsCollectorTest\ org.rocksdb.test.WriteBatchHandlerTest\ diff --git a/java/org/rocksdb/RocksDB.java b/java/org/rocksdb/RocksDB.java index ac02860e81559d9d365bcd898ecdfc869100a03d..ea38241964d327ca3a06416e38d20b9f3d284d74 100644 --- a/java/org/rocksdb/RocksDB.java +++ b/java/org/rocksdb/RocksDB.java @@ -1588,6 +1588,76 @@ public class RocksDB extends RocksObject { columnFamilyHandle.nativeHandle_); } + /** + *

The sequence number of the most recent transaction.

+ * + * @return sequence number of the most + * recent transaction. + */ + public long getLatestSequenceNumber() { + return getLatestSequenceNumber(nativeHandle_); + } + + /** + *

Prevent file deletions. Compactions will continue to occur, + * but no obsolete files will be deleted. Calling this multiple + * times have the same effect as calling it once.

+ * + * @throws RocksDBException thrown if operation was not performed + * successfully. + */ + public void disableFileDeletions() throws RocksDBException { + disableFileDeletions(nativeHandle_); + } + + /** + *

Allow compactions to delete obsolete files. + * If force == true, the call to EnableFileDeletions() + * will guarantee that file deletions are enabled after + * the call, even if DisableFileDeletions() was called + * multiple times before.

+ * + *

If force == false, EnableFileDeletions will only + * enable file deletion after it's been called at least + * as many times as DisableFileDeletions(), enabling + * the two methods to be called by two threads + * concurrently without synchronization + * -- i.e., file deletions will be enabled only after both + * threads call EnableFileDeletions()

+ * + * @param force boolean value described above. + * + * @throws RocksDBException thrown if operation was not performed + * successfully. + */ + public void enableFileDeletions(boolean force) + throws RocksDBException { + enableFileDeletions(nativeHandle_, force); + } + + /** + *

Returns an iterator that is positioned at a write-batch containing + * seq_number. If the sequence number is non existent, it returns an iterator + * at the first available seq_no after the requested seq_no.

+ * + *

Must set WAL_ttl_seconds or WAL_size_limit_MB to large values to + * use this api, else the WAL files will get + * cleared aggressively and the iterator might keep getting invalid before + * an update is read.

+ * + * @param sequenceNumber sequence number offset + * + * @return {@link org.rocksdb.TransactionLogIterator} instance. + * + * @throws org.rocksdb.RocksDBException if iterator cannot be retrieved + * from native-side. + */ + public TransactionLogIterator getUpdatesSince(long sequenceNumber) + throws RocksDBException { + return new TransactionLogIterator( + getUpdatesSince(nativeHandle_, sequenceNumber)); + } + /** * Private constructor. */ @@ -1730,6 +1800,13 @@ public class RocksDB extends RocksObject { private native void compactRange(long handle, byte[] begin, int beginLen, byte[] end, int endLen, boolean reduce_level, int target_level, int target_path_id, long cfHandle) throws RocksDBException; + private native long getLatestSequenceNumber(long handle); + private native void disableFileDeletions(long handle) + throws RocksDBException; + private native void enableFileDeletions(long handle, + boolean force) throws RocksDBException; + private native long getUpdatesSince(long handle, long sequenceNumber) + throws RocksDBException; protected DBOptionsInterface options_; } diff --git a/java/org/rocksdb/TransactionLogIterator.java b/java/org/rocksdb/TransactionLogIterator.java new file mode 100644 index 0000000000000000000000000000000000000000..d82cde3eaccb5e13a3541301509a017b0eabfa6d --- /dev/null +++ b/java/org/rocksdb/TransactionLogIterator.java @@ -0,0 +1,115 @@ +package org.rocksdb; + +/** + *

A TransactionLogIterator is used to iterate over the transactions in a db. + * One run of the iterator is continuous, i.e. the iterator will stop at the + * beginning of any gap in sequences.

+ */ +public class TransactionLogIterator extends RocksObject { + + /** + *

An iterator is either positioned at a WriteBatch + * or not valid. This method returns true if the iterator + * is valid. Can read data from a valid iterator.

+ * + * @return true if iterator position is valid. + */ + public boolean isValid() { + return isValid(nativeHandle_); + } + + /** + *

Moves the iterator to the next WriteBatch. + * REQUIRES: Valid() to be true.

+ */ + public void next() { + next(nativeHandle_); + } + + /** + *

Throws RocksDBException if something went wrong.

+ * + * @throws org.rocksdb.RocksDBException if something went + * wrong in the underlying C++ code. + */ + public void status() throws RocksDBException { + status(nativeHandle_); + } + + /** + *

If iterator position is valid, return the current + * write_batch and the sequence number of the earliest + * transaction contained in the batch.

+ * + *

ONLY use if Valid() is true and status() is OK.

+ * + * @return {@link org.rocksdb.TransactionLogIterator.BatchResult} + * instance. + */ + public BatchResult getBatch() { + assert(isValid()); + return getBatch(nativeHandle_); + } + + /** + *

TransactionLogIterator constructor.

+ * + * @param nativeHandle address to native address. + */ + TransactionLogIterator(long nativeHandle) { + super(); + nativeHandle_ = nativeHandle; + } + + @Override protected void disposeInternal() { + disposeInternal(nativeHandle_); + } + + /** + *

BatchResult represents a data structure returned + * by a TransactionLogIterator containing a sequence + * number and a {@link WriteBatch} instance.

+ */ + public class BatchResult { + /** + *

Constructor of BatchResult class.

+ * + * @param sequenceNumber related to this BatchResult instance. + * @param nativeHandle to {@link org.rocksdb.WriteBatch} + * native instance. + */ + public BatchResult(long sequenceNumber, long nativeHandle) { + sequenceNumber_ = sequenceNumber; + writeBatch_ = new WriteBatch(nativeHandle); + } + + /** + *

Return sequence number related to this BatchResult.

+ * + * @return Sequence number. + */ + public long sequenceNumber() { + return sequenceNumber_; + } + + /** + *

Return contained {@link org.rocksdb.WriteBatch} + * instance

+ * + * @return {@link org.rocksdb.WriteBatch} instance. + */ + public WriteBatch writeBatch() { + return writeBatch_; + } + + private final long sequenceNumber_; + private final WriteBatch writeBatch_; + } + + private native void disposeInternal(long handle); + private native boolean isValid(long handle); + private native void next(long handle); + private native void status(long handle) + throws RocksDBException; + private native BatchResult getBatch(long handle); +} diff --git a/java/org/rocksdb/WriteBatch.java b/java/org/rocksdb/WriteBatch.java index 24133ec397a4eb0c64d1ad02126a9edbf0b8e100..fd6d9386c032fbd6560df82ac03eafa3627a40c5 100644 --- a/java/org/rocksdb/WriteBatch.java +++ b/java/org/rocksdb/WriteBatch.java @@ -53,6 +53,19 @@ public class WriteBatch extends AbstractWriteBatch { iterate(handler.nativeHandle_); } + /** + *

Private WriteBatch constructor which is used to construct + * WriteBatch instances from C++ side. As the reference to this + * object is also managed from C++ side the handle will be disowned.

+ * + * @param nativeHandle address of native instance. + */ + WriteBatch(long nativeHandle) { + super(); + disOwnNativeHandle(); + nativeHandle_ = nativeHandle; + } + @Override final native void disposeInternal(long handle); @Override final native int count0(); @Override final native void put(byte[] key, int keyLen, byte[] value, int valueLen); diff --git a/java/org/rocksdb/test/RocksDBTest.java b/java/org/rocksdb/test/RocksDBTest.java index a6934b31068fdf149636cd3c32b9ba8f9381a891..15dde9856336acbdf1ed62d97e0a5c1a2697e06f 100644 --- a/java/org/rocksdb/test/RocksDBTest.java +++ b/java/org/rocksdb/test/RocksDBTest.java @@ -738,4 +738,25 @@ public class RocksDBTest { } } } + + @Test + public void enableDisableFileDeletions() throws RocksDBException { + RocksDB db = null; + Options options = null; + try { + options = new Options().setCreateIfMissing(true); + db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath()); + db.disableFileDeletions(); + db.enableFileDeletions(false); + db.disableFileDeletions(); + db.enableFileDeletions(true); + } finally { + if (db != null) { + db.close(); + } + if (options != null) { + options.dispose(); + } + } + } } diff --git a/java/org/rocksdb/test/TransactionLogIteratorTest.java b/java/org/rocksdb/test/TransactionLogIteratorTest.java new file mode 100644 index 0000000000000000000000000000000000000000..6d700dac941dbb3d7191991eb0fcc8ae20e44ff5 --- /dev/null +++ b/java/org/rocksdb/test/TransactionLogIteratorTest.java @@ -0,0 +1,183 @@ +package org.rocksdb.test; + +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.*; + +import static org.assertj.core.api.Assertions.assertThat; + +public class TransactionLogIteratorTest { + @ClassRule + public static final RocksMemoryResource rocksMemoryResource = + new RocksMemoryResource(); + + @Rule + public TemporaryFolder dbFolder = new TemporaryFolder(); + + @Test + public void transactionLogIterator() throws RocksDBException { + RocksDB db = null; + Options options = null; + TransactionLogIterator transactionLogIterator = null; + try { + options = new Options(). + setCreateIfMissing(true); + db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath()); + transactionLogIterator = db.getUpdatesSince(0); + } finally { + if (transactionLogIterator != null) { + transactionLogIterator.dispose(); + } + if (db != null) { + db.close(); + } + if (options != null) { + options.dispose(); + } + } + } + + @Test + public void getBatch() throws RocksDBException { + final int numberOfPuts = 5; + RocksDB db = null; + Options options = null; + ColumnFamilyHandle cfHandle = null; + TransactionLogIterator transactionLogIterator = null; + try { + options = new Options(). + setCreateIfMissing(true). + setWalTtlSeconds(1000). + setWalSizeLimitMB(10); + + db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath()); + + for (int i = 0; i < numberOfPuts; i++){ + db.put(String.valueOf(i).getBytes(), + String.valueOf(i).getBytes()); + } + db.flush(new FlushOptions().setWaitForFlush(true)); + + // the latest sequence number is 5 because 5 puts + // were written beforehand + assertThat(db.getLatestSequenceNumber()). + isEqualTo(numberOfPuts); + + // insert 5 writes into a cf + cfHandle = db.createColumnFamily( + new ColumnFamilyDescriptor("new_cf".getBytes())); + + for (int i = 0; i < numberOfPuts; i++){ + db.put(cfHandle, String.valueOf(i).getBytes(), + String.valueOf(i).getBytes()); + } + // the latest sequence number is 10 because + // (5 + 5) puts were written beforehand + assertThat(db.getLatestSequenceNumber()). + isEqualTo(numberOfPuts + numberOfPuts); + + // Get updates since the beginning + transactionLogIterator = db.getUpdatesSince(0); + assertThat(transactionLogIterator.isValid()).isTrue(); + transactionLogIterator.status(); + + // The first sequence number is 1 + TransactionLogIterator.BatchResult batchResult = + transactionLogIterator.getBatch(); + assertThat(batchResult.sequenceNumber()).isEqualTo(1); + } finally { + if (transactionLogIterator != null) { + transactionLogIterator.dispose(); + } + if (cfHandle != null) { + cfHandle.dispose(); + } + if (db != null) { + db.close(); + } + if (options != null) { + options.dispose(); + } + } + } + + @Test + public void transactionLogIteratorStallAtLastRecord() throws RocksDBException { + RocksDB db = null; + Options options = null; + TransactionLogIterator transactionLogIterator = null; + try { + options = new Options(). + setCreateIfMissing(true). + setWalTtlSeconds(1000). + setWalSizeLimitMB(10); + + db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath()); + db.put("key1".getBytes(), "value1".getBytes()); + // Get updates since the beginning + transactionLogIterator = db.getUpdatesSince(0); + transactionLogIterator.status(); + assertThat(transactionLogIterator.isValid()).isTrue(); + transactionLogIterator.next(); + assertThat(transactionLogIterator.isValid()).isFalse(); + transactionLogIterator.status(); + db.put("key2".getBytes(), "value2".getBytes()); + transactionLogIterator.next(); + transactionLogIterator.status(); + assertThat(transactionLogIterator.isValid()).isTrue(); + + } finally { + if (transactionLogIterator != null) { + transactionLogIterator.dispose(); + } + if (db != null) { + db.close(); + } + if (options != null) { + options.dispose(); + } + } + } + + @Test + public void transactionLogIteratorCheckAfterRestart() throws RocksDBException { + final int numberOfKeys = 2; + RocksDB db = null; + Options options = null; + TransactionLogIterator transactionLogIterator = null; + try { + options = new Options(). + setCreateIfMissing(true). + setWalTtlSeconds(1000). + setWalSizeLimitMB(10); + + db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath()); + db.put("key1".getBytes(), "value1".getBytes()); + db.put("key2".getBytes(), "value2".getBytes()); + db.flush(new FlushOptions().setWaitForFlush(true)); + // reopen + db.close(); + db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath()); + assertThat(db.getLatestSequenceNumber()).isEqualTo(numberOfKeys); + + transactionLogIterator = db.getUpdatesSince(0); + for (int i = 0; i < numberOfKeys; i++) { + transactionLogIterator.status(); + assertThat(transactionLogIterator.isValid()).isTrue(); + transactionLogIterator.next(); + } + } finally { + if (transactionLogIterator != null) { + transactionLogIterator.dispose(); + } + if (db != null) { + db.close(); + } + if (options != null) { + options.dispose(); + } + } + } +} diff --git a/java/rocksjni/rocksjni.cc b/java/rocksjni/rocksjni.cc index 1055f87fe12cf55445e655ff22d6964061998ceb..eaa5603ead9d54c896584ddb649765f3a67dd143 100644 --- a/java/rocksjni/rocksjni.cc +++ b/java/rocksjni/rocksjni.cc @@ -9,6 +9,7 @@ #include #include #include +#include #include #include @@ -16,6 +17,7 @@ #include "rocksjni/portal.h" #include "rocksdb/db.h" #include "rocksdb/cache.h" +#include "rocksdb/types.h" ////////////////////////////////////////////////////////////////////////////// // rocksdb::DB::Open @@ -1598,3 +1600,70 @@ void Java_org_rocksdb_RocksDB_compactRange__J_3BI_3BIZIIJ( rocksdb_compactrange_helper(env, db, cf_handle, jbegin, jbegin_len, jend, jend_len, jreduce_level, jtarget_level, jtarget_path_id); } + +////////////////////////////////////////////////////////////////////////////// +// rocksdb::DB::GetLatestSequenceNumber + +/* + * Class: org_rocksdb_RocksDB + * Method: getLatestSequenceNumber + * Signature: (J)V + */ +jlong Java_org_rocksdb_RocksDB_getLatestSequenceNumber(JNIEnv* env, + jobject jdb, jlong jdb_handle) { + auto* db = reinterpret_cast(jdb_handle); + return db->GetLatestSequenceNumber(); +} + +////////////////////////////////////////////////////////////////////////////// +// rocksdb::DB enable/disable file deletions + +/* + * Class: org_rocksdb_RocksDB + * Method: enableFileDeletions + * Signature: (J)V + */ +void Java_org_rocksdb_RocksDB_disableFileDeletions(JNIEnv* env, + jobject jdb, jlong jdb_handle) { + auto* db = reinterpret_cast(jdb_handle); + rocksdb::Status s = db->DisableFileDeletions(); + if (!s.ok()) { + rocksdb::RocksDBExceptionJni::ThrowNew(env, s); + } +} + +/* + * Class: org_rocksdb_RocksDB + * Method: enableFileDeletions + * Signature: (JZ)V + */ +void Java_org_rocksdb_RocksDB_enableFileDeletions(JNIEnv* env, + jobject jdb, jlong jdb_handle, jboolean jforce) { + auto* db = reinterpret_cast(jdb_handle); + rocksdb::Status s = db->EnableFileDeletions(jforce); + if (!s.ok()) { + rocksdb::RocksDBExceptionJni::ThrowNew(env, s); + } +} + +////////////////////////////////////////////////////////////////////////////// +// rocksdb::DB::GetUpdatesSince + +/* + * Class: org_rocksdb_RocksDB + * Method: getUpdatesSince + * Signature: (JJ)J + */ +jlong Java_org_rocksdb_RocksDB_getUpdatesSince(JNIEnv* env, + jobject jdb, jlong jdb_handle, jlong jsequence_number) { + auto* db = reinterpret_cast(jdb_handle); + rocksdb::SequenceNumber sequence_number = + static_cast(jsequence_number); + std::unique_ptr iter; + rocksdb::Status s = db->GetUpdatesSince(sequence_number, &iter); + if (s.ok()) { + return reinterpret_cast(iter.release()); + } + rocksdb::RocksDBExceptionJni::ThrowNew(env, s); + return 0; +} diff --git a/java/rocksjni/transaction_log.cc b/java/rocksjni/transaction_log.cc new file mode 100644 index 0000000000000000000000000000000000000000..1d3d7c100aab315568da830f428f8848eb86ee47 --- /dev/null +++ b/java/rocksjni/transaction_log.cc @@ -0,0 +1,78 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// This file implements the "bridge" between Java and C++ and enables +// calling c++ rocksdb::Iterator methods from Java side. + +#include +#include +#include + +#include "include/org_rocksdb_TransactionLogIterator.h" +#include "rocksdb/transaction_log.h" +#include "rocksjni/portal.h" + +/* + * Class: org_rocksdb_TransactionLogIterator + * Method: disposeInternal + * Signature: (J)V + */ +void Java_org_rocksdb_TransactionLogIterator_disposeInternal( + JNIEnv* env, jobject jobj, jlong handle) { + delete reinterpret_cast(handle); +} + +/* + * Class: org_rocksdb_TransactionLogIterator + * Method: isValid + * Signature: (J)Z + */ +jboolean Java_org_rocksdb_TransactionLogIterator_isValid( + JNIEnv* env, jobject jobj, jlong handle) { + return reinterpret_cast(handle)->Valid(); +} + +/* + * Class: org_rocksdb_TransactionLogIterator + * Method: next + * Signature: (J)V + */ +void Java_org_rocksdb_TransactionLogIterator_next( + JNIEnv* env, jobject jobj, jlong handle) { + reinterpret_cast(handle)->Next(); +} + +/* + * Class: org_rocksdb_TransactionLogIterator + * Method: status + * Signature: (J)V + */ +void Java_org_rocksdb_TransactionLogIterator_status( + JNIEnv* env, jobject jobj, jlong handle) { + rocksdb::Status s = reinterpret_cast< + rocksdb::TransactionLogIterator*>(handle)->status(); + if (!s.ok()) { + rocksdb::RocksDBExceptionJni::ThrowNew(env, s); + } +} + +/* + * Class: org_rocksdb_TransactionLogIterator + * Method: getBatch + * Signature: (J)Lorg/rocksdb/TransactionLogIterator$BatchResult + */ +jobject Java_org_rocksdb_TransactionLogIterator_getBatch( + JNIEnv* env, jobject jobj, jlong handle) { + rocksdb::BatchResult batch_result = + reinterpret_cast(handle)->GetBatch(); + jclass jclazz = env->FindClass( + "org/rocksdb/TransactionLogIterator$BatchResult"); + assert(jclazz != nullptr); + jmethodID mid = env->GetMethodID( + jclazz, "", "(Lorg/rocksdb/TransactionLogIterator;JJ)V"); + assert(mid != nullptr); + return env->NewObject(jclazz, mid, jobj, + batch_result.sequence, batch_result.writeBatchPtr.release()); +}