提交 ac5edad6 编写于 作者: G godyangfight 提交者: LINGuanRen

Fix backup scheduelr thread start backup do not stop gc snapshot ts push

上级 7076b575
......@@ -136,7 +136,8 @@ int ObRestorePointService::create_restore_point(const uint64_t tenant_id, const
}
int ObRestorePointService::create_backup_point(
const uint64_t tenant_id, const char* name, const int64_t snapshot_version, const int64_t schema_version)
const uint64_t tenant_id, const char *name, const int64_t snapshot_version, const int64_t schema_version,
common::ObMySQLTransaction &trans)
{
int ret = OB_SUCCESS;
int64_t retry_cnt = 0;
......@@ -147,14 +148,12 @@ int ObRestorePointService::create_backup_point(
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("restore point service do not init", K(ret));
} else if (OB_ISNULL(name) || OB_INVALID_ID == tenant_id || snapshot_version <= 0) {
} else if (OB_ISNULL(name) || OB_INVALID_ID == tenant_id || snapshot_version <= 0 || !trans.is_started()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("create recovery point get invalid argument", K(ret), KP(name), K(snapshot_version));
} else if (FALSE_IT(get_snapshot_ret = ddl_service_->get_snapshot_mgr().get_snapshot(ddl_service_->get_sql_proxy(),
tenant_id,
share::ObSnapShotType::SNAPSHOT_FOR_BACKUP_POINT,
snapshot_version,
tmp_info))) {
} else if (FALSE_IT(
get_snapshot_ret = ddl_service_->get_snapshot_mgr().get_snapshot(
trans, tenant_id, share::ObSnapShotType::SNAPSHOT_FOR_BACKUP_POINT, snapshot_version, tmp_info))) {
} else if (OB_SUCCESS == get_snapshot_ret) {
ret = OB_ERR_BACKUP_POINT_EXIST;
char tmp_name[OB_MAX_RESERVED_POINT_NAME_LENGTH + 2];
......@@ -177,24 +176,9 @@ int ObRestorePointService::create_backup_point(
info.table_id_ = 0;
info.comment_ = name;
ObMySQLTransaction trans;
common::ObMySQLProxy& proxy = ddl_service_->get_sql_proxy();
if (OB_FAIL(ret)) {
} else if (OB_FAIL(trans.start(&proxy))) {
LOG_WARN("fail to start trans", K(ret));
} else if (OB_FAIL(ddl_service_->get_snapshot_mgr().acquire_snapshot(trans, info))) {
if (OB_FAIL(ddl_service_->get_snapshot_mgr().acquire_snapshot(trans, info))) {
LOG_WARN("fail to acquire snapshot", K(ret), K(info));
}
if (trans.is_started()) {
bool is_commit = (ret == OB_SUCCESS);
int tmp_ret = trans.end(is_commit);
if (OB_SUCCESS != tmp_ret) {
LOG_WARN("fail to end trans", K(ret), K(is_commit));
if (OB_SUCC(ret)) {
ret = tmp_ret;
}
}
}
}
return ret;
}
......
......@@ -27,8 +27,8 @@ public:
~ObRestorePointService();
int init(rootserver::ObDDLService& ddl_service, rootserver::ObFreezeInfoManager& freeze_info_mgr);
int create_restore_point(const uint64_t tenant_id, const char* name);
int create_backup_point(
const uint64_t tenant_id, const char* name, const int64_t snapshot_version, const int64_t schema_version);
int create_backup_point(const uint64_t tenant_id, const char *name, const int64_t snapshot_version,
const int64_t schema_version, common::ObMySQLTransaction &trans);
int drop_restore_point(const uint64_t tenant_id, const char* name);
int drop_backup_point(const uint64_t tenant_id, const int64_t snapshot_version);
......
......@@ -327,8 +327,10 @@ int ObRootBackup::get_all_tenant_ids(ObIArray<uint64_t> &tenant_ids)
if (OB_FAIL(info_manager.get_backup_info(OB_SYS_TENANT_ID, updater, sys_backup_info))) {
LOG_WARN("failed to get backup info", K(ret));
} else if (0 == sys_backup_info.backup_schema_version_) {
// do nothing
// The schema_version of the backup is 0, indicating that the backup has not yet started, so skip
if (sys_backup_info.backup_status_.is_prepare_status() && OB_FAIL(tenant_ids.push_back(OB_SYS_TENANT_ID))) {
LOG_WARN("failed to push tenant id into array", K(ret));
}
} else if (OB_FAIL(ObBackupUtils::retry_get_tenant_schema_guard(
OB_SYS_TENANT_ID, *schema_service_, sys_backup_info.backup_schema_version_, guard))) {
LOG_WARN("failed to get tenant schema guard", K(ret), K(sys_backup_info));
......@@ -1708,8 +1710,8 @@ void ObRootBackup::cleanup_prepared_infos()
// normal tenants, skip sys tenant
for (int64_t i = 1; OB_SUCC(ret) && i < tenant_ids.count(); ++i) {
const uint64_t tenant_id = tenant_ids.at(i);
if (OB_FAIL(cleanup_tenant_prepared_infos(tenant_id, updater.get_trans(), info_manager))) {
LOG_WARN("failed to cleanup stoppped tenant infos", K(ret), K(tenant_id));
if (OB_FAIL(cleanup_tenant_prepared_infos(tenant_id, sys_backup_info, updater.get_trans(), info_manager))) {
LOG_WARN("failed to cleanup stopped tenant infos", K(ret), K(tenant_id));
}
}
......@@ -1750,8 +1752,8 @@ int ObRootBackup::check_need_cleanup_prepared_infos(
return ret;
}
int ObRootBackup::cleanup_tenant_prepared_infos(
const uint64_t tenant_id, ObISQLClient &sys_tenant_trans, ObBackupInfoManager &info_manager)
int ObRootBackup::cleanup_tenant_prepared_infos(const uint64_t tenant_id, const ObBaseBackupInfoStruct &sys_backup_info,
ObISQLClient &sys_tenant_trans, ObBackupInfoManager &info_manager)
{
int ret = OB_SUCCESS;
ObTenantBackupTaskInfo task_info;
......@@ -1768,9 +1770,11 @@ int ObRootBackup::cleanup_tenant_prepared_infos(
} else if (stop_) {
ret = OB_SERVER_IS_STOPPING;
LOG_WARN("observer is stopping", K(ret));
} else if (OB_INVALID_ID == tenant_id) {
} else if (OB_INVALID_ID == tenant_id || !sys_backup_info.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("tenant is is invalid", K(ret), K(tenant_id));
LOG_WARN("tenant is is invalid", K(ret), K(tenant_id), K(sys_backup_info));
} else if (OB_FAIL(drop_backup_point(tenant_id, sys_backup_info.backup_snapshot_version_))) {
LOG_WARN("failed to drop backup point", K(ret), K(info));
} else if (OB_FAIL(timeout_ctx.set_trx_timeout_us(stmt_timeout))) {
LOG_WARN("failed to set trx timeout", K(ret), K(stmt_timeout));
} else if (OB_FAIL(timeout_ctx.set_timeout(stmt_timeout))) {
......@@ -1787,8 +1791,6 @@ int ObRootBackup::cleanup_tenant_prepared_infos(
// do nothing
} else if (OB_FAIL(ObBackupUtil::check_sys_tenant_trans_alive(info_manager, sys_tenant_trans))) {
LOG_WARN("failed to check sys tenant trans alive", K(ret), K(info));
} else if (OB_FAIL(drop_backup_point(tenant_id, info.backup_snapshot_version_))) {
LOG_WARN("failed to drop backup point", K(ret), K(info));
} else {
dest_info = info;
dest_info.backup_status_.set_backup_status_stop();
......
......@@ -194,7 +194,8 @@ private:
void cleanup_prepared_infos();
int check_need_cleanup_prepared_infos(const share::ObBaseBackupInfoStruct& sys_backup_info, bool& need_clean);
int cleanup_tenant_prepared_infos(
const uint64_t tenant_id, common::ObISQLClient& sys_tenant_trans, share::ObBackupInfoManager& info_manager);
const uint64_t tenant_id, const ObBaseBackupInfoStruct &sys_backup_info, common::ObISQLClient& sys_tenant_trans,
share::ObBackupInfoManager& info_manager);
int check_tenants_backup_task_failed(const share::ObBaseBackupInfoStruct& info,
share::ObBackupInfoManager& info_manager, common::ObISQLClient& sys_tenant_trans);
int update_tenant_backup_task(common::ObISQLClient& trans, const share::ObTenantBackupTaskInfo& src_info,
......
......@@ -118,8 +118,8 @@ int ObSnapshotInfoManager::get_snapshot(common::ObMySQLProxy& proxy, const int64
return ret;
}
int ObSnapshotInfoManager::get_snapshot(common::ObMySQLProxy& proxy, const int64_t tenant_id,
share::ObSnapShotType snapshot_type, const int64_t snapshot_ts, share::ObSnapshotInfo& snapshot_info)
int ObSnapshotInfoManager::get_snapshot(common::ObISQLClient &proxy, const int64_t tenant_id,
share::ObSnapShotType snapshot_type, const int64_t snapshot_ts, share::ObSnapshotInfo &snapshot_info)
{
int ret = OB_SUCCESS;
ObSnapshotTableProxy snapshot_proxy;
......
......@@ -31,11 +31,11 @@ public:
int acquire_snapshot(common::ObMySQLTransaction& trans, const share::ObSnapshotInfo& snapshot);
int release_snapshot(common::ObMySQLTransaction& trans, const share::ObSnapshotInfo& snapshot);
int acquire_snapshot_for_building_index(
common::ObMySQLTransaction& trans, const share::ObSnapshotInfo& snapshot, const int64_t index_table_id);
int get_snapshot(common::ObMySQLProxy& proxy, const int64_t tenant_id, share::ObSnapShotType snapshot_type,
const char* extra_info, share::ObSnapshotInfo& snapshot_info);
int get_snapshot(common::ObMySQLProxy& proxy, const int64_t tenant_id, share::ObSnapShotType snapshot_type,
const int64_t snapshot_ts, share::ObSnapshotInfo& snapshot_info);
common::ObMySQLTransaction &trans, const share::ObSnapshotInfo &snapshot, const int64_t index_table_id);
int get_snapshot(common::ObMySQLProxy &proxy, const int64_t tenant_id, share::ObSnapShotType snapshot_type,
const char *extra_info, share::ObSnapshotInfo &snapshot_info);
int get_snapshot(common::ObISQLClient &proxy, const int64_t tenant_id, share::ObSnapShotType snapshot_type,
const int64_t snapshot_ts, share::ObSnapshotInfo &snapshot_info);
int check_restore_point(common::ObMySQLProxy& proxy, const int64_t tenant_id, const int64_t table_id, bool& is_exist);
int get_snapshot_count(
......
......@@ -1103,8 +1103,8 @@ int ObBackupInfoManager::check_can_update_(
} else {
switch (src_status) {
case ObBackupInfoStatus::PREPARE:
if (ObBackupInfoStatus::SCHEDULE != dest_status && ObBackupInfoStatus::CANCEL != dest_status &&
ObBackupInfoStatus::STOP != dest_status) {
if (ObBackupInfoStatus::PREPARE != dest_status && ObBackupInfoStatus::SCHEDULE != dest_status &&
ObBackupInfoStatus::CANCEL != dest_status && ObBackupInfoStatus::STOP != dest_status) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("can not update backup info", K(ret), K(src_status), K(dest_status));
}
......
......@@ -26,6 +26,7 @@
#include "share/backup/ob_log_archive_backup_info_mgr.h"
#include "storage/transaction/ob_ts_mgr.h"
#include "rootserver/ob_rs_event_history_table_operator.h"
#include "share/ob_global_stat_proxy.h"
namespace oceanbase {
using namespace common;
......@@ -43,7 +44,6 @@ ObBackupScheduler::ObBackupScheduler()
schema_version_map_(),
backup_snapshot_version_(0),
backup_data_version_(0),
frozen_timestamp_(0),
max_backup_set_id_(0),
root_backup_(NULL),
freeze_info_manager_(nullptr),
......@@ -61,7 +61,7 @@ int ObBackupScheduler::init(const obrpc::ObBackupDatabaseArg& arg, schema::ObMul
int ret = OB_SUCCESS;
ObFreezeInfoProxy freeze_info_proxy;
ObSimpleFrozenStatus frozen_status;
const int64_t backup_snapshot_version = ObTimeUtil::current_time();
const int64_t current_ts = ObTimeUtil::current_time();
if (is_inited_) {
ret = OB_INIT_TWICE;
LOG_WARN("backup scheduler init twice", K(ret), K(is_inited_));
......@@ -70,14 +70,13 @@ int ObBackupScheduler::init(const obrpc::ObBackupDatabaseArg& arg, schema::ObMul
LOG_WARN("backup scheduler get invalid argument", K(ret), K(arg));
} else if (OB_FAIL(schema_version_map_.create(MAX_TENANT_BUCKET, ObModIds::BACKUP))) {
LOG_WARN("failed to create schema version map", K(ret));
} else if (OB_FAIL(freeze_info_proxy.get_frozen_info_less_than(proxy, backup_snapshot_version, frozen_status))) {
LOG_WARN("failed to get frozen info less than backup snapshot version", K(ret), K(backup_snapshot_version));
} else if (OB_FAIL(freeze_info_proxy.get_frozen_info_less_than(proxy, current_ts, frozen_status))) {
LOG_WARN("failed to get frozen info less than backup snapshot version", K(ret), K(current_ts));
} else if (OB_FAIL(init_frozen_schema_versions_(freeze_info_manager, frozen_status.frozen_version_))) {
LOG_WARN("failed to init frozen schema versions", K(ret), K(frozen_status));
} else {
backup_snapshot_version_ = backup_snapshot_version;
backup_snapshot_version_ = 0;
backup_data_version_ = frozen_status.frozen_version_;
frozen_timestamp_ = frozen_status.frozen_timestamp_;
arg_ = arg;
schema_service_ = &schema_service;
proxy_ = &proxy;
......@@ -86,7 +85,7 @@ int ObBackupScheduler::init(const obrpc::ObBackupDatabaseArg& arg, schema::ObMul
freeze_info_manager_ = &freeze_info_manager;
restore_point_service_ = &restore_point_service;
is_inited_ = true;
LOG_INFO("success init backup scheduler", K(arg), K(backup_snapshot_version), K(frozen_status));
LOG_INFO("success init backup scheduler", K(arg), K(current_ts), K(frozen_status));
}
return ret;
}
......@@ -288,7 +287,6 @@ int ObBackupScheduler::check_can_backup(const ObIArray<ObBaseBackupInfoStruct>&
int ObBackupScheduler::schedule_backup(const ObIArray<uint64_t>& tenant_ids, ObBackupInfoManager& info_manager)
{
int ret = OB_SUCCESS;
const int64_t backup_snapshot_version = backup_snapshot_version_;
if (!is_inited_) {
ret = OB_NOT_INIT;
......@@ -302,7 +300,7 @@ int ObBackupScheduler::schedule_backup(const ObIArray<uint64_t>& tenant_ids, ObB
ret = OB_ERR_UNEXPECTED;
LOG_WARN("first info should be sys tenant backup info", K(ret), K(tenant_id));
} else if (is_cluster_backup_) {
if (OB_FAIL(schedule_sys_tenant_backup(backup_snapshot_version, tenant_id, info_manager))) {
if (OB_FAIL(schedule_sys_tenant_backup(tenant_id, info_manager))) {
LOG_WARN("failed to schedule sys tenant backup", K(ret));
} else {
ROOTSERVICE_EVENT_ADD("backup", "start backup cluster");
......@@ -312,13 +310,20 @@ int ObBackupScheduler::schedule_backup(const ObIArray<uint64_t>& tenant_ids, ObB
DEBUG_SYNC(BACKUP_INFO_PREPARE);
if (OB_SUCC(ret)) {
if (OB_FAIL(schedule_tenants_backup(backup_snapshot_version, tenant_ids, info_manager))) {
if (OB_FAIL(prepare_backup_point_(tenant_ids, info_manager))) {
LOG_WARN("failed to prepare backup point", K(ret), K(tenant_ids));
} else {
DEBUG_SYNC(BEFORE_BACKUP_INFO_SCHEDULER);
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(schedule_tenants_backup(tenant_ids, info_manager))) {
LOG_WARN("failed to schedule tenants backup", K(ret));
} else {
DEBUG_SYNC(BACKUP_INFO_SCHEDULER);
}
}
DEBUG_SYNC(BACKUP_INFO_SCHEDULER);
if (OB_SUCC(ret)) {
if (OB_FAIL(start_backup(info_manager))) {
LOG_WARN("failed to start backup", K(ret));
......@@ -337,24 +342,21 @@ int ObBackupScheduler::schedule_backup(const ObIArray<uint64_t>& tenant_ids, ObB
return ret;
}
int ObBackupScheduler::schedule_sys_tenant_backup(
const int64_t backup_snapshot_version, const uint64_t tenant_id, ObBackupInfoManager& info_manager)
int ObBackupScheduler::schedule_sys_tenant_backup(const uint64_t tenant_id, ObBackupInfoManager &info_manager)
{
int ret = OB_SUCCESS;
ObSchemaGetterGuard guard;
int64_t backup_schema_version = 0;
ObBaseBackupInfoStruct info;
ObBaseBackupInfoStruct dest_info;
ObBackupItemTransUpdater updater;
const int64_t FAKE_BACKUP_SNAPSHOT_VERSION = 1;
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("backup scheduler do not init", K(ret));
} else if (OB_SYS_TENANT_ID != tenant_id || backup_snapshot_version <= 0) {
} else if (OB_SYS_TENANT_ID != tenant_id) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("tenant id should be sys tenant id", K(ret), K(tenant_id), K(backup_snapshot_version));
} else if (OB_FAIL(get_tenant_schema_version(tenant_id, backup_schema_version))) {
LOG_WARN("failed to get tenant schema version", K(ret), K(tenant_id));
LOG_WARN("tenant id should be sys tenant id", K(ret), K(tenant_id));
} else if (OB_FAIL(updater.start(*proxy_))) {
LOG_WARN("failed to start trans", K(ret));
} else {
......@@ -362,9 +364,9 @@ int ObBackupScheduler::schedule_sys_tenant_backup(
LOG_WARN("failed to get backup info", K(ret), K(tenant_id));
} else {
dest_info = info;
// dest_info.backup_dest_ added by log archive
dest_info.backup_snapshot_version_ = backup_snapshot_version;
dest_info.backup_schema_version_ = backup_schema_version;
// backup_snapshot_version_ should not be ZERO, here using FAKE VALUE
dest_info.backup_snapshot_version_ = FAKE_BACKUP_SNAPSHOT_VERSION;
dest_info.backup_schema_version_ = 0;
dest_info.backup_type_.type_ =
arg_.is_incremental_ ? ObBackupType::INCREMENTAL_BACKUP : ObBackupType::FULL_BACKUP;
dest_info.backup_data_version_ = backup_data_version_;
......@@ -391,8 +393,8 @@ int ObBackupScheduler::schedule_sys_tenant_backup(
return ret;
}
int ObBackupScheduler::schedule_tenant_backup(const int64_t backup_snapshot_version, const uint64_t tenant_id,
const ObBaseBackupInfoStruct::BackupDest& backup_dest, ObISQLClient& sys_tenant_trans,
int ObBackupScheduler::schedule_tenant_backup(const int64_t backup_snapshot_version,
const uint64_t tenant_id, const ObBaseBackupInfoStruct::BackupDest& backup_dest, ObISQLClient& sys_tenant_trans,
ObBackupInfoManager& info_manager)
{
int ret = OB_SUCCESS;
......@@ -451,18 +453,12 @@ int ObBackupScheduler::schedule_tenant_backup(const int64_t backup_snapshot_vers
LOG_WARN("end transaction failed", K(tmp_ret), K(ret));
ret = OB_SUCCESS == ret ? tmp_ret : ret;
}
if (OB_SUCC(ret)) {
if (OB_FAIL(create_backup_point(tenant_id))) {
LOG_WARN("failed to create backup point", K(ret), K(tenant_id));
}
}
}
return ret;
}
int ObBackupScheduler::schedule_tenants_backup(const int64_t backup_snapshot_version,
const common::ObIArray<uint64_t>& tenant_ids, ObBackupInfoManager& info_manager)
int ObBackupScheduler::schedule_tenants_backup(
const common::ObIArray<uint64_t> &tenant_ids, ObBackupInfoManager &info_manager)
{
int ret = OB_SUCCESS;
ObBackupItemTransUpdater updater;
......@@ -472,9 +468,9 @@ int ObBackupScheduler::schedule_tenants_backup(const int64_t backup_snapshot_ver
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("backup scheduler do not init", K(ret));
} else if (tenant_ids.empty()) {
} else if (tenant_ids.empty() || backup_snapshot_version_ <= 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("schedule get tenants backup get invalid argument", K(ret), K(tenant_ids));
LOG_WARN("schedule get tenants backup get invalid argument", K(ret), K(tenant_ids), K(backup_snapshot_version_));
} else if (OB_FAIL(updater.start(*proxy_))) {
LOG_WARN("failed to start trans", K(ret));
} else {
......@@ -498,7 +494,7 @@ int ObBackupScheduler::schedule_tenants_backup(const int64_t backup_snapshot_ver
if (OB_SYS_TENANT_ID == tenant_id) {
// do nothing
} else if (OB_FAIL(schedule_tenant_backup(
backup_snapshot_version, tenant_id, backup_dest, updater.get_trans(), info_manager))) {
backup_snapshot_version_, tenant_id, backup_dest, updater.get_trans(), info_manager))) {
LOG_WARN("failed to schedule tenant backup", K(ret), K(tenant_id));
}
}
......@@ -760,7 +756,7 @@ int ObBackupScheduler::check_tenant_can_backup(
return ret;
}
int ObBackupScheduler::create_backup_point(const uint64_t tenant_id)
int ObBackupScheduler::create_backup_point(const uint64_t tenant_id, common::ObMySQLTransaction &trans)
{
int ret = OB_SUCCESS;
char name[OB_MAX_RESERVED_POINT_NAME_LENGTH] = {0};
......@@ -769,13 +765,13 @@ int ObBackupScheduler::create_backup_point(const uint64_t tenant_id)
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("backup scheduler do not init", K(ret));
} else if (OB_INVALID_ID == tenant_id) {
} else if (OB_INVALID_ID == tenant_id || !trans.is_started()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("create backup point get invalid argument", K(ret), K(tenant_id));
} else if (OB_FAIL(get_tenant_schema_version(tenant_id, backup_schema_version))) {
LOG_WARN("failed to get tenant schema version", K(ret), K(tenant_id));
} else if (OB_FAIL(restore_point_service_->create_backup_point(
tenant_id, name, backup_snapshot_version_, backup_schema_version))) {
tenant_id, name, backup_snapshot_version_, backup_schema_version, trans))) {
if (OB_ERR_BACKUP_POINT_EXIST == ret) {
ret = OB_SUCCESS;
} else {
......@@ -812,8 +808,7 @@ int ObBackupScheduler::check_log_archive_status()
LOG_WARN("failed to get log archive backup info", K(ret));
} else if (ObLogArchiveStatus::DOING != sys_log_archive_info_.status_.status_) {
ret = OB_BACKUP_CAN_NOT_START;
LOG_WARN(
"failed to start backup", K(ret), K(sys_log_archive_info_), K(backup_snapshot_version_), K(frozen_timestamp_));
LOG_WARN("failed to start backup", K(ret), K(sys_log_archive_info_));
if (OB_SUCCESS != (tmp_ret = databuff_printf(error_msg,
ERROR_MSG_LENGTH,
pos,
......@@ -823,23 +818,6 @@ int ObBackupScheduler::check_log_archive_status()
} else {
LOG_USER_ERROR(OB_BACKUP_CAN_NOT_START, error_msg);
}
} else if (sys_log_archive_info_.status_.start_ts_ > frozen_timestamp_) {
ret = OB_BACKUP_CAN_NOT_START;
LOG_WARN(
"failed to start backup", K(ret), K(sys_log_archive_info_), K(backup_snapshot_version_), K(frozen_timestamp_));
if (OB_SUCCESS != (tmp_ret = databuff_printf(error_msg,
ERROR_MSG_LENGTH,
pos,
"log archive start timestamp is bigger than frozen timestamp, need major freeze first. "
"start timestamp : %ld, "
"frozen timestamp : %ld .",
sys_log_archive_info_.status_.start_ts_,
frozen_timestamp_))) {
LOG_WARN("failed to set error msg", K(tmp_ret), K(error_msg), K(pos));
} else {
LOG_USER_ERROR(OB_BACKUP_CAN_NOT_START, error_msg);
}
}
return ret;
}
......@@ -866,5 +844,103 @@ int ObBackupScheduler::check_tenant_backup_data_version(
return ret;
}
int ObBackupScheduler::prepare_backup_point_(
const common::ObIArray<uint64_t> &tenant_ids, ObBackupInfoManager &info_manager)
{
int ret = OB_SUCCESS;
int64_t gc_timestamp = 0;
ObBaseBackupInfoStruct info;
ObBaseBackupInfoStruct dest_info;
ObBackupItemTransUpdater updater;
ObTimeoutCtx timeout_ctx;
const int64_t MAX_EXECUTE_TIMEOUT_US = 600L * 1000 * 1000; // 600s
const int64_t stmt_timeout = MAX_EXECUTE_TIMEOUT_US;
const uint64_t sys_tenant_id = OB_SYS_TENANT_ID;
ObFreezeInfoProxy freeze_info_proxy;
ObSimpleFrozenStatus frozen_status;
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("backup scheduler do not init", K(ret));
} else if (tenant_ids.empty()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("prepare backup point get invalid argument", K(ret), K(tenant_ids));
} else if (OB_FAIL(timeout_ctx.set_trx_timeout_us(stmt_timeout))) {
LOG_WARN("fail to set trx timeout", K(ret), K(stmt_timeout));
} else if (OB_FAIL(timeout_ctx.set_timeout(stmt_timeout))) {
LOG_WARN("set timeout context failed", K(ret));
} else if (OB_FAIL(updater.start(*proxy_))) {
LOG_WARN("failed to start trans", K(ret));
} else {
if (OB_FAIL(info_manager.get_backup_info(sys_tenant_id, updater, info))) {
LOG_WARN("failed to get backup info", K(ret), K(sys_tenant_id));
} else if (OB_FAIL(ObGlobalStatProxy::select_gc_timestamp_for_update(updater.get_trans(), gc_timestamp))) {
LOG_WARN("fail to select gc timstamp for update", K(ret));
} else {
backup_snapshot_version_ = ObTimeUtil::current_time();
// because snapshot gc ts is :
// int64_t new_snapshot_gc_ts = ObTimeUtility::current_time() -
// transaction::ObWeakReadUtil::default_max_stale_time_for_weak_consistency();
if (OB_FAIL(freeze_info_proxy.get_frozen_info_less_than(
updater.get_trans(), backup_snapshot_version_, frozen_status))) {
LOG_WARN("failed to get frozen info less than backup snapshot version", K(ret), K(backup_snapshot_version_));
} else if (frozen_status.frozen_version_ != backup_data_version_) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("frozen version is not equal to backup data version. It may has multi RS",
K(ret),
K(frozen_status),
K(backup_data_version_));
} else if (backup_snapshot_version_ <= gc_timestamp) {
ret = OB_BACKUP_CAN_NOT_START;
LOG_WARN("failed to start backup", K(ret), K(backup_snapshot_version_), K(gc_timestamp));
const int64_t ERROR_MSG_LENGTH = 1024;
char error_msg[ERROR_MSG_LENGTH] = "";
int tmp_ret = OB_SUCCESS;
int64_t pos = 0;
if (OB_SUCCESS !=
(tmp_ret = databuff_printf(
error_msg, ERROR_MSG_LENGTH, pos, "snapshot gc ts is bigger than backup snapshot version."))) {
LOG_WARN("failed to set error msg", K(tmp_ret), K(error_msg), K(pos));
} else {
LOG_USER_ERROR(OB_BACKUP_CAN_NOT_START, error_msg);
}
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < tenant_ids.count(); ++i) {
const uint64_t tenant_id = tenant_ids.at(i);
if (OB_SYS_TENANT_ID == tenant_id) {
// do nothing
} else if (OB_FAIL(create_backup_point(tenant_id, updater.get_trans()))) {
LOG_WARN("failed to create backup point", K(ret), K(tenant_id));
}
}
}
}
if (OB_SUCC(ret)) {
int64_t backup_schema_version = 0;
if (OB_FAIL(get_tenant_schema_version(sys_tenant_id, backup_schema_version))) {
LOG_WARN("failed to get tenant schema version", K(ret), K(sys_tenant_id));
} else {
dest_info = info;
dest_info.backup_snapshot_version_ = backup_snapshot_version_;
dest_info.backup_schema_version_ = backup_schema_version;
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(info_manager.check_can_update(info, dest_info))) {
LOG_WARN("failed to check can update", K(ret));
} else if (OB_FAIL(info_manager.update_backup_info(sys_tenant_id, dest_info, updater))) {
LOG_WARN("failed to update backup info", K(ret), K(sys_tenant_id), K(dest_info));
}
}
int tmp_ret = updater.end(OB_SUCC(ret));
if (OB_SUCCESS != tmp_ret) {
LOG_WARN("end transaction failed", K(tmp_ret), K(ret));
ret = OB_SUCCESS == ret ? tmp_ret : ret;
}
}
return ret;
}
} // namespace share
} // namespace oceanbase
......@@ -37,13 +37,11 @@ private:
int get_tenant_ids(common::ObIArray<uint64_t>& tenant_ids);
int check_can_backup(const common::ObIArray<ObBaseBackupInfoStruct>& infos);
int schedule_backup(const common::ObIArray<uint64_t>& tenant_ids, ObBackupInfoManager& info_manager);
int schedule_sys_tenant_backup(
const int64_t backup_snapshot_version, const uint64_t tenant_id, ObBackupInfoManager& info_manager);
int schedule_tenant_backup(const int64_t backup_snapshot_version, const uint64_t tenant_id,
const ObBaseBackupInfoStruct::BackupDest& backup_dest, common::ObISQLClient& sys_tenant_trans,
ObBackupInfoManager& info_manager);
int schedule_tenants_backup(const int64_t backup_snapshot_version, const common::ObIArray<uint64_t>& tenant_ids,
ObBackupInfoManager& info_manager);
int schedule_sys_tenant_backup(const uint64_t tenant_id, ObBackupInfoManager& info_manager);
int schedule_tenant_backup(const int64_t backup_snapshot_version,
const uint64_t tenant_id, const ObBaseBackupInfoStruct::BackupDest& backup_dest,
common::ObISQLClient& sys_tenant_trans, ObBackupInfoManager& info_manager);
int schedule_tenants_backup(const common::ObIArray<uint64_t>& tenant_ids, ObBackupInfoManager& info_manager);
int get_tenant_schema_version(const uint64_t tenant_id, int64_t& schema_version);
int fetch_schema_version(const uint64_t tenant_id, int64_t& schema_version);
......@@ -56,9 +54,10 @@ private:
int check_gts_(const common::ObIArray<uint64_t>& tenant_ids);
int init_frozen_schema_versions_(rootserver::ObFreezeInfoManager& freeze_info_manager, const int64_t frozen_version);
int check_backup_schema_version_(const uint64_t tenant_id, const int64_t backup_schema_version);
int create_backup_point(const uint64_t tenant_id);
int create_backup_point(const uint64_t tenant_id, common::ObMySQLTransaction &trans);
int check_log_archive_status();
int check_tenant_backup_data_version(const uint64_t tenant_id, ObBackupInfoManager& info_manager, bool& can_backup);
int prepare_backup_point_(const common::ObIArray<uint64_t> &tenant_ids, ObBackupInfoManager &info_manager);
private:
static const int64_t MAX_TENANT_BUCKET = 1024;
......@@ -71,7 +70,6 @@ private:
hash::ObHashMap<uint64_t, int64_t> frozen_schema_version_map_;
int64_t backup_snapshot_version_;
int64_t backup_data_version_;
int64_t frozen_timestamp_;
int64_t max_backup_set_id_;
rootserver::ObRootBackup* root_backup_;
rootserver::ObFreezeInfoManager* freeze_info_manager_;
......
......@@ -294,6 +294,7 @@ class ObString;
ACT(FOLLOWER_BEFORE_UPDATE_RESTORE_FLAG_RESTORE_LOG, ) \
ACT(BEFORE_GLOBAL_INDEX_BUILDER_MOVE_TASK, ) \
ACT(BEFORE_SEND_RESTORE_PARTITIONS_RPC, ) \
ACT(BEFORE_BACKUP_INFO_SCHEDULER, ) \
ACT(MAX_DEBUG_SYNC_POINT, )
DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册