From 208f79d80bae1667d147e0713950317c9ca6c6b9 Mon Sep 17 00:00:00 2001 From: kevin Date: Sat, 17 Sep 2022 16:19:42 +0800 Subject: [PATCH] feat(tianmu) Refactor the aggregation and The IN operator to multithread #472 (#422 #465 #466) (#472) * Refactor the aggregation to multithread Co-authored-by: hustjieke --- sql/table.h | 8 + storage/tianmu/core/aggregation_algorithm.cpp | 172 +++++++------ storage/tianmu/core/aggregation_algorithm.h | 22 +- storage/tianmu/core/ctask.h | 16 +- storage/tianmu/core/group_distinct_table.cpp | 16 +- storage/tianmu/core/group_distinct_table.h | 4 + storage/tianmu/core/group_table.cpp | 9 +- storage/tianmu/core/group_table.h | 1 + storage/tianmu/core/groupby_wrapper.cpp | 4 + storage/tianmu/core/joiner.h | 26 ++ storage/tianmu/core/joiner_general.cpp | 242 ++++++++++++++---- .../tianmu/core/value_matching_hashtable.cpp | 42 +++ .../tianmu/core/value_matching_hashtable.h | 2 + storage/tianmu/core/value_matching_table.cpp | 12 +- storage/tianmu/core/value_matching_table.h | 7 +- 15 files changed, 445 insertions(+), 138 deletions(-) diff --git a/sql/table.h b/sql/table.h index ddb5324cc..15331160d 100644 --- a/sql/table.h +++ b/sql/table.h @@ -3052,6 +3052,14 @@ inline bool is_perfschema_db(const char *name) */ inline bool belongs_to_p_s(TABLE_LIST *tl) { + if (!tl->db) { + return false; + } + + if (!tl->table_name) { + return false; + } + return (!strcmp("performance_schema", tl->db) && strcmp(tl->table_name, "threads") && strstr(tl->table_name, "setup_") == NULL); diff --git a/storage/tianmu/core/aggregation_algorithm.cpp b/storage/tianmu/core/aggregation_algorithm.cpp index 46e25bd71..356fab6f9 100644 --- a/storage/tianmu/core/aggregation_algorithm.cpp +++ b/storage/tianmu/core/aggregation_algorithm.cpp @@ -194,7 +194,7 @@ void AggregationAlgorithm::Aggregate(bool just_distinct, int64_t &limit, int64_t } } else { int64_t local_limit = limit == -1 ? upper_approx_of_groups : limit; - MultiDimensionalGroupByScan(gbw, local_limit, offset, sender, limit_less_than_no_groups); + MultiDimensionalGroupByScan(gbw, local_limit, offset, sender, limit_less_than_no_groups, true); if (limit != -1) limit = local_limit; } t->ClearMultiIndexP(); // cleanup (i.e. regarded as materialized, @@ -205,7 +205,8 @@ void AggregationAlgorithm::Aggregate(bool just_distinct, int64_t &limit, int64_t } void AggregationAlgorithm::MultiDimensionalGroupByScan(GroupByWrapper &gbw, int64_t &limit, int64_t &offset, - ResultSender *sender, bool limit_less_than_no_groups) { + ResultSender *sender, bool limit_less_than_no_groups, + bool force_parall) { MEASURE_FET("TempTable::MultiDimensionalGroupByScan(...)"); bool first_pass = true; // tuples are numbered according to tuple_left filter (not used, if tuple_left @@ -234,9 +235,19 @@ void AggregationAlgorithm::MultiDimensionalGroupByScan(GroupByWrapper &gbw, int6 } gbw.SetDistinctTuples(mit.NumOfTuples()); + auto get_thd_cnt = []() { + int hardware_concurrency = std::thread::hardware_concurrency(); + // TODO: The original code was the number of CPU cores divided by 4, and the reason for that is to be traced further + return hardware_concurrency > 4 ? (hardware_concurrency / 4) : 1; + }; + int thd_cnt = 1; - if (ParallelAllowed(gbw) && !limit_less_than_no_groups) { - thd_cnt = std::thread::hardware_concurrency() / 4; // For concurrence reason, don't swallow all cores once. + if (force_parall) { + thd_cnt = get_thd_cnt(); + } else { + if (ParallelAllowed(gbw) && !limit_less_than_no_groups) { + thd_cnt = get_thd_cnt(); // For concurrence reason, don't swallow all cores once. + } } AggregationWorkerEnt ag_worker(gbw, mind, thd_cnt, this); @@ -278,13 +289,13 @@ void AggregationAlgorithm::MultiDimensionalGroupByScan(GroupByWrapper &gbw, int6 // Grouping on a packrow int64_t packrow_length = mit.GetPackSizeLeft(); - int grouping_result = AggregatePackrow(gbw, &mit, cur_tuple); + AggregaGroupingResult grouping_result = AggregatePackrow(gbw, &mit, cur_tuple); if (sender) { sender->SetAffectRows(gbw.NumOfGroups()); } - if (grouping_result == 2) throw common::KilledException(); - if (grouping_result != 5) packrows_found++; // for statistics - if (grouping_result == 1) break; // end of the aggregation + if (grouping_result == AggregaGroupingResult::AGR_KILLED) throw common::KilledException(); + if (grouping_result != AggregaGroupingResult::AGR_NO_LEFT) packrows_found++; // for statistics + if (grouping_result == AggregaGroupingResult::AGR_FINISH) break; // end of the aggregation if (!gbw.IsFull() && gbw.MemoryBlocksLeft() == 0) { gbw.SetAsFull(); } @@ -361,7 +372,7 @@ void AggregationAlgorithm::MultiDimensionalGroupByScan(GroupByWrapper &gbw, int6 displayed_no_groups = t->NumOfObj(); if (t->NumOfObj() >= limit) break; if (gbw.AnyTuplesLeft()) gbw.ClearUsed(); // prepare for the next pass, if needed - } while (gbw.AnyTuplesLeft()); // do the next pass, if anything left + } while (gbw.AnyTuplesLeft() && (1 == thd_cnt)); // do the next pass, if anything left } catch (...) { ag_worker.Commit(false); throw; @@ -468,11 +479,12 @@ void AggregationAlgorithm::MultiDimensionalDistinctScan(GroupByWrapper &gbw, MII } } -int AggregationAlgorithm::AggregatePackrow(GroupByWrapper &gbw, MIIterator *mit, int64_t cur_tuple) { +AggregaGroupingResult AggregationAlgorithm::AggregatePackrow(GroupByWrapper &gbw, MIIterator *mit, int64_t cur_tuple) { + std::scoped_lock guard(mtx); int64_t packrow_length = mit->GetPackSizeLeft(); if (!gbw.AnyTuplesLeft(cur_tuple, cur_tuple + packrow_length - 1)) { mit->NextPackrow(); - return 5; + return AggregaGroupingResult::AGR_NO_LEFT; } int64_t uniform_pos = common::NULL_VALUE_64; bool skip_packrow = false; @@ -509,7 +521,7 @@ int AggregationAlgorithm::AggregatePackrow(GroupByWrapper &gbw, MIIterator *mit, if (gbw.NumOfGroupingAttrs() == gbw.NumOfAttrs() // just DISTINCT without grouping || stop_all) { // or aggregation already done on rough level gbw.TuplesResetAll(); // no more rows needed, just produce output - return 1; // aggregation finished + return AggregaGroupingResult::AGR_FINISH; // aggregation finished } } if (skip_packrow) @@ -522,7 +534,7 @@ int AggregationAlgorithm::AggregatePackrow(GroupByWrapper &gbw, MIIterator *mit, if (packrow_done || skip_packrow) { mit->NextPackrow(); - return 0; // success - roughly omitted + return AggregaGroupingResult::AGR_OK; // success - roughly omitted } // bool require_locking_ag = true; // a new packrow, @@ -530,7 +542,7 @@ int AggregationAlgorithm::AggregatePackrow(GroupByWrapper &gbw, MIIterator *mit, // common::NULL_VALUE_64); // do not lock if the grouping row is uniform while (mit->IsValid()) { // becomes invalid on pack end - if (m_conn->Killed()) return 2; // killed + if (m_conn->Killed()) return AggregaGroupingResult::AGR_KILLED; // killed if (gbw.TuplesGet(cur_tuple)) { if (require_locking_gr) { for (int gr_a = 0; gr_a < gbw.NumOfGroupingAttrs(); gr_a++) @@ -560,7 +572,7 @@ int AggregationAlgorithm::AggregatePackrow(GroupByWrapper &gbw, MIIterator *mit, gbw.SetAllGroupsFound(); if (gbw.NumOfGroupingAttrs() == gbw.NumOfAttrs()) { // just DISTINCT without grouping gbw.TuplesResetAll(); // no more rows needed, just produce output - return 1; // aggregation finished + return AggregaGroupingResult::AGR_FINISH; // aggregation finished } } } @@ -577,7 +589,9 @@ int AggregationAlgorithm::AggregatePackrow(GroupByWrapper &gbw, MIIterator *mit, for (int gr_a = gbw.NumOfGroupingAttrs(); gr_a < gbw.NumOfAttrs(); gr_a++) if (gbw.ColumnNotOmitted(gr_a)) { bool value_successfully_aggregated = gbw.PutAggregatedValue(gr_a, pos, *mit, factor); - if (!value_successfully_aggregated) gbw.DistinctlyOmitted(gr_a, cur_tuple); + if (!value_successfully_aggregated) { + gbw.DistinctlyOmitted(gr_a, cur_tuple); + } } } } @@ -587,7 +601,7 @@ int AggregationAlgorithm::AggregatePackrow(GroupByWrapper &gbw, MIIterator *mit, if (mit->PackrowStarted()) break; } gbw.CommitResets(); - return 0; // success + return AggregaGroupingResult::AGR_OK; // success } void AggregationAlgorithm::AggregateFillOutput(GroupByWrapper &gbw, int64_t gt_pos, int64_t &omit_by_offset) { @@ -813,23 +827,27 @@ void AggregationAlgorithm::TaskFillOutput(GroupByWrapper *gbw, Transaction *ci, } } -void AggregationWorkerEnt::TaskAggrePacks(MIUpdatingIterator *taskIterator, [[maybe_unused]] DimensionVector *dims, - [[maybe_unused]] MIIterator *mit, [[maybe_unused]] int pstart, - [[maybe_unused]] int pend, int tuple, GroupByWrapper *gbw, Transaction *ci) { - int i = 0; - int64_t cur_tuple = tuple; - common::SetMySQLTHD(ci->Thd()); - current_txn_ = ci; +void AggregationWorkerEnt::TaskAggrePacks(MIIterator *taskIterator, [[maybe_unused]] DimensionVector *dims, + [[maybe_unused]] MIIterator *mit, [[maybe_unused]] CTask *task, + GroupByWrapper *gbw, Transaction *ci) { + taskIterator->Rewind(); + int task_pack_num = 0; while (taskIterator->IsValid()) { - int64_t packrow_length = taskIterator->GetPackSizeLeft(); - int grouping_result = aa->AggregatePackrow(*gbw, taskIterator, cur_tuple); - if (grouping_result != 5) i++; - if (grouping_result == 1) break; - if (grouping_result == 2) throw common::KilledException(); - if (grouping_result == 3 || grouping_result == 4) throw common::NotImplementedException("Aggregation overflow."); - cur_tuple += packrow_length; - } - TIANMU_LOG(LogCtl_Level::DEBUG, "TaskAggrePacks routine ends. Task id %d", taskIterator->GetTaskNum()); + if ((task_pack_num >= task->dwStartPackno) && (task_pack_num <= task->dwEndPackno)) { + + int cur_tuple = (*task->dwPack2cur)[task_pack_num]; + MIInpackIterator mii(*taskIterator); + AggregaGroupingResult grouping_result = aa->AggregatePackrow(*gbw, &mii, cur_tuple); + if (grouping_result == AggregaGroupingResult::AGR_FINISH) break; + if (grouping_result == AggregaGroupingResult::AGR_KILLED) throw common::KilledException(); + if (grouping_result == AggregaGroupingResult::AGR_OVERFLOW || + grouping_result == AggregaGroupingResult::AGR_OTHER_ERROR) + throw common::NotImplementedException("Aggregation overflow."); + } + + taskIterator->NextPackrow(); + ++task_pack_num; + } } void AggregationWorkerEnt::PrepShardingCopy(MIIterator *mit, GroupByWrapper *gb_sharding, @@ -852,83 +870,87 @@ void AggregationWorkerEnt::DistributeAggreTaskAverage(MIIterator &mit) { std::vector> vGBW; vGBW.reserve(m_threads); vTask.reserve(m_threads); - if (rc_control_.isOn()) rc_control_.lock(conn->GetThreadID()) << "Prepare data for parallel aggreation" << system::unlock; + if (rc_control_.isOn()) + rc_control_.lock(conn->GetThreadID()) << "Prepare data for parallel aggreation" << system::unlock; int packnum = 0; + int curtuple_index = 0; + std::unordered_map pack2cur; while (mit.IsValid()) { + pack2cur.emplace(std::pair(packnum, curtuple_index)); + + int64_t packrow_length = mit.GetPackSizeLeft(); + curtuple_index += packrow_length; packnum++; mit.NextPackrow(); } - int loopcnt = (packnum < m_threads) ? packnum : m_threads; + pack2cur.emplace(std::pair(packnum, curtuple_index)); + int loopcnt = (packnum < m_threads) ? packnum : m_threads; int mod = packnum % loopcnt; int num = packnum / loopcnt; + utils::result_set res; for (int i = 0; i < loopcnt; ++i) { - res.insert(ha_rcengine_->query_thread_pool.add_task(&AggregationWorkerEnt::PrepShardingCopy, this, &mit, gb_main, &vGBW)); + res.insert( + ha_rcengine_->query_thread_pool.add_task(&AggregationWorkerEnt::PrepShardingCopy, this, &mit, gb_main, &vGBW)); + + int pack_start = i * num; + int pack_end = 0; + int dwPackNum = 0; + if (i == (loopcnt - 1)) { + pack_end = packnum; + dwPackNum = packnum; + } else { + pack_end = (i + 1) * num - 1; + dwPackNum = pack_end + 1; + } + + int cur_start = pack2cur[pack_start]; + int cur_end = pack2cur[pack_end] - 1; + CTask tmp; tmp.dwTaskId = i; - tmp.dwPackNum = num + mod + i * num; + tmp.dwPackNum = dwPackNum; + tmp.dwStartPackno = pack_start; + tmp.dwEndPackno = pack_end; + tmp.dwStartTuple = cur_start; + tmp.dwEndTuple = cur_end; + tmp.dwTuple = cur_start; + tmp.dwPack2cur = &pack2cur; + vTask.push_back(tmp); } res.get_all_with_except(); - if (rc_control_.isOn()) - rc_control_.lock(conn->GetThreadID()) << "Prepare data for parallel aggreation done. Total packnum " << packnum - << system::unlock; - packnum = 0; mit.Rewind(); - int curtuple = 0; - while (mit.IsValid()) { - int64_t packrow_length = mit.GetPackSizeLeft(); - packnum++; - curtuple += packrow_length; - for (auto &it : vTask) { - if (packnum == it.dwPackNum) { - it.dwEndPackno = mit.GetCurPackrow(0); - it.dwTuple = curtuple; - } - } - - if (rc_control_.isOn()) - rc_control_.lock(conn->GetThreadID()) << " GetCurPackrow: " << mit.GetCurPackrow(0) << " packnum: " << packnum - << " cur_tuple: " << curtuple << system::unlock; - - mit.NextPackrow(); - } - std::vector mis; - mis.reserve(vTask.size()); - - std::vector taskIterator; + std::vector taskIterator; taskIterator.reserve(vTask.size()); utils::result_set res1; for (uint i = 0; i < vTask.size(); ++i) { - auto &mi = mis.emplace_back(*mind, true); if (dims.NoDimsUsed() == 0) dims.SetAll(); - auto &mii = taskIterator.emplace_back(&mi, dims); + auto &mii = taskIterator.emplace_back(mit, true); mii.SetTaskNum(vTask.size()); mii.SetTaskId(i); - mii.SetNoPacksToGo(vTask[i].dwEndPackno); - mii.RewindToPack((i == 0) ? 0 : vTask[i - 1].dwEndPackno + 1); } - res1.insert(ha_rcengine_->query_thread_pool.add_task(&AggregationWorkerEnt::TaskAggrePacks, this, &taskIterator[0], &dims, - &mit, 0, vTask[0].dwEndPackno, 0, gb_main, conn)); - for (size_t i = 1; i < vTask.size(); ++i) { - res1.insert(ha_rcengine_->query_thread_pool.add_task(&AggregationWorkerEnt::TaskAggrePacks, this, &taskIterator[i], &dims, - &mit, vTask[i - 1].dwEndPackno + 1, vTask[i].dwEndPackno, - vTask[i - 1].dwTuple, vGBW[i].get(), conn)); + for (size_t i = 0; i < vTask.size(); ++i) { + GroupByWrapper *gbw = i == 0 ? gb_main : vGBW[i].get(); + res1.insert(ha_rcengine_->query_thread_pool.add_task(&AggregationWorkerEnt::TaskAggrePacks, this, &taskIterator[i], + &dims, &mit, &vTask[i], gbw, conn)); } res1.get_all_with_except(); for (size_t i = 0; i < vTask.size(); ++i) { // Merge aggreation data together - if (i != 0) gb_main->Merge(*(vGBW[i])); + if (i != 0) { + aa->MultiDimensionalDistinctScan(*(vGBW[i]), mit); + gb_main->Merge(*(vGBW[i])); + } } } - } // namespace core } // namespace Tianmu diff --git a/storage/tianmu/core/aggregation_algorithm.h b/storage/tianmu/core/aggregation_algorithm.h index 4b4d1092d..2bf2b07e5 100644 --- a/storage/tianmu/core/aggregation_algorithm.h +++ b/storage/tianmu/core/aggregation_algorithm.h @@ -23,9 +23,19 @@ #include "core/mi_iterator.h" #include "core/query.h" #include "core/temp_table.h" +#include "core/ctask.h" namespace Tianmu { namespace core { +enum class AggregaGroupingResult { + AGR_OK = 0, // success + AGR_FINISH = 1, // finish + AGR_KILLED = 2, // killed + AGR_OVERFLOW = 3, // overflow + AGR_OTHER_ERROR = 4, // other error + AGR_NO_LEFT = 5 // pack already aggregated +}; + class AggregationAlgorithm { public: AggregationAlgorithm(TempTable *tt) @@ -37,13 +47,13 @@ class AggregationAlgorithm { bool &ag_not_changeabe, bool &stop_all, int64_t &uniform_pos, int64_t rows_in_pack, int64_t local_factor, int just_one_aggr = -1); void MultiDimensionalGroupByScan(GroupByWrapper &gbw, int64_t &limit, int64_t &offset, ResultSender *sender, - bool limit_less_than_no_groups); + bool limit_less_than_no_groups, bool force_parall); void MultiDimensionalDistinctScan(GroupByWrapper &gbw, MIIterator &mit); void AggregateFillOutput(GroupByWrapper &gbw, int64_t gt_pos, int64_t &omit_by_offset); // Return code for AggregatePackrow: 0 - success, 1 - stop aggregation // (finished), 5 - pack already aggregated (skip) - int AggregatePackrow(GroupByWrapper &gbw, MIIterator *mit, int64_t cur_tuple); + AggregaGroupingResult AggregatePackrow(GroupByWrapper &gbw, MIIterator *mit, int64_t cur_tuple); // No parallel for subquery/join/distinct cases bool ParallelAllowed(GroupByWrapper &gbw) { @@ -77,18 +87,18 @@ class AggregationWorkerEnt { // Return code for AggregatePackrow: 0 - success, 1 - stop aggregation // (finished), 2 - killed, 3 // - overflow, 4 - other error, 5 - pack already aggregated (skip) - int AggregatePackrow(MIUpdatingIterator &lmit, int64_t cur_tuple) { + AggregaGroupingResult AggregatePackrow(MIIterator &lmit, int64_t cur_tuple) { return aa->AggregatePackrow(*gb_main, &lmit, cur_tuple); } - int AggregatePackrow(MIInpackIterator &lmit, int64_t cur_tuple) { + AggregaGroupingResult AggregatePackrow(MIInpackIterator &lmit, int64_t cur_tuple) { return aa->AggregatePackrow(*gb_main, &lmit, cur_tuple); } void Commit([[maybe_unused]] bool do_merge = true) { gb_main->CommitResets(); } void ReevaluateNumberOfThreads([[maybe_unused]] MIIterator &mit) {} int ThreadsUsed() { return m_threads; } void Barrier() {} - void TaskAggrePacks(MIUpdatingIterator *taskIterator, DimensionVector *dims, MIIterator *mit, int pstart, int pend, - int tuple, GroupByWrapper *gbw, Transaction *ci); + void TaskAggrePacks(MIIterator *taskIterator, DimensionVector *dims, MIIterator *mit, CTask *task, + GroupByWrapper *gbw, Transaction *ci); void DistributeAggreTaskAverage(MIIterator &mit); void PrepShardingCopy(MIIterator *mit, GroupByWrapper *gb_sharding, std::vector> *vGBW); diff --git a/storage/tianmu/core/ctask.h b/storage/tianmu/core/ctask.h index 9a4b2d1a2..e168c92f7 100644 --- a/storage/tianmu/core/ctask.h +++ b/storage/tianmu/core/ctask.h @@ -18,6 +18,8 @@ #define TIANMU_CORE_CTASK_H_ #pragma once +#include + namespace Tianmu { namespace core { struct CTask { @@ -26,7 +28,19 @@ struct CTask { int dwEndPackno; // the last packno of this task int dwTuple; // previous task + this task Actual process tuple int dwStartPackno; - CTask() : dwTaskId(0), dwPackNum(0), dwEndPackno(0), dwTuple(0), dwStartPackno(0) {} + int dwStartTuple; + int dwEndTuple; + std::unordered_map* dwPack2cur; + CTask() + : dwTaskId(0), + dwPackNum(0), + dwEndPackno(0), + dwTuple(0), + dwStartPackno(0), + dwStartTuple(0), + dwEndTuple(0), + dwPack2cur(nullptr) + {} }; } // namespace core } // namespace Tianmu diff --git a/storage/tianmu/core/group_distinct_table.cpp b/storage/tianmu/core/group_distinct_table.cpp index 689913484..029011c07 100644 --- a/storage/tianmu/core/group_distinct_table.cpp +++ b/storage/tianmu/core/group_distinct_table.cpp @@ -19,6 +19,7 @@ #include "core/mi_iterator.h" #include "core/transaction.h" +#include "core/value_matching_hashtable.h" namespace Tianmu { namespace core { @@ -175,7 +176,8 @@ GDTResult GroupDistinctTable::Add(int64_t group, MIIterator &mit) { group += 1; // offset; 0 means empty position std::memmove(input_buffer, (unsigned char *)(&group), group_bytes); encoder->Encode(input_buffer + group_bytes, mit, NULL, true); - return FindCurrentRow(); + // return FindCurrentRow(); + return FindCurrentRowByVMTable(); } GDTResult GroupDistinctTable::Add(int64_t group, int64_t val) // numeric values @@ -238,6 +240,18 @@ void GroupDistinctTable::ValueFromInput(types::BString &v) // decode original v v = encoder->GetValueT(input_buffer + group_bytes, mit); } +GDTResult GroupDistinctTable::FindCurrentRowByVMTable() +{ + int64_t row = 0; + bool existed = vm_tab->FindCurrentRow(input_buffer, row, true, total_width); + + if (existed) { + return GDTResult::GDT_EXISTS; + } + + return GDTResult::GBIMODE_AS_TEXT; +} + GDTResult GroupDistinctTable::FindCurrentRow(bool find_only) // find / insert the current buffer { DEBUG_ASSERT(!filter_implementation); diff --git a/storage/tianmu/core/group_distinct_table.h b/storage/tianmu/core/group_distinct_table.h index d22045b7e..4cc9ba540 100644 --- a/storage/tianmu/core/group_distinct_table.h +++ b/storage/tianmu/core/group_distinct_table.h @@ -23,6 +23,7 @@ #include "core/column_bin_encoder.h" #include "core/filter.h" #include "vc/virtual_column.h" +#include "core/value_matching_table.h" namespace Tianmu { namespace core { @@ -52,6 +53,7 @@ class GroupDistinctTable : public mm::TraceableObject { // vector of bytes) void InitializeVC(int64_t max_no_groups, vcolumn::VirtualColumn *vc, int64_t max_no_rows, int64_t max_bytes, bool decodable); + void CopyFromValueMatchingTable(ValueMatchingTable *vt) { vm_tab.reset(vt->Clone()); }; // Assumption: group >= 0 GDTResult Add(int64_t group, @@ -75,6 +77,7 @@ class GroupDistinctTable : public mm::TraceableObject { private: GDTResult FindCurrentRow(bool find_only = false); // find / insert the current buffer + GDTResult FindCurrentRowByVMTable(); bool RowEmpty(unsigned char *p) // is this position empty? only if it starts with zeros { return (std::memcmp(&zero_const, p, group_bytes) == 0); @@ -113,6 +116,7 @@ class GroupDistinctTable : public mm::TraceableObject { bool filter_implementation; Filter *f; int64_t group_factor; // (g, v) -> f( g + group_factor * v ) + std::unique_ptr vm_tab; }; } // namespace core } // namespace Tianmu diff --git a/storage/tianmu/core/group_table.cpp b/storage/tianmu/core/group_table.cpp index a1f09147b..2e95cdbd9 100644 --- a/storage/tianmu/core/group_table.cpp +++ b/storage/tianmu/core/group_table.cpp @@ -340,7 +340,7 @@ void GroupTable::Initialize(int64_t max_no_groups, bool parallel_allowed) { vm_tab.reset(ValueMatchingTable::CreateNew_ValueMatchingTable(primary_total_size, declared_max_no_groups, max_group_code, total_width, grouping_and_UTF_width, - grouping_buf_width, p_power)); + grouping_buf_width, p_power, false)); input_buffer.resize(grouping_and_UTF_width); // pre-allocation of distinct memory @@ -362,6 +362,7 @@ void GroupTable::Initialize(int64_t max_no_groups, bool parallel_allowed) { gdistinct[i]->InitializeVC(vm_tab->RowNumberScope(), vc[i], desc.max_no_values, distinct_size / no_columns_with_distinct, (operation[i] != GT_Aggregation::GT_COUNT_NOT_NULL)); // otherwise must be decodable + gdistinct[i]->CopyFromValueMatchingTable(vm_tab.get()); distinct_size -= gdistinct[i]->BytesTaken(); no_columns_with_distinct--; } @@ -537,10 +538,10 @@ bool GroupTable::PutAggregatedValue(int col, int64_t row, MIIterator &mit, int64 GDTResult res = gdistinct[col]->Add(row, mit); if (res == GDTResult::GDT_EXISTS) return true; // value found, do not aggregate it again if (res == GDTResult::GDT_FULL) { - if (gdistinct[col]->AlreadyFull()) - not_full = false; // disable also the main grouping table (if it is a + //if (gdistinct[col]->AlreadyFull()) + //not_full = false; // disable also the main grouping table (if it is a // persistent rejection) - return false; // value not found in DISTINCT buffer, which is already + //return false; // value not found in DISTINCT buffer, which is already // full } factor = 1; // ignore repetitions for distinct diff --git a/storage/tianmu/core/group_table.h b/storage/tianmu/core/group_table.h index 92432a622..6c6d5a323 100644 --- a/storage/tianmu/core/group_table.h +++ b/storage/tianmu/core/group_table.h @@ -228,6 +228,7 @@ class GroupTable : public mm::TraceableObject { // some memory managing int64_t max_total_size; + std::mutex mtx; }; } // namespace core } // namespace Tianmu diff --git a/storage/tianmu/core/groupby_wrapper.cpp b/storage/tianmu/core/groupby_wrapper.cpp index c4bfaff99..87d9cedd7 100644 --- a/storage/tianmu/core/groupby_wrapper.cpp +++ b/storage/tianmu/core/groupby_wrapper.cpp @@ -633,6 +633,10 @@ bool GroupByWrapper::IsMaxOnly() // true, if an attribute is max(column), and } void GroupByWrapper::InitTupleLeft(int64_t n) { + if (tuple_left) { + delete tuple_left; + tuple_left = NULL; + } DEBUG_ASSERT(tuple_left == NULL); tuple_left = new Filter(n, p_power); tuple_left->Set(); diff --git a/storage/tianmu/core/joiner.h b/storage/tianmu/core/joiner.h index 05510d625..e03a44480 100644 --- a/storage/tianmu/core/joiner.h +++ b/storage/tianmu/core/joiner.h @@ -21,6 +21,7 @@ #include "core/condition.h" #include "core/descriptor.h" #include "core/mi_iterator.h" +#include "core/ctask.h" namespace Tianmu { namespace core { @@ -95,6 +96,31 @@ class JoinerGeneral : public TwoDimensionalJoiner { protected: void ExecuteOuterJoinLoop(Condition &cond, MINewContents &new_mind, DimensionVector &all_dims, DimensionVector &outer_dims, int64_t &tuples_in_output, int64_t output_limit); + + // Instead of the original inner Join function block, + // as the top-level call of inner Join, + // internal split multiple threads to separate different subsets for processing + void ExecuteInnerJoinLoop(MIIterator &mit, Condition &cond, MINewContents &new_mind, DimensionVector &all_dims, + std::vector &pack_desc_locked, int64_t &tuples_in_output, int64_t limit, + bool count_only); + + // Handles each row in the Pack that the current iterator points to + // TODO: Keep in mind that internal Pack reads will have cache invalidation during multithread switching, + // leaving the second phase to continue processing the split of the house storage layer + void ExecuteInnerJoinPackRow(MIIterator *mii, CTask *task, Condition *cond, MINewContents *new_mind, + DimensionVector *all_dims, + std::vector *pack_desc_locked, int64_t *tuples_in_output, int64_t limit, + bool count_only, + bool *stop_execution, int64_t *rows_passed, int64_t *rows_omitted); + + // The purpose of this function is to process the split task in a separate thread + void TaskInnerJoinPacks(MIIterator *taskIterator, CTask *task, Condition *cond, MINewContents *new_mind, + DimensionVector *all_dims, std::vector *pack_desc_locked, int64_t *tuples_in_output, + int64_t limit, bool count_only, + bool *stop_execution, int64_t *rows_passed, int64_t *rows_omitted); + + private: + std::mutex mtx; }; } // namespace core } // namespace Tianmu diff --git a/storage/tianmu/core/joiner_general.cpp b/storage/tianmu/core/joiner_general.cpp index 66d4e3f8a..d4a852d64 100644 --- a/storage/tianmu/core/joiner_general.cpp +++ b/storage/tianmu/core/joiner_general.cpp @@ -21,6 +21,7 @@ #include "core/mi_updating_iterator.h" #include "core/transaction.h" #include "vc/virtual_column.h" +#include "core/ctask.h" namespace Tianmu { namespace core { @@ -70,52 +71,8 @@ void JoinerGeneral::ExecuteJoinConditions(Condition &cond) { if (!outer_dims.IsEmpty()) ExecuteOuterJoinLoop(cond, new_mind, all_dims, outer_dims, tuples_in_output, tips.limit); else { - while (mit.IsValid() && !stop_execution) { - if (mit.PackrowStarted()) { - bool omit_this_packrow = false; - for (int i = 0; (i < no_desc && !omit_this_packrow); i++) - if (cond[i].EvaluateRoughlyPack(mit) == common::RSValue::RS_NONE) omit_this_packrow = true; - for (int i = 0; i < no_desc; i++) pack_desc_locked[i] = false; // delay locking - if (new_mind.NoMoreTuplesPossible()) - break; // stop the join if nothing new may be obtained in some - // optimized cases - if (omit_this_packrow) { - rows_omitted += mit.GetPackSizeLeft(); - rows_passed += mit.GetPackSizeLeft(); - mit.NextPackrow(); - continue; - } - } - loc_result = true; - for (int i = 0; (i < no_desc && loc_result); i++) { - if (!pack_desc_locked[i]) { // delayed locking - maybe will not be - // locked at all? - cond[i].LockSourcePacks(mit); - pack_desc_locked[i] = true; - } - if (types::RequiresUTFConversions(cond[i].GetCollation())) { - if (cond[i].CheckCondition_UTF(mit) == false) loc_result = false; - } else { - if (cond[i].CheckCondition(mit) == false) loc_result = false; - } - } - if (loc_result) { - if (!tips.count_only) { - for (int i = 0; i < mind->NumOfDimensions(); i++) - if (all_dims[i]) new_mind.SetNewTableValue(i, mit[i]); - new_mind.CommitNewTableValues(); - } - tuples_in_output++; - } - ++mit; - rows_passed++; - if (m_conn->Killed()) throw common::KilledException(); - if (tips.limit > -1 && tuples_in_output >= tips.limit) stop_execution = true; - } + ExecuteInnerJoinLoop(mit, cond, new_mind, all_dims, pack_desc_locked, tuples_in_output, tips.limit, tips.count_only); } - if (rows_passed > 0 && rows_omitted > 0) - rc_control_.lock(m_conn->GetThreadID()) - << "Roughly omitted " << int(rows_omitted / double(rows_passed) * 1000) / 10.0 << "% rows." << system::unlock; // Postprocessing and cleanup if (tips.count_only) new_mind.CommitCountOnly(tuples_in_output); @@ -128,6 +85,201 @@ void JoinerGeneral::ExecuteJoinConditions(Condition &cond) { why_failed = JoinFailure::NOT_FAILED; } +// Handles each row in the Pack that the current iterator points to +// TODO: Keep in mind that internal Pack reads will have cache invalidation during multithread switching, +// leaving the second phase to continue processing the split of the house storage layer +void JoinerGeneral::ExecuteInnerJoinPackRow(MIIterator *mii, CTask *task, Condition *cond, MINewContents *new_mind, + DimensionVector *all_dims, std::vector *_pack_desc_locked, + int64_t *tuples_in_output, int64_t limit, bool count_only, + bool *stop_execution, + int64_t *rows_passed, int64_t *rows_omitted) +{ + std::scoped_lock guard(mtx); + int no_desc = (*cond).Size(); + + std::vector pack_desc_locked; + pack_desc_locked.reserve(no_desc); + for (int i = 0; (i < no_desc); i++) { + pack_desc_locked.emplace_back(false); + } + + int index = 0; + while (mii->IsValid() && !*stop_execution) { + if (mii->PackrowStarted()) { + bool omit_this_packrow = false; + for (int i = 0; (i < no_desc && !omit_this_packrow); i++) + if ((*cond)[i].EvaluateRoughlyPack(*mii) == common::RSValue::RS_NONE) omit_this_packrow = true; + for (int i = 0; i < no_desc; i++) pack_desc_locked[i] = false; // delay locking + if (new_mind->NoMoreTuplesPossible()) + break; // stop the join if nothing new may be obtained in some optimized cases + if (omit_this_packrow) { + (*rows_omitted) += mii->GetPackSizeLeft(); + (*rows_passed) += mii->GetPackSizeLeft(); + mii->NextPackrow(); + continue; + } + } +{ + bool loc_result = true; + for (int i = 0; (i < no_desc && loc_result); i++) { + if (!pack_desc_locked[i]) { + (*cond)[i].LockSourcePacks(*mii); + pack_desc_locked[i] = true; + } + + if (types::RequiresUTFConversions((*cond)[i].GetCollation())) { + if ((*cond)[i].CheckCondition_UTF(*mii) == false) loc_result = false; + } else { + if ((*cond)[i].CheckCondition(*mii) == false) loc_result = false; + } + } + + if (loc_result) { + if (!count_only) { + for (int i = 0; i < mind->NumOfDimensions(); i++) + if ((*all_dims)[i]) new_mind->SetNewTableValue(i, (*mii)[i]); + new_mind->CommitNewTableValues(); + } + (*tuples_in_output)++; + } + + (*rows_passed)++; + if (m_conn->Killed()) throw common::KilledException(); + if (limit > -1 && *tuples_in_output >= limit) *stop_execution = true; + + + mii->Increment(); + if (mii->PackrowStarted()) break; + } + + ++index; + } +} + +// The purpose of this function is to process the split task in a separate thread +void JoinerGeneral::TaskInnerJoinPacks(MIIterator *taskIterator, CTask *task, Condition *cond, MINewContents *new_mind, + DimensionVector *all_dims, std::vector *pack_desc_locked_p, + int64_t *tuples_in_output, int64_t limit, bool count_only, bool *stop_execution, + int64_t *rows_passed, int64_t *rows_omitted) +{ + int no_desc = (*cond).Size(); + + std::vector pack_desc_locked; + pack_desc_locked.reserve(no_desc); + for (int i = 0; i < no_desc; i++) { + pack_desc_locked.push_back(false); + } + + Condition cd(*cond); + + taskIterator->Rewind(); + int task_pack_num = 0; + while (taskIterator->IsValid() && !(*stop_execution)) { + if ((task_pack_num >= task->dwStartPackno) && (task_pack_num <= task->dwEndPackno)) { + MIInpackIterator mii(*taskIterator); + + ExecuteInnerJoinPackRow(&mii, task, &cd, new_mind, all_dims, &pack_desc_locked, tuples_in_output, limit, + count_only, + stop_execution, rows_passed, rows_omitted); + } + + taskIterator->NextPackrow(); + ++task_pack_num; + } +} + +// Instead of the original inner Join function block, +// as the top-level call of inner Join, +// internal split multiple threads to separate different subsets for processing +void JoinerGeneral::ExecuteInnerJoinLoop(MIIterator &mit, Condition &cond, MINewContents &new_mind, + DimensionVector &all_dims, std::vector &pack_desc_locked, + int64_t &tuples_in_output, int64_t limit, bool count_only) { + + int packnum = 0; + while (mit.IsValid()) { + int64_t packrow_length = mit.GetPackSizeLeft(); + packnum++; + mit.NextPackrow(); + } + + int hardware_concurrency = std::thread::hardware_concurrency(); + // TODO: The original code was the number of CPU cores divided by 4, and the reason for that is to be traced further + int threads_num = hardware_concurrency > 4 ? (hardware_concurrency / 4) : 1; + + int loopcnt = 0; + int mod = 0; + int num = 0; + + do { + loopcnt = (packnum < threads_num) ? packnum : threads_num; + mod = packnum % loopcnt; + num = packnum / loopcnt; + + --threads_num; + } while ((num <= 1) && (threads_num >= 1)); + + TIANMU_LOG(LogCtl_Level::DEBUG, "ExecuteInnerJoinLoop packnum: %d threads_num: %d loopcnt: %d num: %d mod: %d", + packnum, threads_num, + loopcnt, num, mod); + + std::vector vTask; + vTask.reserve(loopcnt); + + for (int i = 0; i < loopcnt; ++i) { + int pack_start = i * num; + int pack_end = 0; + int dwPackNum = 0; + if (i == (loopcnt - 1)) { + pack_end = packnum; + dwPackNum = packnum; + } else { + pack_end = (i + 1) * num - 1; + dwPackNum = pack_end + 1; + } + + CTask tmp; + tmp.dwTaskId = i; + tmp.dwPackNum = dwPackNum; + tmp.dwStartPackno = pack_start; + tmp.dwEndPackno = pack_end; + + TIANMU_LOG(LogCtl_Level::DEBUG, "ExecuteInnerJoinLoop dwTaskId: %d dwStartPackno: %d dwEndPackno: %d", tmp.dwTaskId, + tmp.dwStartPackno, tmp.dwEndPackno); + + vTask.push_back(tmp); + } + + int64_t rows_passed = 0; + int64_t rows_omitted = 0; + int no_desc = cond.Size(); + bool stop_execution = false; + int index = 0; + + mit.Rewind(); + + std::vector taskIterator; + taskIterator.reserve(vTask.size()); + + for (uint i = 0; i < vTask.size(); ++i) { + auto &mii = taskIterator.emplace_back(mit, true); + mii.SetTaskNum(vTask.size()); + mii.SetTaskId(i); + } + + utils::result_set res; + for (size_t i = 0; i < vTask.size(); ++i) { + res.insert( + ha_rcengine_->query_thread_pool.add_task(&JoinerGeneral::TaskInnerJoinPacks, this, &taskIterator[i], &vTask[i], &cond, &new_mind, &all_dims, + &pack_desc_locked, &tuples_in_output, limit, count_only, &stop_execution, &rows_passed, &rows_omitted)); + } + + res.get_all_with_except(); + + if (rows_passed > 0 && rows_omitted > 0) + rc_control_.lock(m_conn->GetThreadID()) + << "Roughly omitted " << int(rows_omitted / double(rows_passed) * 1000) / 10.0 << "% rows." << system::unlock; +} + void JoinerGeneral::ExecuteOuterJoinLoop(Condition &cond, MINewContents &new_mind, DimensionVector &all_dims, DimensionVector &outer_dims, int64_t &tuples_in_output, int64_t output_limit) { MEASURE_FET("JoinerGeneral::ExecuteOuterJoinLoop(...)"); diff --git a/storage/tianmu/core/value_matching_hashtable.cpp b/storage/tianmu/core/value_matching_hashtable.cpp index 13880dc07..671855776 100644 --- a/storage/tianmu/core/value_matching_hashtable.cpp +++ b/storage/tianmu/core/value_matching_hashtable.cpp @@ -114,6 +114,48 @@ int64_t ValueMatching_HashTable::ByteSize() { return res + max_no_rows * total_width; } +bool ValueMatching_HashTable::FindCurrentRow(unsigned char *input_buffer, int64_t &row, bool add_if_new, + int match_width) { + unsigned int crc_code = HashValue(input_buffer, match_width); + unsigned int ht_pos = (crc_code & ht_mask); + unsigned int row_no = ht[ht_pos]; + if (row_no == 0xFFFFFFFF) { // empty hash position + if (!add_if_new) { + row = common::NULL_VALUE_64; + return false; + } + row_no = no_rows; + ht[ht_pos] = row_no; + } + while (row_no < no_rows) { + unsigned char *cur_row = (unsigned char *)t.GetRow(row_no); + if (std::memcmp(cur_row, input_buffer, match_width) == 0) { + row = row_no; + return true; // position found + } + + unsigned int *next_pos = (unsigned int *)(cur_row + next_pos_offset); + if (*next_pos == 0) { // not found and no more conflicted values + if (add_if_new) *next_pos = no_rows; + row_no = no_rows; + } else { + DEBUG_ASSERT(row_no < *next_pos); + row_no = *next_pos; + } + } + // row_no == no_rows in this place + if (!add_if_new) { + row = common::NULL_VALUE_64; + return false; + } + row = row_no; + int64_t new_row = t.AddEmptyRow(); // 0 is set as a "NextPos" + ASSERT(new_row == row, "wrong row number"); + std::memcpy(t.GetRow(row), input_buffer, match_width); + no_rows++; + return false; +} + bool ValueMatching_HashTable::FindCurrentRow(unsigned char *input_buffer, int64_t &row, bool add_if_new) { unsigned int crc_code = HashValue(input_buffer, matching_width); unsigned int ht_pos = (crc_code & ht_mask); diff --git a/storage/tianmu/core/value_matching_hashtable.h b/storage/tianmu/core/value_matching_hashtable.h index 7a14b69f1..a2dbd79d8 100644 --- a/storage/tianmu/core/value_matching_hashtable.h +++ b/storage/tianmu/core/value_matching_hashtable.h @@ -44,6 +44,8 @@ class ValueMatching_HashTable : public mm::TraceableObject, public ValueMatching bool FindCurrentRow(unsigned char *input_buffer, int64_t &row, bool add_if_new = true) override; + bool FindCurrentRow(unsigned char *input_buffer, int64_t &row, bool add_if_new, int match_width) override; + void Rewind(bool release = false) override { t.Rewind(release); } int64_t GetCurrentRow() override { return t.GetCurrent(); } void NextRow() override { t.NextRow(); } diff --git a/storage/tianmu/core/value_matching_table.cpp b/storage/tianmu/core/value_matching_table.cpp index fc74d4fe7..12ff10c30 100644 --- a/storage/tianmu/core/value_matching_table.cpp +++ b/storage/tianmu/core/value_matching_table.cpp @@ -43,7 +43,7 @@ void ValueMatchingTable::Clear() { no_rows = 0; } ValueMatchingTable *ValueMatchingTable::CreateNew_ValueMatchingTable(int64_t mem_available, int64_t max_no_groups, int64_t max_group_code, int _total_width, int _input_buf_width, int _match_width, - uint32_t power) { + uint32_t power, bool use_lookup_table) { // trivial case: one group only if (_input_buf_width == 0) { ValueMatching_OnePosition *new_object = new ValueMatching_OnePosition(); @@ -52,10 +52,12 @@ ValueMatchingTable *ValueMatchingTable::CreateNew_ValueMatchingTable(int64_t mem } // easy case: narrow scope of group codes, which may be used - if (max_group_code < common::PLUS_INF_64 && max_group_code < max_no_groups * 1.5) { - ValueMatching_LookupTable *new_object = new ValueMatching_LookupTable(); - new_object->Init(max_group_code, _total_width, _input_buf_width, _match_width, power); - return new_object; + if (use_lookup_table) { + if (max_group_code < common::PLUS_INF_64 && max_group_code < max_no_groups * 1.5) { + ValueMatching_LookupTable *new_object = new ValueMatching_LookupTable(); + new_object->Init(max_group_code, _total_width, _input_buf_width, _match_width, power); + return new_object; + } } // default diff --git a/storage/tianmu/core/value_matching_table.h b/storage/tianmu/core/value_matching_table.h index b336b6c0a..39c845c47 100644 --- a/storage/tianmu/core/value_matching_table.h +++ b/storage/tianmu/core/value_matching_table.h @@ -52,6 +52,10 @@ class ValueMatchingTable { // abstract class: interface for value matching // already exists, false if put as a new row virtual bool FindCurrentRow(unsigned char *input_buffer, int64_t &row, bool add_if_new = true) = 0; + virtual bool FindCurrentRow(unsigned char *input_buffer, int64_t &row, bool add_if_new, int match_width) { + return FindCurrentRow(input_buffer, row, add_if_new); + }; + int64_t NoRows() { return int64_t(no_rows); } // rows stored so far virtual bool IsOnePass() = 0; // true if the aggregator is capable of storing all groups // up to max_no_groups declared in Init() @@ -83,7 +87,8 @@ class ValueMatchingTable { // abstract class: interface for value matching */ static ValueMatchingTable *CreateNew_ValueMatchingTable(int64_t mem_available, int64_t max_no_groups, int64_t max_group_code, int _total_width, - int _input_buf_width, int _match_width, uint32_t power); + int _input_buf_width, int _match_width, uint32_t power, + bool use_lookup_table = true); protected: int total_width; // whole row -- GitLab