提交 a7b3c02d 编写于 作者: O obdev 提交者: wangzelin.wzl

[CP] add trigger for ckpt

上级 d1aa3c5d
......@@ -59,7 +59,11 @@ public:
virtual int abort();
bool is_disk_warning() const;
// Thread safe
virtual int get_active_cursor(common::ObLogCursor& log_cursor) override;
virtual int get_active_cursor(common::ObLogCursor &log_cursor) override;
int64_t get_max_file_size() const
{
return log_writer_.get_file_size();
}
void remove_useless_log_file(const int64_t end_file_id);
// parse log to stream
......
......@@ -95,6 +95,11 @@ public:
int delete_log_file(int64_t file_id);
int get_using_disk_space(int64_t &using_space) const;
int64_t get_file_size() const
{
return file_size_;
}
bool is_disk_warning() const;
private:
......
......@@ -1478,13 +1478,11 @@ int ObWriteCheckpointTask::process()
static int64_t checkpoint_version_ = 0;
static common::ObLogCursor slog_cursor_;
const int64_t now = ObTimeUtility::current_time();
int64_t alert_interval = FAIL_WRITE_CHECKPOINT_ALERT_INTERVAL;
const int64_t now = ObTimeUtility::fast_current_time();
int64_t min_interval = RETRY_WRITE_CHECKPOINT_MIN_INTERVAL;
#ifdef ERRSIM
alert_interval = GCONF.fail_write_checkpoint_alert_interval;
min_interval = GCONF.retry_write_checkpoint_min_interval;
LOG_INFO("use errsim checkpoint config", K(alert_interval), K(min_interval));
LOG_INFO("use errsim checkpoint config", K(min_interval));
#endif
DEBUG_SYNC(DELAY_WRITE_CHECKPOINT);
......@@ -1509,70 +1507,68 @@ int ObWriteCheckpointTask::process()
// ignore tmp_ret, cur_cursor will be invalid if previous step failed
// write checkpoint if 1) just finish major merge
// 2) log id increased over MIN_WRITE_CHECKPOINT_LOG_CNT
if (cur_cursor.is_valid() && ((checkpoint_version_ < frozen_version_) ||
((cur_cursor.newer_than(slog_cursor_) &&
(cur_cursor.log_id_ - slog_cursor_.log_id_ >= MIN_WRITE_CHECKPOINT_LOG_CNT))))) {
SERVER_EVENT_ADD("storage",
"write checkpoint start",
"tenant_id",
0,
"checkpoint_snapshot",
frozen_version_,
"checkpoint_type",
"META_CKPT",
"checkpoint_cluster_version",
GET_MIN_CLUSTER_VERSION());
if (OB_SUCCESS != (tmp_ret = ObServerCheckpointWriter::get_instance().write_checkpoint(cur_cursor))) {
ObTaskController::get().allow_next_syslog();
if (0 != last_write_time_ && now > last_write_time_ + alert_interval && OB_EAGAIN != tmp_ret &&
cur_cursor.log_id_ - slog_cursor_.log_id_ >= MIN_WRITE_CHECKPOINT_LOG_CNT * 2) {
LOG_ERROR("Fail to write checkpoint in long time",
K(tmp_ret),
K(frozen_version_),
K(last_write_time_),
K(alert_interval),
"task",
*this);
// 2) log size increased over MIN_WRITE_CHECKPOINT_OFFSET_INTERVAL
if (cur_cursor.is_valid()) {
bool need_ckpt = false;
int64_t offset_interval = 0;
if (OB_FAIL(need_write_ckpt(
checkpoint_version_, frozen_version_, cur_cursor, slog_cursor_, offset_interval, need_ckpt))) {
LOG_WARN("failed to judge whether to make checkpoint or not", K(ret));
} else if (need_ckpt) {
if (OB_FAIL(ObServerCheckpointWriter::get_instance().write_checkpoint(cur_cursor))) {
ObTaskController::get().allow_next_syslog();
LOG_WARN("Fail to write checkpoint", K(tmp_ret), K(frozen_version_), K(last_write_time_), "task", *this);
} else {
LOG_WARN("Fail to write checkpoint in short time",
K(tmp_ret),
K(frozen_version_),
K(last_write_time_),
K(alert_interval),
"task",
*this);
checkpoint_version_ = frozen_version_;
last_write_time_ = now;
slog_cursor_ = cur_cursor;
ObTaskController::get().allow_next_syslog();
LOG_INFO(
"Success to write checkpoint", K(frozen_version_), K(last_write_time_), K(slog_cursor_), "task", *this);
}
} else {
SERVER_EVENT_ADD("storage",
"write checkpoint finish",
"tenant_id",
0,
"checkpoint_snapshot",
frozen_version_,
"checkpoint_type",
"META_CKPT",
"checkpoint_cluster_version",
GET_MIN_CLUSTER_VERSION());
checkpoint_version_ = frozen_version_;
last_write_time_ = now;
slog_cursor_ = cur_cursor;
ObTaskController::get().allow_next_syslog();
LOG_INFO("Success to write checkpoint", K(frozen_version_), K(last_write_time_), K(slog_cursor_), "task", *this);
const int64_t time_after = ObTimeUtility::fast_current_time();
SERVER_EVENT_ADD("storage", "write checkpoint", "ret", ret, "cursor", cur_cursor, "cost_time", time_after - now);
}
}
return ret;
}
int ObSSTableMergeContext::new_block_write_ctx(blocksstable::ObMacroBlocksWriteCtx*& ctx)
int ObWriteCheckpointTask::need_write_ckpt(const int64_t checkpoint_version, const int64_t frozen_version,
const oceanbase::common::ObLogCursor &cur_cursor, const oceanbase::common::ObLogCursor &slog_cursor,
int64_t &offset_interval, bool &need_ckpt)
{
int ret = OB_SUCCESS;
if (cur_cursor.file_id_ == slog_cursor.file_id_) {
offset_interval = cur_cursor.offset_ - slog_cursor.offset_;
} else if (cur_cursor.file_id_ < slog_cursor.file_id_) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("current file_id_ is smaller than last ckpt's file_id_", K(ret), K(cur_cursor), K(slog_cursor));
} else {
offset_interval = cur_cursor.offset_ + (cur_cursor.file_id_ - slog_cursor.file_id_) * SLOGGER.get_max_file_size() -
slog_cursor.offset_;
}
if (OB_FAIL(ret)) {
// do nothing
} else if (checkpoint_version < frozen_version) {
LOG_INFO("Make checkpoint because of frozen version", K(checkpoint_version), K(frozen_version));
need_ckpt = true;
} else if (offset_interval >= MIN_WRITE_CHECKPOINT_OFFSET_INTERVAL) {
LOG_INFO("Make checkpoint because of slog size", K(cur_cursor), K(slog_cursor));
need_ckpt = true;
}
return ret;
}
int ObSSTableMergeContext::new_block_write_ctx(blocksstable::ObMacroBlocksWriteCtx *&ctx)
{
int ret = OB_SUCCESS;
void* buf = NULL;
if (OB_NOT_NULL(ctx)) {
if (OB_UNLIKELY(nullptr != ctx)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("ctx must not null", K(ret), K(ctx));
LOG_WARN("ctx should be null", K(ret), K(ctx));
} else if (OB_ISNULL(buf = allocator_.alloc(sizeof(blocksstable::ObMacroBlocksWriteCtx)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc ObMacroBlocksWriteCtx", K(ret));
......
......@@ -364,11 +364,13 @@ public:
virtual int process() override;
private:
static const int64_t FAIL_WRITE_CHECKPOINT_ALERT_INTERVAL = 1000L * 1000L * 3600LL; // 6h
static const int64_t RETRY_WRITE_CHECKPOINT_MIN_INTERVAL = 1000L * 1000L * 300L; // 5 minutes
// Average replay time of 1 slog is 100us. Total replay time should less than 1 minute.
// So once log count exceed 50% * (60000000 / 100) = 300000, try to write a checkpoint.
static const int64_t MIN_WRITE_CHECKPOINT_LOG_CNT = 300000;
int need_write_ckpt(const int64_t checkpoint_version, const int64_t frozen_version,
const oceanbase::common::ObLogCursor &cur_cursor, const oceanbase::common::ObLogCursor &slog_cursor,
int64_t &offset_interval, bool &need_ckpt);
private:
static const int64_t RETRY_WRITE_CHECKPOINT_MIN_INTERVAL = 1000L * 1000L * 300L; // 5 minutes
static const int64_t MIN_WRITE_CHECKPOINT_OFFSET_INTERVAL = 32 << 20;
bool is_inited_;
int64_t frozen_version_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册