提交 50b4027b 编写于 作者: S simonjoylet 提交者: ob-robot

shrink memory usage of ddl kv mgr

上级 5217ca4c
......@@ -1245,7 +1245,7 @@ int ObComplementMergeTask::add_build_hidden_table_sstable()
} else if (OB_FAIL(param_->get_hidden_table_key(hidden_table_key))) {
LOG_WARN("fail to get hidden table key", K(ret), K(hidden_table_key));
} else if (OB_FAIL(context_->data_sstable_redo_writer_.end_ddl_redo_and_create_ddl_sstable(
ls_handle, hidden_table_key, param_->dest_table_id_, param_->execution_id_, param_->task_id_))) {
ls_handle, context_->ddl_kv_mgr_handle_, hidden_table_key, param_->dest_table_id_, param_->execution_id_, param_->task_id_))) {
LOG_WARN("failed to end ddl redo", K(ret));
}
return ret;
......
......@@ -312,7 +312,7 @@ int ObDDLTableMergeTask::init(const ObDDLTableMergeDagParam &ddl_dag_param)
int ObDDLTableMergeTask::process()
{
int ret = OB_SUCCESS;
int64_t MAX_DDL_SSTABLE = 128;
int64_t MAX_DDL_SSTABLE = ObTabletDDLKvMgr::MAX_DDL_KV_CNT_IN_STORAGE * 0.5;
#ifdef ERRSIM
if (0 != GCONF.errsim_max_ddl_sstable_count) {
MAX_DDL_SSTABLE = GCONF.errsim_max_ddl_sstable_count;
......@@ -426,6 +426,10 @@ int ObDDLTableMergeTask::process()
}
} else {
LOG_INFO("commit ddl sstable succ", K(ddl_param), K(merge_param_));
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(ddl_kv_mgr_handle.get_obj()->unregister_from_tablet(merge_param_.start_scn_, ddl_kv_mgr_handle))) {
LOG_WARN("unregister ddl kv mgr from tablet failed", K(tmp_ret), K(merge_param_));
}
}
}
}
......
......@@ -689,17 +689,27 @@ int ObDDLRedoLogWriter::write_ddl_start_log(ObTabletHandle &tablet_handle,
ret = OB_TASK_EXPIRED;
LOG_INFO("receive a old execution id, don't do ddl start", K(ret), K(log));
} else if (ddl_kv_mgr_handle.get_obj()->get_commit_scn_nolock(tablet_handle.get_obj()->get_tablet_meta()).is_valid_and_not_min()) {
start_scn = ddl_kv_mgr_handle.get_obj()->get_start_scn();
if (!start_scn.is_valid_and_not_min()) {
start_scn = tablet_handle.get_obj()->get_tablet_meta().ddl_start_scn_;
}
if (!start_scn.is_valid_and_not_min()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("start scn must be valid after commit", K(ret), K(start_scn));
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->set_execution_id_nolock(log.get_execution_id()))) {
LOG_WARN("failed to set execution id", K(ret));
// ddl commit log already written
if (ddl_kv_mgr_handle.get_obj()->get_start_scn().is_valid_and_not_min()) {
start_scn = ddl_kv_mgr_handle.get_obj()->get_start_scn();
if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->set_execution_id_nolock(log.get_execution_id()))) {
LOG_WARN("failed to set execution id", K(ret));
}
LOG_INFO("already committed, use new execution id", K(ret), K(start_scn),
K(log.get_execution_id()), "tablet_meta", tablet_handle.get_obj()->get_tablet_meta());
} else {
LOG_INFO("already committed, use previous start scn", K(ret), K(tablet_handle.get_obj()->get_tablet_meta()));
// ddl kv mgr is freed when ddl merge task finished, this is a new one, so recover it
start_scn = tablet_handle.get_obj()->get_tablet_meta().ddl_start_scn_;
SCN commit_scn = tablet_handle.get_obj()->get_tablet_meta().ddl_commit_scn_;
if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_recover_nolock(log.get_table_key(),
start_scn,
log.get_data_format_version(),
log.get_execution_id(),
commit_scn))) {
LOG_WARN("start ddl log failed", K(ret), K(start_scn), K(log));
}
LOG_INFO("already committed, recover to success status", K(ret), K(start_scn), K(commit_scn),
K(log.get_execution_id()), "tablet_meta", tablet_handle.get_obj()->get_tablet_meta());
}
} else if (OB_ISNULL(cb = op_alloc(ObDDLStartClogCb))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
......@@ -1079,6 +1089,7 @@ int ObDDLSSTableRedoWriter::start_ddl_redo(const ObITable::TableKey &table_key,
int ObDDLSSTableRedoWriter::end_ddl_redo_and_create_ddl_sstable(
ObLSHandle &ls_handle,
ObDDLKvMgrHandle &ddl_kv_mgr_handle,
const ObITable::TableKey &table_key,
const uint64_t table_id,
const int64_t execution_id,
......@@ -1086,20 +1097,17 @@ int ObDDLSSTableRedoWriter::end_ddl_redo_and_create_ddl_sstable(
{
int ret = OB_SUCCESS;
ObTabletHandle tablet_handle;
ObDDLKvMgrHandle ddl_kv_mgr_handle;
const ObTabletID &tablet_id = table_key.tablet_id_;
ObLS *ls = nullptr;
ObLSID ls_id;
SCN ddl_start_scn = get_start_scn();
SCN commit_scn = SCN::min_scn();
if (OB_ISNULL(ls = ls_handle.get_ls()) || OB_UNLIKELY(!table_key.is_valid())) {
if (OB_ISNULL(ls = ls_handle.get_ls()) || !ddl_kv_mgr_handle.is_valid() || OB_UNLIKELY(!table_key.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid ls", K(ret), K(table_key));
LOG_WARN("invalid ls", K(ret), KP(ls), K(ddl_kv_mgr_handle), K(table_key));
} else if (OB_FALSE_IT(ls_id = ls->get_ls_id())) {
} else if (OB_FAIL(ObDDLUtil::ddl_get_tablet(ls_handle, tablet_id, tablet_handle))) {
LOG_WARN("get tablet failed", K(ret));
} else if (OB_FAIL(tablet_handle.get_obj()->get_ddl_kv_mgr(ddl_kv_mgr_handle))) {
LOG_WARN("get ddl kv manager failed", K(ret), K(ls_id), K(tablet_id));
} else if (OB_FALSE_IT(ddl_kv_mgr_handle.get_obj()->prepare_info_for_checksum_report(table_id, ddl_task_id))) {
} else if (OB_FAIL(write_commit_log(tablet_handle, ddl_kv_mgr_handle, table_key, table_id, execution_id, ddl_task_id, commit_scn))) {
if (OB_TASK_EXPIRED == ret) {
......
......@@ -269,6 +269,7 @@ public:
const int64_t data_format_version,
ObDDLKvMgrHandle &ddl_kv_mgr_handle);
int end_ddl_redo_and_create_ddl_sstable(ObLSHandle &ls_handle,
ObDDLKvMgrHandle &ddl_kv_mgr_handle,
const ObITable::TableKey &table_key,
const uint64_t table_id,
const int64_t execution_id,
......
......@@ -864,7 +864,7 @@ int ObSSTableInsertTabletContext::create_sstable_with_clog(
} else {
DEBUG_SYNC(AFTER_REMOTE_WRITE_DDL_PREPARE_LOG);
if (OB_FAIL(data_sstable_redo_writer_.end_ddl_redo_and_create_ddl_sstable(
ls_handle_, table_key, table_id, build_param_.execution_id_, build_param_.ddl_task_id_))) {
ls_handle_, ddl_kv_mgr_handle_, table_key, table_id, build_param_.execution_id_, build_param_.ddl_task_id_))) {
LOG_WARN("fail create ddl sstable", K(ret), K(table_key));
}
}
......
......@@ -170,6 +170,39 @@ int ObTabletDDLKvMgr::ddl_start(ObTablet &tablet,
return ret;
}
int ObTabletDDLKvMgr::ddl_recover_nolock(const ObITable::TableKey &table_key,
const SCN &start_scn,
const int64_t data_format_version,
const int64_t execution_id,
const SCN &commit_scn)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret), K(is_inited_));
} else if (OB_UNLIKELY(!table_key.is_valid() || !start_scn.is_valid_and_not_min() || execution_id < 0 || data_format_version < 0
|| (commit_scn.is_valid_and_not_min() && commit_scn < start_scn))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(table_key), K(start_scn), K(execution_id), K(data_format_version), K(commit_scn));
} else if (table_key.get_tablet_id() != tablet_id_) {
ret = OB_ERR_SYS;
LOG_WARN("tablet id not same", K(ret), K(table_key), K(tablet_id_));
} else if (start_scn_.is_valid_and_not_min()) {
ret = OB_ERR_SYS;
LOG_WARN("start scn has set", K(ret), K(start_scn_));
} else {
table_key_ = table_key;
data_format_version_ = data_format_version;
execution_id_ = execution_id;
start_scn_ = start_scn;
max_freeze_scn_ = commit_scn;
commit_scn_ = commit_scn;
success_start_scn_ = start_scn;
}
FLOG_INFO("ddl recover finished", K(ret), K(*this));
return ret;
}
int ObTabletDDLKvMgr::ddl_commit(const SCN &start_scn,
const SCN &commit_scn,
const uint64_t table_id,
......@@ -319,6 +352,12 @@ int ObTabletDDLKvMgr::wait_ddl_merge_success(const SCN &start_scn, const SCN &co
"wait_elpased_s", (ObTimeUtility::fast_current_time() - wait_start_ts) / 1000000L);
}
}
if (OB_TASK_EXPIRED == ret) { // maybe ddl merge task has finished
if (is_commit_success()) {
ret = OB_SUCCESS;
FLOG_INFO("ddl commit already succeed", K(start_scn), K(commit_scn), K(*this));
}
}
}
return ret;
}
......
......@@ -80,6 +80,7 @@ public:
int get_ddl_major_merge_param(const ObTabletMeta &tablet_meta, ObDDLTableMergeDagParam &merge_param);
int get_rec_scn(share::SCN &rec_scn);
void prepare_info_for_checksum_report(const uint64_t table_id, const int64_t ddl_task_id) { table_id_ = table_id; ddl_task_id_ = ddl_task_id; }
int ddl_recover_nolock(const ObITable::TableKey &table_key, const share::SCN &start_scn, const int64_t data_format_version, const int64_t execution_id, const share::SCN &commit_scn);
TO_STRING_KV(K_(is_inited), K_(success_start_scn), K_(ls_id), K_(tablet_id), K_(table_key),
K_(data_format_version), K_(start_scn), K_(commit_scn), K_(max_freeze_scn),
K_(table_id), K_(execution_id), K_(ddl_task_id), K_(head), K_(tail), K_(ref_cnt));
......@@ -97,8 +98,9 @@ private:
void cleanup_unlock();
void destroy();
bool is_commit_success_unlock() const;
private:
public:
static const int64_t MAX_DDL_KV_CNT_IN_STORAGE = 64;
private:
bool is_inited_;
share::SCN success_start_scn_;
share::ObLSID ls_id_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册