提交 9879255b 编写于 作者: C cailiuyang 提交者: WeijieSun

add generate_checkpoint for pegasus_server_impl

Summary: Ref T9978

Test Plan: N/A

Reviewers: qinzuoyan, sunweijie, wutao1, heyuchen, laiyingchun

Reviewed By: qinzuoyan

Subscribers: #pegasus

Maniphest Tasks: T9978

Differential Revision: https://phabricator.d.xiaomi.net/D78821

Conflicts:
	rdsn
上级 c55f7179
Subproject commit 5adcf734c9b5a1e55ea684727ee6473a340f2176
Subproject commit 23ddba3f470ae546c98b5784da12c5e00677d7b6
......@@ -32,11 +32,11 @@ class Checkpoint {
// Quickly build an openable snapshot of RocksDB on the same disk, will not
// wait flush before generate checkpoint.
// Only generate checkpoint when the last flushed decree are greater
// than *decree, and the new values are returned by decree.
// The decree of the checkpoint generated will be returned through
// *checkpoint_decree, if checkpoint_decree not nullptr
// The directory should not already exist and will be created by this API.
virtual Status CreateCheckpointQuick(const std::string& checkpoint_dir,
/*inout*/ uint64_t* decree);
/*output*/ uint64_t* checkpoint_decree);
virtual ~Checkpoint() {}
};
......
......@@ -21,13 +21,13 @@
#include "db/filename.h"
#include "db/log_writer.h"
#include "db/wal_manager.h"
#include "port/port.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/transaction_log.h"
#include "util/coding.h"
#include "util/file_util.h"
#include "util/file_reader_writer.h"
#include "port/port.h"
#include "util/file_util.h"
namespace rocksdb {
......@@ -49,8 +49,9 @@ class CheckpointImpl : public Checkpoint {
// Quickly build an openable snapshot of RocksDB.
// Useful when there is only one column family in the RocksDB.
using Checkpoint::CreateCheckpointQuick;
virtual Status CreateCheckpointQuick(const std::string& checkpoint_dir,
uint64_t* decree) override;
virtual Status CreateCheckpointQuick(
const std::string& checkpoint_dir,
/*output*/ uint64_t* checkpoint_decree) override;
private:
DB* db_;
......@@ -66,7 +67,7 @@ Status Checkpoint::CreateCheckpoint(const std::string& checkpoint_dir) {
}
Status Checkpoint::CreateCheckpointQuick(const std::string& checkpoint_dir,
uint64_t* decree) {
uint64_t* checkpoint_decree) {
return Status::NotSupported("");
}
......@@ -236,8 +237,7 @@ struct LogReporter : public log::Reader::Reporter {
if (this->status->ok()) *this->status = s;
}
};
static Status ModifyMenifestFileLastSeq(Env* env,
const DBOptions& db_options,
static Status ModifyMenifestFileLastSeq(Env* env, const DBOptions& db_options,
const std::string& file_name,
SequenceNumber last_seq) {
Status s;
......@@ -266,14 +266,14 @@ static Status ModifyMenifestFileLastSeq(Env* env,
}
file->SetPreallocationBlockSize(db_options.manifest_preallocation_size);
unique_ptr<WritableFileWriter> writer(
new WritableFileWriter(std::move(file), opt_env_opts));
new WritableFileWriter(std::move(file), opt_env_opts));
file_writer.reset(new log::Writer(std::move(writer)));
}
{
LogReporter reporter;
reporter.status = &s;
log::Reader reader(std::move(file_reader), &reporter,
true /*checksum*/, 0 /*initial_offset*/);
log::Reader reader(std::move(file_reader), &reporter, true /*checksum*/,
0 /*initial_offset*/);
Slice record;
std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
......@@ -308,7 +308,7 @@ static Status ModifyMenifestFileLastSeq(Env* env,
}
Status CheckpointImpl::CreateCheckpointQuick(const std::string& checkpoint_dir,
uint64_t* decree) {
uint64_t* checkpoint_decree) {
Status s;
std::vector<std::string> live_files;
uint64_t manifest_file_size = 0;
......@@ -327,14 +327,8 @@ Status CheckpointImpl::CreateCheckpointQuick(const std::string& checkpoint_dir,
s = db_->DisableFileDeletions();
if (s.ok()) {
// this will return live_files prefixed with "/"
s = db_->GetLiveFilesQuick(live_files, &manifest_file_size,
&last_sequence, &last_decree);
}
if (s.ok()) {
// check if need to checkpoint
if (last_sequence == 0 || last_decree == 0 || last_decree <= *decree) {
s = Status::NoNeedOperate();
}
s = db_->GetLiveFilesQuick(live_files, &manifest_file_size, &last_sequence,
&last_decree);
}
if (!s.ok()) {
db_->EnableFileDeletions(false);
......@@ -434,7 +428,10 @@ Status CheckpointImpl::CreateCheckpointQuick(const std::string& checkpoint_dir,
"Snapshot DONE. All is good. seqno: %" PRIu64 ", decree: %" PRIu64 "",
last_sequence, last_decree);
*decree = last_decree;
if (checkpoint_decree != nullptr) {
*checkpoint_decree = last_decree;
}
return s;
}
} // namespace rocksdb
......
......@@ -1707,7 +1707,10 @@ private:
if (!token_helper.token_got())
return ::dsn::ERR_WRONG_TIMING;
if (last_durable_decree() == _db->GetLastFlushedDecree()) {
// last_durable_decree must less than or equal to _db->GetLastFlushedDecree()
// if last_durable_decree == _db->GetLastFlushedDecree(), we should flush it first
int64_t last_flushed_decree = static_cast<int64_t>(_db->GetLastFlushedDecree());
if (last_durable_decree() == last_flushed_decree) {
if (is_emergency) {
// trigger flushing memtable, but not wait
rocksdb::FlushOptions options;
......@@ -1731,14 +1734,10 @@ private:
}
}
rocksdb::Checkpoint *chkpt = nullptr;
auto status = rocksdb::Checkpoint::Create(_db, &chkpt);
if (!status.ok()) {
derror("%s: create Checkpoint object failed, error = %s",
_replica_name.c_str(),
status.ToString().c_str());
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
dassert(last_durable_decree() < last_flushed_decree,
"%" PRId64 " VS %" PRId64 "",
last_durable_decree(),
last_flushed_decree);
char buf[256];
sprintf(buf, "checkpoint.tmp.%" PRIu64 "", dsn_now_us());
......@@ -1754,24 +1753,17 @@ private:
return ::dsn::ERR_FILE_OPERATION_FAILED;
}
}
uint64_t ci = last_durable_decree();
status = chkpt->CreateCheckpointQuick(tmp_dir, &ci);
delete chkpt;
chkpt = nullptr;
if (!status.ok()) {
derror("%s: async create checkpoint failed, error = %s",
int64_t checkpoint_decree = 0;
::dsn::error_code err = copy_checkpoint_to_dir_unsafe(tmp_dir.c_str(), &checkpoint_decree);
if (err != ::dsn::ERR_OK) {
derror("%s: call copy_checkpoint_to_dir_unsafe failed with err = %s",
_replica_name.c_str(),
status.ToString().c_str());
if (!::dsn::utils::filesystem::remove_path(tmp_dir)) {
derror("%s: remove temporary checkpoint directory %s failed",
_replica_name.c_str(),
tmp_dir.c_str());
}
return status.IsNoNeedOperate() ? ::dsn::ERR_NO_NEED_OPERATE : ::dsn::ERR_LOCAL_APP_FAILURE;
err.to_string());
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
auto chkpt_dir = ::dsn::utils::filesystem::path_combine(_data_dir, chkpt_get_dir_name(ci));
auto chkpt_dir =
::dsn::utils::filesystem::path_combine(_data_dir, chkpt_get_dir_name(checkpoint_decree));
if (::dsn::utils::filesystem::directory_exists(chkpt_dir)) {
ddebug("%s: checkpoint directory %s already exist, remove it first",
_replica_name.c_str(),
......@@ -1804,13 +1796,17 @@ private:
{
::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_checkpoints_lock);
dassert(
ci > last_durable_decree(), "%" PRId64 " VS %" PRId64 "", ci, last_durable_decree());
dassert(checkpoint_decree > last_durable_decree(),
"%" PRId64 " VS %" PRId64 "",
checkpoint_decree,
last_durable_decree());
if (!_checkpoints.empty()) {
dassert(
ci > _checkpoints.back(), "%" PRId64 " VS %" PRId64 "", ci, _checkpoints.back());
dassert(checkpoint_decree > _checkpoints.back(),
"%" PRId64 " VS %" PRId64 "",
checkpoint_decree,
_checkpoints.back());
}
_checkpoints.push_back(ci);
_checkpoints.push_back(checkpoint_decree);
set_last_durable_decree(_checkpoints.back());
}
......@@ -1823,6 +1819,70 @@ private:
return ::dsn::ERR_OK;
}
// Must be thread safe.
::dsn::error_code pegasus_server_impl::copy_checkpoint_to_dir(const char *checkpoint_dir,
/*output*/ int64_t *last_decree)
{
CheckpointingTokenHelper token_helper(_is_checkpointing);
if (!token_helper.token_got()) {
return ::dsn::ERR_WRONG_TIMING;
}
return copy_checkpoint_to_dir_unsafe(checkpoint_dir, last_decree);
}
// not thread safe, should be protected by caller
::dsn::error_code pegasus_server_impl::copy_checkpoint_to_dir_unsafe(const char *checkpoint_dir,
int64_t *checkpoint_decree)
{
rocksdb::Checkpoint *chkpt = nullptr;
rocksdb::Status status = rocksdb::Checkpoint::Create(_db, &chkpt);
if (!status.ok()) {
if (chkpt != nullptr)
delete chkpt, chkpt = nullptr;
derror("%s: create Checkpoint object failed, error = %s",
_replica_name.c_str(),
status.ToString().c_str());
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
if (::dsn::utils::filesystem::directory_exists(checkpoint_dir)) {
ddebug("%s: checkpoint directory %s is already exist, remove it first",
_replica_name.c_str(),
checkpoint_dir);
if (!::dsn::utils::filesystem::remove_path(checkpoint_dir)) {
derror(
"%s: remove checkpoint directory %s failed", _replica_name.c_str(), checkpoint_dir);
return ::dsn::ERR_FILE_OPERATION_FAILED;
}
}
uint64_t ci = 0;
status = chkpt->CreateCheckpointQuick(checkpoint_dir, &ci);
delete chkpt, chkpt = nullptr;
if (!status.ok()) {
derror("%s: async create checkpoint failed, error = %s",
_replica_name.c_str(),
status.ToString().c_str());
if (!::dsn::utils::filesystem::remove_path(checkpoint_dir)) {
derror(
"%s: remove checkpoint directory %s failed", _replica_name.c_str(), checkpoint_dir);
}
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
ddebug("%s: copy checkpoint to dir(%s) succeed, last_decree = %" PRId64 "",
_replica_name.c_str(),
checkpoint_dir,
ci);
if (checkpoint_decree != nullptr) {
*checkpoint_decree = static_cast<int64_t>(ci);
}
return ::dsn::ERR_OK;
}
::dsn::error_code pegasus_server_impl::get_checkpoint(int64_t learn_start,
int64_t local_commit,
void *learn_request,
......
......@@ -87,6 +87,21 @@ public:
// - ERR_TRY_AGAIN: need try again later
virtual ::dsn::error_code async_checkpoint(int64_t last_commit, bool is_emergency) override;
//
// copy the latest checkpoint to checkpoint_dir, and the decree of the checkpoint
// copied will be assigned to checkpoint_decree if checkpoint_decree not null
//
// must be thread safe
// don't need call flush(), just copy even if the app is empty
virtual ::dsn::error_code copy_checkpoint_to_dir(const char *checkpoint_dir,
/*output*/ int64_t *last_decree) override;
//
// help function, just copy checkpoint to specified dir and ignore _is_checkpointing
// if checkpoint_dir already exist, this function will delete it first
::dsn::error_code copy_checkpoint_to_dir_unsafe(const char *checkpoint_dir,
/**output*/ int64_t *checkpoint_decree);
virtual int64_t get_last_checkpoint_decree() override { return last_durable_decree(); }
// get the last checkpoint
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册