From 03ff66996e48fff48118dfea6d431e59773e03a0 Mon Sep 17 00:00:00 2001 From: SanmuWangZJU Date: Fri, 3 Mar 2023 09:40:57 +0000 Subject: [PATCH] [OBCDC] ignore rollbacked schema_version after tenant start service --- src/logservice/libobcdc/src/ob_log_instance.cpp | 12 ++++++++---- src/logservice/libobcdc/src/ob_log_part_mgr.cpp | 12 +++++++++--- src/logservice/libobcdc/src/ob_log_part_mgr.h | 1 + .../libobcdc/src/ob_log_resource_collector.cpp | 5 +++-- .../libobcdc/src/ob_log_resource_collector.h | 4 ++-- 5 files changed, 23 insertions(+), 11 deletions(-) diff --git a/src/logservice/libobcdc/src/ob_log_instance.cpp b/src/logservice/libobcdc/src/ob_log_instance.cpp index 2d2db2df1..7aa0078e8 100644 --- a/src/logservice/libobcdc/src/ob_log_instance.cpp +++ b/src/logservice/libobcdc/src/ob_log_instance.cpp @@ -2236,7 +2236,9 @@ void ObLogInstance::global_flow_control_() int64_t dml_parser_part_trans_task_count = 0; int64_t br_queue_part_trans_task_count = br_queue_.get_part_trans_task_count(); int64_t out_part_trans_task_count = get_out_part_trans_task_count_(); - int64_t resource_collector_part_trans_task_count = resource_collector_->get_part_trans_task_count(); + int64_t resource_collector_part_trans_task_count = 0; + int64_t resource_collector_br_count = 0; + resource_collector_->get_task_count(resource_collector_part_trans_task_count, resource_collector_br_count); int64_t committer_ddl_part_trans_task_count = 0; int64_t committer_dml_part_trans_task_count = 0; committer_->get_part_trans_task_count(committer_ddl_part_trans_task_count, @@ -2521,7 +2523,9 @@ int ObLogInstance::get_task_count_(int64_t &ready_to_seq_task_count, int64_t sys_ls_handle_part_trans_task_count = sys_ls_handler_->get_part_trans_task_count(); int64_t br_queue_part_trans_task_count = br_queue_.get_part_trans_task_count(); int64_t out_part_trans_task_count = get_out_part_trans_task_count_(); - int64_t resource_collector_part_trans_task_count = resource_collector_->get_part_trans_task_count(); + int64_t resource_collector_part_trans_task_count = 0; + int64_t resource_collector_br_count = 0; + resource_collector_->get_task_count(resource_collector_part_trans_task_count, resource_collector_br_count); int64_t dml_br_count_in_user_queue = br_queue_.get_dml_br_count(); int64_t dml_br_count_output = output_dml_br_count_; @@ -2558,8 +2562,8 @@ int ObLogInstance::get_task_count_(int64_t &ready_to_seq_task_count, dml_br_count_in_user_queue); _LOG_INFO("[TASK_COUNT_STAT] [OUT] [PART_TRANS_TASK=%ld] [DDL_BR=%ld] [DML_BR=%ld]", out_part_trans_task_count, ddl_br_count_output, dml_br_count_output); - _LOG_INFO("[TASK_COUNT_STAT] [RESOURCE_COLLECTOR] [PART_TRANS_TASK=%ld]", - resource_collector_part_trans_task_count); + _LOG_INFO("[TASK_COUNT_STAT] [RESOURCE_COLLECTOR] [PART_TRANS_TASK=%ld] [BR=%ld]", + resource_collector_part_trans_task_count, resource_collector_br_count); } } } diff --git a/src/logservice/libobcdc/src/ob_log_part_mgr.cpp b/src/logservice/libobcdc/src/ob_log_part_mgr.cpp index 94aff4982..9412d0862 100644 --- a/src/logservice/libobcdc/src/ob_log_part_mgr.cpp +++ b/src/logservice/libobcdc/src/ob_log_part_mgr.cpp @@ -38,10 +38,14 @@ #define CHECK_SCHEMA_VERSION(check_schema_version, fmt, arg...) \ do { \ if (OB_UNLIKELY(check_schema_version < ATOMIC_LOAD(&cur_schema_version_))) { \ - LOG_ERROR(fmt, K(tenant_id_), K(cur_schema_version_), K(check_schema_version), ##arg); \ - if (!TCONF.skip_reversed_schema_verison) { \ - ret = OB_INVALID_ARGUMENT; \ + if (ATOMIC_LOAD(&enable_check_schema_version_)) { \ + LOG_ERROR(fmt, K(tenant_id_), K(cur_schema_version_), K(check_schema_version), ##arg); \ + if (!TCONF.skip_reversed_schema_verison) { \ + ret = OB_INVALID_ARGUMENT; \ + } \ } \ + } else if (OB_UNLIKELY(! ATOMIC_LOAD(&enable_check_schema_version_))) { \ + ATOMIC_SET(&enable_check_schema_version_, true); \ } \ } while (0) @@ -94,6 +98,7 @@ int ObLogPartMgr::init(const uint64_t tenant_id, table_id_cache_ = &table_id_cache; cur_schema_version_ = start_schema_version; enable_oracle_mode_match_case_sensitive_ = enable_oracle_mode_match_case_sensitive; + enable_check_schema_version_ = false; inited_ = true; LOG_INFO("init PartMgr succ", K(tenant_id), K(start_schema_version)); @@ -111,6 +116,7 @@ void ObLogPartMgr::reset() tablet_to_table_info_.destroy(); cur_schema_version_ = OB_INVALID_VERSION; enable_oracle_mode_match_case_sensitive_ = false; + enable_check_schema_version_ = false; schema_cond_.destroy(); } diff --git a/src/logservice/libobcdc/src/ob_log_part_mgr.h b/src/logservice/libobcdc/src/ob_log_part_mgr.h index 863d5449b..d18a38b1a 100644 --- a/src/logservice/libobcdc/src/ob_log_part_mgr.h +++ b/src/logservice/libobcdc/src/ob_log_part_mgr.h @@ -485,6 +485,7 @@ private: // Default whitelist match insensitive bool enable_oracle_mode_match_case_sensitive_; + bool enable_check_schema_version_; // Conditional common::ObThreadCond schema_cond_; diff --git a/src/logservice/libobcdc/src/ob_log_resource_collector.cpp b/src/logservice/libobcdc/src/ob_log_resource_collector.cpp index e03e894c7..10dfe1b68 100644 --- a/src/logservice/libobcdc/src/ob_log_resource_collector.cpp +++ b/src/logservice/libobcdc/src/ob_log_resource_collector.cpp @@ -785,9 +785,10 @@ int ObLogResourceCollector::revert_single_binlog_record_(ObLogBR *br) return ret; } -int64_t ObLogResourceCollector::get_part_trans_task_count() const +void ObLogResourceCollector::get_task_count(int64_t &part_trans_task_count, int64_t &br_count) const { - return ATOMIC_LOAD(&total_part_trans_task_count_); + part_trans_task_count = ATOMIC_LOAD(&total_part_trans_task_count_); + br_count = ATOMIC_LOAD(&br_count_); } int ObLogResourceCollector::revert_unserved_part_trans_task_(const int64_t thread_idx, PartTransTask &task) diff --git a/src/logservice/libobcdc/src/ob_log_resource_collector.h b/src/logservice/libobcdc/src/ob_log_resource_collector.h index 922de11a1..2a72b885d 100644 --- a/src/logservice/libobcdc/src/ob_log_resource_collector.h +++ b/src/logservice/libobcdc/src/ob_log_resource_collector.h @@ -77,7 +77,7 @@ public: virtual int start() = 0; virtual void stop() = 0; virtual void mark_stop_flag() = 0; - virtual int64_t get_part_trans_task_count() const = 0; + virtual void get_task_count(int64_t &part_trans_task_count, int64_t &br_count) const = 0; virtual void print_stat_info() const = 0; }; @@ -120,7 +120,7 @@ public: void stop(); void mark_stop_flag(); int handle(void *data, const int64_t thread_index, volatile bool &stop_flag); - int64_t get_part_trans_task_count() const; + void get_task_count(int64_t &part_trans_task_count, int64_t &br_count) const; void print_stat_info() const; private: -- GitLab