...
 
Commits (5)
    https://gitcode.net/oceanbase/oceanbase/-/commit/a681bc425d803a1b2772a35c1050541497f3733d [CP] [to #55866822] add _enable_dbms_job_package parameter 2024-04-28T03:28:22+00:00 obdev obdev@oceanbase.com https://gitcode.net/oceanbase/oceanbase/-/commit/d670d7e3988a591bd14b6951f621c4dc530e35c8 [CP] fix round checkpoint scn is not consistent with piece checkpoint scn 2024-04-28T03:44:31+00:00 wxhwang wxhwang@126.com https://gitcode.net/oceanbase/oceanbase/-/commit/baac2e38685722b964908dbdf12dbdb405e260f6 Retry to collect trx state for all participants using the older snapshot 2024-04-28T09:40:12+00:00 KyrielightWei weixx1203@outlook.com https://gitcode.net/oceanbase/oceanbase/-/commit/ee5b22b1527c339acacc82ad6e1fe00f6ac18fa4 [CP] update obd version 2024-04-28T09:45:21+00:00 gys-git 1243106322@qq.com https://gitcode.net/oceanbase/oceanbase/-/commit/af07b95a44b9687b1b8360e5a5bf09d895ce0731 Revert Fix tablet table store check sstable continue bug. 2024-04-28T12:14:24+00:00 oceanoverflow oceanoverflow@gmail.com
...@@ -47,6 +47,6 @@ devdeps-rocksdb-6.22.1.1-52022100420.el7.aarch64.rpm ...@@ -47,6 +47,6 @@ devdeps-rocksdb-6.22.1.1-52022100420.el7.aarch64.rpm
obshell-4.2.3.0-102024031414.el7.aarch64.rpm target=community obshell-4.2.3.0-102024031414.el7.aarch64.rpm target=community
[test-utils] [test-utils]
ob-deploy-2.6.2-2.el7.aarch64.rpm target=community ob-deploy-2.8.0-4.el7.aarch64.rpm target=community
obclient-2.2.2-1.el7.aarch64.rpm target=community obclient-2.2.2-1.el7.aarch64.rpm target=community
libobclient-2.2.2-3.el7.aarch64.rpm target=community libobclient-2.2.2-3.el7.aarch64.rpm target=community
...@@ -49,6 +49,6 @@ devdeps-rocksdb-6.22.1.1-52022100420.el7.x86_64.rpm ...@@ -49,6 +49,6 @@ devdeps-rocksdb-6.22.1.1-52022100420.el7.x86_64.rpm
obshell-4.2.3.0-102024031414.el7.x86_64.rpm target=community obshell-4.2.3.0-102024031414.el7.x86_64.rpm target=community
[test-utils] [test-utils]
ob-deploy-2.6.2-2.el7.x86_64.rpm target=community ob-deploy-2.8.0-4.el7.x86_64.rpm target=community
obclient-2.2.2-1.el7.x86_64.rpm target=community obclient-2.2.2-1.el7.x86_64.rpm target=community
libobclient-2.2.2-3.el7.x86_64.rpm target=community libobclient-2.2.2-3.el7.x86_64.rpm target=community
...@@ -47,6 +47,6 @@ devdeps-rocksdb-6.22.1.1-52022100420.el8.aarch64.rpm ...@@ -47,6 +47,6 @@ devdeps-rocksdb-6.22.1.1-52022100420.el8.aarch64.rpm
obshell-4.2.3.0-102024031414.el8.aarch64.rpm target=community obshell-4.2.3.0-102024031414.el8.aarch64.rpm target=community
[test-utils] [test-utils]
ob-deploy-2.6.2-2.el8.aarch64.rpm target=community ob-deploy-2.8.0-4.el8.aarch64.rpm target=community
obclient-2.2.2-1.el8.aarch64.rpm target=community obclient-2.2.2-1.el8.aarch64.rpm target=community
libobclient-2.2.2-3.el8.aarch64.rpm target=community libobclient-2.2.2-3.el8.aarch64.rpm target=community
...@@ -48,6 +48,6 @@ devdeps-rocksdb-6.22.1.1-52022100420.el8.x86_64.rpm ...@@ -48,6 +48,6 @@ devdeps-rocksdb-6.22.1.1-52022100420.el8.x86_64.rpm
obshell-4.2.3.0-102024031414.el8.x86_64.rpm target=community obshell-4.2.3.0-102024031414.el8.x86_64.rpm target=community
[test-utils] [test-utils]
ob-deploy-2.6.2-2.el8.x86_64.rpm target=community ob-deploy-2.8.0-4.el8.x86_64.rpm target=community
obclient-2.2.2-1.el8.x86_64.rpm target=community obclient-2.2.2-1.el8.x86_64.rpm target=community
libobclient-2.2.2-3.el8.x86_64.rpm target=community libobclient-2.2.2-3.el8.x86_64.rpm target=community
...@@ -2011,6 +2011,8 @@ int ObRootService::execute_bootstrap(const obrpc::ObBootstrapArg &arg) ...@@ -2011,6 +2011,8 @@ int ObRootService::execute_bootstrap(const obrpc::ObBootstrapArg &arg)
LOG_WARN("failed to update cpu_quota_concurrency", K(ret)); LOG_WARN("failed to update cpu_quota_concurrency", K(ret));
} else if (OB_FAIL(set_enable_trace_log_())) { } else if (OB_FAIL(set_enable_trace_log_())) {
LOG_WARN("fail to set one phase commit config", K(ret)); LOG_WARN("fail to set one phase commit config", K(ret));
} else if (OB_FAIL(disable_dbms_job())) {
LOG_WARN("failed to update _enable_dbms_job_package", K(ret));
} }
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
...@@ -11062,6 +11064,18 @@ int ObRootService::set_cpu_quota_concurrency_config_() ...@@ -11062,6 +11064,18 @@ int ObRootService::set_cpu_quota_concurrency_config_()
return ret; return ret;
} }
int ObRootService::disable_dbms_job()
{
int64_t affected_rows = 0;
int ret = OB_SUCCESS;
if (OB_FAIL(sql_proxy_.write("ALTER SYSTEM SET _enable_dbms_job_package = false;", affected_rows))) {
LOG_WARN("update _enable_dbms_job_package to false failed", K(ret));
} else if (OB_FAIL(check_config_result("_enable_dbms_job_package", "false"))) {
LOG_WARN("failed to check config same", K(ret));
}
return ret;
}
int ObRootService::handle_recover_table(const obrpc::ObRecoverTableArg &arg) int ObRootService::handle_recover_table(const obrpc::ObRecoverTableArg &arg)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
......
...@@ -913,6 +913,7 @@ private: ...@@ -913,6 +913,7 @@ private:
void update_cpu_quota_concurrency_in_memory_(); void update_cpu_quota_concurrency_in_memory_();
int set_cpu_quota_concurrency_config_(); int set_cpu_quota_concurrency_config_();
int set_enable_trace_log_(); int set_enable_trace_log_();
int disable_dbms_job();
int try_notify_switch_leader(const obrpc::ObNotifySwitchLeaderArg::SwitchLeaderComment &comment); int try_notify_switch_leader(const obrpc::ObNotifySwitchLeaderArg::SwitchLeaderComment &comment);
int precheck_interval_part(const obrpc::ObAlterTableArg &arg); int precheck_interval_part(const obrpc::ObAlterTableArg &arg);
......
...@@ -100,7 +100,7 @@ int ObDestRoundCheckpointer::checkpoint(const ObTenantArchiveRoundAttr &round_in ...@@ -100,7 +100,7 @@ int ObDestRoundCheckpointer::checkpoint(const ObTenantArchiveRoundAttr &round_in
LOG_WARN("backwards, checkpoint scn is bigger than limit scn", K(ret), K(round_info), K_(max_checkpoint_scn)); LOG_WARN("backwards, checkpoint scn is bigger than limit scn", K(ret), K(round_info), K_(max_checkpoint_scn));
} else if (OB_FAIL(count_(round_info, summary, counter))) { } else if (OB_FAIL(count_(round_info, summary, counter))) {
LOG_WARN("failed to to count", K(ret), K(round_info), K(summary)); LOG_WARN("failed to to count", K(ret), K(round_info), K(summary));
} else if (OB_FAIL(gen_new_round_info_(round_info, counter, result.new_round_info_, need_checkpoint))) { } else if (OB_FAIL(gen_new_round_info_(round_info, summary, counter, result.new_round_info_, need_checkpoint))) {
LOG_WARN("failed to decide next state", K(ret), K(round_info), K(counter), K(summary)); LOG_WARN("failed to decide next state", K(ret), K(round_info), K(counter), K(summary));
} else if (!need_checkpoint) { } else if (!need_checkpoint) {
} else if (OB_FAIL(checkpoint_(round_info, summary, result))) { } else if (OB_FAIL(checkpoint_(round_info, summary, result))) {
...@@ -177,8 +177,81 @@ int ObDestRoundCheckpointer::count_( ...@@ -177,8 +177,81 @@ int ObDestRoundCheckpointer::count_(
return ret; return ret;
} }
int ObDestRoundCheckpointer::gen_new_round_info_(const ObTenantArchiveRoundAttr &old_round_info, const ObDestRoundCheckpointer::Counter &counter, int ObDestRoundCheckpointer::calc_next_checkpoint_scn_(
ObTenantArchiveRoundAttr &new_round_info, bool &need_checkpoint) const const ObTenantArchiveRoundAttr &old_round_info,
const ObDestRoundSummary &summary,
const Counter &counter,
SCN &next_checkpoint_scn) const
{
int ret = OB_SUCCESS;
SCN max_avail_piece_start_scn;
SCN max_avail_piece_checkpoint_scn;
int64_t max_avail_piece_id = 0;
// The next checkpoint scn can not exceed the max_checkpoint_scn_ which takes the GTS.
next_checkpoint_scn = MIN(max_checkpoint_scn_, counter.checkpoint_scn_);
if (OB_FAIL(ObTenantArchiveMgr::decide_piece_id(
old_round_info.start_scn_,
old_round_info.base_piece_id_,
old_round_info.piece_switch_interval_,
next_checkpoint_scn,
max_avail_piece_id))) {
LOG_WARN("failed to calc max available piece id", K(ret), K(old_round_info), K(next_checkpoint_scn));
} else if (OB_FAIL(ObTenantArchiveMgr::decide_piece_start_scn(
old_round_info.start_scn_,
old_round_info.base_piece_id_,
old_round_info.piece_switch_interval_,
max_avail_piece_id,
max_avail_piece_start_scn))) {
LOG_WARN("failed to calc max available piece start scn", K(ret), K(old_round_info), K(max_avail_piece_id));
}
// Consider 2 log streams, the log groups info are as following :
// 1001: [500, 600], [700, 1200]
// 1002: [500, 900]
// Then the reasonable next round checkpoint scn is 900. However, suppose the piece switch end scn is 1000,
// if we specify to restore until 800, the result is that it will return and cannot be recovered. As the
// log with range [700, 800] is in next piece, but the file status is BACKUP_FILE_INCOMPLETE which we will
// ignore during restore. In this case, the next round checkpoint scn will be adjust to 600, instead of 900.
max_avail_piece_checkpoint_scn = SCN::max_scn();
const ObArray<ObLSDestRoundSummary> &ls_round_list = summary.ls_round_list_;
for (int64_t i = 0; OB_SUCC(ret) && i < ls_round_list.count(); i++) {
const ObLSDestRoundSummary &ls_round = ls_round_list.at(i);
// search the piece
int64_t idx = ls_round.get_piece_idx(max_avail_piece_id);
if (-1 == idx) {
LOG_INFO("ls piece not found", K(ret), K(max_avail_piece_id), K(ls_round));
} else {
bool last_piece = false;
const ObLSDestRoundSummary::OnePiece &ls_piece = ls_round.piece_list_.at(idx);
if (OB_FAIL(ls_round.check_is_last_piece_for_deleted_ls(max_avail_piece_id, last_piece))) {
LOG_WARN("failed to check is last piece for deleted ls", K(ret));
} else if (last_piece) {
// If the ls is deleted, and this is the last piece. It should not
// affect the checkpoint_scn.
// Mark the last piece deleted for deleted ls. For example, piece 10 and 11 is found of
// a deleted ls for current checkpoint, piece 10 is not marked with deleted, but piece 11
// is marked with deleted.
// do nothing.
} else {
// checkpoint scn may be smaller than start scn for empty piece.
max_avail_piece_checkpoint_scn = MAX(max_avail_piece_start_scn, MIN(max_avail_piece_checkpoint_scn, ls_piece.checkpoint_scn_));
}
}
}
if (OB_SUCC(ret)) {
next_checkpoint_scn = MIN(next_checkpoint_scn, max_avail_piece_checkpoint_scn);
}
return ret;
}
int ObDestRoundCheckpointer::gen_new_round_info_(
const ObTenantArchiveRoundAttr &old_round_info,
const ObDestRoundSummary &summary,
const ObDestRoundCheckpointer::Counter &counter,
ObTenantArchiveRoundAttr &new_round_info,
bool &need_checkpoint) const
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
// Current existing log stream count. // Current existing log stream count.
...@@ -193,11 +266,11 @@ int ObDestRoundCheckpointer::gen_new_round_info_(const ObTenantArchiveRoundAttr ...@@ -193,11 +266,11 @@ int ObDestRoundCheckpointer::gen_new_round_info_(const ObTenantArchiveRoundAttr
old_round_info.piece_switch_interval_, counter.max_scn_, new_round_info.used_piece_id_))) { old_round_info.piece_switch_interval_, counter.max_scn_, new_round_info.used_piece_id_))) {
LOG_WARN("failed to calc MAX piece id", K(ret), K(old_round_info), K(counter)); LOG_WARN("failed to calc MAX piece id", K(ret), K(old_round_info), K(counter));
} else if (OB_FALSE_IT(new_round_info.max_scn_ = counter.max_scn_)) { } else if (OB_FALSE_IT(new_round_info.max_scn_ = counter.max_scn_)) {
} else if (OB_FALSE_IT(next_checkpoint_scn = MIN(max_checkpoint_scn_, counter.checkpoint_scn_))) { } else if (OB_FAIL(calc_next_checkpoint_scn_(old_round_info, summary, counter, next_checkpoint_scn))) {
// Checkpoint can not over limit ts. However, if old round goes into STOPPING, then we will not LOG_WARN("failed to calc next checkpoint scn", K(ret), K(old_round_info), K(summary), K(counter));
// move checkpoint_scn on.
} }
if (OB_FAIL(ret)) { if (OB_FAIL(ret)) {
} else if (old_round_info.state_.is_beginning()) { } else if (old_round_info.state_.is_beginning()) {
if (counter.not_start_cnt_ > 0) { if (counter.not_start_cnt_ > 0) {
......
...@@ -106,8 +106,17 @@ private: ...@@ -106,8 +106,17 @@ private:
bool can_do_checkpoint_(const ObTenantArchiveRoundAttr &round_info) const; bool can_do_checkpoint_(const ObTenantArchiveRoundAttr &round_info) const;
int count_(const ObTenantArchiveRoundAttr &old_round_info, const ObDestRoundSummary &summary, Counter &counter) const; int count_(const ObTenantArchiveRoundAttr &old_round_info, const ObDestRoundSummary &summary, Counter &counter) const;
int gen_new_round_info_(const ObTenantArchiveRoundAttr &old_round_info, const Counter &counter, int calc_next_checkpoint_scn_(
ObTenantArchiveRoundAttr &new_round_info, bool &need_checkpoint) const; const ObTenantArchiveRoundAttr &old_round_info,
const ObDestRoundSummary &summary,
const Counter &counter,
SCN &next_checkpoint_scn) const;
int gen_new_round_info_(
const ObTenantArchiveRoundAttr &old_round_info,
const ObDestRoundSummary &summary,
const Counter &counter,
ObTenantArchiveRoundAttr &new_round_info,
bool &need_checkpoint) const;
// do checkpoint to checkpoint_ts. // do checkpoint to checkpoint_ts.
int checkpoint_(const ObTenantArchiveRoundAttr &round_info, const ObDestRoundSummary &summary, int checkpoint_(const ObTenantArchiveRoundAttr &round_info, const ObDestRoundSummary &summary,
......
...@@ -1918,3 +1918,6 @@ DEF_BOOL(_allow_skip_replay_redo_after_detete_tablet, OB_TENANT_PARAMETER, "FALS ...@@ -1918,3 +1918,6 @@ DEF_BOOL(_allow_skip_replay_redo_after_detete_tablet, OB_TENANT_PARAMETER, "FALS
"allow skip replay invalid redo log after tablet delete transaction is committed." "allow skip replay invalid redo log after tablet delete transaction is committed."
"The default value is FALSE. Value: TRUE means we allow skip replaying this invalid redo log, False means we do not alow such behavior.", "The default value is FALSE. Value: TRUE means we allow skip replaying this invalid redo log, False means we do not alow such behavior.",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_BOOL(_enable_dbms_job_package, OB_CLUSTER_PARAMETER, "True",
"Control whether can use DBMS_JOB package.",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
...@@ -640,7 +640,7 @@ int ObTXTransferUtils::build_empty_minor_sstable_param_( ...@@ -640,7 +640,7 @@ int ObTXTransferUtils::build_empty_minor_sstable_param_(
param.data_checksum_ = 0; param.data_checksum_ = 0;
param.occupy_size_ = 0; param.occupy_size_ = 0;
param.ddl_scn_.set_min(); param.ddl_scn_.set_min();
param.filled_tx_scn_ = end_scn; param.filled_tx_scn_.set_min();
param.original_size_ = 0; param.original_size_ = 0;
param.compressor_type_ = ObCompressorType::NONE_COMPRESSOR; param.compressor_type_ = ObCompressorType::NONE_COMPRESSOR;
......
...@@ -1829,10 +1829,9 @@ int ObTabletTableStore::check_minor_tables_continue_(T &minor_tables) const ...@@ -1829,10 +1829,9 @@ int ObTabletTableStore::check_minor_tables_continue_(T &minor_tables) const
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObITable *prev_table = nullptr; ObITable *prev_table = nullptr;
prev_table = nullptr; prev_table = nullptr;
SCN filled_tx_scn(SCN::min_scn());
for (int64_t i = 0; OB_SUCC(ret) && i < minor_tables.count(); ++i) { for (int64_t i = 0; OB_SUCC(ret) && i < minor_tables.count(); ++i) {
ObITable *table = minor_tables.at(i); ObITable *table = minor_tables.at(i);
if (OB_FAIL(check_minor_table_continue_(table, prev_table, filled_tx_scn))) { if (OB_FAIL(check_minor_table_continue_(table, prev_table))) {
LOG_WARN("failed to check minor table continue", K(ret), KPC(table)); LOG_WARN("failed to check minor table continue", K(ret), KPC(table));
} }
} }
...@@ -2201,41 +2200,30 @@ int ObTabletTableStore::cut_ha_sstable_scn_range_( ...@@ -2201,41 +2200,30 @@ int ObTabletTableStore::cut_ha_sstable_scn_range_(
int ObTabletTableStore::check_minor_table_continue_( int ObTabletTableStore::check_minor_table_continue_(
ObITable *table, ObITable *table,
ObITable *&prev_table, ObITable *prev_table) const
share::SCN &filled_tx_scn) const
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObSSTable *curr_sstable = nullptr; ObSSTable *curr_sstable = nullptr;
ObSSTable *prev_sstable = nullptr;
if (OB_UNLIKELY(OB_ISNULL(table) || !table->is_multi_version_minor_sstable())) { if (OB_UNLIKELY(OB_ISNULL(table) || !table->is_multi_version_minor_sstable())) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_WARN("table must be multi version minor table", K(ret), KPC(table)); LOG_WARN("table must be multi version minor table", K(ret), KPC(table));
} else if (FALSE_IT(curr_sstable = static_cast<ObSSTable *>(table))) {
} else if (OB_ISNULL(prev_table)) { } else if (OB_ISNULL(prev_table)) {
//do nothing // do nothing
} else if (table->get_start_scn() > prev_table->get_end_scn() } else if (table->get_start_scn() > prev_table->get_end_scn()
|| table->get_end_scn() <= prev_table->get_end_scn()) { || table->get_end_scn() <= prev_table->get_end_scn()) {
ret = OB_ERR_SYS; ret = OB_ERR_SYS;
LOG_ERROR("table scn range not continuous or overlap", K(ret), KPC(table), KPC(prev_table)); LOG_ERROR("table scn range not continuous or overlap", K(ret), KPC(table), KPC(prev_table));
} else if (table->get_key().tablet_id_.is_ls_inner_tablet()) { } else if (FALSE_IT(curr_sstable = static_cast<ObSSTable *>(table))) {
//do nothing } else if (FALSE_IT(prev_sstable = static_cast<ObSSTable *>(prev_table))) {
} else if (curr_sstable->is_empty() || curr_sstable->get_filled_tx_scn().is_max()) { } else if (table->get_key().tablet_id_.is_ls_inner_tablet() || prev_sstable->get_filled_tx_scn().is_max()) {
//skip empty sstable pr max filled tx scn for compatible // do nothing
LOG_INFO("current sstable is empty or filled tx scn is max, skip check this tablet filled tx scn", KPC(curr_sstable)); } else if (curr_sstable->get_filled_tx_scn() < prev_sstable->get_filled_tx_scn()) {
} else if (curr_sstable->get_filled_tx_scn() < filled_tx_scn) {
ret = OB_ERR_SYS; ret = OB_ERR_SYS;
LOG_WARN("sstable's filled_tx_scn is out of order", K(ret), KPC(table), KP(prev_table), LOG_WARN("sstable's filled_tx_scn is out of order", K(ret), KPC(table), KP(prev_table),
"curr_filled_tx_scn", curr_sstable->get_filled_tx_scn(), "prev_filled_tx_scn", filled_tx_scn); "curr_filled_tx_scn", curr_sstable->get_filled_tx_scn(), "prev_filled_tx_scn", prev_sstable->get_filled_tx_scn());
}
if (OB_SUCC(ret)) {
prev_table = table;
if (!curr_sstable->get_key().tablet_id_.is_ls_inner_tablet()
&& !curr_sstable->is_empty()
&& !curr_sstable->get_filled_tx_scn().is_max()) {
filled_tx_scn = curr_sstable->get_filled_tx_scn();
}
} }
prev_table = table;
return ret; return ret;
} }
...@@ -2246,7 +2234,6 @@ int ObTabletTableStore::check_minor_tables_continue_( ...@@ -2246,7 +2234,6 @@ int ObTabletTableStore::check_minor_tables_continue_(
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObITable *prev_table = nullptr; ObITable *prev_table = nullptr;
prev_table = nullptr; prev_table = nullptr;
SCN filled_tx_scn(SCN::min_scn());
if (count > 0 && OB_ISNULL(minor_sstables)) { if (count > 0 && OB_ISNULL(minor_sstables)) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_WARN("check minor tables continue minor sstables is unexpected", K(ret), KP(minor_sstables), K(count)); LOG_WARN("check minor tables continue minor sstables is unexpected", K(ret), KP(minor_sstables), K(count));
...@@ -2254,7 +2241,7 @@ int ObTabletTableStore::check_minor_tables_continue_( ...@@ -2254,7 +2241,7 @@ int ObTabletTableStore::check_minor_tables_continue_(
for (int64_t i = 0; OB_SUCC(ret) && i < count; ++i) { for (int64_t i = 0; OB_SUCC(ret) && i < count; ++i) {
ObITable *table = minor_sstables[i]; ObITable *table = minor_sstables[i];
if (OB_FAIL(check_minor_table_continue_(table, prev_table, filled_tx_scn))) { if (OB_FAIL(check_minor_table_continue_(table, prev_table))) {
LOG_WARN("failed to check minor table continue", K(ret), KPC(table)); LOG_WARN("failed to check minor table continue", K(ret), KPC(table));
} }
} }
......
...@@ -260,8 +260,7 @@ private: ...@@ -260,8 +260,7 @@ private:
ObITable **minor_sstables) const; ObITable **minor_sstables) const;
int check_minor_table_continue_( int check_minor_table_continue_(
ObITable *table, ObITable *table,
ObITable *&prev_table, ObITable *prev_table) const;
share::SCN &filled_tx_scn) const;
int combine_ha_minor_sstables_( int combine_ha_minor_sstables_(
const ObTablet &tablet, const ObTablet &tablet,
common::ObIArray<ObITable *> &old_store_minor_sstables, common::ObIArray<ObITable *> &old_store_minor_sstables,
......
...@@ -8970,7 +8970,7 @@ void ObPartTransCtx::handle_trans_ask_state_(const SCN &snapshot) ...@@ -8970,7 +8970,7 @@ void ObPartTransCtx::handle_trans_ask_state_(const SCN &snapshot)
{ {
if (snapshot > lastest_snapshot_) { if (snapshot > lastest_snapshot_) {
build_and_post_collect_state_msg_(snapshot); build_and_post_collect_state_msg_(snapshot);
} else if (snapshot <= lastest_snapshot_ && standby_part_collected_.num_members() != state_info_array_.count()) { } else if (snapshot <= lastest_snapshot_) {
if (refresh_state_info_interval_.reach()) { if (refresh_state_info_interval_.reach()) {
build_and_post_collect_state_msg_(snapshot); build_and_post_collect_state_msg_(snapshot);
} }
...@@ -9164,7 +9164,7 @@ int ObPartTransCtx::handle_trans_collect_state_resp(const ObCollectStateRespMsg ...@@ -9164,7 +9164,7 @@ int ObPartTransCtx::handle_trans_collect_state_resp(const ObCollectStateRespMsg
} else if (state_info_array_.at(i-1).need_update(msg.state_info_)) { } else if (state_info_array_.at(i-1).need_update(msg.state_info_)) {
state_info_array_.at(i-1) = msg.state_info_; state_info_array_.at(i-1) = msg.state_info_;
} else { } else {
state_info_array_.at(i-1).snapshot_version_ = msg.state_info_.snapshot_version_; state_info_array_.at(i-1).snapshot_version_.inc_update(msg.state_info_.snapshot_version_);
} }
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
standby_part_collected_.add_member(i-1); standby_part_collected_.add_member(i-1);
......
...@@ -290,6 +290,7 @@ _enable_choose_migration_source_policy ...@@ -290,6 +290,7 @@ _enable_choose_migration_source_policy
_enable_compaction_diagnose _enable_compaction_diagnose
_enable_convert_real_to_decimal _enable_convert_real_to_decimal
_enable_dblink_reuse_connection _enable_dblink_reuse_connection
_enable_dbms_job_package
_enable_defensive_check _enable_defensive_check
_enable_easy_keepalive _enable_easy_keepalive
_enable_hash_join_hasher _enable_hash_join_hasher
......
...@@ -2184,6 +2184,260 @@ TEST_F(ArchiveCheckpointerTest, in_doing_06) ...@@ -2184,6 +2184,260 @@ TEST_F(ArchiveCheckpointerTest, in_doing_06)
ASSERT_EQ(g_call_cnt, 3); ASSERT_EQ(g_call_cnt, 3);
} }
TEST_F(ArchiveCheckpointerTest, in_doing_07)
{
// old round's status is DOING.
ObTenantArchiveRoundAttr old_round;
fill_round(
ObArchiveRoundState::doing(),
"2022-01-01 00:00:00", /* start time */
"2022-01-01 00:00:20", /* checkpoint time */
"2022-01-01 00:00:30", /* max time */
1, /* used piece id */
0, /* frozen_input_bytes */
0, /* frozen_output_bytes */
100, /* active_input_bytes */
10, /* active_output_bytes */
0, /* deleted_input_bytes */
0, /* deleted_output_bytes */
old_round);
// 2 log streams are archiving.
ObDestRoundSummary summary;
// log stream 1001 is archiving.
ObLSDestRoundSummary ls_1001;
ObArchiveLSPieceSummary piece_1001_1;
fill_archive_ls_piece(
1001, /* ls id */
false, /* is deleted */
1, /* piece id */
ObArchiveRoundState::doing(), /* state */
"2022-01-01 00:00:00", /* start time */
"2022-01-01 00:00:50", /* checkpoint time */
0, /* min_lsn */
2000, /* max_lsn */
200, /* input_bytes */
20, /* output_bytes */
piece_1001_1);
ASSERT_EQ(ls_1001.add_one_piece(piece_1001_1), OB_SUCCESS);
// log stream 1002 is archiving.
ObLSDestRoundSummary ls_1002;
ObArchiveLSPieceSummary piece_1002_1;
fill_archive_ls_piece(
1002, /* ls id */
false, /* is deleted */
1, /* piece id */
ObArchiveRoundState::doing(), /* state */
"2022-01-01 00:00:00", /* start time */
"2022-01-01 00:00:40", /* checkpoint time */
0, /* min_lsn */
1000, /* max_lsn */
100, /* input_bytes */
10, /* output_bytes */
piece_1002_1);
ObArchiveLSPieceSummary piece_1002_2;
fill_archive_ls_piece(
1002, /* ls id */
false, /* is deleted */
2, /* piece id */
ObArchiveRoundState::doing(), /* state */
"2022-01-01 00:01:00", /* start time */
"2022-01-01 00:01:30", /* checkpoint time */
1000, /* min_lsn */
2000, /* max_lsn */
100, /* input_bytes */
10, /* output_bytes */
piece_1002_2);
ASSERT_EQ(ls_1002.add_one_piece(piece_1002_1), OB_SUCCESS);
ASSERT_EQ(ls_1002.add_one_piece(piece_1002_2), OB_SUCCESS);
ASSERT_EQ(summary.add_ls_dest_round_summary(ls_1001), OB_SUCCESS);
ASSERT_EQ(summary.add_ls_dest_round_summary(ls_1002), OB_SUCCESS);
// All log streams are archiving, the next status is DOING.
class MockRoundHandler final: public ObArchiveRoundHandler
{
public:
int checkpoint_to(
const ObTenantArchiveRoundAttr &old_round,
const ObTenantArchiveRoundAttr &new_round,
const common::ObIArray<ObTenantArchivePieceAttr> &pieces) override
{
int ret = OB_SUCCESS;
g_call_cnt++;
ArchiveCheckpointerTest test;
ObTenantArchiveRoundAttr expect_round;
test.fill_new_round(
old_round,
ObArchiveRoundState::doing(), /* state */
"2022-01-01 00:00:40", /* checkpoint_time */
"2022-01-01 00:01:30", /* max_time */
2, /* used_piece_id */
0, /* frozen_input_bytes */
0, /* frozen_output_bytes */
400, /* active_input_bytes */
40, /* active_output_bytes */
0, /* deleted_input_bytes */
0, /* deleted_output_bytes */
expect_round);
ObTenantArchivePieceAttr expect_piece_1;
test.fill_piece(
old_round,
1, /* piece id */
"2022-01-01 00:00:40", /* checkpoint time */
"2022-01-01 00:00:50", /* max time */
300, /* input_bytes */
30, /* output_bytes */
ObArchivePieceStatus::active(), /* piece status */
ObBackupFileStatus::STATUS::BACKUP_FILE_AVAILABLE, /* file status */
expect_piece_1);
ObTenantArchivePieceAttr expect_piece_2;
test.fill_piece(
old_round,
2, /* piece id */
"2022-01-01 00:01:00", /* checkpoint time */
"2022-01-01 00:01:30", /* max time */
100, /* input_bytes */
10, /* output_bytes */
ObArchivePieceStatus::active(), /* piece status */
ObBackupFileStatus::STATUS::BACKUP_FILE_INCOMPLETE, /* file status */
expect_piece_2);
ret = test.compare_two_rounds(new_round, expect_round);
if (OB_SUCC(ret)) {
if (pieces.count() != 2) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid pieces count", K(ret), K(pieces));
}
}
if (OB_SUCC(ret)) {
const ObTenantArchivePieceAttr &piece_1 = pieces.at(0);
if (OB_FAIL(test.compare_two_pieces(piece_1, expect_piece_1))) {
LOG_WARN("not equal pieces", K(ret), K(piece_1), K(expect_piece_1));
}
}
if (OB_SUCC(ret)) {
const ObTenantArchivePieceAttr &piece_2 = pieces.at(1);
if (OB_FAIL(test.compare_two_pieces(piece_2, expect_piece_2))) {
LOG_WARN("not equal pieces", K(ret), K(piece_2), K(expect_piece_2));
}
}
return ret;
}
};
ObDestRoundCheckpointer::PieceGeneratedCb gen_piece_cb =
[](common::ObISQLClient *proxy, const ObTenantArchiveRoundAttr &old_round, const ObDestRoundCheckpointer::Result &result, const ObDestRoundCheckpointer::GeneratedPiece &piece)
{
int ret = OB_SUCCESS;
g_call_cnt++;
ArchiveCheckpointerTest test;
ObDestRoundCheckpointer::GeneratedPiece expect_piece;
if (piece.piece_info_.key_.piece_id_ == 1) {
test.fill_piece(
old_round,
1,
"2022-01-01 00:00:40",
"2022-01-01 00:00:50",
300,
30,
ObArchivePieceStatus::active(),
ObBackupFileStatus::STATUS::BACKUP_FILE_AVAILABLE,
expect_piece.piece_info_);
ObDestRoundCheckpointer::GeneratedLSPiece ls_piece_1001 = test.gen_checkpoint_ls_piece(
1001,
"2022-01-01 00:00:00",
"2022-01-01 00:00:50",
0,
2000,
200,
20);
ObDestRoundCheckpointer::GeneratedLSPiece ls_piece_1002 = test.gen_checkpoint_ls_piece(
1002,
"2022-01-01 00:00:00",
"2022-01-01 00:00:40",
0,
1000,
100,
10);
expect_piece.ls_piece_list_.push_back(ls_piece_1001);
expect_piece.ls_piece_list_.push_back(ls_piece_1002);
} else {
test.fill_piece(
old_round,
2,
"2022-01-01 00:01:00",
"2022-01-01 00:01:30",
100,
10,
ObArchivePieceStatus::active(),
ObBackupFileStatus::STATUS::BACKUP_FILE_INCOMPLETE,
expect_piece.piece_info_);
ObDestRoundCheckpointer::GeneratedLSPiece ls_piece_1002 = test.gen_checkpoint_ls_piece(
1002,
"2022-01-01 00:01:00",
"2022-01-01 00:01:30",
1000,
2000,
100,
10);
expect_piece.ls_piece_list_.push_back(ls_piece_1002);
}
ret = test.compare_two_checkpoint_pieces(piece, expect_piece);
return ret;
};
ObDestRoundCheckpointer::RoundCheckpointCb round_cb =
[](common::ObISQLClient *proxy, const ObTenantArchiveRoundAttr &old_round, const ObTenantArchiveRoundAttr &new_round)
{
int ret = OB_SUCCESS;
g_call_cnt++;
ArchiveCheckpointerTest test;
ObTenantArchiveRoundAttr expect_round;
test.fill_new_round(
old_round,
ObArchiveRoundState::doing(),
"2022-01-01 00:00:40",
"2022-01-01 00:01:30",
2,
0,
0,
400,
40,
0,
0,
expect_round);
ret = test.compare_two_rounds(new_round, expect_round);
return ret;
};
int ret = OB_SUCCESS;
g_call_cnt = 0;
MockRoundHandler mock_handler;
ObDestRoundCheckpointer checkpointer;
share::SCN limit_scn;
(void)limit_scn.convert_for_logservice(convert_timestr_2_scn("2022-01-01 00:00:45"));
ret = checkpointer.init(&mock_handler, gen_piece_cb, round_cb, limit_scn);
ASSERT_EQ(OB_SUCCESS, ret);
ret = checkpointer.checkpoint(old_round, summary);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(g_call_cnt, 4);
}
TEST_F(ArchiveCheckpointerTest, in_stopping_01) TEST_F(ArchiveCheckpointerTest, in_stopping_01)
{ {
......
...@@ -1100,41 +1100,6 @@ TEST_F(TestCompactionPolicy, test_medium_info_serialize) ...@@ -1100,41 +1100,6 @@ TEST_F(TestCompactionPolicy, test_medium_info_serialize)
} }
} }
TEST_F(TestCompactionPolicy, check_sstable_continue_failed)
{
int ret = OB_SUCCESS;
ObTenantFreezeInfoMgr *mgr = MTL(ObTenantFreezeInfoMgr *);
ASSERT_TRUE(nullptr != mgr);
common::ObArray<ObTenantFreezeInfoMgr::FreezeInfo> freeze_info;
common::ObArray<share::ObSnapshotInfo> snapshots;
ASSERT_EQ(OB_SUCCESS, freeze_info.push_back(ObTenantFreezeInfoMgr::FreezeInfo(1, 1, 0)));
ret = TestCompactionPolicy::prepare_freeze_info(500, freeze_info, snapshots);
ASSERT_EQ(OB_SUCCESS, ret);
const char *key_data =
"table_type start_scn end_scn max_ver upper_ver\n"
"10 0 1 1 1 \n"
"11 1 150 150 150 \n"
"11 150 200 200 200 \n"
"11 200 250 250 250 \n"
"11 250 300 300 300 \n"
"11 900 1000 1000 1000 \n";
ret = prepare_tablet(key_data, 1000, 1000);
ASSERT_EQ(OB_SUCCESS, ret);
ObTablet *tablet = tablet_handle_.get_obj();
ASSERT_TRUE(nullptr != tablet);
ObTabletMemberWrapper<ObTabletTableStore> table_store_wrapper;
ret = tablet->fetch_table_store(table_store_wrapper);
ASSERT_EQ(OB_SUCCESS, ret);
ret = table_store_wrapper.get_member()->check_continuous();
ASSERT_EQ(OB_ERR_SYS, ret);
}
} //unittest } //unittest
} //oceanbase } //oceanbase
......