diff --git a/rdsn b/rdsn index 5adcf734c9b5a1e55ea684727ee6473a340f2176..23ddba3f470ae546c98b5784da12c5e00677d7b6 160000 --- a/rdsn +++ b/rdsn @@ -1 +1 @@ -Subproject commit 5adcf734c9b5a1e55ea684727ee6473a340f2176 +Subproject commit 23ddba3f470ae546c98b5784da12c5e00677d7b6 diff --git a/rocksdb/include/rocksdb/utilities/checkpoint.h b/rocksdb/include/rocksdb/utilities/checkpoint.h index 9da254c4fbb266bfb317e50ebda53a9d3d4f4afb..9c40681a699cbecf69f12432118f529c66bc038a 100644 --- a/rocksdb/include/rocksdb/utilities/checkpoint.h +++ b/rocksdb/include/rocksdb/utilities/checkpoint.h @@ -32,11 +32,11 @@ class Checkpoint { // Quickly build an openable snapshot of RocksDB on the same disk, will not // wait flush before generate checkpoint. - // Only generate checkpoint when the last flushed decree are greater - // than *decree, and the new values are returned by decree. + // The decree of the checkpoint generated will be returned through + // *checkpoint_decree, if checkpoint_decree not nullptr // The directory should not already exist and will be created by this API. virtual Status CreateCheckpointQuick(const std::string& checkpoint_dir, - /*inout*/ uint64_t* decree); + /*output*/ uint64_t* checkpoint_decree); virtual ~Checkpoint() {} }; diff --git a/rocksdb/utilities/checkpoint/checkpoint.cc b/rocksdb/utilities/checkpoint/checkpoint.cc index fc1594b2b284838fb9db5400a93d70554009249a..d15c6a95477ceefd13ddd91f416f3db9f5d73ed1 100644 --- a/rocksdb/utilities/checkpoint/checkpoint.cc +++ b/rocksdb/utilities/checkpoint/checkpoint.cc @@ -21,13 +21,13 @@ #include "db/filename.h" #include "db/log_writer.h" #include "db/wal_manager.h" +#include "port/port.h" #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/transaction_log.h" #include "util/coding.h" -#include "util/file_util.h" #include "util/file_reader_writer.h" -#include "port/port.h" +#include "util/file_util.h" namespace rocksdb { @@ -49,8 +49,9 @@ class CheckpointImpl : public Checkpoint { // Quickly build an openable snapshot of RocksDB. // Useful when there is only one column family in the RocksDB. using Checkpoint::CreateCheckpointQuick; - virtual Status CreateCheckpointQuick(const std::string& checkpoint_dir, - uint64_t* decree) override; + virtual Status CreateCheckpointQuick( + const std::string& checkpoint_dir, + /*output*/ uint64_t* checkpoint_decree) override; private: DB* db_; @@ -66,7 +67,7 @@ Status Checkpoint::CreateCheckpoint(const std::string& checkpoint_dir) { } Status Checkpoint::CreateCheckpointQuick(const std::string& checkpoint_dir, - uint64_t* decree) { + uint64_t* checkpoint_decree) { return Status::NotSupported(""); } @@ -236,8 +237,7 @@ struct LogReporter : public log::Reader::Reporter { if (this->status->ok()) *this->status = s; } }; -static Status ModifyMenifestFileLastSeq(Env* env, - const DBOptions& db_options, +static Status ModifyMenifestFileLastSeq(Env* env, const DBOptions& db_options, const std::string& file_name, SequenceNumber last_seq) { Status s; @@ -266,14 +266,14 @@ static Status ModifyMenifestFileLastSeq(Env* env, } file->SetPreallocationBlockSize(db_options.manifest_preallocation_size); unique_ptr writer( - new WritableFileWriter(std::move(file), opt_env_opts)); + new WritableFileWriter(std::move(file), opt_env_opts)); file_writer.reset(new log::Writer(std::move(writer))); } { LogReporter reporter; reporter.status = &s; - log::Reader reader(std::move(file_reader), &reporter, - true /*checksum*/, 0 /*initial_offset*/); + log::Reader reader(std::move(file_reader), &reporter, true /*checksum*/, + 0 /*initial_offset*/); Slice record; std::string scratch; while (reader.ReadRecord(&record, &scratch) && s.ok()) { @@ -308,7 +308,7 @@ static Status ModifyMenifestFileLastSeq(Env* env, } Status CheckpointImpl::CreateCheckpointQuick(const std::string& checkpoint_dir, - uint64_t* decree) { + uint64_t* checkpoint_decree) { Status s; std::vector live_files; uint64_t manifest_file_size = 0; @@ -327,14 +327,8 @@ Status CheckpointImpl::CreateCheckpointQuick(const std::string& checkpoint_dir, s = db_->DisableFileDeletions(); if (s.ok()) { // this will return live_files prefixed with "/" - s = db_->GetLiveFilesQuick(live_files, &manifest_file_size, - &last_sequence, &last_decree); - } - if (s.ok()) { - // check if need to checkpoint - if (last_sequence == 0 || last_decree == 0 || last_decree <= *decree) { - s = Status::NoNeedOperate(); - } + s = db_->GetLiveFilesQuick(live_files, &manifest_file_size, &last_sequence, + &last_decree); } if (!s.ok()) { db_->EnableFileDeletions(false); @@ -434,7 +428,10 @@ Status CheckpointImpl::CreateCheckpointQuick(const std::string& checkpoint_dir, "Snapshot DONE. All is good. seqno: %" PRIu64 ", decree: %" PRIu64 "", last_sequence, last_decree); - *decree = last_decree; + if (checkpoint_decree != nullptr) { + *checkpoint_decree = last_decree; + } + return s; } } // namespace rocksdb diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index 414677e711d7ad7c51a6daeac0e7997ca898ad7a..30a0f64e265ab97825265551db8ab0d73643d359 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -1707,7 +1707,10 @@ private: if (!token_helper.token_got()) return ::dsn::ERR_WRONG_TIMING; - if (last_durable_decree() == _db->GetLastFlushedDecree()) { + // last_durable_decree must less than or equal to _db->GetLastFlushedDecree() + // if last_durable_decree == _db->GetLastFlushedDecree(), we should flush it first + int64_t last_flushed_decree = static_cast(_db->GetLastFlushedDecree()); + if (last_durable_decree() == last_flushed_decree) { if (is_emergency) { // trigger flushing memtable, but not wait rocksdb::FlushOptions options; @@ -1731,14 +1734,10 @@ private: } } - rocksdb::Checkpoint *chkpt = nullptr; - auto status = rocksdb::Checkpoint::Create(_db, &chkpt); - if (!status.ok()) { - derror("%s: create Checkpoint object failed, error = %s", - _replica_name.c_str(), - status.ToString().c_str()); - return ::dsn::ERR_LOCAL_APP_FAILURE; - } + dassert(last_durable_decree() < last_flushed_decree, + "%" PRId64 " VS %" PRId64 "", + last_durable_decree(), + last_flushed_decree); char buf[256]; sprintf(buf, "checkpoint.tmp.%" PRIu64 "", dsn_now_us()); @@ -1754,24 +1753,17 @@ private: return ::dsn::ERR_FILE_OPERATION_FAILED; } } - - uint64_t ci = last_durable_decree(); - status = chkpt->CreateCheckpointQuick(tmp_dir, &ci); - delete chkpt; - chkpt = nullptr; - if (!status.ok()) { - derror("%s: async create checkpoint failed, error = %s", + int64_t checkpoint_decree = 0; + ::dsn::error_code err = copy_checkpoint_to_dir_unsafe(tmp_dir.c_str(), &checkpoint_decree); + if (err != ::dsn::ERR_OK) { + derror("%s: call copy_checkpoint_to_dir_unsafe failed with err = %s", _replica_name.c_str(), - status.ToString().c_str()); - if (!::dsn::utils::filesystem::remove_path(tmp_dir)) { - derror("%s: remove temporary checkpoint directory %s failed", - _replica_name.c_str(), - tmp_dir.c_str()); - } - return status.IsNoNeedOperate() ? ::dsn::ERR_NO_NEED_OPERATE : ::dsn::ERR_LOCAL_APP_FAILURE; + err.to_string()); + return ::dsn::ERR_LOCAL_APP_FAILURE; } - auto chkpt_dir = ::dsn::utils::filesystem::path_combine(_data_dir, chkpt_get_dir_name(ci)); + auto chkpt_dir = + ::dsn::utils::filesystem::path_combine(_data_dir, chkpt_get_dir_name(checkpoint_decree)); if (::dsn::utils::filesystem::directory_exists(chkpt_dir)) { ddebug("%s: checkpoint directory %s already exist, remove it first", _replica_name.c_str(), @@ -1804,13 +1796,17 @@ private: { ::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_checkpoints_lock); - dassert( - ci > last_durable_decree(), "%" PRId64 " VS %" PRId64 "", ci, last_durable_decree()); + dassert(checkpoint_decree > last_durable_decree(), + "%" PRId64 " VS %" PRId64 "", + checkpoint_decree, + last_durable_decree()); if (!_checkpoints.empty()) { - dassert( - ci > _checkpoints.back(), "%" PRId64 " VS %" PRId64 "", ci, _checkpoints.back()); + dassert(checkpoint_decree > _checkpoints.back(), + "%" PRId64 " VS %" PRId64 "", + checkpoint_decree, + _checkpoints.back()); } - _checkpoints.push_back(ci); + _checkpoints.push_back(checkpoint_decree); set_last_durable_decree(_checkpoints.back()); } @@ -1823,6 +1819,70 @@ private: return ::dsn::ERR_OK; } +// Must be thread safe. +::dsn::error_code pegasus_server_impl::copy_checkpoint_to_dir(const char *checkpoint_dir, + /*output*/ int64_t *last_decree) +{ + CheckpointingTokenHelper token_helper(_is_checkpointing); + if (!token_helper.token_got()) { + return ::dsn::ERR_WRONG_TIMING; + } + + return copy_checkpoint_to_dir_unsafe(checkpoint_dir, last_decree); +} + +// not thread safe, should be protected by caller +::dsn::error_code pegasus_server_impl::copy_checkpoint_to_dir_unsafe(const char *checkpoint_dir, + int64_t *checkpoint_decree) +{ + rocksdb::Checkpoint *chkpt = nullptr; + rocksdb::Status status = rocksdb::Checkpoint::Create(_db, &chkpt); + if (!status.ok()) { + if (chkpt != nullptr) + delete chkpt, chkpt = nullptr; + derror("%s: create Checkpoint object failed, error = %s", + _replica_name.c_str(), + status.ToString().c_str()); + return ::dsn::ERR_LOCAL_APP_FAILURE; + } + + if (::dsn::utils::filesystem::directory_exists(checkpoint_dir)) { + ddebug("%s: checkpoint directory %s is already exist, remove it first", + _replica_name.c_str(), + checkpoint_dir); + if (!::dsn::utils::filesystem::remove_path(checkpoint_dir)) { + derror( + "%s: remove checkpoint directory %s failed", _replica_name.c_str(), checkpoint_dir); + return ::dsn::ERR_FILE_OPERATION_FAILED; + } + } + + uint64_t ci = 0; + status = chkpt->CreateCheckpointQuick(checkpoint_dir, &ci); + delete chkpt, chkpt = nullptr; + + if (!status.ok()) { + derror("%s: async create checkpoint failed, error = %s", + _replica_name.c_str(), + status.ToString().c_str()); + if (!::dsn::utils::filesystem::remove_path(checkpoint_dir)) { + derror( + "%s: remove checkpoint directory %s failed", _replica_name.c_str(), checkpoint_dir); + } + return ::dsn::ERR_LOCAL_APP_FAILURE; + } + + ddebug("%s: copy checkpoint to dir(%s) succeed, last_decree = %" PRId64 "", + _replica_name.c_str(), + checkpoint_dir, + ci); + if (checkpoint_decree != nullptr) { + *checkpoint_decree = static_cast(ci); + } + + return ::dsn::ERR_OK; +} + ::dsn::error_code pegasus_server_impl::get_checkpoint(int64_t learn_start, int64_t local_commit, void *learn_request, diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h index f1ea26a7160135bbd8d3adaee868f3cd40b8147c..ea04a95cdc03f768245e1213ab78f5653864e003 100644 --- a/src/server/pegasus_server_impl.h +++ b/src/server/pegasus_server_impl.h @@ -87,6 +87,21 @@ public: // - ERR_TRY_AGAIN: need try again later virtual ::dsn::error_code async_checkpoint(int64_t last_commit, bool is_emergency) override; + // + // copy the latest checkpoint to checkpoint_dir, and the decree of the checkpoint + // copied will be assigned to checkpoint_decree if checkpoint_decree not null + // + // must be thread safe + // don't need call flush(), just copy even if the app is empty + virtual ::dsn::error_code copy_checkpoint_to_dir(const char *checkpoint_dir, + /*output*/ int64_t *last_decree) override; + + // + // help function, just copy checkpoint to specified dir and ignore _is_checkpointing + // if checkpoint_dir already exist, this function will delete it first + ::dsn::error_code copy_checkpoint_to_dir_unsafe(const char *checkpoint_dir, + /**output*/ int64_t *checkpoint_decree); + virtual int64_t get_last_checkpoint_decree() override { return last_durable_decree(); } // get the last checkpoint