From bb38cd03a97718f053bcfd9dbdadd3c419e977e6 Mon Sep 17 00:00:00 2001 From: Sagar Vemuri Date: Mon, 2 Oct 2017 16:08:53 -0700 Subject: [PATCH] Limit number of merge operands in Cassandra merge operator Summary: Now that RocksDB supports conditional merging during point lookups (introduced in #2923), Cassandra value merge operator can be updated to pass in a limit. The limit needs to be passed in from the Cassandra code. Closes https://github.com/facebook/rocksdb/pull/2947 Differential Revision: D5938454 Pulled By: sagar0 fbshipit-source-id: d64a72d53170d8cf202b53bd648475c3952f7d7f --- java/rocksjni/cassandra_value_operator.cc | 7 ++++--- .../java/org/rocksdb/CassandraValueMergeOperator.java | 9 +++++++-- utilities/cassandra/merge_operator.h | 11 +++++++++-- 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/java/rocksjni/cassandra_value_operator.cc b/java/rocksjni/cassandra_value_operator.cc index 5ba13cfe5..034f33cc3 100644 --- a/java/rocksjni/cassandra_value_operator.cc +++ b/java/rocksjni/cassandra_value_operator.cc @@ -23,13 +23,14 @@ /* * Class: org_rocksdb_CassandraValueMergeOperator * Method: newSharedCassandraValueMergeOperator - * Signature: (I)J + * Signature: (II)J */ jlong Java_org_rocksdb_CassandraValueMergeOperator_newSharedCassandraValueMergeOperator( - JNIEnv* env, jclass jclazz, jint gcGracePeriodInSeconds) { + JNIEnv* env, jclass jclazz, jint gcGracePeriodInSeconds, + jint operands_limit) { auto* op = new std::shared_ptr( new rocksdb::cassandra::CassandraValueMergeOperator( - gcGracePeriodInSeconds)); + gcGracePeriodInSeconds, operands_limit)); return reinterpret_cast(op); } diff --git a/java/src/main/java/org/rocksdb/CassandraValueMergeOperator.java b/java/src/main/java/org/rocksdb/CassandraValueMergeOperator.java index 310a6e8ff..4b0c71ba5 100644 --- a/java/src/main/java/org/rocksdb/CassandraValueMergeOperator.java +++ b/java/src/main/java/org/rocksdb/CassandraValueMergeOperator.java @@ -11,10 +11,15 @@ package org.rocksdb; */ public class CassandraValueMergeOperator extends MergeOperator { public CassandraValueMergeOperator(int gcGracePeriodInSeconds) { - super(newSharedCassandraValueMergeOperator(gcGracePeriodInSeconds)); + super(newSharedCassandraValueMergeOperator(gcGracePeriodInSeconds, 0)); } - private native static long newSharedCassandraValueMergeOperator(int gcGracePeriodInSeconds); + public CassandraValueMergeOperator(int gcGracePeriodInSeconds, int operandsLimit) { + super(newSharedCassandraValueMergeOperator(gcGracePeriodInSeconds, operandsLimit)); + } + + private native static long newSharedCassandraValueMergeOperator( + int gcGracePeriodInSeconds, int limit); @Override protected final native void disposeInternal(final long handle); } diff --git a/utilities/cassandra/merge_operator.h b/utilities/cassandra/merge_operator.h index 272bfc21e..4d02c26de 100644 --- a/utilities/cassandra/merge_operator.h +++ b/utilities/cassandra/merge_operator.h @@ -15,8 +15,10 @@ namespace cassandra { */ class CassandraValueMergeOperator : public MergeOperator { public: - explicit CassandraValueMergeOperator(int32_t gc_grace_period_in_seconds) - : gc_grace_period_in_seconds_(gc_grace_period_in_seconds) {} + explicit CassandraValueMergeOperator(int32_t gc_grace_period_in_seconds, + size_t operands_limit = 0) + : gc_grace_period_in_seconds_(gc_grace_period_in_seconds), + operands_limit_(operands_limit) {} virtual bool FullMergeV2(const MergeOperationInput& merge_in, MergeOperationOutput* merge_out) const override; @@ -30,8 +32,13 @@ public: virtual bool AllowSingleOperand() const override { return true; } + virtual bool ShouldMerge(const std::vector& operands) const override { + return operands_limit_ > 0 && operands.size() >= operands_limit_; + } + private: int32_t gc_grace_period_in_seconds_; + size_t operands_limit_; }; } // namespace cassandra } // namespace rocksdb -- GitLab