From 131c2cf6580d68c82cf3ed4d38d54df36f173e09 Mon Sep 17 00:00:00 2001 From: obdev Date: Wed, 14 Jul 2021 10:24:13 +0800 Subject: [PATCH] remote execute hang because of plan diff with ctrl ser and runner ser --- deps/oblib/src/lib/utility/ob_tracepoint.h | 4 ++ src/observer/mysql/ob_query_retry_ctrl.cpp | 5 ++- src/observer/ob_inner_sql_connection.cpp | 3 ++ .../code_generator/ob_code_generator_impl.cpp | 5 --- .../executor/ob_executor_rpc_processor.cpp | 8 ++-- .../executor/ob_remote_executor_processor.cpp | 6 ++- src/sql/executor/ob_remote_scheduler.cpp | 11 +++++- src/sql/executor/ob_task_executor_ctx.cpp | 2 +- src/sql/ob_sql.cpp | 12 +++++- src/sql/optimizer/ob_logical_operator.cpp | 39 +++++++++++++------ .../test_optimizer_default_stat.result | 10 ++--- .../optimizer/test_optimizer_select.result | 10 ++--- 12 files changed, 78 insertions(+), 37 deletions(-) diff --git a/deps/oblib/src/lib/utility/ob_tracepoint.h b/deps/oblib/src/lib/utility/ob_tracepoint.h index 0aff5e4530..16419c927e 100644 --- a/deps/oblib/src/lib/utility/ob_tracepoint.h +++ b/deps/oblib/src/lib/utility/ob_tracepoint.h @@ -469,6 +469,10 @@ public: EN_ENABLE_HASH_JOIN_CACHE_AWARE = 251, EN_SET_DISABLE_HASH_JOIN_BATCH = 252, + // only work for remote execute + EN_DISABLE_REMOTE_EXEC_WITH_PLAN = 255, + EN_REMOTE_EXEC_ERR = 256, + EN_XA_PREPARE_ERROR = 260, EN_XA_UPDATE_COORD_FAILED = 261, EN_XA_PREPARE_RESP_LOST = 262, diff --git a/src/observer/mysql/ob_query_retry_ctrl.cpp b/src/observer/mysql/ob_query_retry_ctrl.cpp index 903aaf65c5..e403c4ba6e 100644 --- a/src/observer/mysql/ob_query_retry_ctrl.cpp +++ b/src/observer/mysql/ob_query_retry_ctrl.cpp @@ -450,7 +450,10 @@ void ObQueryRetryCtrl::test_and_save_retry_state(const ObGlobalContext& gctx, co } if (RETRY_TYPE_NONE != retry_type_) { - session->get_retry_info_for_update().set_last_query_retry_err(err); + // STATIC_ENG_NOT_IMPLEMENT myabe cover the orign err code, so not save STATIC_ENG_NOT_IMPLEMENT + if (STATIC_ENG_NOT_IMPLEMENT != err) { + session->get_retry_info_for_update().set_last_query_retry_err(err); + } if (OB_UNLIKELY(err != client_ret)) { LOG_ERROR("when need retry, client_ret must be equal to err", K(client_ret), K(err), K(retry_type_)); } diff --git a/src/observer/ob_inner_sql_connection.cpp b/src/observer/ob_inner_sql_connection.cpp index a078ebea77..4883780906 100644 --- a/src/observer/ob_inner_sql_connection.cpp +++ b/src/observer/ob_inner_sql_connection.cpp @@ -794,9 +794,11 @@ int ObInnerSQLConnection::query(sqlclient::ObIExecutor& executor, ObInnerSQLResu local_sys_schema_version); } + int ret_code = OB_SUCCESS; if (OB_FAIL(ret)) { // do nothing } else if (OB_FAIL(SMART_CALL(do_query(executor, res)))) { + ret_code = ret; LOG_WARN("execute failed", K(ret), K(executor), K(retry_cnt)); int tmp_ret = process_retry(res, ret, abs_timeout_us, need_retry, retry_cnt, is_from_pl); if (OB_SUCCESS != tmp_ret) { @@ -809,6 +811,7 @@ int ObInnerSQLConnection::query(sqlclient::ObIExecutor& executor, ObInnerSQLResu LOG_WARN("failed to close result", K(close_ret), K(ret)); } } + get_session().set_session_in_retry(need_retry, ret_code); execute_start_timestamp_ = res.get_execute_start_ts(); execute_end_timestamp_ = res.get_execute_end_ts(); diff --git a/src/sql/code_generator/ob_code_generator_impl.cpp b/src/sql/code_generator/ob_code_generator_impl.cpp index 831e4d3c1b..b3a8e5b99f 100644 --- a/src/sql/code_generator/ob_code_generator_impl.cpp +++ b/src/sql/code_generator/ob_code_generator_impl.cpp @@ -3272,11 +3272,6 @@ int ObCodeGeneratorImpl::convert_exchange( RowDesc* out_row_desc = NULL; if (OB_FAIL(create_phy_op_desc(type, receive, out_row_desc, out_ops, op.get_op_id()))) { LOG_WARN("failed to create phy op and desc", K(ret)); - } else if (GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_2250) { - // after 2.2.50, remote plan will send SQL, instead of plan to the remote server - if (OB_FAIL(construct_basic_row_desc(op.get_output_exprs(), *out_row_desc))) { - LOG_WARN("construct basic row desc failed", K(ret), K(op.get_output_exprs())); - } } else if (child_ops.at(0).first->get_output_count() > 0 && child_ops.at(0).second->get_column_num() > 0 && OB_FAIL(copy_row_desc_by_projector(*child_ops.at(0).second, child_ops.at(0).first->get_projector(), diff --git a/src/sql/executor/ob_executor_rpc_processor.cpp b/src/sql/executor/ob_executor_rpc_processor.cpp index b58aa78945..ea92754c5e 100644 --- a/src/sql/executor/ob_executor_rpc_processor.cpp +++ b/src/sql/executor/ob_executor_rpc_processor.cpp @@ -81,8 +81,8 @@ int ObDistExecuteBaseP::param_preprocess(ObTask& task) ObTaskExecutorCtx& executor_ctx = exec_ctx_.get_task_exec_ctx(); int64_t tenant_local_version = -1; int64_t sys_local_version = -1; - int64_t sys_schema_version = executor_ctx.get_query_tenant_begin_schema_version(); - int64_t tenant_schema_version = executor_ctx.get_query_sys_begin_schema_version(); + int64_t tenant_schema_version = executor_ctx.get_query_tenant_begin_schema_version(); + int64_t sys_schema_version = executor_ctx.get_query_sys_begin_schema_version(); LOG_DEBUG("task submit", K(task), "ob_task_id", task.get_ob_task_id()); if (OB_ISNULL(gctx_.schema_service_) || OB_ISNULL(gctx_.sql_engine_)) { @@ -1011,8 +1011,8 @@ int ObMiniTaskBaseP::prepare_task_env(ObMiniTask& task) ObTaskExecutorCtx& executor_ctx = exec_ctx_.get_task_exec_ctx(); int64_t tenant_local_version = -1; int64_t sys_local_version = -1; - int64_t sys_schema_version = executor_ctx.get_query_tenant_begin_schema_version(); - int64_t tenant_schema_version = executor_ctx.get_query_sys_begin_schema_version(); + int64_t tenant_schema_version = executor_ctx.get_query_tenant_begin_schema_version(); + int64_t sys_schema_version = executor_ctx.get_query_sys_begin_schema_version(); process_timestamp_ = ObTimeUtility::current_time(); if (OB_ISNULL(gctx_.schema_service_) || OB_ISNULL(gctx_.sql_engine_)) { diff --git a/src/sql/executor/ob_remote_executor_processor.cpp b/src/sql/executor/ob_remote_executor_processor.cpp index b46a4e18ba..522703e536 100644 --- a/src/sql/executor/ob_remote_executor_processor.cpp +++ b/src/sql/executor/ob_remote_executor_processor.cpp @@ -686,7 +686,11 @@ int ObRemoteBaseExecuteP::execute_with_sql(ObRemoteTask& task) ObPhysicalPlan* plan = nullptr; ObPhysicalPlanCtx* plan_ctx = nullptr; CacheRefHandleID cache_handle_id = MAX_HANDLE; - if (OB_ISNULL(session = exec_ctx_.get_my_session())) { + int inject_err_no = EVENT_CALL(EventTable::EN_REMOTE_EXEC_ERR); + if (0 != inject_err_no) { + ret = inject_err_no; + LOG_WARN("Injection OB_LOCATION_NOT_EXIST error", K(ret)); + } else if (OB_ISNULL(session = exec_ctx_.get_my_session())) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("session is NULL", K(ret), K(task)); } else if (OB_ISNULL(plan_ctx = GET_PHY_PLAN_CTX(exec_ctx_))) { diff --git a/src/sql/executor/ob_remote_scheduler.cpp b/src/sql/executor/ob_remote_scheduler.cpp index e7d374b378..997d04ef5e 100644 --- a/src/sql/executor/ob_remote_scheduler.cpp +++ b/src/sql/executor/ob_remote_scheduler.cpp @@ -56,8 +56,15 @@ int ObRemoteScheduler::schedule(ObExecContext& ctx, ObPhysicalPlan* phy_plan) LOG_WARN("execute with sql failed", K(ret)); } } - } else if (OB_FAIL(execute_with_plan(ctx, phy_plan))) { - LOG_WARN("execute with plan failed", K(ret)); + } else { + if (NULL != phy_plan->get_root_op_spec()) { + // Remote execution under the static engine does not support sending plans + // so here is a fallback to the old engine + ret = STATIC_ENG_NOT_IMPLEMENT; + LOG_WARN("static engine not support remote execute with send plan, will retry", K(ret)); + } else if (OB_FAIL(execute_with_plan(ctx, phy_plan))) { + LOG_WARN("execute with plan failed", K(ret)); + } } return ret; } diff --git a/src/sql/executor/ob_task_executor_ctx.cpp b/src/sql/executor/ob_task_executor_ctx.cpp index 5e939283c1..ad07c58dba 100644 --- a/src/sql/executor/ob_task_executor_ctx.cpp +++ b/src/sql/executor/ob_task_executor_ctx.cpp @@ -46,7 +46,7 @@ int ObTaskExecutorCtx::CalcVirtualPartitionIdParams::init(uint64_t ref_table_id) } OB_SERIALIZE_MEMBER(ObTaskExecutorCtx, table_locations_, retry_times_, min_cluster_version_, expected_worker_cnt_, - allocated_worker_cnt_); + allocated_worker_cnt_, query_tenant_begin_schema_version_, query_sys_begin_schema_version_); ObTaskExecutorCtx::ObTaskExecutorCtx(ObExecContext& exec_context) : task_resp_handler_(NULL), diff --git a/src/sql/ob_sql.cpp b/src/sql/ob_sql.cpp index b831c1988d..104ff17c07 100644 --- a/src/sql/ob_sql.cpp +++ b/src/sql/ob_sql.cpp @@ -2997,6 +2997,15 @@ int ObSql::after_get_plan(ObPlanCacheCtx& pc_ctx, ObSQLSessionInfo& session, ObP { int ret = OB_SUCCESS; ObPhysicalPlanCtx* pctx = pc_ctx.exec_ctx_.get_physical_plan_ctx(); + bool enable_send_plan_event = EVENT_CALL(EventTable::EN_DISABLE_REMOTE_EXEC_WITH_PLAN) == 0; + bool enable_send_plan = session.get_is_in_retry() && enable_send_plan_event; + int last_query_retry_err = session.get_retry_info().get_last_query_retry_err(); + if (OB_TRANSACTION_SET_VIOLATION == last_query_retry_err + || OB_TRY_LOCK_ROW_CONFLICT == last_query_retry_err) { + enable_send_plan = false; + } + LOG_DEBUG("before after_get_plan", K(enable_send_plan), K(enable_send_plan_event), + "is_retry",session.get_is_in_retry()); // LOG_INFO("after get paln", K(pctx), K(phy_plan)); if (NULL != pctx) { if (NULL != phy_plan) { @@ -3020,7 +3029,8 @@ int ObSql::after_get_plan(ObPlanCacheCtx& pc_ctx, ObSQLSessionInfo& session, ObP } // end for } if (OB_SUCC(ret) && phy_plan->is_remote_plan() && !phy_plan->contains_temp_table() && - GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_2250) { + GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_2250 && + !enable_send_plan) { ParamStore& param_store = pctx->get_param_store_for_update(); if (OB_NOT_NULL(ps_params)) { int64_t initial_param_count = ps_params->count(); diff --git a/src/sql/optimizer/ob_logical_operator.cpp b/src/sql/optimizer/ob_logical_operator.cpp index e48bf5f7ee..e31d665fe6 100644 --- a/src/sql/optimizer/ob_logical_operator.cpp +++ b/src/sql/optimizer/ob_logical_operator.cpp @@ -5203,26 +5203,41 @@ int ObLogicalOperator::allocate_dummy_output_access() } else if (type_ == log_op_def::LOG_GRANULE_ITERATOR && get_plan()->get_optimizer_context().is_batched_multi_stmt()) { ret = append_array_no_dup(output_exprs_, get_child(first_child)->get_output_exprs()); - } else { - ObRawExpr* partition_id_expr = NULL; - if (log_op_def::LOG_EXCHANGE == type_) { + } else if (log_op_def::LOG_EXCHANGE == type_) { + ObRawExpr *partition_id_expr = NULL; + ObLogExchange *exchange_op = NULL; + exchange_op = static_cast(this); + if (exchange_op->get_is_remote() && exchange_op->is_producer()) { + // https://work.aone.alibaba-inc.com/issue/33487009 + // 0. EXCHANGE IN REMOTE + // 1. EXCHANGE OUT REMOTE + // 2. TABLE SCAN / OTHERS + // exchange_out_remote must copy output from exchange_in_remote + // operators 0 and 1 must have the same output + if (OB_ISNULL(get_parent())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("parent is null", K(ret), K(get_plan()), K(type_), K(id_)); + } else if (OB_FAIL(output_exprs_.assign(get_parent()->get_output_exprs()))) { + LOG_WARN("copy_exprs fails", K(ret)); + } + } else { for (int64_t i = 0; i < output_exprs_.count() && OB_ISNULL(partition_id_expr); i++) { ObRawExpr* expr = output_exprs_.at(i); if (expr->get_expr_type() == T_PDML_PARTITION_ID) { partition_id_expr = expr; } } - } - if (OB_FAIL(output_exprs_.assign(get_child(first_child)->get_output_exprs()))) { - LOG_WARN("copy_exprs fails", K(ret)); - } else if (OB_NOT_NULL(partition_id_expr)) { - LOG_DEBUG("append the partition id expr to output exprs", K(*partition_id_expr)); - if (OB_FAIL(add_var_to_array_no_dup(output_exprs_, partition_id_expr))) { - LOG_WARN("failed to add partition id expr to output exprs", K(ret)); + if (OB_FAIL(output_exprs_.assign(get_child(first_child)->get_output_exprs()))) { + LOG_WARN("copy_exprs fails", K(ret)); + } else if (OB_NOT_NULL(partition_id_expr)) { + LOG_DEBUG("append the partition id expr to output exprs", K(*partition_id_expr)); + if (OB_FAIL(add_var_to_array_no_dup(output_exprs_, partition_id_expr))) { + LOG_WARN("failed to add partition id expr to output exprs", K(ret)); + } } - } else { - LOG_DEBUG("succ to assign output_exprs_", K(output_exprs_)); } + } else if (OB_FAIL(output_exprs_.assign(get_child(first_child)->get_output_exprs()))) { + LOG_WARN("copy_exprs fails", K(ret)); } } else { /* Do nothing */ } diff --git a/unittest/sql/optimizer/test_optimizer_default_stat.result b/unittest/sql/optimizer/test_optimizer_default_stat.result index 2cf3969470..c80910fb5c 100644 --- a/unittest/sql/optimizer/test_optimizer_default_stat.result +++ b/unittest/sql/optimizer/test_optimizer_default_stat.result @@ -3546,7 +3546,7 @@ SQL: select * from t5 where c2 = 5 and c3 = 7; Outputs & filters: ------------------------------------- 0 - output([t5.c1], [t5.c2], [t5.c3]), filter(nil) - 1 - output([t5.c2], [t5.c3], [t5.c1]), filter(nil) + 1 - output([t5.c1], [t5.c2], [t5.c3]), filter(nil) 2 - output([t5.c2], [t5.c3], [t5.c1]), filter(nil), access([t5.c2], [t5.c3], [t5.c1]), partitions(p2), is_index_back=false, @@ -3570,7 +3570,7 @@ SQL: select * from t5 where c2 = 6 and c3 = 7; Outputs & filters: ------------------------------------- 0 - output([t5.c1], [t5.c2], [t5.c3]), filter(nil) - 1 - output([t5.c2], [t5.c3], [t5.c1]), filter(nil) + 1 - output([t5.c1], [t5.c2], [t5.c3]), filter(nil) 2 - output([t5.c2], [t5.c3], [t5.c1]), filter(nil), access([t5.c2], [t5.c3], [t5.c1]), partitions(p2), is_index_back=false, @@ -7427,7 +7427,7 @@ SQL: select * from t1 X, t1 Y where X.c1 = Y.c1 and X.c1 = 1 and Y.c1 = 1; Outputs & filters: ------------------------------------- 0 - output([X.c1], [X.c2], [X.c1], [X.c2]), filter(nil) - 1 - output([X.c1], [X.c2]), filter(nil) + 1 - output([X.c1], [X.c2], [X.c1], [X.c2]), filter(nil) 2 - output([X.c1], [X.c2]), filter(nil), access([X.c1], [X.c2]), partitions(p1), is_index_back=false, @@ -8714,7 +8714,7 @@ SQL: select * from t14 partition(p1) where (c1, c2) =(1, 2) or (c1, c2) = (2, 3) Outputs & filters: ------------------------------------- 0 - output([t14.c1], [t14.c2], [t14.c3]), filter(nil) - 1 - output([t14.c3], [t14.c1], [t14.c2]), filter(nil) + 1 - output([t14.c1], [t14.c2], [t14.c3]), filter(nil) 2 - output([t14.c3], [t14.c1], [t14.c2]), filter(nil), access([t14.c3], [t14.c1], [t14.c2]), partitions(p1), is_index_back=false, @@ -8738,7 +8738,7 @@ SQL: select/*+index(t14 primary)*/* from t14 partition(p1) where (c1, c2) =(1, Outputs & filters: ------------------------------------- 0 - output([t14.c1], [t14.c2], [t14.c3]), filter(nil) - 1 - output([t14.c3], [t14.c1], [t14.c2]), filter(nil) + 1 - output([t14.c1], [t14.c2], [t14.c3]), filter(nil) 2 - output([t14.c3], [t14.c1], [t14.c2]), filter(nil), access([t14.c3], [t14.c1], [t14.c2]), partitions(p1), is_index_back=false, diff --git a/unittest/sql/optimizer/test_optimizer_select.result b/unittest/sql/optimizer/test_optimizer_select.result index 8346d9e83d..12c62cfcc9 100644 --- a/unittest/sql/optimizer/test_optimizer_select.result +++ b/unittest/sql/optimizer/test_optimizer_select.result @@ -4464,7 +4464,7 @@ SQL: select * from t5 where c2 = 5 and c3 = 7; Outputs & filters: ------------------------------------- 0 - output([t5.c1], [t5.c2], [t5.c3]), filter(nil) - 1 - output([t5.c2], [t5.c3], [t5.c1]), filter(nil) + 1 - output([t5.c1], [t5.c2], [t5.c3]), filter(nil) 2 - output([t5.c2], [t5.c3], [t5.c1]), filter(nil), access([t5.c2], [t5.c3], [t5.c1]), partitions(p2), is_index_back=false, @@ -4488,7 +4488,7 @@ SQL: select * from t5 where c2 = 6 and c3 = 7; Outputs & filters: ------------------------------------- 0 - output([t5.c1], [t5.c2], [t5.c3]), filter(nil) - 1 - output([t5.c2], [t5.c3], [t5.c1]), filter(nil) + 1 - output([t5.c1], [t5.c2], [t5.c3]), filter(nil) 2 - output([t5.c2], [t5.c3], [t5.c1]), filter(nil), access([t5.c2], [t5.c3], [t5.c1]), partitions(p2), is_index_back=false, @@ -8500,7 +8500,7 @@ SQL: select * from t1 X, t1 Y where X.c1 = Y.c1 and X.c1 = 1 and Y.c1 = 1; Outputs & filters: ------------------------------------- 0 - output([X.c1], [X.c2], [X.c1], [X.c2]), filter(nil) - 1 - output([X.c1], [X.c2]), filter(nil) + 1 - output([X.c1], [X.c2], [X.c1], [X.c2]), filter(nil) 2 - output([X.c1], [X.c2]), filter(nil), access([X.c1], [X.c2]), partitions(p1), is_index_back=false, @@ -9823,7 +9823,7 @@ SQL: select * from t14 partition(p1) where (c1, c2) =(1, 2) or (c1, c2) = (2, 3) Outputs & filters: ------------------------------------- 0 - output([t14.c1], [t14.c2], [t14.c3]), filter(nil) - 1 - output([t14.c3], [t14.c1], [t14.c2]), filter(nil) + 1 - output([t14.c1], [t14.c2], [t14.c3]), filter(nil) 2 - output([t14.c3], [t14.c1], [t14.c2]), filter(nil), access([t14.c3], [t14.c1], [t14.c2]), partitions(p1), is_index_back=false, @@ -9847,7 +9847,7 @@ SQL: select/*+index(t14 primary)*/* from t14 partition(p1) where (c1, c2) =(1, Outputs & filters: ------------------------------------- 0 - output([t14.c1], [t14.c2], [t14.c3]), filter(nil) - 1 - output([t14.c3], [t14.c1], [t14.c2]), filter(nil) + 1 - output([t14.c1], [t14.c2], [t14.c3]), filter(nil) 2 - output([t14.c3], [t14.c1], [t14.c2]), filter(nil), access([t14.c3], [t14.c1], [t14.c2]), partitions(p1), is_index_back=false, -- GitLab