提交 5e98e532 编写于 作者: Y Yueh-Hsuan Chiang

Merge pull request #458 from fyrz/RocksJava-TTLDB-Support

[RocksJava] TTL DB support
......@@ -29,13 +29,14 @@ NATIVE_JAVA_CLASSES = org.rocksdb.AbstractComparator\
org.rocksdb.SkipListMemTableConfig\
org.rocksdb.Slice\
org.rocksdb.Statistics\
org.rocksdb.TtlDB\
org.rocksdb.VectorMemTableConfig\
org.rocksdb.StringAppendOperator\
org.rocksdb.WriteBatch\
org.rocksdb.WriteBatch.Handler\
org.rocksdb.test.WriteBatchInternal\
org.rocksdb.test.WriteBatchTest\
org.rocksdb.WriteOptions\
org.rocksdb.WriteOptions\
org.rocksdb.WriteBatchWithIndex\
org.rocksdb.WBWIRocksIterator
......@@ -79,6 +80,7 @@ JAVA_TESTS = org.rocksdb.test.BackupableDBOptionsTest\
org.rocksdb.test.SizeUnitTest\
org.rocksdb.test.SliceTest\
org.rocksdb.test.SnapshotTest\
org.rocksdb.test.TtlDBTest\
org.rocksdb.test.StatisticsCollectorTest\
org.rocksdb.test.WriteBatchHandlerTest\
org.rocksdb.test.WriteBatchTest\
......
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
package org.rocksdb;
import java.util.List;
/**
* Database with TTL support.
*
* <p><strong>Use case</strong></p>
* <p>This API should be used to open the db when key-values inserted are
* meant to be removed from the db in a non-strict 'ttl' amount of time
* Therefore, this guarantees that key-values inserted will remain in the
* db for &gt;= ttl amount of time and the db will make efforts to remove the
* key-values as soon as possible after ttl seconds of their insertion.
* </p>
*
* <p><strong>Behaviour</strong></p>
* <p>TTL is accepted in seconds
* (int32_t)Timestamp(creation) is suffixed to values in Put internally
* Expired TTL values deleted in compaction only:(Timestamp+ttl&lt;time_now)
* Get/Iterator may return expired entries(compaction not run on them yet)
* Different TTL may be used during different Opens
* </p>
*
* <p><strong>Example</strong></p>
* <ul>
* <li>Open1 at t=0 with ttl=4 and insert k1,k2, close at t=2</li>
* <li>Open2 at t=3 with ttl=5. Now k1,k2 should be deleted at t&gt;=5</li>
* </ul>
*
* <p>
* read_only=true opens in the usual read-only mode. Compactions will not be
* triggered(neither manual nor automatic), so no expired entries removed
* </p>
*
* <p><strong>Constraints</strong></p>
* <p>Not specifying/passing or non-positive TTL behaves
* like TTL = infinity</p>
*
* <p><strong>!!!WARNING!!!</strong></p>
* <p>Calling DB::Open directly to re-open a db created by this API will get
* corrupt values(timestamp suffixed) and no ttl effect will be there
* during the second Open, so use this API consistently to open the db
* Be careful when passing ttl with a small positive value because the
* whole database may be deleted in a small amount of time.</p>
*/
public class TtlDB extends RocksDB {
/**
* <p>Opens a TtlDB.</p>
*
* <p>Database is opened in read-write mode without default TTL.</p>
*
* @param options {@link org.rocksdb.Options} instance.
* @param db_path path to database.
*
* @return TtlDB instance.
*
* @throws RocksDBException thrown if an error occurs within the native
* part of the library.
*/
public static TtlDB open(Options options, String db_path)
throws RocksDBException {
return open(options, db_path, 0, false);
}
/**
* <p>Opens a TtlDB.</p>
*
* @param options {@link org.rocksdb.Options} instance.
* @param db_path path to database.
* @param ttl time to live for new entries.
* @param readOnly boolean value indicating if database if db is
* opened read-only.
*
* @return TtlDB instance.
*
* @throws RocksDBException thrown if an error occurs within the native
* part of the library.
*/
public static TtlDB open(Options options, String db_path, int ttl,
boolean readOnly) throws RocksDBException {
TtlDB ttldb = new TtlDB();
ttldb.open(options.nativeHandle_, db_path, ttl, readOnly);
return ttldb;
}
/**
* <p>Opens a TtlDB.</p>
*
* @param options {@link org.rocksdb.Options} instance.
* @param db_path path to database.
* @param columnFamilyDescriptors list of column family descriptors
* @param columnFamilyHandles will be filled with ColumnFamilyHandle instances
* on open.
* @param ttlValues time to live values per column family handle
* @param readOnly boolean value indicating if database if db is
* opened read-only.
*
* @return TtlDB instance.
*
* @throws RocksDBException thrown if an error occurs within the native
* part of the library.
* @throws java.lang.IllegalArgumentException when there is not a ttl value
* per given column family handle.
*/
public static TtlDB open(DBOptions options, String db_path,
List<ColumnFamilyDescriptor> columnFamilyDescriptors,
List<ColumnFamilyHandle> columnFamilyHandles,
List<Integer> ttlValues, boolean readOnly) throws RocksDBException {
if (columnFamilyDescriptors.size() != ttlValues.size()) {
throw new IllegalArgumentException("There must be a ttl value per column" +
"family handle.");
}
TtlDB ttlDB = new TtlDB();
List<Long> cfReferences = ttlDB.openCF(options.nativeHandle_, db_path,
columnFamilyDescriptors, columnFamilyDescriptors.size(),
ttlValues, readOnly);
for (int i=0; i<columnFamilyDescriptors.size(); i++) {
columnFamilyHandles.add(new ColumnFamilyHandle(ttlDB, cfReferences.get(i)));
}
return ttlDB;
}
/**
* <p>Creates a new ttl based column family with a name defined
* in given ColumnFamilyDescriptor and allocates a
* ColumnFamilyHandle within an internal structure.</p>
*
* <p>The ColumnFamilyHandle is automatically disposed with DB
* disposal.</p>
*
* @param columnFamilyDescriptor column family to be created.
* @param ttl TTL to set for this column family.
*
* @return {@link org.rocksdb.ColumnFamilyHandle} instance.
*
* @throws RocksDBException thrown if error happens in underlying
* native library.
*/
public ColumnFamilyHandle createColumnFamilyWithTtl(
ColumnFamilyDescriptor columnFamilyDescriptor, int ttl)
throws RocksDBException {
assert(isInitialized());
return new ColumnFamilyHandle(this,
createColumnFamilyWithTtl(nativeHandle_,
columnFamilyDescriptor, ttl));
}
/**
* <p>Close the TtlDB instance and release resource.</p>
*
* <p>Internally, TtlDB owns the {@code rocksdb::DB} pointer
* to its associated {@link org.rocksdb.RocksDB}. The release
* of that RocksDB pointer is handled in the destructor of the
* c++ {@code rocksdb::TtlDB} and should be transparent to
* Java developers.</p>
*/
@Override public synchronized void close() {
if (isInitialized()) {
super.close();
}
}
/**
* <p>A protected constructor that will be used in the static
* factory method
* {@link #open(Options, String, int, boolean)}
* and
* {@link #open(DBOptions, String, java.util.List, java.util.List,
* java.util.List, boolean)}.
* </p>
*/
protected TtlDB() {
super();
}
@Override protected void finalize() throws Throwable {
close();
super.finalize();
}
private native void open(long optionsHandle, String db_path, int ttl,
boolean readOnly) throws RocksDBException;
private native List<Long> openCF(long optionsHandle, String db_path,
List<ColumnFamilyDescriptor> columnFamilyDescriptors,
int columnFamilyDescriptorsLength, List<Integer> ttlValues,
boolean readOnly) throws RocksDBException;
private native long createColumnFamilyWithTtl(long handle,
ColumnFamilyDescriptor columnFamilyDescriptor, int ttl)
throws RocksDBException;
}
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
package org.rocksdb.test;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.rocksdb.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
public class TtlDBTest {
@ClassRule
public static final RocksMemoryResource rocksMemoryResource =
new RocksMemoryResource();
@Rule
public TemporaryFolder dbFolder = new TemporaryFolder();
@Test
public void ttlDBOpen() throws RocksDBException,
InterruptedException {
Options options = null;
TtlDB ttlDB = null;
try {
options = new Options().
setCreateIfMissing(true).
setMaxGrandparentOverlapFactor(0).
setMaxMemCompactionLevel(0);
ttlDB = TtlDB.open(options,
dbFolder.getRoot().getAbsolutePath());
ttlDB.put("key".getBytes(), "value".getBytes());
assertThat(ttlDB.get("key".getBytes())).
isEqualTo("value".getBytes());
assertThat(ttlDB.get("key".getBytes())).isNotNull();
} finally {
if (ttlDB != null) {
ttlDB.close();
}
if (options != null) {
options.dispose();
}
}
}
@Test
public void ttlDBOpenWithTtl() throws RocksDBException,
InterruptedException {
Options options = null;
TtlDB ttlDB = null;
try {
options = new Options().
setCreateIfMissing(true).
setMaxGrandparentOverlapFactor(0).
setMaxMemCompactionLevel(0);
ttlDB = TtlDB.open(options, dbFolder.getRoot().getAbsolutePath(),
1, false);
ttlDB.put("key".getBytes(), "value".getBytes());
assertThat(ttlDB.get("key".getBytes())).
isEqualTo("value".getBytes());
TimeUnit.SECONDS.sleep(2);
ttlDB.compactRange();
assertThat(ttlDB.get("key".getBytes())).isNull();
} finally {
if (ttlDB != null) {
ttlDB.close();
}
if (options != null) {
options.dispose();
}
}
}
@Test
public void ttlDbOpenWithColumnFamilies() throws RocksDBException, InterruptedException {
DBOptions dbOptions = null;
TtlDB ttlDB = null;
List<ColumnFamilyDescriptor> cfNames =
new ArrayList<>();
List<ColumnFamilyHandle> columnFamilyHandleList =
new ArrayList<>();
cfNames.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY));
cfNames.add(new ColumnFamilyDescriptor("new_cf".getBytes()));
List<Integer> ttlValues = new ArrayList<>();
// Default column family with infinite lifetime
ttlValues.add(0);
// new column family with 1 second ttl
ttlValues.add(1);
try {
dbOptions = new DBOptions().
setCreateMissingColumnFamilies(true).
setCreateIfMissing(true);
ttlDB = TtlDB.open(dbOptions, dbFolder.getRoot().getAbsolutePath(),
cfNames, columnFamilyHandleList, ttlValues, false);
ttlDB.put("key".getBytes(), "value".getBytes());
assertThat(ttlDB.get("key".getBytes())).
isEqualTo("value".getBytes());
ttlDB.put(columnFamilyHandleList.get(1), "key".getBytes(),
"value".getBytes());
assertThat(ttlDB.get(columnFamilyHandleList.get(1),
"key".getBytes())).isEqualTo("value".getBytes());
TimeUnit.SECONDS.sleep(2);
ttlDB.compactRange();
ttlDB.compactRange(columnFamilyHandleList.get(1));
assertThat(ttlDB.get("key".getBytes())).isNotNull();
assertThat(ttlDB.get(columnFamilyHandleList.get(1),
"key".getBytes())).isNull();
} finally {
for (ColumnFamilyHandle columnFamilyHandle :
columnFamilyHandleList) {
columnFamilyHandle.dispose();
}
if (ttlDB != null) {
ttlDB.close();
}
if (dbOptions != null) {
dbOptions.dispose();
}
}
}
@Test
public void createTtlColumnFamily() throws RocksDBException,
InterruptedException {
Options options = null;
TtlDB ttlDB = null;
ColumnFamilyHandle columnFamilyHandle = null;
try {
options = new Options().setCreateIfMissing(true);
ttlDB = TtlDB.open(options,
dbFolder.getRoot().getAbsolutePath());
columnFamilyHandle = ttlDB.createColumnFamilyWithTtl(
new ColumnFamilyDescriptor("new_cf".getBytes()), 1);
ttlDB.put(columnFamilyHandle, "key".getBytes(),
"value".getBytes());
assertThat(ttlDB.get(columnFamilyHandle, "key".getBytes())).
isEqualTo("value".getBytes());
TimeUnit.SECONDS.sleep(2);
ttlDB.compactRange(columnFamilyHandle);
assertThat(ttlDB.get(columnFamilyHandle, "key".getBytes())).isNull();
} finally {
if (columnFamilyHandle != null) {
columnFamilyHandle.dispose();
}
if (ttlDB != null) {
ttlDB.close();
}
if (options != null) {
options.dispose();
}
}
}
}
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// This file implements the "bridge" between Java and C++ and enables
// calling c++ rocksdb::TtlDB methods.
// from Java side.
#include <jni.h>
#include <stdio.h>
#include <stdlib.h>
#include <string>
#include <vector>
#include "include/org_rocksdb_TtlDB.h"
#include "rocksdb/utilities/db_ttl.h"
#include "rocksjni/portal.h"
/*
* Class: org_rocksdb_TtlDB
* Method: open
* Signature: (JLjava/lang/String;IZ)V
*/
void Java_org_rocksdb_TtlDB_open(JNIEnv* env,
jobject jttldb, jlong joptions_handle, jstring jdb_path,
jint jttl, jboolean jread_only) {
auto* opt = reinterpret_cast<rocksdb::Options*>(joptions_handle);
rocksdb::DBWithTTL* db = nullptr;
const char* db_path = env->GetStringUTFChars(jdb_path, 0);
rocksdb::Status s = rocksdb::DBWithTTL::Open(*opt, db_path, &db,
jttl, jread_only);
env->ReleaseStringUTFChars(jdb_path, db_path);
// as TTLDB extends RocksDB on the java side, we can reuse
// the RocksDB portal here.
if (s.ok()) {
rocksdb::RocksDBJni::setHandle(env, jttldb, db);
return;
}
rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
}
/*
* Class: org_rocksdb_TtlDB
* Method: openCF
* Signature: (JLjava/lang/String;Ljava/util/List;
* ILjava/util/List;Z)Ljava/util/List;
*/
jobject
Java_org_rocksdb_TtlDB_openCF(
JNIEnv* env, jobject jdb, jlong jopt_handle, jstring jdb_path,
jobject jcfdesc_list, jint jcfdesc_count, jobject jttl_list,
jboolean jread_only) {
auto* opt = reinterpret_cast<rocksdb::Options*>(jopt_handle);
rocksdb::DBWithTTL* db = nullptr;
const char* db_path = env->GetStringUTFChars(jdb_path, 0);
std::vector<jbyte*> cfnames_to_free;
std::vector<jbyteArray> jcfnames_for_free;
std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
std::vector<int32_t> ttl_values;
std::vector<rocksdb::ColumnFamilyHandle* > handles;
// get iterator for ColumnFamilyDescriptors
jobject iteratorObj = env->CallObjectMethod(
jcfdesc_list, rocksdb::ListJni::getIteratorMethod(env));
// iterate over ColumnFamilyDescriptors
while (env->CallBooleanMethod(
iteratorObj, rocksdb::ListJni::getHasNextMethod(env)) == JNI_TRUE) {
// get ColumnFamilyDescriptor
jobject jcf_descriptor = env->CallObjectMethod(iteratorObj,
rocksdb::ListJni::getNextMethod(env));
// get ColumnFamilyName
jbyteArray byteArray = static_cast<jbyteArray>(env->CallObjectMethod(
jcf_descriptor,
rocksdb::ColumnFamilyDescriptorJni::getColumnFamilyNameMethod(
env)));
// get CF Options
jobject jcf_opt_obj = env->CallObjectMethod(jcf_descriptor,
rocksdb::ColumnFamilyDescriptorJni::getColumnFamilyOptionsMethod(
env));
rocksdb::ColumnFamilyOptions* cfOptions =
rocksdb::ColumnFamilyOptionsJni::getHandle(env, jcf_opt_obj);
jbyte* cfname = env->GetByteArrayElements(byteArray, 0);
// free allocated cfnames after call to open
cfnames_to_free.push_back(cfname);
jcfnames_for_free.push_back(byteArray);
column_families.push_back(rocksdb::ColumnFamilyDescriptor(
reinterpret_cast<const char *>(cfname),
*cfOptions));
}
// get iterator for TTL values
iteratorObj = env->CallObjectMethod(
jttl_list, rocksdb::ListJni::getIteratorMethod(env));
// iterate over TTL values
while (env->CallBooleanMethod(
iteratorObj, rocksdb::ListJni::getHasNextMethod(env)) == JNI_TRUE) {
// get TTL object
jobject jttl_object = env->CallObjectMethod(iteratorObj,
rocksdb::ListJni::getNextMethod(env));
// get Integer value
jclass jIntClazz = env->FindClass("java/lang/Integer");
jmethodID getVal = env->GetMethodID(jIntClazz, "intValue", "()I");
ttl_values.push_back(env->CallIntMethod(jttl_object, getVal));
}
rocksdb::Status s = rocksdb::DBWithTTL::Open(*opt, db_path, column_families,
&handles, &db, ttl_values, jread_only);
env->ReleaseStringUTFChars(jdb_path, db_path);
// free jbyte allocations
for (std::vector<jbyte*>::size_type i = 0;
i != cfnames_to_free.size(); i++) {
// free cfnames
env->ReleaseByteArrayElements(jcfnames_for_free[i], cfnames_to_free[i], 0);
}
// check if open operation was successful
if (s.ok()) {
rocksdb::RocksDBJni::setHandle(env, jdb, db);
jclass jListClazz = env->FindClass("java/util/ArrayList");
jmethodID midList = rocksdb::ListJni::getArrayListConstructorMethodId(
env, jListClazz);
jobject jcfhandle_list = env->NewObject(jListClazz,
midList, handles.size());
// insert in java list
for (std::vector<rocksdb::ColumnFamilyHandle*>::size_type i = 0;
i != handles.size(); i++) {
// jlong must be converted to Long due to collections restrictions
jclass jLongClazz = env->FindClass("java/lang/Long");
jmethodID midLong = env->GetMethodID(jLongClazz, "<init>", "(J)V");
jobject obj = env->NewObject(jLongClazz, midLong,
reinterpret_cast<jlong>(handles[i]));
env->CallBooleanMethod(jcfhandle_list,
rocksdb::ListJni::getListAddMethodId(env), obj);
}
return jcfhandle_list;
}
rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
return nullptr;
}
/*
* Class: org_rocksdb_TtlDB
* Method: createColumnFamilyWithTtl
* Signature: (JLorg/rocksdb/ColumnFamilyDescriptor;I)J;
*/
jlong Java_org_rocksdb_TtlDB_createColumnFamilyWithTtl(
JNIEnv* env, jobject jobj, jlong jdb_handle,
jobject jcf_descriptor, jint jttl) {
rocksdb::ColumnFamilyHandle* handle;
auto* db_handle = reinterpret_cast<rocksdb::DBWithTTL*>(jdb_handle);
// get ColumnFamilyName
jbyteArray byteArray = static_cast<jbyteArray>(env->CallObjectMethod(
jcf_descriptor,
rocksdb::ColumnFamilyDescriptorJni::getColumnFamilyNameMethod(
env)));
// get CF Options
jobject jcf_opt_obj = env->CallObjectMethod(jcf_descriptor,
rocksdb::ColumnFamilyDescriptorJni::getColumnFamilyOptionsMethod(
env));
rocksdb::ColumnFamilyOptions* cfOptions =
rocksdb::ColumnFamilyOptionsJni::getHandle(env, jcf_opt_obj);
jbyte* cfname = env->GetByteArrayElements(byteArray, 0);
rocksdb::Status s = db_handle->CreateColumnFamilyWithTtl(
*cfOptions, reinterpret_cast<char *>(cfname), &handle, jttl);
env->ReleaseByteArrayElements(byteArray, cfname, 0);
if (s.ok()) {
return reinterpret_cast<jlong>(handle);
}
rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册