提交 fa1b62ca 编写于 作者: A Ankit Gupta

Merge remote-tracking branch 'upstream/master'

......@@ -25,11 +25,14 @@ public class BackupableDB extends RocksDB {
public static BackupableDB open(
Options opt, BackupableDBOptions bopt, String db_path)
throws RocksDBException {
// since BackupableDB c++ will handle the life cycle of
// the returned RocksDB of RocksDB.open(), here we store
// it as a BackupableDB member variable to avoid GC.
BackupableDB bdb = new BackupableDB(RocksDB.open(opt, db_path));
bdb.open(bdb.db_.nativeHandle_, bopt.nativeHandle_);
RocksDB db = RocksDB.open(opt, db_path);
BackupableDB bdb = new BackupableDB();
bdb.open(db.nativeHandle_, bopt.nativeHandle_);
// Prevent the RocksDB object from attempting to delete
// the underly C++ DB object.
db.disOwnNativeHandle();
return bdb;
}
......@@ -64,9 +67,8 @@ public class BackupableDB extends RocksDB {
* A protected construction that will be used in the static factory
* method BackupableDB.open().
*/
protected BackupableDB(RocksDB db) {
protected BackupableDB() {
super();
db_ = db;
}
@Override protected void finalize() {
......@@ -75,6 +77,4 @@ public class BackupableDB extends RocksDB {
protected native void open(long rocksDBHandle, long backupDBOptionsHandle);
protected native void createNewBackup(long handle, boolean flag);
private final RocksDB db_;
}
......@@ -59,15 +59,14 @@ public class BackupableDBOptions extends RocksObject {
* Release the memory allocated for the current instance
* in the c++ side.
*/
@Override public synchronized void dispose() {
if (isInitialized()) {
dispose(nativeHandle_);
}
@Override protected void disposeInternal() {
assert(isInitialized());
disposeInternal(nativeHandle_);
}
private native void newBackupableDBOptions(String path,
boolean shareTableFiles, boolean sync, boolean destroyOldData,
boolean backupLogFiles, long backupRateLimit, long restoreRateLimit);
private native String backupDir(long handle);
private native void dispose(long handle);
private native void disposeInternal(long handle);
}
......@@ -22,11 +22,10 @@ public abstract class Filter extends RocksObject {
* RocksDB instances referencing the filter are closed.
* Otherwise an undefined behavior will occur.
*/
@Override public synchronized void dispose() {
if (isInitialized()) {
dispose0(nativeHandle_);
}
@Override protected void disposeInternal() {
assert(isInitialized());
disposeInternal(nativeHandle_);
}
private native void dispose0(long handle);
private native void disposeInternal(long handle);
}
......@@ -2311,10 +2311,9 @@ public class Options extends RocksObject {
* Release the memory allocated for the current instance
* in the c++ side.
*/
@Override public synchronized void dispose() {
if (isInitialized()) {
dispose0();
}
@Override protected void disposeInternal() {
assert(isInitialized());
disposeInternal(nativeHandle_);
}
static final int DEFAULT_PLAIN_TABLE_BLOOM_BITS_PER_KEY = 10;
......@@ -2322,7 +2321,7 @@ public class Options extends RocksObject {
static final int DEFAULT_PLAIN_TABLE_INDEX_SPARSENESS = 16;
private native void newOptions();
private native void dispose0();
private native void disposeInternal(long handle);
private native void setCreateIfMissing(long handle, boolean flag);
private native boolean createIfMissing(long handle);
private native void setWriteBufferSize(long handle, long writeBufferSize);
......
......@@ -18,19 +18,6 @@ public class ReadOptions extends RocksObject {
}
private native void newReadOptions();
/**
* Release the memory allocated for the current instance
* in the c++ side.
*
* Calling other methods after dispose() leads to undefined behavior.
*/
@Override public synchronized void dispose() {
if (isInitialized()) {
dispose(nativeHandle_);
}
}
private native void dispose(long handle);
/**
* If true, all data read from underlying storage will be
* verified against corresponding checksums.
......@@ -127,4 +114,12 @@ public class ReadOptions extends RocksObject {
}
private native void setTailing(
long handle, boolean tailing);
@Override protected void disposeInternal() {
assert(isInitialized());
disposeInternal(nativeHandle_);
}
private native void disposeInternal(long handle);
}
......@@ -114,11 +114,9 @@ public class RocksDB extends RocksObject {
return db;
}
@Override public synchronized void dispose() {
if (isInitialized()) {
dispose(nativeHandle_);
nativeHandle_ = 0;
}
@Override protected void disposeInternal() {
assert(isInitialized());
disposeInternal(nativeHandle_);
}
/**
......@@ -315,10 +313,6 @@ public class RocksDB extends RocksObject {
return new RocksIterator(iterator0(nativeHandle_));
}
@Override protected void finalize() {
close();
}
/**
* Private constructor.
*/
......@@ -370,7 +364,7 @@ public class RocksDB extends RocksObject {
long handle, long writeOptHandle,
byte[] key, int keyLen) throws RocksDBException;
protected native long iterator0(long optHandle);
protected native void dispose(long handle);
private native void disposeInternal(long handle);
protected Filter filter_;
}
......@@ -118,15 +118,13 @@ public class RocksIterator extends RocksObject {
/**
* Deletes underlying C++ iterator pointer.
*/
@Override public synchronized void dispose() {
if(isInitialized()) {
dispose(nativeHandle_);
nativeHandle_ = 0;
}
@Override protected void disposeInternal() {
assert(isInitialized());
disposeInternal(nativeHandle_);
}
private native boolean isValid0(long handle);
private native void dispose(long handle);
private native void disposeInternal(long handle);
private native void seekToFirst0(long handle);
private native void seekToLast0(long handle);
private native void next0(long handle);
......
......@@ -16,12 +16,48 @@ package org.rocksdb;
public abstract class RocksObject {
protected RocksObject() {
nativeHandle_ = 0;
owningHandle_ = true;
}
/**
* Release the c++ object pointed by the native handle.
*
* Note that once an instance of RocksObject has been disposed,
* calling its function will lead undefined behavior.
*/
public abstract void dispose();
public final synchronized void dispose() {
if (isOwningNativeHandle() && isInitialized()) {
disposeInternal();
}
nativeHandle_ = 0;
disOwnNativeHandle();
}
/**
* The helper function of dispose() which all subclasses of RocksObject
* must implement to release their associated C++ resource.
*/
protected abstract void disposeInternal();
/**
* Revoke ownership of the native object.
*
* This will prevent the object from attempting to delete the underlying
* native object in its finalizer. This must be used when another object
* takes over ownership of the native object or both will attempt to delete
* the underlying object when garbage collected.
*
* When disOwnNativeHandle is called, dispose() will simply set nativeHandle_
* to 0 without releasing its associated C++ resource. As a result,
* incorrectly use this function may cause memory leak.
*/
protected void disOwnNativeHandle() {
owningHandle_ = false;
}
protected boolean isOwningNativeHandle() {
return owningHandle_;
}
protected boolean isInitialized() {
return (nativeHandle_ != 0);
......@@ -32,4 +68,5 @@ public abstract class RocksObject {
}
protected long nativeHandle_;
private boolean owningHandle_;
}
......@@ -86,10 +86,9 @@ public class WriteBatch extends RocksObject {
/**
* Delete the c++ side pointer.
*/
@Override public synchronized void dispose() {
if (isInitialized()) {
dispose0();
}
@Override protected void disposeInternal() {
assert(isInitialized());
disposeInternal(nativeHandle_);
}
private native void newWriteBatch(int reserved_bytes);
......@@ -99,7 +98,7 @@ public class WriteBatch extends RocksObject {
byte[] value, int valueLen);
private native void remove(byte[] key, int keyLen);
private native void putLogData(byte[] blob, int blobLen);
private native void dispose0();
private native void disposeInternal(long handle);
}
/**
......
......@@ -17,10 +17,9 @@ public class WriteOptions extends RocksObject {
newWriteOptions();
}
@Override public synchronized void dispose() {
if (isInitialized()) {
dispose0(nativeHandle_);
}
@Override protected void disposeInternal() {
assert(isInitialized());
disposeInternal(nativeHandle_);
}
/**
......@@ -96,5 +95,5 @@ public class WriteOptions extends RocksObject {
private native boolean sync(long handle);
private native void setDisableWAL(long handle, boolean flag);
private native boolean disableWAL(long handle);
private native void dispose0(long handle);
private native void disposeInternal(long handle);
}
......@@ -534,8 +534,6 @@ public class DbBenchmark {
(Long)flags_.get(Flag.block_size));
options.setMaxOpenFiles(
(Integer)flags_.get(Flag.open_files));
options.setCreateIfMissing(
!(Boolean)flags_.get(Flag.use_existing_db));
options.setTableCacheRemoveScanCountLimit(
(Integer)flags_.get(Flag.cache_remove_scan_count_limit));
options.setDisableDataSync(
......@@ -939,7 +937,7 @@ public class DbBenchmark {
"\tflag and also specify a benchmark that wants a fresh database,\n" +
"\tthat benchmark will fail.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
return parseBoolean(value);
}
},
num(1000000,
......@@ -1028,7 +1026,7 @@ public class DbBenchmark {
use_plain_table(false,
"Use plain-table sst format.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
return parseBoolean(value);
}
},
cache_size(-1L,
......@@ -1085,7 +1083,7 @@ public class DbBenchmark {
},
histogram(false,"Print histogram of operation timings.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
return parseBoolean(value);
}
},
min_write_buffer_number_to_merge(
......@@ -1203,12 +1201,12 @@ public class DbBenchmark {
verify_checksum(false,"Verify checksum for every block read\n" +
"\tfrom storage.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
return parseBoolean(value);
}
},
statistics(false,"Database statistics.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
return parseBoolean(value);
}
},
writes(-1,"Number of write operations to do. If negative, do\n" +
......@@ -1219,23 +1217,23 @@ public class DbBenchmark {
},
sync(false,"Sync all writes to disk.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
return parseBoolean(value);
}
},
disable_data_sync(false,"If true, do not wait until data is\n" +
"\tsynced to disk.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
return parseBoolean(value);
}
},
use_fsync(false,"If true, issue fsync instead of fdatasync.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
return parseBoolean(value);
}
},
disable_wal(false,"If true, do not write WAL for write.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
return parseBoolean(value);
}
},
wal_dir("", "If not empty, use the given dir for WAL.") {
......@@ -1312,7 +1310,7 @@ public class DbBenchmark {
disable_seek_compaction(false,"Option to disable compaction\n" +
"\ttriggered by read.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
return parseBoolean(value);
}
},
delete_obsolete_files_period_micros(0,"Option to delete\n" +
......@@ -1393,12 +1391,12 @@ public class DbBenchmark {
},
readonly(false,"Run read only benchmarks.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
return parseBoolean(value);
}
},
disable_auto_compactions(false,"Do not auto trigger compactions.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
return parseBoolean(value);
}
},
source_compaction_factor(1,"Cap the size of data in level-K for\n" +
......@@ -1423,26 +1421,26 @@ public class DbBenchmark {
bufferedio(rocksdb::EnvOptions().use_os_buffer,
"Allow buffered io using OS buffers.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
return parseBoolean(value);
}
},
*/
mmap_read(false,
"Allow reads to occur via mmap-ing files.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
return parseBoolean(value);
}
},
mmap_write(false,
"Allow writes to occur via mmap-ing files.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
return parseBoolean(value);
}
},
advise_random_on_open(defaultOptions_.adviseRandomOnOpen(),
"Advise random access on table file open.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
return parseBoolean(value);
}
},
compaction_fadvice("NORMAL",
......@@ -1454,13 +1452,13 @@ public class DbBenchmark {
use_tailing_iterator(false,
"Use tailing iterator to access a series of keys instead of get.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
return parseBoolean(value);
}
},
use_adaptive_mutex(defaultOptions_.useAdaptiveMutex(),
"Use adaptive mutex.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
return parseBoolean(value);
}
},
bytes_per_sync(defaultOptions_.bytesPerSync(),
......@@ -1474,7 +1472,7 @@ public class DbBenchmark {
filter_deletes(false," On true, deletes use bloom-filter and drop\n" +
"\tthe delete if key not present.") {
@Override public Object parseValue(String value) {
return Boolean.parseBoolean(value);
return parseBoolean(value);
}
},
max_successive_merges(0,"Maximum number of successive merge\n" +
......@@ -1495,8 +1493,6 @@ public class DbBenchmark {
desc_ = desc;
}
protected abstract Object parseValue(String value);
public Object getDefaultValue() {
return defaultValue_;
}
......@@ -1505,6 +1501,17 @@ public class DbBenchmark {
return desc_;
}
public boolean parseBoolean(String value) {
if (value.equals("1")) {
return true;
} else if (value.equals("0")) {
return false;
}
return Boolean.parseBoolean(value);
}
protected abstract Object parseValue(String value);
private final Object defaultValue_;
private final String desc_;
}
......
......@@ -82,10 +82,10 @@ jstring Java_org_rocksdb_BackupableDBOptions_backupDir(
/*
* Class: org_rocksdb_BackupableDBOptions
* Method: dispose
* Method: disposeInternal
* Signature: (J)V
*/
void Java_org_rocksdb_BackupableDBOptions_dispose(
void Java_org_rocksdb_BackupableDBOptions_disposeInternal(
JNIEnv* env, jobject jopt, jlong jhandle) {
auto bopt = reinterpret_cast<rocksdb::BackupableDBOptions*>(jhandle);
assert(bopt);
......
......@@ -29,13 +29,10 @@ void Java_org_rocksdb_BloomFilter_createNewFilter0(
/*
* Class: org_rocksdb_Filter
* Method: dispose0
* Method: disposeInternal
* Signature: (J)V
*/
void Java_org_rocksdb_Filter_dispose0(
void Java_org_rocksdb_Filter_disposeInternal(
JNIEnv* env, jobject jobj, jlong handle) {
auto fp = reinterpret_cast<rocksdb::FilterPolicy*>(handle);
delete fp;
rocksdb::FilterJni::setHandle(env, jobj, nullptr);
delete reinterpret_cast<rocksdb::FilterPolicy*>(handle);
}
......@@ -135,10 +135,10 @@ void Java_org_rocksdb_RocksIterator_status0(
/*
* Class: org_rocksdb_RocksIterator
* Method: dispose
* Method: disposeInternal
* Signature: (J)V
*/
void Java_org_rocksdb_RocksIterator_dispose(
void Java_org_rocksdb_RocksIterator_disposeInternal(
JNIEnv* env, jobject jobj, jlong handle) {
auto it = reinterpret_cast<rocksdb::Iterator*>(handle);
delete it;
......
......@@ -35,14 +35,12 @@ void Java_org_rocksdb_Options_newOptions(JNIEnv* env, jobject jobj) {
/*
* Class: org_rocksdb_Options
* Method: dispose0
* Signature: ()V
* Method: disposeInternal
* Signature: (J)V
*/
void Java_org_rocksdb_Options_dispose0(JNIEnv* env, jobject jobj) {
rocksdb::Options* op = rocksdb::OptionsJni::getHandle(env, jobj);
delete op;
rocksdb::OptionsJni::setHandle(env, jobj, nullptr);
void Java_org_rocksdb_Options_disposeInternal(
JNIEnv* env, jobject jobj, jlong handle) {
delete reinterpret_cast<rocksdb::Options*>(handle);
}
/*
......@@ -1665,10 +1663,10 @@ void Java_org_rocksdb_WriteOptions_newWriteOptions(
/*
* Class: org_rocksdb_WriteOptions
* Method: dispose0
* Method: disposeInternal
* Signature: ()V
*/
void Java_org_rocksdb_WriteOptions_dispose0(
void Java_org_rocksdb_WriteOptions_disposeInternal(
JNIEnv* env, jobject jwrite_options, jlong jhandle) {
auto write_options = reinterpret_cast<rocksdb::WriteOptions*>(jhandle);
delete write_options;
......@@ -1732,10 +1730,10 @@ void Java_org_rocksdb_ReadOptions_newReadOptions(
/*
* Class: org_rocksdb_ReadOptions
* Method: dispose
* Method: disposeInternal
* Signature: (J)V
*/
void Java_org_rocksdb_ReadOptions_dispose(
void Java_org_rocksdb_ReadOptions_disposeInternal(
JNIEnv* env, jobject jobj, jlong jhandle) {
delete reinterpret_cast<rocksdb::ReadOptions*>(jhandle);
rocksdb::ReadOptionsJni::setHandle(env, jobj, nullptr);
......
......@@ -419,14 +419,12 @@ void Java_org_rocksdb_RocksDB_remove__JJ_3BI(
/*
* Class: org_rocksdb_RocksDB
* Method: dispose
* Method: disposeInternal
* Signature: (J)V
*/
void Java_org_rocksdb_RocksDB_dispose(
void Java_org_rocksdb_RocksDB_disposeInternal(
JNIEnv* env, jobject java_db, jlong jhandle) {
auto db = reinterpret_cast<rocksdb::DB*>(jhandle);
assert(db != nullptr);
delete db;
delete reinterpret_cast<rocksdb::DB*>(jhandle);
}
/*
......
......@@ -134,15 +134,12 @@ void Java_org_rocksdb_WriteBatch_putLogData(
/*
* Class: org_rocksdb_WriteBatch
* Method: dispose0
* Signature: ()V
* Method: disposeInternal
* Signature: (J)V
*/
void Java_org_rocksdb_WriteBatch_dispose0(JNIEnv* env, jobject jobj) {
rocksdb::WriteBatch* wb = rocksdb::WriteBatchJni::getHandle(env, jobj);
assert(wb != nullptr);
delete wb;
rocksdb::WriteBatchJni::setHandle(env, jobj, nullptr);
void Java_org_rocksdb_WriteBatch_disposeInternal(
JNIEnv* env, jobject jobj, jlong handle) {
delete reinterpret_cast<rocksdb::WriteBatch*>(handle);
}
/*
......
......@@ -94,17 +94,6 @@ static Status IOError(const std::string& context, int err_number) {
return Status::IOError(context, strerror(err_number));
}
// TODO(sdong): temp logging. Need to help debugging. Remove it when
// the feature is proved to be stable.
inline void PrintThreadInfo(size_t thread_id, pthread_t id) {
unsigned char* ptc = (unsigned char*)(void*)(&id);
fprintf(stdout, "Bg thread %zu terminates 0x", thread_id);
for (size_t i = 0; i < sizeof(id); i++) {
fprintf(stdout, "%02x", (unsigned)(ptc[i]));
}
fprintf(stdout, "\n");
}
#ifdef NDEBUG
// empty in release build
#define TEST_KILL_RANDOM(rocksdb_kill_odds)
......@@ -1293,13 +1282,17 @@ class PosixEnv : public Env {
return Status::OK();
}
static uint64_t gettid() {
pthread_t tid = pthread_self();
static uint64_t gettid(pthread_t tid) {
uint64_t thread_id = 0;
memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
return thread_id;
}
static uint64_t gettid() {
pthread_t tid = pthread_self();
return gettid(tid);
}
virtual Status NewLogger(const std::string& fname,
shared_ptr<Logger>* result) {
FILE* f = fopen(fname.c_str(), "w");
......@@ -1525,7 +1518,8 @@ class PosixEnv : public Env {
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
// TODO(sdong): temp logging. Need to help debugging. Remove it when
// the feature is proved to be stable.
PrintThreadInfo(thread_id, terminating_thread);
fprintf(stdout, "Bg thread %zu terminates %llx\n", thread_id,
static_cast<long long unsigned int>(gettid()));
break;
}
void (*function)(void*) = queue_.front().function;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册