From beda1a3f147b236b0833c2605411a3d1cc1bca56 Mon Sep 17 00:00:00 2001 From: adofsauron Date: Tue, 27 Sep 2022 19:55:57 +0800 Subject: [PATCH] The nested loop now enforces multithreading. Switches are added to support controlling whether multithreading is used to nested loops The reason for adding a parameter to control whether to enable multithreaded nested loop is that 1. Provides parameters for fast execution of single-threaded versus multithreaded comparisons 2. At present, the physical layer is placed in the second stage because the multi-threaded transformation of Nested loop has not been thoroughly carried out during the development period. The argument that controls turning on multithreading is also needed for easy code rollback TODO: There is no time to continue to optimize this parameter, later need to provide a configurable parameter to control whether to enable multithreading --- storage/tianmu/core/joiner.h | 9 ++- storage/tianmu/core/joiner_general.cpp | 85 +++++++++++++++++++++++--- 2 files changed, 85 insertions(+), 9 deletions(-) diff --git a/storage/tianmu/core/joiner.h b/storage/tianmu/core/joiner.h index e03a44480..c73992093 100644 --- a/storage/tianmu/core/joiner.h +++ b/storage/tianmu/core/joiner.h @@ -100,10 +100,17 @@ class JoinerGeneral : public TwoDimensionalJoiner { // Instead of the original inner Join function block, // as the top-level call of inner Join, // internal split multiple threads to separate different subsets for processing - void ExecuteInnerJoinLoop(MIIterator &mit, Condition &cond, MINewContents &new_mind, DimensionVector &all_dims, + void ExecuteInnerJoinLoopMultiThread(MIIterator &mit, Condition &cond, MINewContents &new_mind, + DimensionVector &all_dims, std::vector &pack_desc_locked, int64_t &tuples_in_output, int64_t limit, bool count_only); + // A single thread handles nested loop. + // The purpose of this function is to provide an option that can be handled by a single thread + void ExecuteInnerJoinLoopSingleThread(MIIterator &mit, Condition &cond, MINewContents &new_mind, + DimensionVector &all_dims, std::vector &pack_desc_locked, + int64_t &tuples_in_output, int64_t limit, bool count_only); + // Handles each row in the Pack that the current iterator points to // TODO: Keep in mind that internal Pack reads will have cache invalidation during multithread switching, // leaving the second phase to continue processing the split of the house storage layer diff --git a/storage/tianmu/core/joiner_general.cpp b/storage/tianmu/core/joiner_general.cpp index d4a852d64..e3100197b 100644 --- a/storage/tianmu/core/joiner_general.cpp +++ b/storage/tianmu/core/joiner_general.cpp @@ -63,15 +63,21 @@ void JoinerGeneral::ExecuteJoinConditions(Condition &cond) { bool loc_result; bool stop_execution = false; // early stop for LIMIT - rc_control_.lock(m_conn->GetThreadID()) << "Starting joiner loop (" << mit.NumOfTuples() << " rows)." << system::unlock; // The main loop for checking conditions - int64_t rows_passed = 0; - int64_t rows_omitted = 0; if (!outer_dims.IsEmpty()) ExecuteOuterJoinLoop(cond, new_mind, all_dims, outer_dims, tuples_in_output, tips.limit); else { - ExecuteInnerJoinLoop(mit, cond, new_mind, all_dims, pack_desc_locked, tuples_in_output, tips.limit, tips.count_only); + // TODO: There is no time to continue to optimize this parameter, + // later need to provide a configurable parameter to + // control whether to enable multithreading +#ifdef INNER_JOIN_LOOP_MULTI_THREAD + ExecuteInnerJoinLoopMultiThread(mit, cond, new_mind, all_dims, pack_desc_locked, tuples_in_output, tips.limit, + tips.count_only); +#else + ExecuteInnerJoinLoopSingleThread(mit, cond, new_mind, all_dims, pack_desc_locked, tuples_in_output, tips.limit, + tips.count_only); +#endif } // Postprocessing and cleanup if (tips.count_only) @@ -171,7 +177,6 @@ void JoinerGeneral::TaskInnerJoinPacks(MIIterator *taskIterator, CTask *task, Co } Condition cd(*cond); - taskIterator->Rewind(); int task_pack_num = 0; while (taskIterator->IsValid() && !(*stop_execution)) { @@ -188,12 +193,75 @@ void JoinerGeneral::TaskInnerJoinPacks(MIIterator *taskIterator, CTask *task, Co } } + // A single thread handles nested loop. +// The purpose of this function is to provide an option that can be handled by a single thread +void JoinerGeneral::ExecuteInnerJoinLoopSingleThread(MIIterator &mit, Condition &cond, MINewContents &new_mind, + DimensionVector& all_dims, std::vector& pack_desc_locked, + int64_t& tuples_in_output, int64_t limit, bool count_only) { + rc_control_.lock(m_conn->GetThreadID()) << "Starting joiner loop (" << mit.NumOfTuples() << " rows)." << system::unlock; + bool stop_execution = false; + bool loc_result = false; + int no_desc = cond.Size(); + int64_t rows_passed = 0; + int64_t rows_omitted = 0; + + while (mit.IsValid() && !stop_execution) { + if (mit.PackrowStarted()) { + bool omit_this_packrow = false; + for (int i = 0; (i < no_desc && !omit_this_packrow); i++) + if (cond[i].EvaluateRoughlyPack(mit) == common::RSValue::RS_NONE) omit_this_packrow = true; + for (int i = 0; i < no_desc; i++) pack_desc_locked[i] = false; // delay locking + if (new_mind.NoMoreTuplesPossible()) + break; // stop the join if nothing new may be obtained in some + // optimized cases + if (omit_this_packrow) { + rows_omitted += mit.GetPackSizeLeft(); + rows_passed += mit.GetPackSizeLeft(); + mit.NextPackrow(); + continue; + } + } + loc_result = true; + for (int i = 0; (i < no_desc && loc_result); i++) { + if (!pack_desc_locked[i]) { // delayed locking - maybe will not be + // locked at all? + cond[i].LockSourcePacks(mit); + pack_desc_locked[i] = true; + } + if (types::RequiresUTFConversions(cond[i].GetCollation())) { + if (cond[i].CheckCondition_UTF(mit) == false) loc_result = false; + } else { + if (cond[i].CheckCondition(mit) == false) loc_result = false; + } + } + if (loc_result) { + if (!count_only) { + for (int i = 0; i < mind->NumOfDimensions(); i++) + if (all_dims[i]) new_mind.SetNewTableValue(i, mit[i]); + new_mind.CommitNewTableValues(); + } + tuples_in_output++; + } + ++mit; + rows_passed++; + if (m_conn->Killed()) throw common::KilledException(); + if (limit > -1 && tuples_in_output >= limit) stop_execution = true; + } + + if (rows_passed > 0 && rows_omitted > 0) { + rc_control_.lock(m_conn->GetThreadID()) + << "Roughly omitted " << int(rows_omitted / double(rows_passed) * 1000) / 10.0 << "% rows." << system::unlock; + } +} + // Instead of the original inner Join function block, // as the top-level call of inner Join, // internal split multiple threads to separate different subsets for processing -void JoinerGeneral::ExecuteInnerJoinLoop(MIIterator &mit, Condition &cond, MINewContents &new_mind, +void JoinerGeneral::ExecuteInnerJoinLoopMultiThread(MIIterator &mit, Condition &cond, MINewContents &new_mind, DimensionVector &all_dims, std::vector &pack_desc_locked, int64_t &tuples_in_output, int64_t limit, bool count_only) { + rc_control_.lock(m_conn->GetThreadID()) + << "Starting joiner loop (" << mit.NumOfTuples() << " rows)." << system::unlock; int packnum = 0; while (mit.IsValid()) { @@ -218,7 +286,7 @@ void JoinerGeneral::ExecuteInnerJoinLoop(MIIterator &mit, Condition &cond, MINew --threads_num; } while ((num <= 1) && (threads_num >= 1)); - TIANMU_LOG(LogCtl_Level::DEBUG, "ExecuteInnerJoinLoop packnum: %d threads_num: %d loopcnt: %d num: %d mod: %d", + TIANMU_LOG(LogCtl_Level::DEBUG, "ExecuteInnerJoinLoopMultiThreading packnum: %d threads_num: %d loopcnt: %d num: %d mod: %d", packnum, threads_num, loopcnt, num, mod); @@ -275,9 +343,10 @@ void JoinerGeneral::ExecuteInnerJoinLoop(MIIterator &mit, Condition &cond, MINew res.get_all_with_except(); - if (rows_passed > 0 && rows_omitted > 0) + if (rows_passed > 0 && rows_omitted > 0) { rc_control_.lock(m_conn->GetThreadID()) << "Roughly omitted " << int(rows_omitted / double(rows_passed) * 1000) / 10.0 << "% rows." << system::unlock; + } } void JoinerGeneral::ExecuteOuterJoinLoop(Condition &cond, MINewContents &new_mind, DimensionVector &all_dims, -- GitLab