Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
Incubator Pegasus
提交
f1d8f989
Incubator Pegasus
项目概览
apache
/
Incubator Pegasus
通知
9
Star
5
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
Incubator Pegasus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
f1d8f989
编写于
3月 25, 2020
作者:
Y
Yingchun Lai
提交者:
GitHub
3月 25, 2020
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor: simplify logging in pegasus_server_impl (#503)
上级
47cea636
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
82 addition
and
112 deletion
+82
-112
src/server/pegasus_server_impl.cpp
src/server/pegasus_server_impl.cpp
+82
-112
未找到文件。
src/server/pegasus_server_impl.cpp
浏览文件 @
f1d8f989
...
...
@@ -1773,57 +1773,51 @@ private:
int64_t
last_durable
=
last_durable_decree
();
int64_t
last_commit
=
last_committed_decree
();
d
assert
(
last_durable
<=
last_commit
,
"%"
PRId64
" VS %"
PRId64
,
last_durable
,
last_commit
);
d
check_le_replica
(
last_durable
,
last_commit
);
// case 1: last_durable == last_commit
// no need to do checkpoint
if
(
last_durable
==
last_commit
)
{
ddebug
(
"%s: no need to checkpoint because "
"last_durable_decree = last_committed_decree = %"
PRId64
,
replica_name
(),
last_durable
);
ddebug_replica
(
"no need to do checkpoint because last_durable_decree = last_committed_decree = {}"
,
last_durable
);
return
::
dsn
::
ERR_OK
;
}
// case 2: last_durable < last_commit
// need to do checkpoint
rocksdb
::
Checkpoint
*
chkpt_raw
=
nullptr
;
auto
status
=
rocksdb
::
Checkpoint
::
Create
(
_db
,
&
chkpt_raw
);
if
(
!
status
.
ok
())
{
derror
(
"%s: create Checkpoint object failed, error = %s"
,
replica_name
(),
status
.
ToString
().
c_str
());
derror_replica
(
"create Checkpoint object failed, error = {}"
,
status
.
ToString
());
return
::
dsn
::
ERR_LOCAL_APP_FAILURE
;
}
std
::
unique_ptr
<
rocksdb
::
Checkpoint
>
chkpt
(
chkpt_raw
);
auto
dir
=
chkpt_get_dir_name
(
last_commit
);
auto
chkpt_dir
=
::
dsn
::
utils
::
filesystem
::
path_combine
(
data_dir
(),
dir
);
if
(
::
dsn
::
utils
::
filesystem
::
directory_exists
(
chkpt_dir
))
{
ddebug
(
"%s: checkpoint directory %s already exist, remove it first"
,
replica_name
(),
chkpt_dir
.
c_str
());
if
(
!::
dsn
::
utils
::
filesystem
::
remove_path
(
chkpt_dir
))
{
derror
(
"%s: remove old checkpoint directory %s failed"
,
replica_name
(),
chkpt_dir
.
c_str
());
auto
checkpoint_dir
=
::
dsn
::
utils
::
filesystem
::
path_combine
(
data_dir
(),
dir
);
if
(
::
dsn
::
utils
::
filesystem
::
directory_exists
(
checkpoint_dir
))
{
ddebug_replica
(
"checkpoint directory {} is already existed, remove it first"
,
checkpoint_dir
);
if
(
!::
dsn
::
utils
::
filesystem
::
remove_path
(
checkpoint_dir
))
{
derror_replica
(
"remove checkpoint directory {} failed"
,
checkpoint_dir
);
return
::
dsn
::
ERR_FILE_OPERATION_FAILED
;
}
}
//
CreateCheckpoint() will always flush memtable firstly.
status
=
chkpt
->
CreateCheckpoint
(
ch
kpt_dir
,
0
);
//
log_size_for_flush = 0 means always flush memtable before recording the live files
status
=
chkpt
->
CreateCheckpoint
(
ch
eckpoint_dir
,
0
/* log_size_for_flush */
);
if
(
!
status
.
ok
())
{
// sometimes checkpoint may fail, and try again will succeed
derror
(
"%s: create checkpoint failed, error = %s, try again"
,
replica_name
(),
status
.
ToString
().
c_str
());
status
=
chkpt
->
CreateCheckpoint
(
chkpt_dir
,
0
);
derror_replica
(
"CreateCheckpoint failed, error = {}, try again"
,
status
.
ToString
());
// TODO(yingchun): fail and return
status
=
chkpt
->
CreateCheckpoint
(
checkpoint_dir
,
0
);
}
if
(
!
status
.
ok
())
{
derror
(
"%s: create checkpoint failed, error = %s"
,
replica_name
(),
status
.
ToString
().
c_str
());
::
dsn
::
utils
::
filesystem
::
remove_path
(
chkpt_dir
);
if
(
!::
dsn
::
utils
::
filesystem
::
remove_path
(
chkpt_dir
))
{
derror
(
"%s: remove damaged checkpoint directory %s failed"
,
replica_name
(),
chkpt_dir
.
c_str
());
derror_replica
(
"CreateCheckpoint failed, error = {}"
,
status
.
ToString
());
if
(
!::
dsn
::
utils
::
filesystem
::
remove_path
(
checkpoint_dir
))
{
derror_replica
(
"remove checkpoint directory {} failed"
,
checkpoint_dir
);
}
return
::
dsn
::
ERR_LOCAL_APP_FAILURE
;
}
...
...
@@ -1840,9 +1834,8 @@ private:
set_last_durable_decree
(
_checkpoints
.
back
());
}
ddebug
(
"%s: sync create checkpoint succeed, last_durable_decree = %"
PRId64
""
,
replica_name
(),
last_durable_decree
());
ddebug_replica
(
"sync create checkpoint succeed, last_durable_decree = {}"
,
last_durable_decree
());
gc_checkpoints
();
...
...
@@ -1860,50 +1853,53 @@ private:
int64_t
last_flushed
=
static_cast
<
int64_t
>
(
_db
->
GetLastFlushedDecree
());
int64_t
last_commit
=
last_committed_decree
();
d
assert
(
last_durable
<=
last_flushed
,
"%"
PRId64
" VS %"
PRId64
,
last_durable
,
last_flushed
);
d
assert
(
last_flushed
<=
last_commit
,
"%"
PRId64
" VS %"
PRId64
,
last_flushed
,
last_commit
);
d
check_le_replica
(
last_durable
,
last_flushed
);
d
check_le_replica
(
last_flushed
,
last_commit
);
// case 1: last_durable == last_flushed == last_commit
// no need to do checkpoint
if
(
last_durable
==
last_commit
)
{
ddebug
(
"%s: no need to checkpoint because "
"last_durable_decree = last_committed_decree = %"
PRId64
,
replica_name
(),
last_durable
);
dcheck_eq_replica
(
last_durable
,
last_flushed
);
dcheck_eq_replica
(
last_flushed
,
last_commit
);
ddebug_replica
(
"no need to checkpoint because last_durable_decree = last_committed_decree = {}"
,
last_durable
);
return
::
dsn
::
ERR_OK
;
}
// case 2: last_durable == last_flushed < last_commit
// no need to do checkpoint, but need to flush memtable if required
if
(
last_durable
==
last_flushed
)
{
if
(
flush_memtable
)
{
// trigger flushing memtable, but not wait
rocksdb
::
FlushOptions
options
;
options
.
wait
=
false
;
auto
status
=
_db
->
Flush
(
options
);
if
(
status
.
ok
())
{
ddebug
(
"%s: trigger flushing memtable succeed"
,
replica_name
());
return
::
dsn
::
ERR_TRY_AGAIN
;
}
else
{
derror
(
"%s: trigger flushing memtable failed, error = %s"
,
replica_name
(),
status
.
ToString
().
c_str
());
return
::
dsn
::
ERR_LOCAL_APP_FAILURE
;
}
}
else
{
dcheck_lt_replica
(
last_flushed
,
last_commit
);
if
(
!
flush_memtable
)
{
// no flush required
return
::
dsn
::
ERR_OK
;
}
// flush required, but not wait
rocksdb
::
FlushOptions
options
;
options
.
wait
=
false
;
auto
status
=
_db
->
Flush
(
options
);
if
(
status
.
ok
())
{
ddebug_replica
(
"trigger flushing memtable succeed, status = {}"
,
status
.
ToString
());
return
::
dsn
::
ERR_TRY_AGAIN
;
}
else
{
ddebug_replica
(
"trigger flushing memtable failed, status = {}"
,
status
.
ToString
());
return
::
dsn
::
ERR_LOCAL_APP_FAILURE
;
}
}
dassert
(
last_durable
<
last_flushed
,
"%"
PRId64
" VS %"
PRId64
,
last_durable
,
last_flushed
);
// case 3: last_durable < last_flushed <= last_commit
// need to do checkpoint
dcheck_lt_replica
(
last_durable
,
last_flushed
);
char
buf
[
256
];
sprintf
(
buf
,
"checkpoint.tmp.%"
PRIu64
""
,
dsn_now_us
());
std
::
string
tmp_dir
=
::
dsn
::
utils
::
filesystem
::
path_combine
(
data_dir
(),
buf
);
std
::
string
tmp_dir
=
::
dsn
::
utils
::
filesystem
::
path_combine
(
data_dir
(),
std
::
string
(
"checkpoint.tmp."
)
+
std
::
to_string
(
dsn_now_us
()));
if
(
::
dsn
::
utils
::
filesystem
::
directory_exists
(
tmp_dir
))
{
ddebug
(
"%s: temporary checkpoint directory %s already exist, remove it first"
,
replica_name
(),
tmp_dir
.
c_str
());
ddebug_replica
(
"temporary checkpoint directory {} is already existed, remove it first"
,
tmp_dir
);
if
(
!::
dsn
::
utils
::
filesystem
::
remove_path
(
tmp_dir
))
{
derror
(
"%s: remove temporary checkpoint directory %s failed"
,
replica_name
(),
tmp_dir
.
c_str
());
derror_replica
(
"remove temporary checkpoint directory {} failed"
,
tmp_dir
);
return
::
dsn
::
ERR_FILE_OPERATION_FAILED
;
}
}
...
...
@@ -1911,62 +1907,44 @@ private:
int64_t
checkpoint_decree
=
0
;
::
dsn
::
error_code
err
=
copy_checkpoint_to_dir_unsafe
(
tmp_dir
.
c_str
(),
&
checkpoint_decree
);
if
(
err
!=
::
dsn
::
ERR_OK
)
{
derror
(
"%s: call copy_checkpoint_to_dir_unsafe failed with err = %s"
,
replica_name
(),
err
.
to_string
());
derror_replica
(
"copy_checkpoint_to_dir_unsafe failed with err = {}"
,
err
.
to_string
());
return
::
dsn
::
ERR_LOCAL_APP_FAILURE
;
}
auto
ch
kp
t_dir
=
auto
ch
eckpoin
t_dir
=
::
dsn
::
utils
::
filesystem
::
path_combine
(
data_dir
(),
chkpt_get_dir_name
(
checkpoint_decree
));
if
(
::
dsn
::
utils
::
filesystem
::
directory_exists
(
chkpt_dir
))
{
ddebug
(
"%s: checkpoint directory %s already exist, remove it first"
,
replica_name
(),
chkpt_dir
.
c_str
());
if
(
!::
dsn
::
utils
::
filesystem
::
remove_path
(
chkpt_dir
))
{
derror
(
"%s: remove old checkpoint directory %s failed"
,
replica_name
(),
chkpt_dir
.
c_str
());
if
(
::
dsn
::
utils
::
filesystem
::
directory_exists
(
checkpoint_dir
))
{
ddebug_replica
(
"checkpoint directory {} is already existed, remove it first"
,
checkpoint_dir
);
if
(
!::
dsn
::
utils
::
filesystem
::
remove_path
(
checkpoint_dir
))
{
derror_replica
(
"remove old checkpoint directory {} failed"
,
checkpoint_dir
);
if
(
!::
dsn
::
utils
::
filesystem
::
remove_path
(
tmp_dir
))
{
derror
(
"%s: remove temporary checkpoint directory %s failed"
,
replica_name
(),
tmp_dir
.
c_str
());
derror_replica
(
"remove temporary checkpoint directory {} failed"
,
tmp_dir
);
}
return
::
dsn
::
ERR_FILE_OPERATION_FAILED
;
}
}
if
(
!::
dsn
::
utils
::
filesystem
::
rename_path
(
tmp_dir
,
chkpt_dir
))
{
derror
(
"%s: rename checkpoint directory from %s to %s failed"
,
replica_name
(),
tmp_dir
.
c_str
(),
chkpt_dir
.
c_str
());
if
(
!::
dsn
::
utils
::
filesystem
::
rename_path
(
tmp_dir
,
checkpoint_dir
))
{
derror_replica
(
"rename checkpoint directory from {} to {} failed"
,
tmp_dir
,
checkpoint_dir
);
if
(
!::
dsn
::
utils
::
filesystem
::
remove_path
(
tmp_dir
))
{
derror
(
"%s: remove temporary checkpoint directory %s failed"
,
replica_name
(),
tmp_dir
.
c_str
());
derror_replica
(
"remove temporary checkpoint directory {} failed"
,
tmp_dir
);
}
return
::
dsn
::
ERR_FILE_OPERATION_FAILED
;
}
{
::
dsn
::
utils
::
auto_lock
<::
dsn
::
utils
::
ex_lock_nr
>
l
(
_checkpoints_lock
);
dassert
(
checkpoint_decree
>
last_durable_decree
(),
"%"
PRId64
" VS %"
PRId64
""
,
checkpoint_decree
,
last_durable_decree
());
dcheck_gt_replica
(
checkpoint_decree
,
last_durable_decree
());
if
(
!
_checkpoints
.
empty
())
{
dassert
(
checkpoint_decree
>
_checkpoints
.
back
(),
"%"
PRId64
" VS %"
PRId64
""
,
checkpoint_decree
,
_checkpoints
.
back
());
dcheck_gt_replica
(
checkpoint_decree
,
_checkpoints
.
back
());
}
_checkpoints
.
push_back
(
checkpoint_decree
);
set_last_durable_decree
(
_checkpoints
.
back
());
}
ddebug
(
"%s: async create checkpoint succeed, last_durable_decree = %"
PRId64
""
,
replica_name
(),
last_durable_decree
());
ddebug_replica
(
"async create checkpoint succeed, last_durable_decree = {}"
,
last_durable_decree
());
gc_checkpoints
();
...
...
@@ -1990,21 +1968,18 @@ private:
int64_t
*
checkpoint_decree
)
{
rocksdb
::
Checkpoint
*
chkpt_raw
=
nullptr
;
rocksdb
::
Status
status
=
rocksdb
::
Checkpoint
::
Create
(
_db
,
&
chkpt_raw
);
auto
status
=
rocksdb
::
Checkpoint
::
Create
(
_db
,
&
chkpt_raw
);
if
(
!
status
.
ok
())
{
derror
(
"%s: create Checkpoint object failed, error = %s"
,
replica_name
(),
status
.
ToString
().
c_str
());
derror_replica
(
"create Checkpoint object failed, error = {}"
,
status
.
ToString
());
return
::
dsn
::
ERR_LOCAL_APP_FAILURE
;
}
std
::
unique_ptr
<
rocksdb
::
Checkpoint
>
chkpt
(
chkpt_raw
);
if
(
::
dsn
::
utils
::
filesystem
::
directory_exists
(
checkpoint_dir
))
{
ddebug
(
"%s: checkpoint directory %s is already exist, remove it first"
,
replica_name
(),
checkpoint_dir
);
ddebug_replica
(
"checkpoint directory {} is already existed, remove it first"
,
checkpoint_dir
);
if
(
!::
dsn
::
utils
::
filesystem
::
remove_path
(
checkpoint_dir
))
{
derror
(
"%s: remove checkpoint directory %s failed"
,
replica_name
()
,
checkpoint_dir
);
derror
_replica
(
"remove checkpoint directory {} failed"
,
checkpoint_dir
);
return
::
dsn
::
ERR_FILE_OPERATION_FAILED
;
}
}
...
...
@@ -2012,19 +1987,14 @@ private:
uint64_t
ci
=
0
;
status
=
chkpt
->
CreateCheckpointQuick
(
checkpoint_dir
,
&
ci
);
if
(
!
status
.
ok
())
{
derror
(
"%s: async create checkpoint failed, error = %s"
,
replica_name
(),
status
.
ToString
().
c_str
());
derror_replica
(
"CreateCheckpoint failed, error = {}"
,
status
.
ToString
());
if
(
!::
dsn
::
utils
::
filesystem
::
remove_path
(
checkpoint_dir
))
{
derror
(
"%s: remove checkpoint directory %s failed"
,
replica_name
()
,
checkpoint_dir
);
derror
_replica
(
"remove checkpoint directory {} failed"
,
checkpoint_dir
);
}
return
::
dsn
::
ERR_LOCAL_APP_FAILURE
;
}
ddebug_replica
(
"copy checkpoint to dir({}) succeed, last_decree = {}"
,
checkpoint_dir
,
ci
);
ddebug
(
"%s: copy checkpoint to dir(%s) succeed, last_decree = %"
PRId64
""
,
replica_name
(),
checkpoint_dir
,
ci
);
if
(
checkpoint_decree
!=
nullptr
)
{
*
checkpoint_decree
=
static_cast
<
int64_t
>
(
ci
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录