提交 92fffa73 编写于 作者: S simonjoylet 提交者: wangzelin.wzl

tolerant for multi ddl execution on single tablet

上级 6677ad6f
......@@ -309,7 +309,7 @@ int ObComplementDataContext::write_start_log(const ObComplementDataParam &param)
} else if (OB_UNLIKELY(!hidden_table_key.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid table key", K(ret), K(hidden_table_key));
} else if (OB_FAIL(data_sstable_redo_writer_.start_ddl_redo(hidden_table_key))) {
} else if (OB_FAIL(data_sstable_redo_writer_.start_ddl_redo(hidden_table_key, ddl_kv_mgr_handle_))) {
LOG_WARN("fail write start log", K(ret), K(hidden_table_key), K(param));
} else {
LOG_INFO("complement task start ddl redo success", K(hidden_table_key));
......@@ -327,6 +327,7 @@ void ObComplementDataContext::destroy()
allocator_.free(index_builder_);
index_builder_ = nullptr;
}
ddl_kv_mgr_handle_.reset();
allocator_.reset();
}
......@@ -1176,7 +1177,11 @@ int ObComplementMergeTask::add_build_hidden_table_sstable()
1/*execution_id*/,
param_->task_id_,
prepare_log_ts))) {
LOG_WARN("fail write ddl prepare log", K(ret), K(hidden_table_key));
if (OB_TASK_EXPIRED == ret) {
LOG_INFO("ddl task expired, but return success", K(ret), K(hidden_table_key), KPC(param_));
} else {
LOG_WARN("fail write ddl prepare log", K(ret), K(hidden_table_key));
}
} else {
ObTabletHandle new_tablet_handle; // no use here
ObDDLKvMgrHandle ddl_kv_mgr_handle;
......@@ -1190,9 +1195,17 @@ int ObComplementMergeTask::add_build_hidden_table_sstable()
param_->hidden_table_schema_->get_table_id(),
1/*execution_id*/,
param_->task_id_))) {
LOG_WARN("commit ddl log failed", K(ret), K(ddl_start_log_ts), K(prepare_log_ts), K(hidden_table_key));
if (OB_TASK_EXPIRED == ret) {
LOG_INFO("ddl task expired, but return success", K(ret), K(ls_id), K(tablet_id),
K(ddl_start_log_ts), "new_ddl_start_log_ts", ddl_kv_mgr_handle.get_obj()->get_start_log_ts());
ret = OB_SUCCESS;
} else {
LOG_WARN("commit ddl log failed", K(ret), K(ddl_start_log_ts), K(prepare_log_ts), K(hidden_table_key));
}
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->wait_ddl_commit(ddl_start_log_ts, prepare_log_ts))) {
if (OB_TASK_EXPIRED == ret) {
LOG_INFO("ddl task expired, but return success", K(ret), K(ls_id), K(tablet_id),
K(ddl_start_log_ts), "new_ddl_start_log_ts", ddl_kv_mgr_handle.get_obj()->get_start_log_ts());
ret = OB_SUCCESS;
} else {
LOG_WARN("wait ddl commit failed", K(ret), K(ddl_start_log_ts), K(hidden_table_key));
......
......@@ -121,6 +121,7 @@ public:
int64_t concurrent_cnt_;
ObDDLSSTableRedoWriter data_sstable_redo_writer_;
blocksstable::ObSSTableIndexBuilder *index_builder_;
ObDDLKvMgrHandle ddl_kv_mgr_handle_; // for keeping ddl kv mgr alive
};
class ObComplementPrepareTask;
......
......@@ -106,6 +106,9 @@ int ObDDLTableMergeDag::create_first_task()
LOG_WARN("get tablet failed", K(ret), K(ddl_param_));
} else if (OB_FAIL(tablet_handle.get_obj()->get_ddl_kv_mgr(ddl_kv_mgr_handle))) {
LOG_WARN("get ddl kv mgr failed", K(ret), K(ddl_param_));
} else if (ddl_param_.start_log_ts_ < ddl_kv_mgr_handle.get_obj()->get_start_log_ts()) {
ret = OB_TASK_EXPIRED;
LOG_WARN("ddl task expired, skip it", K(ret), K(ddl_param_), "new_start_log_ts", ddl_kv_mgr_handle.get_obj()->get_start_log_ts());
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->freeze_ddl_kv())) {
LOG_WARN("ddl kv manager try freeze failed", K(ret), K(ddl_param_));
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->get_ddl_kvs(true/*frozen_only*/, ddl_kvs_handle))) {
......@@ -340,6 +343,8 @@ int ObDDLTableMergeTask::process()
LOG_INFO("tablet me says with major but no major, meaning its a migrated deleted tablet, skip");
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->get_ddl_param(ddl_param))) {
LOG_WARN("get tablet ddl param failed", K(ret));
} else if (merge_param_.start_log_ts_ > 0 && merge_param_.start_log_ts_ < ddl_param.start_log_ts_) {
LOG_INFO("ddl merge task expired, do nothing", K(merge_param_), "new_start_log_ts", ddl_param.start_log_ts_);
} else if (merge_param_.is_commit_ && OB_FAIL(check_data_integrity(ddl_sstable_handles,
ddl_param.start_log_ts_,
merge_param_.rec_log_ts_,
......
......@@ -42,21 +42,23 @@ public:
tablet_id_(),
rec_log_ts_(0),
is_commit_(false),
start_log_ts_(0),
table_id_(0),
execution_id_(0),
ddl_task_id_(0)
{}
bool is_valid() const
{
return ls_id_.is_valid() && tablet_id_.is_valid();
return ls_id_.is_valid() && tablet_id_.is_valid() && start_log_ts_ > 0;
}
virtual ~ObDDLTableMergeDagParam() = default;
TO_STRING_KV(K_(ls_id), K_(tablet_id), K_(rec_log_ts), K_(is_commit), K_(table_id), K_(execution_id), K_(ddl_task_id));
TO_STRING_KV(K_(ls_id), K_(tablet_id), K_(rec_log_ts), K_(is_commit), K_(start_log_ts), K_(table_id), K_(execution_id), K_(ddl_task_id));
public:
share::ObLSID ls_id_;
ObTabletID tablet_id_;
int64_t rec_log_ts_;
bool is_commit_;
int64_t start_log_ts_; // start log ts at schedule, for skipping expired task
uint64_t table_id_; // used for report ddl checksum
int64_t execution_id_; // used for report ddl checksum
int64_t ddl_task_id_; // used for report ddl checksum
......
......@@ -922,12 +922,12 @@ int ObDDLSSTableRedoWriter::init(const ObLSID &ls_id, const ObTabletID &tablet_i
return ret;
}
int ObDDLSSTableRedoWriter::start_ddl_redo(const ObITable::TableKey &table_key)
int ObDDLSSTableRedoWriter::start_ddl_redo(const ObITable::TableKey &table_key, ObDDLKvMgrHandle &ddl_kv_mgr_handle)
{
int ret = OB_SUCCESS;
ObLS *ls = nullptr;
ObDDLStartLog log;
ObDDLKvMgrHandle ddl_kv_mgr_handle;
ddl_kv_mgr_handle.reset();
int64_t tmp_log_ts = 0;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
......@@ -1067,9 +1067,7 @@ int ObDDLSSTableRedoWriter::write_prepare_log(const ObITable::TableKey &table_ke
ret = OB_ERR_SYS;
LOG_WARN("srv rpc proxy or location service is null", K(ret), KP(srv_rpc_proxy));
} else if (OB_FAIL(srv_rpc_proxy->to(leader_addr_).remote_write_ddl_prepare_log(arg, log_ts))) {
if (OB_TASK_EXPIRED == ret) {
ret = OB_SUCCESS;
} else {
if (OB_TASK_EXPIRED != ret) {
LOG_WARN("fail to remote write ddl redo log", K(ret), K(arg));
}
} else {
......
......@@ -235,7 +235,7 @@ public:
ObDDLSSTableRedoWriter();
~ObDDLSSTableRedoWriter();
int init(const share::ObLSID &ls_id, const ObTabletID &tablet_id);
int start_ddl_redo(const ObITable::TableKey &table_key);
int start_ddl_redo(const ObITable::TableKey &table_key, ObDDLKvMgrHandle &ddl_kv_mgr_handle);
int write_redo_log(const blocksstable::ObDDLMacroBlockRedoInfo &redo_info,
const blocksstable::MacroBlockId &macro_block_id);
int wait_redo_log_finish(const blocksstable::ObDDLMacroBlockRedoInfo &redo_info,
......
......@@ -242,6 +242,7 @@ int ObDDLKV::set_macro_block(const ObDDLMacroBlock &macro_block)
ObDDLTableMergeDagParam param;
param.ls_id_ = ls_id_;
param.tablet_id_ = tablet_id_;
param.start_log_ts_ = ddl_start_log_ts_;
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(compaction::ObScheduleDagFunc::schedule_ddl_table_merge_dag(param))) {
LOG_WARN("try schedule ddl merge dag failed when ddl kv is full ",
......
......@@ -204,6 +204,7 @@ ObSSTableInsertTabletContext::~ObSSTableInsertTabletContext()
allocator_.free(index_builder_);
index_builder_ = nullptr;
}
ddl_kv_mgr_handle_.reset();
allocator_.reset();
}
......@@ -262,7 +263,7 @@ int ObSSTableInsertTabletContext::update(const int64_t snapshot_version)
LOG_WARN("invalid argument", K(ret), K(table_key));
} else if (data_sstable_redo_writer_.get_start_log_ts() > 0) {
// ddl start log is already written, do nothing
} else if (OB_FAIL(data_sstable_redo_writer_.start_ddl_redo(table_key))) {
} else if (OB_FAIL(data_sstable_redo_writer_.start_ddl_redo(table_key, ddl_kv_mgr_handle_))) {
LOG_WARN("fail write start log", K(ret), K(table_key), K(build_param_));
}
}
......@@ -678,7 +679,12 @@ int ObSSTableInsertTabletContext::create_sstable_with_clog(
build_param_.execution_id_,
build_param_.ddl_task_id_,
prepare_log_ts))) {
LOG_WARN("fail write ddl prepare log", K(ret), K(table_key));
if (OB_TASK_EXPIRED == ret) {
LOG_INFO("ddl task expired, but return success", K(ret), K(table_key), K(build_param_));
ret = OB_SUCCESS;
} else {
LOG_WARN("fail write ddl prepare log", K(ret), K(table_key));
}
} else {
DEBUG_SYNC(AFTER_REMOTE_WRITE_DDL_PREPARE_LOG);
ObTabletHandle tablet_handle;
......@@ -690,15 +696,23 @@ int ObSSTableInsertTabletContext::create_sstable_with_clog(
if (OB_FAIL(ls->get_tablet(tablet_id, tablet_handle))) {
LOG_WARN("get tablet failed", K(ret));
} else if (OB_FAIL(tablet_handle.get_obj()->get_ddl_kv_mgr(ddl_kv_mgr_handle))) {
LOG_WARN("get ddl kv manager failed", K(ret));
LOG_WARN("get ddl kv manager failed", K(ret), K(ls_id), K(tablet_id));
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_prepare(ddl_start_log_ts,
prepare_log_ts,
table_schema->get_table_id(),
build_param_.execution_id_,
build_param_.ddl_task_id_))) {
LOG_WARN("failed to do ddl kv prepare", K(ret), K(ddl_start_log_ts), K(prepare_log_ts), K(build_param_));
if (OB_TASK_EXPIRED == ret) {
LOG_INFO("ddl task expired, but return success", K(ret), K(ls_id), K(tablet_id),
K(ddl_start_log_ts), "new_ddl_start_log_ts", ddl_kv_mgr_handle.get_obj()->get_start_log_ts());
ret = OB_SUCCESS;
} else {
LOG_WARN("failed to do ddl kv prepare", K(ret), K(ddl_start_log_ts), K(prepare_log_ts), K(build_param_));
}
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->wait_ddl_commit(ddl_start_log_ts, prepare_log_ts))) {
if (OB_TASK_EXPIRED == ret) {
LOG_INFO("ddl task expired, but return success", K(ret), K(ls_id), K(tablet_id),
K(ddl_start_log_ts), "new_ddl_start_log_ts", ddl_kv_mgr_handle.get_obj()->get_start_log_ts());
ret = OB_SUCCESS;
} else {
LOG_WARN("failed to wait ddl kv commit", K(ret), K(ddl_start_log_ts), K(build_param_));
......
......@@ -137,6 +137,7 @@ private:
int64_t task_finish_count_;
blocksstable::ObSSTableIndexBuilder *index_builder_;
int64_t task_id_;
ObDDLKvMgrHandle ddl_kv_mgr_handle_; // for keeping ddl kv mgr alive
};
struct ObSSTableInsertTableParam final
......
......@@ -145,6 +145,7 @@ int ObTabletDDLKvMgr::ddl_prepare(const int64_t start_log_ts,
ret = OB_STATE_NOT_MATCH;
LOG_WARN("ddl not started", K(ret));
} else if (start_log_ts < start_log_ts_) {
ret = OB_TASK_EXPIRED;
LOG_INFO("skip ddl prepare log", K(start_log_ts), K(start_log_ts_), K(ls_id_), K(tablet_id_));
} else if (OB_FAIL(freeze_ddl_kv(prepare_log_ts))) {
LOG_WARN("freeze ddl kv failed", K(ret), K(prepare_log_ts));
......@@ -158,6 +159,7 @@ int ObTabletDDLKvMgr::ddl_prepare(const int64_t start_log_ts,
param.tablet_id_ = tablet_id_;
param.rec_log_ts_ = prepare_log_ts;
param.is_commit_ = true;
param.start_log_ts_ = start_log_ts;
param.table_id_ = table_id;
param.execution_id_ = execution_id_;
param.ddl_task_id_ = ddl_task_id_;
......@@ -193,6 +195,7 @@ int ObTabletDDLKvMgr::ddl_commit(const int64_t start_log_ts, const int64_t prepa
} else if (is_commit_success_) {
LOG_INFO("ddl commit already succeed", K(ls_id_), K(tablet_id_), K(table_id_));
} else if (start_log_ts < start_log_ts_) {
ret = OB_TASK_EXPIRED;
LOG_INFO("skip ddl commit log", K(start_log_ts), K(start_log_ts_), K(ls_id_), K(tablet_id_));
} else if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id_, ls_handle, ObLSGetMod::DDL_MOD))) {
LOG_WARN("failed to get log stream", K(ret), K(ls_id_));
......@@ -202,6 +205,7 @@ int ObTabletDDLKvMgr::ddl_commit(const int64_t start_log_ts, const int64_t prepa
param.tablet_id_ = tablet_id_;
param.rec_log_ts_ = prepare_log_ts;
param.is_commit_ = true;
param.start_log_ts_ = start_log_ts;
param.table_id_ = table_id_;
param.execution_id_ = execution_id_;
param.ddl_task_id_ = ddl_task_id_;
......@@ -215,15 +219,15 @@ int ObTabletDDLKvMgr::ddl_commit(const int64_t start_log_ts, const int64_t prepa
} else {
ret = OB_EAGAIN; // until major sstable is ready
}
if (OB_FAIL(ret) && is_replay) {
if (OB_TABLET_NOT_EXIST == ret) {
ret = OB_SUCCESS; // think as succcess for replay
} else {
if (REACH_TIME_INTERVAL(10L * 1000L * 1000L)) {
LOG_INFO("replay ddl commit", K(ret), K(ls_id_), K(tablet_id_), K(start_log_ts_), K(prepare_log_ts), K(max_freeze_log_ts_));
}
ret = OB_EAGAIN; // retry by replay service
}
if (OB_FAIL(ret) && is_replay) {
if (OB_TABLET_NOT_EXIST == ret || OB_TASK_EXPIRED == ret) {
ret = OB_SUCCESS; // think as succcess for replay
} else {
if (REACH_TIME_INTERVAL(10L * 1000L * 1000L)) {
LOG_INFO("replay ddl commit", K(ret), K(ls_id_), K(tablet_id_), K(start_log_ts_), K(start_log_ts), K(prepare_log_ts), K(max_freeze_log_ts_));
}
ret = OB_EAGAIN; // retry by replay service
}
}
return ret;
......@@ -241,9 +245,6 @@ int ObTabletDDLKvMgr::wait_ddl_commit(const int64_t start_log_ts, const int64_t
} else if (!is_started()) {
ret = OB_STATE_NOT_MATCH;
LOG_WARN("ddl not started", K(ret));
} else if (start_log_ts < start_log_ts_) {
ret = OB_TASK_EXPIRED;
LOG_INFO("task expired, skip ddl commit log", K(start_log_ts), K(start_log_ts_), K(ls_id_), K(tablet_id_));
} else if (start_log_ts > start_log_ts_) {
ret = OB_ERR_SYS;
LOG_WARN("start log ts not match", K(ret), K(start_log_ts), K(start_log_ts_), K(ls_id_), K(tablet_id_));
......
......@@ -235,10 +235,13 @@ int ObLSDDLLogHandler::flush(int64_t rec_log_ts)
ObDDLTableMergeDagParam param;
param.ls_id_ = ls_->get_ls_id();
param.tablet_id_ = ddl_kv_mgr_handle.get_obj()->get_tablet_id();
param.start_log_ts_ = ddl_kv_mgr_handle.get_obj()->get_start_log_ts();
param.rec_log_ts_ = rec_log_ts;
if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_ddl_table_merge_dag(param))) {
if (OB_EAGAIN != ret && OB_SIZE_OVERFLOW != ret) {
LOG_WARN("failed to schedule ddl kv merge dag", K(ret));
} else {
ret = OB_SUCCESS;
}
}
}
......
......@@ -2138,64 +2138,6 @@ int ObTablet::update_tablet_autoinc_seq(const uint64_t autoinc_seq, const int64_
return ret;
}
int ObTablet::get_active_ddl_kv(ObDDLKVHandle &ddl_kvs_handle)
{
int ret = OB_NOT_SUPPORTED;
UNUSED(ddl_kvs_handle);
return ret;
}
int ObTablet::get_or_create_active_ddl_kv(ObDDLKVHandle &ddl_kvs_handle)
{
int ret = OB_NOT_SUPPORTED;
UNUSED(ddl_kvs_handle);
return ret;
}
int ObTablet::check_has_effective_ddl_kv(bool &has_ddl_kv)
{
int ret = OB_SUCCESS;
ObDDLKvMgrHandle ddl_kv_mgr_handle;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret));
} else if (OB_FAIL(get_ddl_kv_mgr(ddl_kv_mgr_handle))) {
if (OB_ENTRY_NOT_EXIST != ret) {
LOG_WARN("failed to get ddl kv mgr", K(ret));
} else {
has_ddl_kv = false;
ret = OB_SUCCESS;
}
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->check_has_effective_ddl_kv(has_ddl_kv))) {
LOG_WARN("fail to check has effective ddl kv", K(ret));
}
return ret;
}
int ObTablet::get_ddl_kv_min_log_ts(int64_t &min_log_ts)
{
int ret = OB_SUCCESS;
ObDDLKvMgrHandle ddl_kv_mgr_handle;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret));
} else if (OB_FAIL(get_ddl_kv_mgr(ddl_kv_mgr_handle))) {
if (OB_ENTRY_NOT_EXIST != ret) {
LOG_WARN("failed to get ddl kv mgr", K(ret));
} else {
min_log_ts = INT64_MAX;
ret = OB_SUCCESS;
}
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->get_ddl_kv_min_log_ts(min_log_ts))) {
LOG_WARN("fail to get ddl kv min log ts", K(ret));
}
return ret;
}
int ObTablet::start_ddl_if_need()
{
int ret = OB_SUCCESS;
......
......@@ -286,10 +286,6 @@ public:
int check_has_sstable(bool &has_sstable) const;
// ddl kv
int get_active_ddl_kv(ObDDLKVHandle &ddl_kvs_handle);
int get_or_create_active_ddl_kv(ObDDLKVHandle &ddl_kvs_handle);
int check_has_effective_ddl_kv(bool &has_ddl_kv);
int get_ddl_kv_min_log_ts(int64_t &min_log_ts);
int get_ddl_kv_mgr(ObDDLKvMgrHandle &ddl_kv_mgr_handle, bool try_create = false);
void remove_ddl_kv_mgr();
int start_ddl_if_need();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册