From 4bca2395943b3879890b4b1d7299209e0f72cf28 Mon Sep 17 00:00:00 2001 From: obdev Date: Mon, 6 Feb 2023 14:38:54 +0800 Subject: [PATCH] fix shared hj hang because one of workers is already exited --- src/sql/engine/join/ob_hash_join_op.cpp | 22 ++++++++++++++++++++++ src/sql/engine/join/ob_hash_join_op.h | 10 ++++++++++ 2 files changed, 32 insertions(+) diff --git a/src/sql/engine/join/ob_hash_join_op.cpp b/src/sql/engine/join/ob_hash_join_op.cpp index d6bdee1d60..325c9d3855 100644 --- a/src/sql/engine/join/ob_hash_join_op.cpp +++ b/src/sql/engine/join/ob_hash_join_op.cpp @@ -390,6 +390,9 @@ int ObHashJoinOp::inner_open() ObSQLSessionInfo *session = NULL; if (OB_FAIL(set_shared_info())) { LOG_WARN("failed to set shared info", K(ret)); + } else if (is_shared_ && OB_FAIL(sync_wait_open())) { + is_shared_ = false; + LOG_WARN("failed to sync open for shared hj", K(ret)); } else if ((OB_UNLIKELY(MY_SPEC.all_join_keys_.count() <= 0 || MY_SPEC.all_join_keys_.count() != MY_SPEC.all_hash_funcs_.count() || OB_ISNULL(left_)))) { @@ -2392,6 +2395,25 @@ int ObHashJoinOp::sync_wait_close() return ret; } +int ObHashJoinOp::sync_wait_open() +{ + int ret = OB_SUCCESS; + ObHashJoinInput *hj_input = static_cast(input_); + if (OB_ISNULL(hj_input)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected status: shared hash join info is null", K(ret)); + } else if (OB_FAIL(hj_input->sync_wait( + ctx_, hj_input->get_open_cnt(), + [&](int64_t n_times) { + UNUSED(n_times); + }))) { + LOG_WARN("failed to sync open", K(ret), K(spec_.id_)); + } else { + LOG_TRACE("debug sync sync open", K(ret), K(spec_.id_)); + } + return ret; +} + // dump partition that has only little data int ObHashJoinOp::dump_remain_partition() { diff --git a/src/sql/engine/join/ob_hash_join_op.h b/src/sql/engine/join/ob_hash_join_op.h index 6c639e9742..03d73ba4a2 100644 --- a/src/sql/engine/join/ob_hash_join_op.h +++ b/src/sql/engine/join/ob_hash_join_op.h @@ -45,6 +45,7 @@ struct ObHashTableSharedTableInfo int64_t total_memory_row_count_; int64_t total_memory_size_; + int64_t open_cnt_; }; class ObHashJoinInput : public ObOpInput @@ -144,6 +145,13 @@ public: ObHashTableSharedTableInfo *shared_hj_info = reinterpret_cast(shared_hj_info_); return shared_hj_info->close_cnt_; } + + int64_t &get_open_cnt() + { + ObHashTableSharedTableInfo *shared_hj_info = reinterpret_cast(shared_hj_info_); + return shared_hj_info->open_cnt_; + } + ObHashTableSharedTableInfo *get_shared_hj_info() { return reinterpret_cast(shared_hj_info_); @@ -175,6 +183,7 @@ public: shared_hj_info->process_cnt_ = 0; shared_hj_info->close_cnt_ = 0; + shared_hj_info->open_cnt_ = 0; shared_hj_info->ret_ = OB_SUCCESS; shared_hj_info->read_null_in_naaj_ = false; new (&shared_hj_info->cond_)common::SimpleCond(common::ObWaitEventIds::SQL_SHARED_HJ_COND_WAIT); @@ -946,6 +955,7 @@ private: int sync_set_early_exit(); int do_sync_wait_all(); int sync_wait_close(); + int sync_wait_open(); /********** end for shared hash table hash join *******/ private: using PredFunc = std::function; -- GitLab