未验证 提交 8ecb8ce7 编写于 作者: Q QinZuoyan 提交者: GitHub

sync_checkpoint/async_checkpoint do not return ERR_NO_NEED_OPERATE (#30)

上级 6ff8f5ec
Subproject commit 5a4c4b0f81e5b81212583ae5aab49742374d9e3b
Subproject commit 3849eeabc828ac3b6133e5032b1c57b6256626b3
Subproject commit 85770cb6caf5c4fd14bed25aea5f4fe5574774b1
Subproject commit 5372db0bb719fa00dbe4a4941e8cf52fe8d29eb9
......@@ -1744,6 +1744,7 @@ DEFINE_TASK_CODE(UPDATING_ROCKSDB_SSTSIZE, TASK_PRIORITY_COMMON, THREAD_POOL_REP
auto status = rocksdb::DB::Open(opts, path, &_db);
if (status.ok()) {
_last_committed_decree = _db->GetLastFlushedDecree();
_value_schema_version = _db->GetValueSchemaVersion();
if (_value_schema_version > PEGASUS_VALUE_SCHEMA_MAX_VERSION) {
derror("%s: open app failed, unsupported value schema version %" PRIu32,
......@@ -1762,31 +1763,25 @@ DEFINE_TASK_CODE(UPDATING_ROCKSDB_SSTSIZE, TASK_PRIORITY_COMMON, THREAD_POOL_REP
parse_checkpoints();
int64_t ci = _db->GetLastFlushedDecree();
if (ci != last_durable_decree()) {
// checkpoint if necessary to make last_durable_decree() fresh.
// only need async checkpoint because we sure that memtable is empty now.
int64_t last_flushed = _db->GetLastFlushedDecree();
if (last_flushed != last_durable_decree()) {
ddebug("%s: start to do async checkpoint, last_durable_decree = %" PRId64
", last_flushed_decree = %" PRId64,
replica_name(),
last_durable_decree(),
ci);
last_flushed);
auto err = async_checkpoint(false);
if (err != ::dsn::ERR_OK) {
dwarn("%s: create checkpoint failed, error = %s, retry again",
replica_name(),
err.to_string());
err = async_checkpoint(false);
if (err != ::dsn::ERR_OK) {
derror("%s: create checkpoint failed, error = %s",
replica_name(),
err.to_string());
delete _db;
_db = nullptr;
return err;
}
derror("%s: create checkpoint failed, error = %s", replica_name(), err.to_string());
delete _db;
_db = nullptr;
return err;
}
dassert(ci == last_durable_decree(),
dassert(last_flushed == last_durable_decree(),
"last durable decree mismatch after checkpoint: %" PRId64 " vs %" PRId64,
ci,
last_flushed,
last_durable_decree());
}
......@@ -1824,10 +1819,8 @@ DEFINE_TASK_CODE(UPDATING_ROCKSDB_SSTSIZE, TASK_PRIORITY_COMMON, THREAD_POOL_REP
}
if (!clear_state) {
rocksdb::FlushOptions options;
options.wait = true;
auto status = _db->Flush(options);
if (!status.ok() && !status.IsNoNeedOperate()) {
auto status = _db->Flush(rocksdb::FlushOptions());
if (!status.ok()) {
derror("%s: flush memtable on close failed: %s",
replica_name(),
status.ToString().c_str());
......@@ -1890,9 +1883,17 @@ private:
if (!token_helper.token_got())
return ::dsn::ERR_WRONG_TIMING;
int64_t last_durable = last_durable_decree();
int64_t last_commit = last_committed_decree();
if (last_durable_decree() == last_commit)
return ::dsn::ERR_NO_NEED_OPERATE;
dassert(last_durable <= last_commit, "%" PRId64 " VS %" PRId64, last_durable, last_commit);
if (last_durable == last_commit) {
ddebug("%s: no need to checkpoint because "
"last_durable_decree = last_committed_decree = %" PRId64,
replica_name(),
last_durable);
return ::dsn::ERR_OK;
}
rocksdb::Checkpoint *chkpt = nullptr;
auto status = rocksdb::Checkpoint::Create(_db, &chkpt);
......@@ -1918,13 +1919,14 @@ private:
}
}
status = chkpt->CreateCheckpoint(chkpt_dir);
// CreateCheckpoint() will always flush memtable firstly.
status = chkpt->CreateCheckpoint(chkpt_dir, 0);
if (!status.ok()) {
// sometimes checkpoint may fail, and try again will succeed
derror("%s: create checkpoint failed, error = %s, try again",
replica_name(),
status.ToString().c_str());
status = chkpt->CreateCheckpoint(chkpt_dir);
status = chkpt->CreateCheckpoint(chkpt_dir, 0);
}
// destroy Checkpoint object
......@@ -1949,6 +1951,10 @@ private:
"%" PRId64 " VS %" PRId64 "",
last_commit,
last_durable_decree());
dassert(last_commit == _db->GetLastFlushedDecree(),
"%" PRId64 " VS %" PRId64 "",
last_commit,
_db->GetLastFlushedDecree());
if (!_checkpoints.empty()) {
dassert(last_commit > _checkpoints.back(),
"%" PRId64 " VS %" PRId64 "",
......@@ -1969,17 +1975,29 @@ private:
}
// Must be thread safe.
::dsn::error_code pegasus_server_impl::async_checkpoint(bool is_emergency)
::dsn::error_code pegasus_server_impl::async_checkpoint(bool flush_memtable)
{
CheckpointingTokenHelper token_helper(_is_checkpointing);
if (!token_helper.token_got())
return ::dsn::ERR_WRONG_TIMING;
// last_durable_decree must less than or equal to _db->GetLastFlushedDecree()
// if last_durable_decree == _db->GetLastFlushedDecree(), we should flush it first
int64_t last_flushed_decree = static_cast<int64_t>(_db->GetLastFlushedDecree());
if (last_durable_decree() == last_flushed_decree) {
if (is_emergency) {
int64_t last_durable = last_durable_decree();
int64_t last_flushed = static_cast<int64_t>(_db->GetLastFlushedDecree());
int64_t last_commit = last_committed_decree();
dassert(last_durable <= last_flushed, "%" PRId64 " VS %" PRId64, last_durable, last_flushed);
dassert(last_flushed <= last_commit, "%" PRId64 " VS %" PRId64, last_flushed, last_commit);
if (last_durable == last_commit) {
ddebug("%s: no need to checkpoint because "
"last_durable_decree = last_committed_decree = %" PRId64,
replica_name(),
last_durable);
return ::dsn::ERR_OK;
}
if (last_durable == last_flushed) {
if (flush_memtable) {
// trigger flushing memtable, but not wait
rocksdb::FlushOptions options;
options.wait = false;
......@@ -1987,9 +2005,6 @@ private:
if (status.ok()) {
ddebug("%s: trigger flushing memtable succeed", replica_name());
return ::dsn::ERR_TRY_AGAIN;
} else if (status.IsNoNeedOperate()) {
dwarn("%s: trigger flushing memtable failed, no memtable to flush", replica_name());
return ::dsn::ERR_NO_NEED_OPERATE;
} else {
derror("%s: trigger flushing memtable failed, error = %s",
replica_name(),
......@@ -1997,14 +2012,11 @@ private:
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
} else {
return ::dsn::ERR_NO_NEED_OPERATE;
return ::dsn::ERR_OK;
}
}
dassert(last_durable_decree() < last_flushed_decree,
"%" PRId64 " VS %" PRId64 "",
last_durable_decree(),
last_flushed_decree);
dassert(last_durable < last_flushed, "%" PRId64 " VS %" PRId64, last_durable, last_flushed);
char buf[256];
sprintf(buf, "checkpoint.tmp.%" PRIu64 "", dsn_now_us());
......
......@@ -75,21 +75,19 @@ public:
return ::dsn::ERR_OK;
}
// returns:
// - ERR_OK
// - ERR_WRONG_TIMING
// - ERR_NO_NEED_OPERATE
// - ERR_LOCAL_APP_FAILURE
// - ERR_FILE_OPERATION_FAILED
// - ERR_OK: checkpoint succeed
// - ERR_WRONG_TIMING: another checkpoint is running now
// - ERR_LOCAL_APP_FAILURE: some internal failure
// - ERR_FILE_OPERATION_FAILED: some file failure
virtual ::dsn::error_code sync_checkpoint() override;
// returns:
// - ERR_OK
// - ERR_WRONG_TIMING: is checkpointing now
// - ERR_NO_NEED_OPERATE: the checkpoint is fresh enough, no need to checkpoint
// - ERR_OK: checkpoint succeed
// - ERR_WRONG_TIMING: another checkpoint is running now
// - ERR_LOCAL_APP_FAILURE: some internal failure
// - ERR_FILE_OPERATION_FAILED: some file failure
// - ERR_TRY_AGAIN: need try again later
virtual ::dsn::error_code async_checkpoint(bool is_emergency) override;
// - ERR_TRY_AGAIN: flush memtable triggered, need try again later
virtual ::dsn::error_code async_checkpoint(bool flush_memtable) override;
//
// copy the latest checkpoint to checkpoint_dir, and the decree of the checkpoint
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册