提交 beda1a3f 编写于 作者: 悟世者's avatar 悟世者 提交者: mergify[bot]

The nested loop now enforces multithreading.

    Switches are added to support controlling whether multithreading is used to nested loops

    The reason for adding a parameter to control whether to enable multithreaded nested loop is that

    1. Provides parameters for fast execution of single-threaded versus multithreaded comparisons
    2. At present, the physical layer is placed in the second stage because
    the multi-threaded transformation of Nested loop has not been thoroughly carried out during the development period.
    The argument that controls turning on multithreading is also needed for easy code rollback

    TODO: There is no time to continue to optimize this parameter,
    later need to provide a configurable parameter to
    control whether to enable multithreading
上级 7789a52c
......@@ -100,10 +100,17 @@ class JoinerGeneral : public TwoDimensionalJoiner {
// Instead of the original inner Join function block,
// as the top-level call of inner Join,
// internal split multiple threads to separate different subsets for processing
void ExecuteInnerJoinLoop(MIIterator &mit, Condition &cond, MINewContents &new_mind, DimensionVector &all_dims,
void ExecuteInnerJoinLoopMultiThread(MIIterator &mit, Condition &cond, MINewContents &new_mind,
DimensionVector &all_dims,
std::vector<bool> &pack_desc_locked, int64_t &tuples_in_output, int64_t limit,
bool count_only);
// A single thread handles nested loop.
// The purpose of this function is to provide an option that can be handled by a single thread
void ExecuteInnerJoinLoopSingleThread(MIIterator &mit, Condition &cond, MINewContents &new_mind,
DimensionVector &all_dims, std::vector<bool> &pack_desc_locked,
int64_t &tuples_in_output, int64_t limit, bool count_only);
// Handles each row in the Pack that the current iterator points to
// TODO: Keep in mind that internal Pack reads will have cache invalidation during multithread switching,
// leaving the second phase to continue processing the split of the house storage layer
......@@ -63,15 +63,21 @@ void JoinerGeneral::ExecuteJoinConditions(Condition &cond) {
bool loc_result;
bool stop_execution = false; // early stop for LIMIT
rc_control_.lock(m_conn->GetThreadID()) << "Starting joiner loop (" << mit.NumOfTuples() << " rows)." << system::unlock;
// The main loop for checking conditions
int64_t rows_passed = 0;
int64_t rows_omitted = 0;
if (!outer_dims.IsEmpty())
ExecuteOuterJoinLoop(cond, new_mind, all_dims, outer_dims, tuples_in_output, tips.limit);
else {
ExecuteInnerJoinLoop(mit, cond, new_mind, all_dims, pack_desc_locked, tuples_in_output, tips.limit, tips.count_only);
// TODO: There is no time to continue to optimize this parameter,
// later need to provide a configurable parameter to
// control whether to enable multithreading
ExecuteInnerJoinLoopMultiThread(mit, cond, new_mind, all_dims, pack_desc_locked, tuples_in_output, tips.limit,
ExecuteInnerJoinLoopSingleThread(mit, cond, new_mind, all_dims, pack_desc_locked, tuples_in_output, tips.limit,
// Postprocessing and cleanup
if (tips.count_only)
......@@ -171,7 +177,6 @@ void JoinerGeneral::TaskInnerJoinPacks(MIIterator *taskIterator, CTask *task, Co
Condition cd(*cond);
int task_pack_num = 0;
while (taskIterator->IsValid() && !(*stop_execution)) {
......@@ -188,12 +193,75 @@ void JoinerGeneral::TaskInnerJoinPacks(MIIterator *taskIterator, CTask *task, Co
// A single thread handles nested loop.
// The purpose of this function is to provide an option that can be handled by a single thread
void JoinerGeneral::ExecuteInnerJoinLoopSingleThread(MIIterator &mit, Condition &cond, MINewContents &new_mind,
DimensionVector& all_dims, std::vector<bool>& pack_desc_locked,
int64_t& tuples_in_output, int64_t limit, bool count_only) {
rc_control_.lock(m_conn->GetThreadID()) << "Starting joiner loop (" << mit.NumOfTuples() << " rows)." << system::unlock;
bool stop_execution = false;
bool loc_result = false;
int no_desc = cond.Size();
int64_t rows_passed = 0;
int64_t rows_omitted = 0;
while (mit.IsValid() && !stop_execution) {
if (mit.PackrowStarted()) {
bool omit_this_packrow = false;
for (int i = 0; (i < no_desc && !omit_this_packrow); i++)
if (cond[i].EvaluateRoughlyPack(mit) == common::RSValue::RS_NONE) omit_this_packrow = true;
for (int i = 0; i < no_desc; i++) pack_desc_locked[i] = false; // delay locking
if (new_mind.NoMoreTuplesPossible())
break; // stop the join if nothing new may be obtained in some
// optimized cases
if (omit_this_packrow) {
rows_omitted += mit.GetPackSizeLeft();
rows_passed += mit.GetPackSizeLeft();
loc_result = true;
for (int i = 0; (i < no_desc && loc_result); i++) {
if (!pack_desc_locked[i]) { // delayed locking - maybe will not be
// locked at all?
pack_desc_locked[i] = true;
if (types::RequiresUTFConversions(cond[i].GetCollation())) {
if (cond[i].CheckCondition_UTF(mit) == false) loc_result = false;
} else {
if (cond[i].CheckCondition(mit) == false) loc_result = false;
if (loc_result) {
if (!count_only) {
for (int i = 0; i < mind->NumOfDimensions(); i++)
if (all_dims[i]) new_mind.SetNewTableValue(i, mit[i]);
if (m_conn->Killed()) throw common::KilledException();
if (limit > -1 && tuples_in_output >= limit) stop_execution = true;
if (rows_passed > 0 && rows_omitted > 0) {
<< "Roughly omitted " << int(rows_omitted / double(rows_passed) * 1000) / 10.0 << "% rows." << system::unlock;
// Instead of the original inner Join function block,
// as the top-level call of inner Join,
// internal split multiple threads to separate different subsets for processing
void JoinerGeneral::ExecuteInnerJoinLoop(MIIterator &mit, Condition &cond, MINewContents &new_mind,
void JoinerGeneral::ExecuteInnerJoinLoopMultiThread(MIIterator &mit, Condition &cond, MINewContents &new_mind,
DimensionVector &all_dims, std::vector<bool> &pack_desc_locked,
int64_t &tuples_in_output, int64_t limit, bool count_only) {
<< "Starting joiner loop (" << mit.NumOfTuples() << " rows)." << system::unlock;
int packnum = 0;
while (mit.IsValid()) {
......@@ -218,7 +286,7 @@ void JoinerGeneral::ExecuteInnerJoinLoop(MIIterator &mit, Condition &cond, MINew
} while ((num <= 1) && (threads_num >= 1));
TIANMU_LOG(LogCtl_Level::DEBUG, "ExecuteInnerJoinLoop packnum: %d threads_num: %d loopcnt: %d num: %d mod: %d",
TIANMU_LOG(LogCtl_Level::DEBUG, "ExecuteInnerJoinLoopMultiThreading packnum: %d threads_num: %d loopcnt: %d num: %d mod: %d",
packnum, threads_num,
loopcnt, num, mod);
......@@ -275,9 +343,10 @@ void JoinerGeneral::ExecuteInnerJoinLoop(MIIterator &mit, Condition &cond, MINew
if (rows_passed > 0 && rows_omitted > 0)
if (rows_passed > 0 && rows_omitted > 0) {
<< "Roughly omitted " << int(rows_omitted / double(rows_passed) * 1000) / 10.0 << "% rows." << system::unlock;
void JoinerGeneral::ExecuteOuterJoinLoop(Condition &cond, MINewContents &new_mind, DimensionVector &all_dims,
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册