From ad7be2769266f9b0d9425121aa9b3230909bfb0b Mon Sep 17 00:00:00 2001 From: obdev Date: Thu, 15 Jul 2021 13:48:14 +0800 Subject: [PATCH] cherry-pick to opensource. --- .../ob_all_virtual_proxy_schema.cpp | 3 +++ src/sql/executor/ob_distributed_scheduler.cpp | 7 ++++--- .../executor/ob_multiscan_task_spliter.cpp | 9 +++++---- src/sql/executor/ob_multiscan_task_spliter.h | 4 ++++ .../executor/ob_trans_result_collector.cpp | 19 ++++++++++++++----- src/sql/executor/ob_trans_result_collector.h | 7 +++---- src/sql/ob_result_set.cpp | 2 +- 7 files changed, 34 insertions(+), 17 deletions(-) diff --git a/src/observer/virtual_table/ob_all_virtual_proxy_schema.cpp b/src/observer/virtual_table/ob_all_virtual_proxy_schema.cpp index 3d7e318096..a951337e4d 100644 --- a/src/observer/virtual_table/ob_all_virtual_proxy_schema.cpp +++ b/src/observer/virtual_table/ob_all_virtual_proxy_schema.cpp @@ -194,6 +194,9 @@ int ObAllVirtualProxySchema::inner_open() LOG_WARN("ob_write_string failed", K(object_table_name), K(ret)); } else if (OB_FAIL(full_schema_guard_.get_database_schema(object_database_id, database_schema))) { LOG_WARN("get_database_schema failed", K(tenant_id), K(object_database_id), K(ret)); + } else if (OB_ISNULL(database_schema)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("database schema is null", K(ret), K(object_database_id)); } else if (OB_FAIL(ob_write_string( *allocator_, database_schema->get_database_name_str(), level1_decoded_db_name_))) { LOG_WARN("ob_write_string failed", K(database_schema->get_database_name_str()), K(ret)); diff --git a/src/sql/executor/ob_distributed_scheduler.cpp b/src/sql/executor/ob_distributed_scheduler.cpp index b273f511b1..7adf37dd79 100644 --- a/src/sql/executor/ob_distributed_scheduler.cpp +++ b/src/sql/executor/ob_distributed_scheduler.cpp @@ -661,9 +661,10 @@ int ObDistributedScheduler::schedule(ObExecContext& ctx, ObPhysicalPlan* phy_pla } else { LOG_WARN("fail to schedule, print jobs' status", K(ret), K(fail_ret), "jobs_status", jc_status_buf); } - if (OB_SUCCESS != (fail_ret = signal_schedule_error(ret))) { - LOG_WARN("fail to signal schedule error", K(fail_ret)); - } + // 移到do_schedule中、执行wait_root_use_up_data之前的位置 +// if (OB_SUCCESS != (fail_ret = signal_schedule_error(ret))) { +// LOG_WARN("fail to signal schedule error", K(fail_ret)); +// } } NG_TRACE(distributed_schedule_end); return ret; diff --git a/src/sql/executor/ob_multiscan_task_spliter.cpp b/src/sql/executor/ob_multiscan_task_spliter.cpp index dabfb3bb9c..4841c8d47f 100644 --- a/src/sql/executor/ob_multiscan_task_spliter.cpp +++ b/src/sql/executor/ob_multiscan_task_spliter.cpp @@ -251,7 +251,8 @@ bool ObDistributedTaskSpliter::ObSliceComparer::operator()(const ObSliceEvent* s } ObDistributedTaskSpliter::ObDistributedTaskSpliter() - : table_locations_(ObModIds::OB_SQL_EXECUTOR_TASK_SPLITER, OB_MALLOC_NORMAL_BLOCK_SIZE), + : schema_guard_(), + table_locations_(ObModIds::OB_SQL_EXECUTOR_TASK_SPLITER, OB_MALLOC_NORMAL_BLOCK_SIZE), part_shuffle_keys_(ObModIds::OB_SQL_EXECUTOR_TASK_SPLITER, OB_MALLOC_NORMAL_BLOCK_SIZE), part_idxs_(ObModIds::OB_SQL_EXECUTOR_TASK_SPLITER, OB_MALLOC_NORMAL_BLOCK_SIZE), child_slices_(ObModIds::OB_SQL_EXECUTOR_TASK_SPLITER, OB_MALLOC_NORMAL_BLOCK_SIZE), @@ -569,10 +570,10 @@ int ObDistributedTaskSpliter::check_table_locations() int ObDistributedTaskSpliter::init_part_shuffle_keys() { int ret = OB_SUCCESS; - ObSchemaGetterGuard schema_guard; const ObTableSchema* table_schema = NULL; const ObPhyTableLocation* table_loc = NULL; const ObPartitionReplicaLocationIArray* part_locs = NULL; + schema_guard_.reset(); part_shuffle_keys_.reset(); part_idxs_.reset(); if (table_locations_.count() < 1) { @@ -583,9 +584,9 @@ int ObDistributedTaskSpliter::init_part_shuffle_keys() } else if (FALSE_IT(part_locs = &table_loc->get_partition_location_list())) { // nothing. } else if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard( - exec_ctx_->get_my_session()->get_effective_tenant_id(), schema_guard))) { + exec_ctx_->get_my_session()->get_effective_tenant_id(), schema_guard_))) { LOG_WARN("faile to get schema guard", K(ret)); - } else if (OB_FAIL(schema_guard.get_table_schema(table_loc->get_ref_table_id(), table_schema))) { + } else if (OB_FAIL(schema_guard_.get_table_schema(table_loc->get_ref_table_id(), table_schema))) { LOG_WARN("faile to get table schema", K(ret), K(table_loc->get_ref_table_id())); } else if (OB_ISNULL(table_schema)) { ret = OB_SCHEMA_ERROR; diff --git a/src/sql/executor/ob_multiscan_task_spliter.h b/src/sql/executor/ob_multiscan_task_spliter.h index f2995c1b4b..68339f4dff 100644 --- a/src/sql/executor/ob_multiscan_task_spliter.h +++ b/src/sql/executor/ob_multiscan_task_spliter.h @@ -176,6 +176,10 @@ private: int need_split_task_by_partition(bool& by_partition) const; private: + // we need define schema_guard_ together with part_shuffle_keys_, + // because part_shuffle_keys_ may refer to the memory of schema_guard_. + // see https://work.aone.alibaba-inc.com/issue/33570337 + share::schema::ObSchemaGetterGuard schema_guard_; // table informations. common::ObSEArray table_locations_; common::ObSEArray part_shuffle_keys_; diff --git a/src/sql/executor/ob_trans_result_collector.cpp b/src/sql/executor/ob_trans_result_collector.cpp index f1c3832ecb..42bbca95a1 100644 --- a/src/sql/executor/ob_trans_result_collector.cpp +++ b/src/sql/executor/ob_trans_result_collector.cpp @@ -104,8 +104,11 @@ int ObTransResultCollector::recv_result(const ObTaskID& task_id, const TransResu { int ret = OB_SUCCESS; LOG_TRACE("TRC_recv_result", K(task_id)); - OV(OB_NOT_NULL(trans_result_)); - OZ(trans_result_->merge_result(trans_result)); + { + ObLockGuard lock_guard(lock_); + OV(OB_NOT_NULL(trans_result_)); + OZ(trans_result_->merge_result(trans_result)); + } if (OB_SUCCESS != ret) { err_code_ = ret; } @@ -199,6 +202,7 @@ int ObTransResultCollector::init(ObSQLSessionInfo& session, ObExecutorRpcImpl* e { int ret = OB_SUCCESS; OX(reset()); + ObLockGuard lock_guard(lock_); OV(OB_NOT_NULL(exec_rpc)); OX(rpc_tenant_id_ = session.get_rpc_tenant_id()); OX(trans_id_ = session.get_trans_desc().get_trans_id()); @@ -216,7 +220,7 @@ int ObTransResultCollector::wait_all_task(int64_t query_timeout, const bool is_b int ret = OB_SUCCESS; int64_t cur_time = ObTimeUtility::current_time(); int64_t next_ping_time = cur_time; - int64_t max_wait_time = MAX(cur_time + TTL_THRESHOLD * WAIT_ONCE_TIME, query_timeout); + int64_t max_wait_time = MIN(cur_time + TTL_THRESHOLD * WAIT_ONCE_TIME, query_timeout); bool need_wait = true; bool need_ping = true; while (/*OB_SUCC(ret)*/ need_wait && cur_time < max_wait_time) { @@ -245,8 +249,11 @@ int ObTransResultCollector::wait_all_task(int64_t query_timeout, const bool is_b if (need_wait || OB_SUCCESS != err_code_) { // now we may get many errors, we must log every one, but return any one is OK. LOG_WARN("need set incomplete", K(need_wait), K(err_code_)); - OV(OB_NOT_NULL(trans_result_)); - OX(trans_result_->set_incomplete()); + { + ObLockGuard lock_guard(lock_); + OV (OB_NOT_NULL(trans_result_)); + OX (trans_result_->set_incomplete()); + } if (need_wait) { ret = OB_TIMEOUT; for (int64_t i = 0; i < reporters_.count(); i++) { @@ -256,12 +263,14 @@ int ObTransResultCollector::wait_all_task(int64_t query_timeout, const bool is_b } } } + ObLockGuard lock_guard(lock_); trans_result_ = NULL; return ret; } void ObTransResultCollector::reset() { + ObLockGuard lock_guard(lock_); trans_result_ = NULL; err_code_ = OB_SUCCESS; trans_id_.reset(); diff --git a/src/sql/executor/ob_trans_result_collector.h b/src/sql/executor/ob_trans_result_collector.h index ac7c0037c7..657ee1343b 100644 --- a/src/sql/executor/ob_trans_result_collector.h +++ b/src/sql/executor/ob_trans_result_collector.h @@ -85,9 +85,6 @@ private: // other attributes. ObTaskStatus status_; common::ObSpinLock lock_; - -private: - static const int64_t TTL_THRESHOLD = 5; }; typedef common::Ob2DArray ObReporterArray; @@ -132,6 +129,8 @@ private: void wait_reporter_event(int64_t wait_timeout); private: + // trans_result will be inited and reset by main thread, accessed by rpc thread, + // so we need protect it with lock. TransResult* trans_result_; int err_code_; common::ObSpinLock lock_; @@ -146,7 +145,7 @@ private: obrpc::SingleWaitCond reporter_cond_; private: - static const int64_t TTL_THRESHOLD = 5; + static const int64_t TTL_THRESHOLD = 10; static const int64_t WAIT_ONCE_TIME = 500000; // 500ms. }; diff --git a/src/sql/ob_result_set.cpp b/src/sql/ob_result_set.cpp index 9b5af87d94..f5da7d7312 100644 --- a/src/sql/ob_result_set.cpp +++ b/src/sql/ob_result_set.cpp @@ -10,7 +10,7 @@ * See the Mulan PubL v2 for more details. */ -#define USING_LOG_PREFIX SQL_SESSION +#define USING_LOG_PREFIX SQL #include "sql/ob_result_set.h" #include "lib/oblog/ob_trace_log.h" #include "lib/container/ob_id_set.h" -- GitLab