提交 131c2cf6 编写于 作者: O obdev 提交者: wangzelin.wzl

remote execute hang because of plan diff with ctrl ser and runner ser

上级 42a6e126
......@@ -469,6 +469,10 @@ public:
EN_ENABLE_HASH_JOIN_CACHE_AWARE = 251,
EN_SET_DISABLE_HASH_JOIN_BATCH = 252,
// only work for remote execute
EN_DISABLE_REMOTE_EXEC_WITH_PLAN = 255,
EN_REMOTE_EXEC_ERR = 256,
EN_XA_PREPARE_ERROR = 260,
EN_XA_UPDATE_COORD_FAILED = 261,
EN_XA_PREPARE_RESP_LOST = 262,
......
......@@ -450,7 +450,10 @@ void ObQueryRetryCtrl::test_and_save_retry_state(const ObGlobalContext& gctx, co
}
if (RETRY_TYPE_NONE != retry_type_) {
session->get_retry_info_for_update().set_last_query_retry_err(err);
// STATIC_ENG_NOT_IMPLEMENT myabe cover the orign err code, so not save STATIC_ENG_NOT_IMPLEMENT
if (STATIC_ENG_NOT_IMPLEMENT != err) {
session->get_retry_info_for_update().set_last_query_retry_err(err);
}
if (OB_UNLIKELY(err != client_ret)) {
LOG_ERROR("when need retry, client_ret must be equal to err", K(client_ret), K(err), K(retry_type_));
}
......
......@@ -794,9 +794,11 @@ int ObInnerSQLConnection::query(sqlclient::ObIExecutor& executor, ObInnerSQLResu
local_sys_schema_version);
}
int ret_code = OB_SUCCESS;
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_FAIL(SMART_CALL(do_query(executor, res)))) {
ret_code = ret;
LOG_WARN("execute failed", K(ret), K(executor), K(retry_cnt));
int tmp_ret = process_retry(res, ret, abs_timeout_us, need_retry, retry_cnt, is_from_pl);
if (OB_SUCCESS != tmp_ret) {
......@@ -809,6 +811,7 @@ int ObInnerSQLConnection::query(sqlclient::ObIExecutor& executor, ObInnerSQLResu
LOG_WARN("failed to close result", K(close_ret), K(ret));
}
}
get_session().set_session_in_retry(need_retry, ret_code);
execute_start_timestamp_ = res.get_execute_start_ts();
execute_end_timestamp_ = res.get_execute_end_ts();
......
......@@ -3272,11 +3272,6 @@ int ObCodeGeneratorImpl::convert_exchange(
RowDesc* out_row_desc = NULL;
if (OB_FAIL(create_phy_op_desc(type, receive, out_row_desc, out_ops, op.get_op_id()))) {
LOG_WARN("failed to create phy op and desc", K(ret));
} else if (GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_2250) {
// after 2.2.50, remote plan will send SQL, instead of plan to the remote server
if (OB_FAIL(construct_basic_row_desc(op.get_output_exprs(), *out_row_desc))) {
LOG_WARN("construct basic row desc failed", K(ret), K(op.get_output_exprs()));
}
} else if (child_ops.at(0).first->get_output_count() > 0 && child_ops.at(0).second->get_column_num() > 0 &&
OB_FAIL(copy_row_desc_by_projector(*child_ops.at(0).second,
child_ops.at(0).first->get_projector(),
......
......@@ -81,8 +81,8 @@ int ObDistExecuteBaseP::param_preprocess(ObTask& task)
ObTaskExecutorCtx& executor_ctx = exec_ctx_.get_task_exec_ctx();
int64_t tenant_local_version = -1;
int64_t sys_local_version = -1;
int64_t sys_schema_version = executor_ctx.get_query_tenant_begin_schema_version();
int64_t tenant_schema_version = executor_ctx.get_query_sys_begin_schema_version();
int64_t tenant_schema_version = executor_ctx.get_query_tenant_begin_schema_version();
int64_t sys_schema_version = executor_ctx.get_query_sys_begin_schema_version();
LOG_DEBUG("task submit", K(task), "ob_task_id", task.get_ob_task_id());
if (OB_ISNULL(gctx_.schema_service_) || OB_ISNULL(gctx_.sql_engine_)) {
......@@ -1011,8 +1011,8 @@ int ObMiniTaskBaseP::prepare_task_env(ObMiniTask& task)
ObTaskExecutorCtx& executor_ctx = exec_ctx_.get_task_exec_ctx();
int64_t tenant_local_version = -1;
int64_t sys_local_version = -1;
int64_t sys_schema_version = executor_ctx.get_query_tenant_begin_schema_version();
int64_t tenant_schema_version = executor_ctx.get_query_sys_begin_schema_version();
int64_t tenant_schema_version = executor_ctx.get_query_tenant_begin_schema_version();
int64_t sys_schema_version = executor_ctx.get_query_sys_begin_schema_version();
process_timestamp_ = ObTimeUtility::current_time();
if (OB_ISNULL(gctx_.schema_service_) || OB_ISNULL(gctx_.sql_engine_)) {
......
......@@ -686,7 +686,11 @@ int ObRemoteBaseExecuteP<T>::execute_with_sql(ObRemoteTask& task)
ObPhysicalPlan* plan = nullptr;
ObPhysicalPlanCtx* plan_ctx = nullptr;
CacheRefHandleID cache_handle_id = MAX_HANDLE;
if (OB_ISNULL(session = exec_ctx_.get_my_session())) {
int inject_err_no = EVENT_CALL(EventTable::EN_REMOTE_EXEC_ERR);
if (0 != inject_err_no) {
ret = inject_err_no;
LOG_WARN("Injection OB_LOCATION_NOT_EXIST error", K(ret));
} else if (OB_ISNULL(session = exec_ctx_.get_my_session())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("session is NULL", K(ret), K(task));
} else if (OB_ISNULL(plan_ctx = GET_PHY_PLAN_CTX(exec_ctx_))) {
......
......@@ -56,8 +56,15 @@ int ObRemoteScheduler::schedule(ObExecContext& ctx, ObPhysicalPlan* phy_plan)
LOG_WARN("execute with sql failed", K(ret));
}
}
} else if (OB_FAIL(execute_with_plan(ctx, phy_plan))) {
LOG_WARN("execute with plan failed", K(ret));
} else {
if (NULL != phy_plan->get_root_op_spec()) {
// Remote execution under the static engine does not support sending plans
// so here is a fallback to the old engine
ret = STATIC_ENG_NOT_IMPLEMENT;
LOG_WARN("static engine not support remote execute with send plan, will retry", K(ret));
} else if (OB_FAIL(execute_with_plan(ctx, phy_plan))) {
LOG_WARN("execute with plan failed", K(ret));
}
}
return ret;
}
......
......@@ -46,7 +46,7 @@ int ObTaskExecutorCtx::CalcVirtualPartitionIdParams::init(uint64_t ref_table_id)
}
OB_SERIALIZE_MEMBER(ObTaskExecutorCtx, table_locations_, retry_times_, min_cluster_version_, expected_worker_cnt_,
allocated_worker_cnt_);
allocated_worker_cnt_, query_tenant_begin_schema_version_, query_sys_begin_schema_version_);
ObTaskExecutorCtx::ObTaskExecutorCtx(ObExecContext& exec_context)
: task_resp_handler_(NULL),
......
......@@ -2997,6 +2997,15 @@ int ObSql::after_get_plan(ObPlanCacheCtx& pc_ctx, ObSQLSessionInfo& session, ObP
{
int ret = OB_SUCCESS;
ObPhysicalPlanCtx* pctx = pc_ctx.exec_ctx_.get_physical_plan_ctx();
bool enable_send_plan_event = EVENT_CALL(EventTable::EN_DISABLE_REMOTE_EXEC_WITH_PLAN) == 0;
bool enable_send_plan = session.get_is_in_retry() && enable_send_plan_event;
int last_query_retry_err = session.get_retry_info().get_last_query_retry_err();
if (OB_TRANSACTION_SET_VIOLATION == last_query_retry_err
|| OB_TRY_LOCK_ROW_CONFLICT == last_query_retry_err) {
enable_send_plan = false;
}
LOG_DEBUG("before after_get_plan", K(enable_send_plan), K(enable_send_plan_event),
"is_retry",session.get_is_in_retry());
// LOG_INFO("after get paln", K(pctx), K(phy_plan));
if (NULL != pctx) {
if (NULL != phy_plan) {
......@@ -3020,7 +3029,8 @@ int ObSql::after_get_plan(ObPlanCacheCtx& pc_ctx, ObSQLSessionInfo& session, ObP
} // end for
}
if (OB_SUCC(ret) && phy_plan->is_remote_plan() && !phy_plan->contains_temp_table() &&
GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_2250) {
GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_2250 &&
!enable_send_plan) {
ParamStore& param_store = pctx->get_param_store_for_update();
if (OB_NOT_NULL(ps_params)) {
int64_t initial_param_count = ps_params->count();
......
......@@ -5203,26 +5203,41 @@ int ObLogicalOperator::allocate_dummy_output_access()
} else if (type_ == log_op_def::LOG_GRANULE_ITERATOR &&
get_plan()->get_optimizer_context().is_batched_multi_stmt()) {
ret = append_array_no_dup(output_exprs_, get_child(first_child)->get_output_exprs());
} else {
ObRawExpr* partition_id_expr = NULL;
if (log_op_def::LOG_EXCHANGE == type_) {
} else if (log_op_def::LOG_EXCHANGE == type_) {
ObRawExpr *partition_id_expr = NULL;
ObLogExchange *exchange_op = NULL;
exchange_op = static_cast<ObLogExchange*>(this);
if (exchange_op->get_is_remote() && exchange_op->is_producer()) {
// https://work.aone.alibaba-inc.com/issue/33487009
// 0. EXCHANGE IN REMOTE
// 1. EXCHANGE OUT REMOTE
// 2. TABLE SCAN / OTHERS
// exchange_out_remote must copy output from exchange_in_remote
// operators 0 and 1 must have the same output
if (OB_ISNULL(get_parent())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("parent is null", K(ret), K(get_plan()), K(type_), K(id_));
} else if (OB_FAIL(output_exprs_.assign(get_parent()->get_output_exprs()))) {
LOG_WARN("copy_exprs fails", K(ret));
}
} else {
for (int64_t i = 0; i < output_exprs_.count() && OB_ISNULL(partition_id_expr); i++) {
ObRawExpr* expr = output_exprs_.at(i);
if (expr->get_expr_type() == T_PDML_PARTITION_ID) {
partition_id_expr = expr;
}
}
}
if (OB_FAIL(output_exprs_.assign(get_child(first_child)->get_output_exprs()))) {
LOG_WARN("copy_exprs fails", K(ret));
} else if (OB_NOT_NULL(partition_id_expr)) {
LOG_DEBUG("append the partition id expr to output exprs", K(*partition_id_expr));
if (OB_FAIL(add_var_to_array_no_dup(output_exprs_, partition_id_expr))) {
LOG_WARN("failed to add partition id expr to output exprs", K(ret));
if (OB_FAIL(output_exprs_.assign(get_child(first_child)->get_output_exprs()))) {
LOG_WARN("copy_exprs fails", K(ret));
} else if (OB_NOT_NULL(partition_id_expr)) {
LOG_DEBUG("append the partition id expr to output exprs", K(*partition_id_expr));
if (OB_FAIL(add_var_to_array_no_dup(output_exprs_, partition_id_expr))) {
LOG_WARN("failed to add partition id expr to output exprs", K(ret));
}
}
} else {
LOG_DEBUG("succ to assign output_exprs_", K(output_exprs_));
}
} else if (OB_FAIL(output_exprs_.assign(get_child(first_child)->get_output_exprs()))) {
LOG_WARN("copy_exprs fails", K(ret));
}
} else { /* Do nothing */
}
......
......@@ -3546,7 +3546,7 @@ SQL: select * from t5 where c2 = 5 and c3 = 7;
Outputs & filters:
-------------------------------------
0 - output([t5.c1], [t5.c2], [t5.c3]), filter(nil)
1 - output([t5.c2], [t5.c3], [t5.c1]), filter(nil)
1 - output([t5.c1], [t5.c2], [t5.c3]), filter(nil)
2 - output([t5.c2], [t5.c3], [t5.c1]), filter(nil),
access([t5.c2], [t5.c3], [t5.c1]), partitions(p2),
is_index_back=false,
......@@ -3570,7 +3570,7 @@ SQL: select * from t5 where c2 = 6 and c3 = 7;
Outputs & filters:
-------------------------------------
0 - output([t5.c1], [t5.c2], [t5.c3]), filter(nil)
1 - output([t5.c2], [t5.c3], [t5.c1]), filter(nil)
1 - output([t5.c1], [t5.c2], [t5.c3]), filter(nil)
2 - output([t5.c2], [t5.c3], [t5.c1]), filter(nil),
access([t5.c2], [t5.c3], [t5.c1]), partitions(p2),
is_index_back=false,
......@@ -7427,7 +7427,7 @@ SQL: select * from t1 X, t1 Y where X.c1 = Y.c1 and X.c1 = 1 and Y.c1 = 1;
Outputs & filters:
-------------------------------------
0 - output([X.c1], [X.c2], [X.c1], [X.c2]), filter(nil)
1 - output([X.c1], [X.c2]), filter(nil)
1 - output([X.c1], [X.c2], [X.c1], [X.c2]), filter(nil)
2 - output([X.c1], [X.c2]), filter(nil),
access([X.c1], [X.c2]), partitions(p1),
is_index_back=false,
......@@ -8714,7 +8714,7 @@ SQL: select * from t14 partition(p1) where (c1, c2) =(1, 2) or (c1, c2) = (2, 3)
Outputs & filters:
-------------------------------------
0 - output([t14.c1], [t14.c2], [t14.c3]), filter(nil)
1 - output([t14.c3], [t14.c1], [t14.c2]), filter(nil)
1 - output([t14.c1], [t14.c2], [t14.c3]), filter(nil)
2 - output([t14.c3], [t14.c1], [t14.c2]), filter(nil),
access([t14.c3], [t14.c1], [t14.c2]), partitions(p1),
is_index_back=false,
......@@ -8738,7 +8738,7 @@ SQL: select/*+index(t14 primary)*/* from t14 partition(p1) where (c1, c2) =(1,
Outputs & filters:
-------------------------------------
0 - output([t14.c1], [t14.c2], [t14.c3]), filter(nil)
1 - output([t14.c3], [t14.c1], [t14.c2]), filter(nil)
1 - output([t14.c1], [t14.c2], [t14.c3]), filter(nil)
2 - output([t14.c3], [t14.c1], [t14.c2]), filter(nil),
access([t14.c3], [t14.c1], [t14.c2]), partitions(p1),
is_index_back=false,
......
......@@ -4464,7 +4464,7 @@ SQL: select * from t5 where c2 = 5 and c3 = 7;
Outputs & filters:
-------------------------------------
0 - output([t5.c1], [t5.c2], [t5.c3]), filter(nil)
1 - output([t5.c2], [t5.c3], [t5.c1]), filter(nil)
1 - output([t5.c1], [t5.c2], [t5.c3]), filter(nil)
2 - output([t5.c2], [t5.c3], [t5.c1]), filter(nil),
access([t5.c2], [t5.c3], [t5.c1]), partitions(p2),
is_index_back=false,
......@@ -4488,7 +4488,7 @@ SQL: select * from t5 where c2 = 6 and c3 = 7;
Outputs & filters:
-------------------------------------
0 - output([t5.c1], [t5.c2], [t5.c3]), filter(nil)
1 - output([t5.c2], [t5.c3], [t5.c1]), filter(nil)
1 - output([t5.c1], [t5.c2], [t5.c3]), filter(nil)
2 - output([t5.c2], [t5.c3], [t5.c1]), filter(nil),
access([t5.c2], [t5.c3], [t5.c1]), partitions(p2),
is_index_back=false,
......@@ -8500,7 +8500,7 @@ SQL: select * from t1 X, t1 Y where X.c1 = Y.c1 and X.c1 = 1 and Y.c1 = 1;
Outputs & filters:
-------------------------------------
0 - output([X.c1], [X.c2], [X.c1], [X.c2]), filter(nil)
1 - output([X.c1], [X.c2]), filter(nil)
1 - output([X.c1], [X.c2], [X.c1], [X.c2]), filter(nil)
2 - output([X.c1], [X.c2]), filter(nil),
access([X.c1], [X.c2]), partitions(p1),
is_index_back=false,
......@@ -9823,7 +9823,7 @@ SQL: select * from t14 partition(p1) where (c1, c2) =(1, 2) or (c1, c2) = (2, 3)
Outputs & filters:
-------------------------------------
0 - output([t14.c1], [t14.c2], [t14.c3]), filter(nil)
1 - output([t14.c3], [t14.c1], [t14.c2]), filter(nil)
1 - output([t14.c1], [t14.c2], [t14.c3]), filter(nil)
2 - output([t14.c3], [t14.c1], [t14.c2]), filter(nil),
access([t14.c3], [t14.c1], [t14.c2]), partitions(p1),
is_index_back=false,
......@@ -9847,7 +9847,7 @@ SQL: select/*+index(t14 primary)*/* from t14 partition(p1) where (c1, c2) =(1,
Outputs & filters:
-------------------------------------
0 - output([t14.c1], [t14.c2], [t14.c3]), filter(nil)
1 - output([t14.c3], [t14.c1], [t14.c2]), filter(nil)
1 - output([t14.c1], [t14.c2], [t14.c3]), filter(nil)
2 - output([t14.c3], [t14.c1], [t14.c2]), filter(nil),
access([t14.c3], [t14.c1], [t14.c2]), partitions(p1),
is_index_back=false,
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册