From d367555027a1d5858bb508eacb6662bfcc7ce9bd Mon Sep 17 00:00:00 2001 From: Adam Retter Date: Tue, 14 Jun 2016 16:52:25 +0100 Subject: [PATCH] Added further Java API options for controlling concurrent writes --- java/rocksjni/options.cc | 176 ++++++++++++++++++ java/src/main/java/org/rocksdb/DBOptions.java | 56 ++++++ .../java/org/rocksdb/DBOptionsInterface.java | 104 +++++++++++ java/src/main/java/org/rocksdb/Options.java | 56 ++++++ .../test/java/org/rocksdb/DBOptionsTest.java | 36 ++++ .../test/java/org/rocksdb/OptionsTest.java | 36 ++++ 6 files changed, 464 insertions(+) diff --git a/java/rocksjni/options.cc b/java/rocksjni/options.cc index 76dc1d145..ba66127a7 100644 --- a/java/rocksjni/options.cc +++ b/java/rocksjni/options.cc @@ -1034,6 +1034,94 @@ void Java_org_rocksdb_Options_setBytesPerSync( static_cast(bytes_per_sync); } +/* + * Class: org_rocksdb_Options + * Method: setAllowConcurrentMemtableWrite + * Signature: (JZ)V + */ +void Java_org_rocksdb_Options_setAllowConcurrentMemtableWrite( + JNIEnv* env, jobject jobj, jlong jhandle, jboolean allow) { + reinterpret_cast(jhandle)-> + allow_concurrent_memtable_write = static_cast(allow); +} + +/* + * Class: org_rocksdb_Options + * Method: allowConcurrentMemtableWrite + * Signature: (J)Z + */ +jboolean Java_org_rocksdb_Options_allowConcurrentMemtableWrite( + JNIEnv* env, jobject jobj, jlong jhandle) { + return reinterpret_cast(jhandle)-> + allow_concurrent_memtable_write; +} + +/* + * Class: org_rocksdb_Options + * Method: setEnableWriteThreadAdaptiveYield + * Signature: (JZ)V + */ +void Java_org_rocksdb_Options_setEnableWriteThreadAdaptiveYield( + JNIEnv* env, jobject jobj, jlong jhandle, jboolean yield) { + reinterpret_cast(jhandle)-> + enable_write_thread_adaptive_yield = static_cast(yield); +} + +/* + * Class: org_rocksdb_Options + * Method: enableWriteThreadAdaptiveYield + * Signature: (J)Z + */ +jboolean Java_org_rocksdb_Options_enableWriteThreadAdaptiveYield( + JNIEnv* env, jobject jobj, jlong jhandle) { + return reinterpret_cast(jhandle)-> + enable_write_thread_adaptive_yield; +} + +/* + * Class: org_rocksdb_Options + * Method: setWriteThreadMaxYieldUsec + * Signature: (JJ)V + */ +void Java_org_rocksdb_Options_setWriteThreadMaxYieldUsec( + JNIEnv* env, jobject jobject, jlong jhandle, jlong max) { + reinterpret_cast(jhandle)-> + write_thread_max_yield_usec = static_cast(max); +} + +/* + * Class: org_rocksdb_Options + * Method: writeThreadMaxYieldUsec + * Signature: (J)J + */ +jlong Java_org_rocksdb_Options_writeThreadMaxYieldUsec( + JNIEnv* env, jobject jobj, jlong jhandle) { + return reinterpret_cast(jhandle)-> + write_thread_max_yield_usec; +} + +/* + * Class: org_rocksdb_Options + * Method: setWriteThreadSlowYieldUsec + * Signature: (JJ)V + */ +void Java_org_rocksdb_Options_setWriteThreadSlowYieldUsec( + JNIEnv* env, jobject jobj, jlong jhandle, jlong slow) { + reinterpret_cast(jhandle)-> + write_thread_slow_yield_usec = static_cast(slow); +} + +/* + * Class: org_rocksdb_Options + * Method: writeThreadSlowYieldUsec + * Signature: (J)J + */ +jlong Java_org_rocksdb_Options_writeThreadSlowYieldUsec( + JNIEnv* env, jobject jobj, jlong jhandle) { + return reinterpret_cast(jhandle)-> + write_thread_slow_yield_usec; +} + /* * Method: tableFactoryName * Signature: (J)Ljava/lang/String @@ -4287,6 +4375,94 @@ jlong Java_org_rocksdb_DBOptions_bytesPerSync( return reinterpret_cast(jhandle)->bytes_per_sync; } +/* + * Class: org_rocksdb_DBOptions + * Method: setAllowConcurrentMemtableWrite + * Signature: (JZ)V + */ +void Java_org_rocksdb_DBOptions_setAllowConcurrentMemtableWrite( + JNIEnv* env, jobject jobj, jlong jhandle, jboolean allow) { + reinterpret_cast(jhandle)-> + allow_concurrent_memtable_write = static_cast(allow); +} + +/* + * Class: org_rocksdb_DBOptions + * Method: allowConcurrentMemtableWrite + * Signature: (J)Z + */ +jboolean Java_org_rocksdb_DBOptions_allowConcurrentMemtableWrite( + JNIEnv* env, jobject jobj, jlong jhandle) { + return reinterpret_cast(jhandle)-> + allow_concurrent_memtable_write; +} + +/* + * Class: org_rocksdb_DBOptions + * Method: setEnableWriteThreadAdaptiveYield + * Signature: (JZ)V + */ +void Java_org_rocksdb_DBOptions_setEnableWriteThreadAdaptiveYield( + JNIEnv* env, jobject jobj, jlong jhandle, jboolean yield) { + reinterpret_cast(jhandle)-> + enable_write_thread_adaptive_yield = static_cast(yield); +} + +/* + * Class: org_rocksdb_DBOptions + * Method: enableWriteThreadAdaptiveYield + * Signature: (J)Z + */ +jboolean Java_org_rocksdb_DBOptions_enableWriteThreadAdaptiveYield( + JNIEnv* env, jobject jobj, jlong jhandle) { + return reinterpret_cast(jhandle)-> + enable_write_thread_adaptive_yield; +} + +/* + * Class: org_rocksdb_DBOptions + * Method: setWriteThreadMaxYieldUsec + * Signature: (JJ)V + */ +void Java_org_rocksdb_DBOptions_setWriteThreadMaxYieldUsec( + JNIEnv* env, jobject jobject, jlong jhandle, jlong max) { + reinterpret_cast(jhandle)-> + write_thread_max_yield_usec = static_cast(max); +} + +/* + * Class: org_rocksdb_DBOptions + * Method: writeThreadMaxYieldUsec + * Signature: (J)J + */ +jlong Java_org_rocksdb_DBOptions_writeThreadMaxYieldUsec( + JNIEnv* env, jobject jobj, jlong jhandle) { + return reinterpret_cast(jhandle)-> + write_thread_max_yield_usec; +} + +/* + * Class: org_rocksdb_DBOptions + * Method: setWriteThreadSlowYieldUsec + * Signature: (JJ)V + */ +void Java_org_rocksdb_DBOptions_setWriteThreadSlowYieldUsec( + JNIEnv* env, jobject jobj, jlong jhandle, jlong slow) { + reinterpret_cast(jhandle)-> + write_thread_slow_yield_usec = static_cast(slow); +} + +/* + * Class: org_rocksdb_DBOptions + * Method: writeThreadSlowYieldUsec + * Signature: (J)J + */ +jlong Java_org_rocksdb_DBOptions_writeThreadSlowYieldUsec( + JNIEnv* env, jobject jobj, jlong jhandle) { + return reinterpret_cast(jhandle)-> + write_thread_slow_yield_usec; +} + ////////////////////////////////////////////////////////////////////////////// // rocksdb::WriteOptions diff --git a/java/src/main/java/org/rocksdb/DBOptions.java b/java/src/main/java/org/rocksdb/DBOptions.java index 967673d8b..9cc2d2bfd 100644 --- a/java/src/main/java/org/rocksdb/DBOptions.java +++ b/java/src/main/java/org/rocksdb/DBOptions.java @@ -558,6 +558,50 @@ public class DBOptions extends RocksObject implements DBOptionsInterface { return bytesPerSync(nativeHandle_); } + @Override + public void setAllowConcurrentMemtableWrite( + final boolean allowConcurrentMemtableWrite) { + setAllowConcurrentMemtableWrite(nativeHandle_, + allowConcurrentMemtableWrite); + } + + @Override + public boolean allowConcurrentMemtableWrite() { + return allowConcurrentMemtableWrite(nativeHandle_); + } + + @Override + public void setEnableWriteThreadAdaptiveYield( + final boolean enableWriteThreadAdaptiveYield) { + setEnableWriteThreadAdaptiveYield(nativeHandle_, + enableWriteThreadAdaptiveYield); + } + + @Override + public boolean enableWriteThreadAdaptiveYield() { + return enableWriteThreadAdaptiveYield(nativeHandle_); + } + + @Override + public void setWriteThreadMaxYieldUsec(final long writeThreadMaxYieldUsec) { + setWriteThreadMaxYieldUsec(nativeHandle_, writeThreadMaxYieldUsec); + } + + @Override + public long writeThreadMaxYieldUsec() { + return writeThreadMaxYieldUsec(nativeHandle_); + } + + @Override + public void setWriteThreadSlowYieldUsec(final long writeThreadSlowYieldUsec) { + setWriteThreadSlowYieldUsec(nativeHandle_, writeThreadSlowYieldUsec); + } + + @Override + public long writeThreadSlowYieldUsec() { + return writeThreadSlowYieldUsec(nativeHandle_); + } + static final int DEFAULT_NUM_SHARD_BITS = -1; /** @@ -668,6 +712,18 @@ public class DBOptions extends RocksObject implements DBOptionsInterface { private native void setBytesPerSync( long handle, long bytesPerSync); private native long bytesPerSync(long handle); + private native void setAllowConcurrentMemtableWrite(long handle, + boolean allowConcurrentMemtableWrite); + private native boolean allowConcurrentMemtableWrite(long handle); + private native void setEnableWriteThreadAdaptiveYield(long handle, + boolean enableWriteThreadAdaptiveYield); + private native boolean enableWriteThreadAdaptiveYield(long handle); + private native void setWriteThreadMaxYieldUsec(long handle, + long writeThreadMaxYieldUsec); + private native long writeThreadMaxYieldUsec(long handle); + private native void setWriteThreadSlowYieldUsec(long handle, + long writeThreadSlowYieldUsec); + private native long writeThreadSlowYieldUsec(long handle); int numShardBits_; RateLimiterConfig rateLimiterConfig_; diff --git a/java/src/main/java/org/rocksdb/DBOptionsInterface.java b/java/src/main/java/org/rocksdb/DBOptionsInterface.java index 3fa34dae5..734ccc155 100644 --- a/java/src/main/java/org/rocksdb/DBOptionsInterface.java +++ b/java/src/main/java/org/rocksdb/DBOptionsInterface.java @@ -802,4 +802,108 @@ public interface DBOptionsInterface { * @return size in bytes */ long bytesPerSync(); + + /** + * If true, allow multi-writers to update mem tables in parallel. + * Only some memtable factorys support concurrent writes; currently it + * is implemented only for SkipListFactory. Concurrent memtable writes + * are not compatible with inplace_update_support or filter_deletes. + * It is strongly recommended to set + * {@link #setEnableWriteThreadAdaptiveYield(boolean)} if you are going to use + * this feature. + * Default: false + * + * @param allowConcurrentMemtableWrite true to enable concurrent writes + * for the memtable + */ + void setAllowConcurrentMemtableWrite(boolean allowConcurrentMemtableWrite); + + /** + * If true, allow multi-writers to update mem tables in parallel. + * Only some memtable factorys support concurrent writes; currently it + * is implemented only for SkipListFactory. Concurrent memtable writes + * are not compatible with inplace_update_support or filter_deletes. + * It is strongly recommended to set + * {@link #setEnableWriteThreadAdaptiveYield(boolean)} if you are going to use + * this feature. + * Default: false + * + * @return true if concurrent writes are enabled for the memtable + */ + boolean allowConcurrentMemtableWrite(); + + /** + * If true, threads synchronizing with the write batch group leader will + * wait for up to {@link #writeThreadMaxYieldUsec()} before blocking on a + * mutex. This can substantially improve throughput for concurrent workloads, + * regardless of whether {@link #allowConcurrentMemtableWrite()} is enabled. + * Default: false + * + * @param enableWriteThreadAdaptiveYield true to enable adaptive yield for the + * write threads + */ + void setEnableWriteThreadAdaptiveYield( + boolean enableWriteThreadAdaptiveYield); + + /** + * If true, threads synchronizing with the write batch group leader will + * wait for up to {@link #writeThreadMaxYieldUsec()} before blocking on a + * mutex. This can substantially improve throughput for concurrent workloads, + * regardless of whether {@link #allowConcurrentMemtableWrite()} is enabled. + * Default: false + * + * @return true if adaptive yield is enabled + * for the writing threads + */ + boolean enableWriteThreadAdaptiveYield(); + + /** + * The maximum number of microseconds that a write operation will use + * a yielding spin loop to coordinate with other write threads before + * blocking on a mutex. (Assuming {@link #writeThreadSlowYieldUsec()} is + * set properly) increasing this value is likely to increase RocksDB + * throughput at the expense of increased CPU usage. + * Default: 100 + * + * @param writeThreadMaxYieldUsec maximum number of microseconds + */ + void setWriteThreadMaxYieldUsec(long writeThreadMaxYieldUsec); + + /** + * The maximum number of microseconds that a write operation will use + * a yielding spin loop to coordinate with other write threads before + * blocking on a mutex. (Assuming {@link #writeThreadSlowYieldUsec()} is + * set properly) increasing this value is likely to increase RocksDB + * throughput at the expense of increased CPU usage. + * Default: 100 + * + * @return the maximum number of microseconds + */ + long writeThreadMaxYieldUsec(); + + /** + * The latency in microseconds after which a std::this_thread::yield + * call (sched_yield on Linux) is considered to be a signal that + * other processes or threads would like to use the current core. + * Increasing this makes writer threads more likely to take CPU + * by spinning, which will show up as an increase in the number of + * involuntary context switches. + * Default: 3 + * + * @param writeThreadSlowYieldUsec the latency in microseconds + */ + void setWriteThreadSlowYieldUsec(long writeThreadSlowYieldUsec); + + /** + * The latency in microseconds after which a std::this_thread::yield + * call (sched_yield on Linux) is considered to be a signal that + * other processes or threads would like to use the current core. + * Increasing this makes writer threads more likely to take CPU + * by spinning, which will show up as an increase in the number of + * involuntary context switches. + * Default: 3 + * + * @return writeThreadSlowYieldUsec the latency in microseconds + */ + long writeThreadSlowYieldUsec(); } diff --git a/java/src/main/java/org/rocksdb/Options.java b/java/src/main/java/org/rocksdb/Options.java index 39c48df06..bf94fa6bd 100644 --- a/java/src/main/java/org/rocksdb/Options.java +++ b/java/src/main/java/org/rocksdb/Options.java @@ -631,6 +631,50 @@ public class Options extends RocksObject return this; } + @Override + public void setAllowConcurrentMemtableWrite( + final boolean allowConcurrentMemtableWrite) { + setAllowConcurrentMemtableWrite(nativeHandle_, + allowConcurrentMemtableWrite); + } + + @Override + public boolean allowConcurrentMemtableWrite() { + return allowConcurrentMemtableWrite(nativeHandle_); + } + + @Override + public void setEnableWriteThreadAdaptiveYield( + final boolean enableWriteThreadAdaptiveYield) { + setEnableWriteThreadAdaptiveYield(nativeHandle_, + enableWriteThreadAdaptiveYield); + } + + @Override + public boolean enableWriteThreadAdaptiveYield() { + return enableWriteThreadAdaptiveYield(nativeHandle_); + } + + @Override + public void setWriteThreadMaxYieldUsec(final long writeThreadMaxYieldUsec) { + setWriteThreadMaxYieldUsec(nativeHandle_, writeThreadMaxYieldUsec); + } + + @Override + public long writeThreadMaxYieldUsec() { + return writeThreadMaxYieldUsec(nativeHandle_); + } + + @Override + public void setWriteThreadSlowYieldUsec(final long writeThreadSlowYieldUsec) { + setWriteThreadSlowYieldUsec(nativeHandle_, writeThreadSlowYieldUsec); + } + + @Override + public long writeThreadSlowYieldUsec() { + return writeThreadSlowYieldUsec(nativeHandle_); + } + @Override public Options setMemTableConfig(final MemTableConfig config) { memTableConfig_ = config; @@ -1282,6 +1326,18 @@ public class Options extends RocksObject private native void setBytesPerSync( long handle, long bytesPerSync); private native long bytesPerSync(long handle); + private native void setAllowConcurrentMemtableWrite(long handle, + boolean allowConcurrentMemtableWrite); + private native boolean allowConcurrentMemtableWrite(long handle); + private native void setEnableWriteThreadAdaptiveYield(long handle, + boolean enableWriteThreadAdaptiveYield); + private native boolean enableWriteThreadAdaptiveYield(long handle); + private native void setWriteThreadMaxYieldUsec(long handle, + long writeThreadMaxYieldUsec); + private native long writeThreadMaxYieldUsec(long handle); + private native void setWriteThreadSlowYieldUsec(long handle, + long writeThreadSlowYieldUsec); + private native long writeThreadSlowYieldUsec(long handle); // CF native handles private native void optimizeForPointLookup(long handle, long blockCacheSizeMb); diff --git a/java/src/test/java/org/rocksdb/DBOptionsTest.java b/java/src/test/java/org/rocksdb/DBOptionsTest.java index 2527ce949..66372f6cb 100644 --- a/java/src/test/java/org/rocksdb/DBOptionsTest.java +++ b/java/src/test/java/org/rocksdb/DBOptionsTest.java @@ -352,6 +352,42 @@ public class DBOptionsTest { } } + @Test + public void allowConcurrentMemtableWrite() { + try (final DBOptions opt = new DBOptions()) { + final boolean boolValue = rand.nextBoolean(); + opt.setAllowConcurrentMemtableWrite(boolValue); + assertThat(opt.allowConcurrentMemtableWrite()).isEqualTo(boolValue); + } + } + + @Test + public void enableWriteThreadAdaptiveYield() { + try (final DBOptions opt = new DBOptions()) { + final boolean boolValue = rand.nextBoolean(); + opt.setEnableWriteThreadAdaptiveYield(boolValue); + assertThat(opt.enableWriteThreadAdaptiveYield()).isEqualTo(boolValue); + } + } + + @Test + public void writeThreadMaxYieldUsec() { + try (final DBOptions opt = new DBOptions()) { + final long longValue = rand.nextLong(); + opt.setWriteThreadMaxYieldUsec(longValue); + assertThat(opt.writeThreadMaxYieldUsec()).isEqualTo(longValue); + } + } + + @Test + public void writeThreadSlowYieldUsec() { + try (final DBOptions opt = new DBOptions()) { + final long longValue = rand.nextLong(); + opt.setWriteThreadSlowYieldUsec(longValue); + assertThat(opt.writeThreadSlowYieldUsec()).isEqualTo(longValue); + } + } + @Test public void rateLimiterConfig() { try(final DBOptions options = new DBOptions(); diff --git a/java/src/test/java/org/rocksdb/OptionsTest.java b/java/src/test/java/org/rocksdb/OptionsTest.java index 13d84bd58..644768524 100644 --- a/java/src/test/java/org/rocksdb/OptionsTest.java +++ b/java/src/test/java/org/rocksdb/OptionsTest.java @@ -661,6 +661,42 @@ public class OptionsTest { } } + @Test + public void allowConcurrentMemtableWrite() { + try (final Options opt = new Options()) { + final boolean boolValue = rand.nextBoolean(); + opt.setAllowConcurrentMemtableWrite(boolValue); + assertThat(opt.allowConcurrentMemtableWrite()).isEqualTo(boolValue); + } + } + + @Test + public void enableWriteThreadAdaptiveYield() { + try (final Options opt = new Options()) { + final boolean boolValue = rand.nextBoolean(); + opt.setEnableWriteThreadAdaptiveYield(boolValue); + assertThat(opt.enableWriteThreadAdaptiveYield()).isEqualTo(boolValue); + } + } + + @Test + public void writeThreadMaxYieldUsec() { + try (final Options opt = new Options()) { + final long longValue = rand.nextLong(); + opt.setWriteThreadMaxYieldUsec(longValue); + assertThat(opt.writeThreadMaxYieldUsec()).isEqualTo(longValue); + } + } + + @Test + public void writeThreadSlowYieldUsec() { + try (final Options opt = new Options()) { + final long longValue = rand.nextLong(); + opt.setWriteThreadSlowYieldUsec(longValue); + assertThat(opt.writeThreadSlowYieldUsec()).isEqualTo(longValue); + } + } + @Test public void env() { try (final Options options = new Options(); -- GitLab