diff --git a/src/server/pegasus_write_service_impl.h b/src/server/pegasus_write_service_impl.h index e202ff11cc3998b6e55299e8fef0dc0473301a57..ce01937e468b5244eeddba31cf7e51e86e4db5a3 100644 --- a/src/server/pegasus_write_service_impl.h +++ b/src/server/pegasus_write_service_impl.h @@ -182,59 +182,61 @@ public: uint32_t new_expire_ts = 0; db_get_context get_ctx; int err = db_get(raw_key, &get_ctx); - if (err == 0) { - if (!get_ctx.found) { - // old value is not found, set to 0 before increment - new_value = update.increment; - new_expire_ts = update.expire_ts_seconds > 0 ? update.expire_ts_seconds : 0; - } else if (get_ctx.expired) { - // ttl timeout, set to 0 before increment - _pfc_recent_expire_count->increment(); + if (err != 0) { + resp.error = err; + return err; + } + if (!get_ctx.found) { + // old value is not found, set to 0 before increment + new_value = update.increment; + new_expire_ts = update.expire_ts_seconds > 0 ? update.expire_ts_seconds : 0; + } else if (get_ctx.expired) { + // ttl timeout, set to 0 before increment + _pfc_recent_expire_count->increment(); + new_value = update.increment; + new_expire_ts = update.expire_ts_seconds > 0 ? update.expire_ts_seconds : 0; + } else { + ::dsn::blob old_value; + pegasus_extract_user_data( + _pegasus_data_version, std::move(get_ctx.raw_value), old_value); + if (old_value.length() == 0) { + // empty old value, set to 0 before increment new_value = update.increment; - new_expire_ts = update.expire_ts_seconds > 0 ? update.expire_ts_seconds : 0; } else { - ::dsn::blob old_value; - pegasus_extract_user_data( - _pegasus_data_version, std::move(get_ctx.raw_value), old_value); - if (old_value.length() == 0) { - // empty old value, set to 0 before increment - new_value = update.increment; - } else { - int64_t old_value_int; - if (!dsn::buf2int64(old_value, old_value_int)) { - // invalid old value - derror_replica("incr failed: decree = {}, error = " - "old value \"{}\" is not an integer or out of range", - decree, - utils::c_escape_string(old_value)); - resp.error = rocksdb::Status::kInvalidArgument; - // we should write empty record to update rocksdb's last flushed decree - return empty_put(decree); - } - new_value = old_value_int + update.increment; - if ((update.increment > 0 && new_value < old_value_int) || - (update.increment < 0 && new_value > old_value_int)) { - // new value is out of range, return old value by 'new_value' - derror_replica("incr failed: decree = {}, error = " - "new value is out of range, old_value = {}, increment = {}", - decree, - old_value_int, - update.increment); - resp.error = rocksdb::Status::kInvalidArgument; - resp.new_value = old_value_int; - // we should write empty record to update rocksdb's last flushed decree - return empty_put(decree); - } + int64_t old_value_int; + if (!dsn::buf2int64(old_value, old_value_int)) { + // invalid old value + derror_replica("incr failed: decree = {}, error = " + "old value \"{}\" is not an integer or out of range", + decree, + utils::c_escape_string(old_value)); + resp.error = rocksdb::Status::kInvalidArgument; + // we should write empty record to update rocksdb's last flushed decree + return empty_put(decree); } - // set new ttl - if (update.expire_ts_seconds == 0) { - new_expire_ts = get_ctx.expire_ts; - } else if (update.expire_ts_seconds < 0) { - new_expire_ts = 0; - } else { // update.expire_ts_seconds > 0 - new_expire_ts = update.expire_ts_seconds; + new_value = old_value_int + update.increment; + if ((update.increment > 0 && new_value < old_value_int) || + (update.increment < 0 && new_value > old_value_int)) { + // new value is out of range, return old value by 'new_value' + derror_replica("incr failed: decree = {}, error = " + "new value is out of range, old_value = {}, increment = {}", + decree, + old_value_int, + update.increment); + resp.error = rocksdb::Status::kInvalidArgument; + resp.new_value = old_value_int; + // we should write empty record to update rocksdb's last flushed decree + return empty_put(decree); } } + // set new ttl + if (update.expire_ts_seconds == 0) { + new_expire_ts = get_ctx.expire_ts; + } else if (update.expire_ts_seconds < 0) { + new_expire_ts = 0; + } else { // update.expire_ts_seconds > 0 + new_expire_ts = update.expire_ts_seconds; + } } resp.error = diff --git a/src/server/test/pegasus_write_service_impl_test.cpp b/src/server/test/pegasus_write_service_impl_test.cpp index 7f1aec09fa1e8506a3c6ab516ffb1bb87e69402c..f3ee0a89b6f99cdf004fb08a169bca7d8300c1f6 100644 --- a/src/server/test/pegasus_write_service_impl_test.cpp +++ b/src/server/test/pegasus_write_service_impl_test.cpp @@ -217,5 +217,31 @@ TEST_F(incr_test, invalid_incr) ASSERT_EQ(resp.new_value, 100); } +TEST_F(incr_test, fail_on_get) +{ + dsn::fail::setup(); + dsn::fail::cfg("db_get", "100%1*return()"); + // when db_get failed, incr should return an error. + + req.increment = 10; + _write_impl->incr(1, req, resp); + ASSERT_EQ(resp.error, FAIL_DB_GET); + + dsn::fail::teardown(); +} + +TEST_F(incr_test, fail_on_put) +{ + dsn::fail::setup(); + dsn::fail::cfg("db_write_batch_put", "100%1*return()"); + // when rocksdb put failed, incr should return an error. + + req.increment = 10; + _write_impl->incr(1, req, resp); + ASSERT_EQ(resp.error, FAIL_DB_WRITE_BATCH_PUT); + + dsn::fail::teardown(); +} + } // namespace server } // namespace pegasus