未验证 提交 208f79d8 编写于 作者: 悟世者's avatar 悟世者 提交者: GitHub

feat(tianmu) Refactor the aggregation and The IN operator to multithread #472...

feat(tianmu) Refactor the aggregation and The IN operator to multithread #472 (#422 #465 #466) (#472)

* Refactor the aggregation to multithread
Co-authored-by: Nhustjieke <gaoriyao1@gmail.com>
上级 e3a99029
......@@ -3052,6 +3052,14 @@ inline bool is_perfschema_db(const char *name)
*/
inline bool belongs_to_p_s(TABLE_LIST *tl)
{
if (!tl->db) {
return false;
}
if (!tl->table_name) {
return false;
}
return (!strcmp("performance_schema", tl->db) &&
strcmp(tl->table_name, "threads") &&
strstr(tl->table_name, "setup_") == NULL);
......
......@@ -194,7 +194,7 @@ void AggregationAlgorithm::Aggregate(bool just_distinct, int64_t &limit, int64_t
}
} else {
int64_t local_limit = limit == -1 ? upper_approx_of_groups : limit;
MultiDimensionalGroupByScan(gbw, local_limit, offset, sender, limit_less_than_no_groups);
MultiDimensionalGroupByScan(gbw, local_limit, offset, sender, limit_less_than_no_groups, true);
if (limit != -1) limit = local_limit;
}
t->ClearMultiIndexP(); // cleanup (i.e. regarded as materialized,
......@@ -205,7 +205,8 @@ void AggregationAlgorithm::Aggregate(bool just_distinct, int64_t &limit, int64_t
}
void AggregationAlgorithm::MultiDimensionalGroupByScan(GroupByWrapper &gbw, int64_t &limit, int64_t &offset,
ResultSender *sender, bool limit_less_than_no_groups) {
ResultSender *sender, bool limit_less_than_no_groups,
bool force_parall) {
MEASURE_FET("TempTable::MultiDimensionalGroupByScan(...)");
bool first_pass = true;
// tuples are numbered according to tuple_left filter (not used, if tuple_left
......@@ -234,9 +235,19 @@ void AggregationAlgorithm::MultiDimensionalGroupByScan(GroupByWrapper &gbw, int6
}
gbw.SetDistinctTuples(mit.NumOfTuples());
auto get_thd_cnt = []() {
int hardware_concurrency = std::thread::hardware_concurrency();
// TODO: The original code was the number of CPU cores divided by 4, and the reason for that is to be traced further
return hardware_concurrency > 4 ? (hardware_concurrency / 4) : 1;
};
int thd_cnt = 1;
if (ParallelAllowed(gbw) && !limit_less_than_no_groups) {
thd_cnt = std::thread::hardware_concurrency() / 4; // For concurrence reason, don't swallow all cores once.
if (force_parall) {
thd_cnt = get_thd_cnt();
} else {
if (ParallelAllowed(gbw) && !limit_less_than_no_groups) {
thd_cnt = get_thd_cnt(); // For concurrence reason, don't swallow all cores once.
}
}
AggregationWorkerEnt ag_worker(gbw, mind, thd_cnt, this);
......@@ -278,13 +289,13 @@ void AggregationAlgorithm::MultiDimensionalGroupByScan(GroupByWrapper &gbw, int6
// Grouping on a packrow
int64_t packrow_length = mit.GetPackSizeLeft();
int grouping_result = AggregatePackrow(gbw, &mit, cur_tuple);
AggregaGroupingResult grouping_result = AggregatePackrow(gbw, &mit, cur_tuple);
if (sender) {
sender->SetAffectRows(gbw.NumOfGroups());
}
if (grouping_result == 2) throw common::KilledException();
if (grouping_result != 5) packrows_found++; // for statistics
if (grouping_result == 1) break; // end of the aggregation
if (grouping_result == AggregaGroupingResult::AGR_KILLED) throw common::KilledException();
if (grouping_result != AggregaGroupingResult::AGR_NO_LEFT) packrows_found++; // for statistics
if (grouping_result == AggregaGroupingResult::AGR_FINISH) break; // end of the aggregation
if (!gbw.IsFull() && gbw.MemoryBlocksLeft() == 0) {
gbw.SetAsFull();
}
......@@ -361,7 +372,7 @@ void AggregationAlgorithm::MultiDimensionalGroupByScan(GroupByWrapper &gbw, int6
displayed_no_groups = t->NumOfObj();
if (t->NumOfObj() >= limit) break;
if (gbw.AnyTuplesLeft()) gbw.ClearUsed(); // prepare for the next pass, if needed
} while (gbw.AnyTuplesLeft()); // do the next pass, if anything left
} while (gbw.AnyTuplesLeft() && (1 == thd_cnt)); // do the next pass, if anything left
} catch (...) {
ag_worker.Commit(false);
throw;
......@@ -468,11 +479,12 @@ void AggregationAlgorithm::MultiDimensionalDistinctScan(GroupByWrapper &gbw, MII
}
}
int AggregationAlgorithm::AggregatePackrow(GroupByWrapper &gbw, MIIterator *mit, int64_t cur_tuple) {
AggregaGroupingResult AggregationAlgorithm::AggregatePackrow(GroupByWrapper &gbw, MIIterator *mit, int64_t cur_tuple) {
std::scoped_lock guard(mtx);
int64_t packrow_length = mit->GetPackSizeLeft();
if (!gbw.AnyTuplesLeft(cur_tuple, cur_tuple + packrow_length - 1)) {
mit->NextPackrow();
return 5;
return AggregaGroupingResult::AGR_NO_LEFT;
}
int64_t uniform_pos = common::NULL_VALUE_64;
bool skip_packrow = false;
......@@ -509,7 +521,7 @@ int AggregationAlgorithm::AggregatePackrow(GroupByWrapper &gbw, MIIterator *mit,
if (gbw.NumOfGroupingAttrs() == gbw.NumOfAttrs() // just DISTINCT without grouping
|| stop_all) { // or aggregation already done on rough level
gbw.TuplesResetAll(); // no more rows needed, just produce output
return 1; // aggregation finished
return AggregaGroupingResult::AGR_FINISH; // aggregation finished
}
}
if (skip_packrow)
......@@ -522,7 +534,7 @@ int AggregationAlgorithm::AggregatePackrow(GroupByWrapper &gbw, MIIterator *mit,
if (packrow_done || skip_packrow) {
mit->NextPackrow();
return 0; // success - roughly omitted
return AggregaGroupingResult::AGR_OK; // success - roughly omitted
}
// bool require_locking_ag = true; // a new packrow,
......@@ -530,7 +542,7 @@ int AggregationAlgorithm::AggregatePackrow(GroupByWrapper &gbw, MIIterator *mit,
// common::NULL_VALUE_64); // do not lock if the grouping row is uniform
while (mit->IsValid()) { // becomes invalid on pack end
if (m_conn->Killed()) return 2; // killed
if (m_conn->Killed()) return AggregaGroupingResult::AGR_KILLED; // killed
if (gbw.TuplesGet(cur_tuple)) {
if (require_locking_gr) {
for (int gr_a = 0; gr_a < gbw.NumOfGroupingAttrs(); gr_a++)
......@@ -560,7 +572,7 @@ int AggregationAlgorithm::AggregatePackrow(GroupByWrapper &gbw, MIIterator *mit,
gbw.SetAllGroupsFound();
if (gbw.NumOfGroupingAttrs() == gbw.NumOfAttrs()) { // just DISTINCT without grouping
gbw.TuplesResetAll(); // no more rows needed, just produce output
return 1; // aggregation finished
return AggregaGroupingResult::AGR_FINISH; // aggregation finished
}
}
}
......@@ -577,7 +589,9 @@ int AggregationAlgorithm::AggregatePackrow(GroupByWrapper &gbw, MIIterator *mit,
for (int gr_a = gbw.NumOfGroupingAttrs(); gr_a < gbw.NumOfAttrs(); gr_a++)
if (gbw.ColumnNotOmitted(gr_a)) {
bool value_successfully_aggregated = gbw.PutAggregatedValue(gr_a, pos, *mit, factor);
if (!value_successfully_aggregated) gbw.DistinctlyOmitted(gr_a, cur_tuple);
if (!value_successfully_aggregated) {
gbw.DistinctlyOmitted(gr_a, cur_tuple);
}
}
}
}
......@@ -587,7 +601,7 @@ int AggregationAlgorithm::AggregatePackrow(GroupByWrapper &gbw, MIIterator *mit,
if (mit->PackrowStarted()) break;
}
gbw.CommitResets();
return 0; // success
return AggregaGroupingResult::AGR_OK; // success
}
void AggregationAlgorithm::AggregateFillOutput(GroupByWrapper &gbw, int64_t gt_pos, int64_t &omit_by_offset) {
......@@ -813,23 +827,27 @@ void AggregationAlgorithm::TaskFillOutput(GroupByWrapper *gbw, Transaction *ci,
}
}
void AggregationWorkerEnt::TaskAggrePacks(MIUpdatingIterator *taskIterator, [[maybe_unused]] DimensionVector *dims,
[[maybe_unused]] MIIterator *mit, [[maybe_unused]] int pstart,
[[maybe_unused]] int pend, int tuple, GroupByWrapper *gbw, Transaction *ci) {
int i = 0;
int64_t cur_tuple = tuple;
common::SetMySQLTHD(ci->Thd());
current_txn_ = ci;
void AggregationWorkerEnt::TaskAggrePacks(MIIterator *taskIterator, [[maybe_unused]] DimensionVector *dims,
[[maybe_unused]] MIIterator *mit, [[maybe_unused]] CTask *task,
GroupByWrapper *gbw, Transaction *ci) {
taskIterator->Rewind();
int task_pack_num = 0;
while (taskIterator->IsValid()) {
int64_t packrow_length = taskIterator->GetPackSizeLeft();
int grouping_result = aa->AggregatePackrow(*gbw, taskIterator, cur_tuple);
if (grouping_result != 5) i++;
if (grouping_result == 1) break;
if (grouping_result == 2) throw common::KilledException();
if (grouping_result == 3 || grouping_result == 4) throw common::NotImplementedException("Aggregation overflow.");
cur_tuple += packrow_length;
}
TIANMU_LOG(LogCtl_Level::DEBUG, "TaskAggrePacks routine ends. Task id %d", taskIterator->GetTaskNum());
if ((task_pack_num >= task->dwStartPackno) && (task_pack_num <= task->dwEndPackno)) {
int cur_tuple = (*task->dwPack2cur)[task_pack_num];
MIInpackIterator mii(*taskIterator);
AggregaGroupingResult grouping_result = aa->AggregatePackrow(*gbw, &mii, cur_tuple);
if (grouping_result == AggregaGroupingResult::AGR_FINISH) break;
if (grouping_result == AggregaGroupingResult::AGR_KILLED) throw common::KilledException();
if (grouping_result == AggregaGroupingResult::AGR_OVERFLOW ||
grouping_result == AggregaGroupingResult::AGR_OTHER_ERROR)
throw common::NotImplementedException("Aggregation overflow.");
}
taskIterator->NextPackrow();
++task_pack_num;
}
}
void AggregationWorkerEnt::PrepShardingCopy(MIIterator *mit, GroupByWrapper *gb_sharding,
......@@ -852,83 +870,87 @@ void AggregationWorkerEnt::DistributeAggreTaskAverage(MIIterator &mit) {
std::vector<std::unique_ptr<GroupByWrapper>> vGBW;
vGBW.reserve(m_threads);
vTask.reserve(m_threads);
if (rc_control_.isOn()) rc_control_.lock(conn->GetThreadID()) << "Prepare data for parallel aggreation" << system::unlock;
if (rc_control_.isOn())
rc_control_.lock(conn->GetThreadID()) << "Prepare data for parallel aggreation" << system::unlock;
int packnum = 0;
int curtuple_index = 0;
std::unordered_map<int, int> pack2cur;
while (mit.IsValid()) {
pack2cur.emplace(std::pair<int, int>(packnum, curtuple_index));
int64_t packrow_length = mit.GetPackSizeLeft();
curtuple_index += packrow_length;
packnum++;
mit.NextPackrow();
}
int loopcnt = (packnum < m_threads) ? packnum : m_threads;
pack2cur.emplace(std::pair<int, int>(packnum, curtuple_index));
int loopcnt = (packnum < m_threads) ? packnum : m_threads;
int mod = packnum % loopcnt;
int num = packnum / loopcnt;
utils::result_set<void> res;
for (int i = 0; i < loopcnt; ++i) {
res.insert(ha_rcengine_->query_thread_pool.add_task(&AggregationWorkerEnt::PrepShardingCopy, this, &mit, gb_main, &vGBW));
res.insert(
ha_rcengine_->query_thread_pool.add_task(&AggregationWorkerEnt::PrepShardingCopy, this, &mit, gb_main, &vGBW));
int pack_start = i * num;
int pack_end = 0;
int dwPackNum = 0;
if (i == (loopcnt - 1)) {
pack_end = packnum;
dwPackNum = packnum;
} else {
pack_end = (i + 1) * num - 1;
dwPackNum = pack_end + 1;
}
int cur_start = pack2cur[pack_start];
int cur_end = pack2cur[pack_end] - 1;
CTask tmp;
tmp.dwTaskId = i;
tmp.dwPackNum = num + mod + i * num;
tmp.dwPackNum = dwPackNum;
tmp.dwStartPackno = pack_start;
tmp.dwEndPackno = pack_end;
tmp.dwStartTuple = cur_start;
tmp.dwEndTuple = cur_end;
tmp.dwTuple = cur_start;
tmp.dwPack2cur = &pack2cur;
vTask.push_back(tmp);
}
res.get_all_with_except();
if (rc_control_.isOn())
rc_control_.lock(conn->GetThreadID()) << "Prepare data for parallel aggreation done. Total packnum " << packnum
<< system::unlock;
packnum = 0;
mit.Rewind();
int curtuple = 0;
while (mit.IsValid()) {
int64_t packrow_length = mit.GetPackSizeLeft();
packnum++;
curtuple += packrow_length;
for (auto &it : vTask) {
if (packnum == it.dwPackNum) {
it.dwEndPackno = mit.GetCurPackrow(0);
it.dwTuple = curtuple;
}
}
if (rc_control_.isOn())
rc_control_.lock(conn->GetThreadID()) << " GetCurPackrow: " << mit.GetCurPackrow(0) << " packnum: " << packnum
<< " cur_tuple: " << curtuple << system::unlock;
mit.NextPackrow();
}
std::vector<MultiIndex> mis;
mis.reserve(vTask.size());
std::vector<MIUpdatingIterator> taskIterator;
std::vector<MIIterator> taskIterator;
taskIterator.reserve(vTask.size());
utils::result_set<void> res1;
for (uint i = 0; i < vTask.size(); ++i) {
auto &mi = mis.emplace_back(*mind, true);
if (dims.NoDimsUsed() == 0) dims.SetAll();
auto &mii = taskIterator.emplace_back(&mi, dims);
auto &mii = taskIterator.emplace_back(mit, true);
mii.SetTaskNum(vTask.size());
mii.SetTaskId(i);
mii.SetNoPacksToGo(vTask[i].dwEndPackno);
mii.RewindToPack((i == 0) ? 0 : vTask[i - 1].dwEndPackno + 1);
}
res1.insert(ha_rcengine_->query_thread_pool.add_task(&AggregationWorkerEnt::TaskAggrePacks, this, &taskIterator[0], &dims,
&mit, 0, vTask[0].dwEndPackno, 0, gb_main, conn));
for (size_t i = 1; i < vTask.size(); ++i) {
res1.insert(ha_rcengine_->query_thread_pool.add_task(&AggregationWorkerEnt::TaskAggrePacks, this, &taskIterator[i], &dims,
&mit, vTask[i - 1].dwEndPackno + 1, vTask[i].dwEndPackno,
vTask[i - 1].dwTuple, vGBW[i].get(), conn));
for (size_t i = 0; i < vTask.size(); ++i) {
GroupByWrapper *gbw = i == 0 ? gb_main : vGBW[i].get();
res1.insert(ha_rcengine_->query_thread_pool.add_task(&AggregationWorkerEnt::TaskAggrePacks, this, &taskIterator[i],
&dims, &mit, &vTask[i], gbw, conn));
}
res1.get_all_with_except();
for (size_t i = 0; i < vTask.size(); ++i) {
// Merge aggreation data together
if (i != 0) gb_main->Merge(*(vGBW[i]));
if (i != 0) {
aa->MultiDimensionalDistinctScan(*(vGBW[i]), mit);
gb_main->Merge(*(vGBW[i]));
}
}
}
} // namespace core
} // namespace Tianmu
......@@ -23,9 +23,19 @@
#include "core/mi_iterator.h"
#include "core/query.h"
#include "core/temp_table.h"
#include "core/ctask.h"
namespace Tianmu {
namespace core {
enum class AggregaGroupingResult {
AGR_OK = 0, // success
AGR_FINISH = 1, // finish
AGR_KILLED = 2, // killed
AGR_OVERFLOW = 3, // overflow
AGR_OTHER_ERROR = 4, // other error
AGR_NO_LEFT = 5 // pack already aggregated
};
class AggregationAlgorithm {
public:
AggregationAlgorithm(TempTable *tt)
......@@ -37,13 +47,13 @@ class AggregationAlgorithm {
bool &ag_not_changeabe, bool &stop_all, int64_t &uniform_pos, int64_t rows_in_pack,
int64_t local_factor, int just_one_aggr = -1);
void MultiDimensionalGroupByScan(GroupByWrapper &gbw, int64_t &limit, int64_t &offset, ResultSender *sender,
bool limit_less_than_no_groups);
bool limit_less_than_no_groups, bool force_parall);
void MultiDimensionalDistinctScan(GroupByWrapper &gbw, MIIterator &mit);
void AggregateFillOutput(GroupByWrapper &gbw, int64_t gt_pos, int64_t &omit_by_offset);
// Return code for AggregatePackrow: 0 - success, 1 - stop aggregation
// (finished), 5 - pack already aggregated (skip)
int AggregatePackrow(GroupByWrapper &gbw, MIIterator *mit, int64_t cur_tuple);
AggregaGroupingResult AggregatePackrow(GroupByWrapper &gbw, MIIterator *mit, int64_t cur_tuple);
// No parallel for subquery/join/distinct cases
bool ParallelAllowed(GroupByWrapper &gbw) {
......@@ -77,18 +87,18 @@ class AggregationWorkerEnt {
// Return code for AggregatePackrow: 0 - success, 1 - stop aggregation
// (finished), 2 - killed, 3
// - overflow, 4 - other error, 5 - pack already aggregated (skip)
int AggregatePackrow(MIUpdatingIterator &lmit, int64_t cur_tuple) {
AggregaGroupingResult AggregatePackrow(MIIterator &lmit, int64_t cur_tuple) {
return aa->AggregatePackrow(*gb_main, &lmit, cur_tuple);
}
int AggregatePackrow(MIInpackIterator &lmit, int64_t cur_tuple) {
AggregaGroupingResult AggregatePackrow(MIInpackIterator &lmit, int64_t cur_tuple) {
return aa->AggregatePackrow(*gb_main, &lmit, cur_tuple);
}
void Commit([[maybe_unused]] bool do_merge = true) { gb_main->CommitResets(); }
void ReevaluateNumberOfThreads([[maybe_unused]] MIIterator &mit) {}
int ThreadsUsed() { return m_threads; }
void Barrier() {}
void TaskAggrePacks(MIUpdatingIterator *taskIterator, DimensionVector *dims, MIIterator *mit, int pstart, int pend,
int tuple, GroupByWrapper *gbw, Transaction *ci);
void TaskAggrePacks(MIIterator *taskIterator, DimensionVector *dims, MIIterator *mit, CTask *task,
GroupByWrapper *gbw, Transaction *ci);
void DistributeAggreTaskAverage(MIIterator &mit);
void PrepShardingCopy(MIIterator *mit, GroupByWrapper *gb_sharding,
std::vector<std::unique_ptr<GroupByWrapper>> *vGBW);
......
......@@ -18,6 +18,8 @@
#define TIANMU_CORE_CTASK_H_
#pragma once
#include <unordered_map>
namespace Tianmu {
namespace core {
struct CTask {
......@@ -26,7 +28,19 @@ struct CTask {
int dwEndPackno; // the last packno of this task
int dwTuple; // previous task + this task Actual process tuple
int dwStartPackno;
CTask() : dwTaskId(0), dwPackNum(0), dwEndPackno(0), dwTuple(0), dwStartPackno(0) {}
int dwStartTuple;
int dwEndTuple;
std::unordered_map<int, int>* dwPack2cur;
CTask()
: dwTaskId(0),
dwPackNum(0),
dwEndPackno(0),
dwTuple(0),
dwStartPackno(0),
dwStartTuple(0),
dwEndTuple(0),
dwPack2cur(nullptr)
{}
};
} // namespace core
} // namespace Tianmu
......
......@@ -19,6 +19,7 @@
#include "core/mi_iterator.h"
#include "core/transaction.h"
#include "core/value_matching_hashtable.h"
namespace Tianmu {
namespace core {
......@@ -175,7 +176,8 @@ GDTResult GroupDistinctTable::Add(int64_t group, MIIterator &mit) {
group += 1; // offset; 0 means empty position
std::memmove(input_buffer, (unsigned char *)(&group), group_bytes);
encoder->Encode(input_buffer + group_bytes, mit, NULL, true);
return FindCurrentRow();
// return FindCurrentRow();
return FindCurrentRowByVMTable();
}
GDTResult GroupDistinctTable::Add(int64_t group, int64_t val) // numeric values
......@@ -238,6 +240,18 @@ void GroupDistinctTable::ValueFromInput(types::BString &v) // decode original v
v = encoder->GetValueT(input_buffer + group_bytes, mit);
}
GDTResult GroupDistinctTable::FindCurrentRowByVMTable()
{
int64_t row = 0;
bool existed = vm_tab->FindCurrentRow(input_buffer, row, true, total_width);
if (existed) {
return GDTResult::GDT_EXISTS;
}
return GDTResult::GBIMODE_AS_TEXT;
}
GDTResult GroupDistinctTable::FindCurrentRow(bool find_only) // find / insert the current buffer
{
DEBUG_ASSERT(!filter_implementation);
......
......@@ -23,6 +23,7 @@
#include "core/column_bin_encoder.h"
#include "core/filter.h"
#include "vc/virtual_column.h"
#include "core/value_matching_table.h"
namespace Tianmu {
namespace core {
......@@ -52,6 +53,7 @@ class GroupDistinctTable : public mm::TraceableObject {
// vector of bytes)
void InitializeVC(int64_t max_no_groups, vcolumn::VirtualColumn *vc, int64_t max_no_rows, int64_t max_bytes,
bool decodable);
void CopyFromValueMatchingTable(ValueMatchingTable *vt) { vm_tab.reset(vt->Clone()); };
// Assumption: group >= 0
GDTResult Add(int64_t group,
......@@ -75,6 +77,7 @@ class GroupDistinctTable : public mm::TraceableObject {
private:
GDTResult FindCurrentRow(bool find_only = false); // find / insert the current buffer
GDTResult FindCurrentRowByVMTable();
bool RowEmpty(unsigned char *p) // is this position empty? only if it starts with zeros
{
return (std::memcmp(&zero_const, p, group_bytes) == 0);
......@@ -113,6 +116,7 @@ class GroupDistinctTable : public mm::TraceableObject {
bool filter_implementation;
Filter *f;
int64_t group_factor; // (g, v) -> f( g + group_factor * v )
std::unique_ptr<ValueMatchingTable> vm_tab;
};
} // namespace core
} // namespace Tianmu
......
......@@ -340,7 +340,7 @@ void GroupTable::Initialize(int64_t max_no_groups, bool parallel_allowed) {
vm_tab.reset(ValueMatchingTable::CreateNew_ValueMatchingTable(primary_total_size, declared_max_no_groups,
max_group_code, total_width, grouping_and_UTF_width,
grouping_buf_width, p_power));
grouping_buf_width, p_power, false));
input_buffer.resize(grouping_and_UTF_width);
// pre-allocation of distinct memory
......@@ -362,6 +362,7 @@ void GroupTable::Initialize(int64_t max_no_groups, bool parallel_allowed) {
gdistinct[i]->InitializeVC(vm_tab->RowNumberScope(), vc[i], desc.max_no_values,
distinct_size / no_columns_with_distinct,
(operation[i] != GT_Aggregation::GT_COUNT_NOT_NULL)); // otherwise must be decodable
gdistinct[i]->CopyFromValueMatchingTable(vm_tab.get());
distinct_size -= gdistinct[i]->BytesTaken();
no_columns_with_distinct--;
}
......@@ -537,10 +538,10 @@ bool GroupTable::PutAggregatedValue(int col, int64_t row, MIIterator &mit, int64
GDTResult res = gdistinct[col]->Add(row, mit);
if (res == GDTResult::GDT_EXISTS) return true; // value found, do not aggregate it again
if (res == GDTResult::GDT_FULL) {
if (gdistinct[col]->AlreadyFull())
not_full = false; // disable also the main grouping table (if it is a
//if (gdistinct[col]->AlreadyFull())
//not_full = false; // disable also the main grouping table (if it is a
// persistent rejection)
return false; // value not found in DISTINCT buffer, which is already
//return false; // value not found in DISTINCT buffer, which is already
// full
}
factor = 1; // ignore repetitions for distinct
......
......@@ -228,6 +228,7 @@ class GroupTable : public mm::TraceableObject {
// some memory managing
int64_t max_total_size;
std::mutex mtx;
};
} // namespace core
} // namespace Tianmu
......
......@@ -633,6 +633,10 @@ bool GroupByWrapper::IsMaxOnly() // true, if an attribute is max(column), and
}
void GroupByWrapper::InitTupleLeft(int64_t n) {
if (tuple_left) {
delete tuple_left;
tuple_left = NULL;
}
DEBUG_ASSERT(tuple_left == NULL);
tuple_left = new Filter(n, p_power);
tuple_left->Set();
......
......@@ -21,6 +21,7 @@
#include "core/condition.h"
#include "core/descriptor.h"
#include "core/mi_iterator.h"
#include "core/ctask.h"
namespace Tianmu {
namespace core {
......@@ -95,6 +96,31 @@ class JoinerGeneral : public TwoDimensionalJoiner {
protected:
void ExecuteOuterJoinLoop(Condition &cond, MINewContents &new_mind, DimensionVector &all_dims,
DimensionVector &outer_dims, int64_t &tuples_in_output, int64_t output_limit);
// Instead of the original inner Join function block,
// as the top-level call of inner Join,
// internal split multiple threads to separate different subsets for processing
void ExecuteInnerJoinLoop(MIIterator &mit, Condition &cond, MINewContents &new_mind, DimensionVector &all_dims,
std::vector<bool> &pack_desc_locked, int64_t &tuples_in_output, int64_t limit,
bool count_only);
// Handles each row in the Pack that the current iterator points to
// TODO: Keep in mind that internal Pack reads will have cache invalidation during multithread switching,
// leaving the second phase to continue processing the split of the house storage layer
void ExecuteInnerJoinPackRow(MIIterator *mii, CTask *task, Condition *cond, MINewContents *new_mind,
DimensionVector *all_dims,
std::vector<bool> *pack_desc_locked, int64_t *tuples_in_output, int64_t limit,
bool count_only,
bool *stop_execution, int64_t *rows_passed, int64_t *rows_omitted);
// The purpose of this function is to process the split task in a separate thread
void TaskInnerJoinPacks(MIIterator *taskIterator, CTask *task, Condition *cond, MINewContents *new_mind,
DimensionVector *all_dims, std::vector<bool> *pack_desc_locked, int64_t *tuples_in_output,
int64_t limit, bool count_only,
bool *stop_execution, int64_t *rows_passed, int64_t *rows_omitted);
private:
std::mutex mtx;
};
} // namespace core
} // namespace Tianmu
......
......@@ -21,6 +21,7 @@
#include "core/mi_updating_iterator.h"
#include "core/transaction.h"
#include "vc/virtual_column.h"
#include "core/ctask.h"
namespace Tianmu {
namespace core {
......@@ -70,52 +71,8 @@ void JoinerGeneral::ExecuteJoinConditions(Condition &cond) {
if (!outer_dims.IsEmpty())
ExecuteOuterJoinLoop(cond, new_mind, all_dims, outer_dims, tuples_in_output, tips.limit);
else {
while (mit.IsValid() && !stop_execution) {
if (mit.PackrowStarted()) {
bool omit_this_packrow = false;
for (int i = 0; (i < no_desc && !omit_this_packrow); i++)
if (cond[i].EvaluateRoughlyPack(mit) == common::RSValue::RS_NONE) omit_this_packrow = true;
for (int i = 0; i < no_desc; i++) pack_desc_locked[i] = false; // delay locking
if (new_mind.NoMoreTuplesPossible())
break; // stop the join if nothing new may be obtained in some
// optimized cases
if (omit_this_packrow) {
rows_omitted += mit.GetPackSizeLeft();
rows_passed += mit.GetPackSizeLeft();
mit.NextPackrow();
continue;
}
}
loc_result = true;
for (int i = 0; (i < no_desc && loc_result); i++) {
if (!pack_desc_locked[i]) { // delayed locking - maybe will not be
// locked at all?
cond[i].LockSourcePacks(mit);
pack_desc_locked[i] = true;
}
if (types::RequiresUTFConversions(cond[i].GetCollation())) {
if (cond[i].CheckCondition_UTF(mit) == false) loc_result = false;
} else {
if (cond[i].CheckCondition(mit) == false) loc_result = false;
}
}
if (loc_result) {
if (!tips.count_only) {
for (int i = 0; i < mind->NumOfDimensions(); i++)
if (all_dims[i]) new_mind.SetNewTableValue(i, mit[i]);
new_mind.CommitNewTableValues();
}
tuples_in_output++;
}
++mit;
rows_passed++;
if (m_conn->Killed()) throw common::KilledException();
if (tips.limit > -1 && tuples_in_output >= tips.limit) stop_execution = true;
}
ExecuteInnerJoinLoop(mit, cond, new_mind, all_dims, pack_desc_locked, tuples_in_output, tips.limit, tips.count_only);
}
if (rows_passed > 0 && rows_omitted > 0)
rc_control_.lock(m_conn->GetThreadID())
<< "Roughly omitted " << int(rows_omitted / double(rows_passed) * 1000) / 10.0 << "% rows." << system::unlock;
// Postprocessing and cleanup
if (tips.count_only)
new_mind.CommitCountOnly(tuples_in_output);
......@@ -128,6 +85,201 @@ void JoinerGeneral::ExecuteJoinConditions(Condition &cond) {
why_failed = JoinFailure::NOT_FAILED;
}
// Handles each row in the Pack that the current iterator points to
// TODO: Keep in mind that internal Pack reads will have cache invalidation during multithread switching,
// leaving the second phase to continue processing the split of the house storage layer
void JoinerGeneral::ExecuteInnerJoinPackRow(MIIterator *mii, CTask *task, Condition *cond, MINewContents *new_mind,
DimensionVector *all_dims, std::vector<bool> *_pack_desc_locked,
int64_t *tuples_in_output, int64_t limit, bool count_only,
bool *stop_execution,
int64_t *rows_passed, int64_t *rows_omitted)
{
std::scoped_lock guard(mtx);
int no_desc = (*cond).Size();
std::vector<bool> pack_desc_locked;
pack_desc_locked.reserve(no_desc);
for (int i = 0; (i < no_desc); i++) {
pack_desc_locked.emplace_back(false);
}
int index = 0;
while (mii->IsValid() && !*stop_execution) {
if (mii->PackrowStarted()) {
bool omit_this_packrow = false;
for (int i = 0; (i < no_desc && !omit_this_packrow); i++)
if ((*cond)[i].EvaluateRoughlyPack(*mii) == common::RSValue::RS_NONE) omit_this_packrow = true;
for (int i = 0; i < no_desc; i++) pack_desc_locked[i] = false; // delay locking
if (new_mind->NoMoreTuplesPossible())
break; // stop the join if nothing new may be obtained in some optimized cases
if (omit_this_packrow) {
(*rows_omitted) += mii->GetPackSizeLeft();
(*rows_passed) += mii->GetPackSizeLeft();
mii->NextPackrow();
continue;
}
}
{
bool loc_result = true;
for (int i = 0; (i < no_desc && loc_result); i++) {
if (!pack_desc_locked[i]) {
(*cond)[i].LockSourcePacks(*mii);
pack_desc_locked[i] = true;
}
if (types::RequiresUTFConversions((*cond)[i].GetCollation())) {
if ((*cond)[i].CheckCondition_UTF(*mii) == false) loc_result = false;
} else {
if ((*cond)[i].CheckCondition(*mii) == false) loc_result = false;
}
}
if (loc_result) {
if (!count_only) {
for (int i = 0; i < mind->NumOfDimensions(); i++)
if ((*all_dims)[i]) new_mind->SetNewTableValue(i, (*mii)[i]);
new_mind->CommitNewTableValues();
}
(*tuples_in_output)++;
}
(*rows_passed)++;
if (m_conn->Killed()) throw common::KilledException();
if (limit > -1 && *tuples_in_output >= limit) *stop_execution = true;
mii->Increment();
if (mii->PackrowStarted()) break;
}
++index;
}
}
// The purpose of this function is to process the split task in a separate thread
void JoinerGeneral::TaskInnerJoinPacks(MIIterator *taskIterator, CTask *task, Condition *cond, MINewContents *new_mind,
DimensionVector *all_dims, std::vector<bool> *pack_desc_locked_p,
int64_t *tuples_in_output, int64_t limit, bool count_only, bool *stop_execution,
int64_t *rows_passed, int64_t *rows_omitted)
{
int no_desc = (*cond).Size();
std::vector<bool> pack_desc_locked;
pack_desc_locked.reserve(no_desc);
for (int i = 0; i < no_desc; i++) {
pack_desc_locked.push_back(false);
}
Condition cd(*cond);
taskIterator->Rewind();
int task_pack_num = 0;
while (taskIterator->IsValid() && !(*stop_execution)) {
if ((task_pack_num >= task->dwStartPackno) && (task_pack_num <= task->dwEndPackno)) {
MIInpackIterator mii(*taskIterator);
ExecuteInnerJoinPackRow(&mii, task, &cd, new_mind, all_dims, &pack_desc_locked, tuples_in_output, limit,
count_only,
stop_execution, rows_passed, rows_omitted);
}
taskIterator->NextPackrow();
++task_pack_num;
}
}
// Instead of the original inner Join function block,
// as the top-level call of inner Join,
// internal split multiple threads to separate different subsets for processing
void JoinerGeneral::ExecuteInnerJoinLoop(MIIterator &mit, Condition &cond, MINewContents &new_mind,
DimensionVector &all_dims, std::vector<bool> &pack_desc_locked,
int64_t &tuples_in_output, int64_t limit, bool count_only) {
int packnum = 0;
while (mit.IsValid()) {
int64_t packrow_length = mit.GetPackSizeLeft();
packnum++;
mit.NextPackrow();
}
int hardware_concurrency = std::thread::hardware_concurrency();
// TODO: The original code was the number of CPU cores divided by 4, and the reason for that is to be traced further
int threads_num = hardware_concurrency > 4 ? (hardware_concurrency / 4) : 1;
int loopcnt = 0;
int mod = 0;
int num = 0;
do {
loopcnt = (packnum < threads_num) ? packnum : threads_num;
mod = packnum % loopcnt;
num = packnum / loopcnt;
--threads_num;
} while ((num <= 1) && (threads_num >= 1));
TIANMU_LOG(LogCtl_Level::DEBUG, "ExecuteInnerJoinLoop packnum: %d threads_num: %d loopcnt: %d num: %d mod: %d",
packnum, threads_num,
loopcnt, num, mod);
std::vector<CTask> vTask;
vTask.reserve(loopcnt);
for (int i = 0; i < loopcnt; ++i) {
int pack_start = i * num;
int pack_end = 0;
int dwPackNum = 0;
if (i == (loopcnt - 1)) {
pack_end = packnum;
dwPackNum = packnum;
} else {
pack_end = (i + 1) * num - 1;
dwPackNum = pack_end + 1;
}
CTask tmp;
tmp.dwTaskId = i;
tmp.dwPackNum = dwPackNum;
tmp.dwStartPackno = pack_start;
tmp.dwEndPackno = pack_end;
TIANMU_LOG(LogCtl_Level::DEBUG, "ExecuteInnerJoinLoop dwTaskId: %d dwStartPackno: %d dwEndPackno: %d", tmp.dwTaskId,
tmp.dwStartPackno, tmp.dwEndPackno);
vTask.push_back(tmp);
}
int64_t rows_passed = 0;
int64_t rows_omitted = 0;
int no_desc = cond.Size();
bool stop_execution = false;
int index = 0;
mit.Rewind();
std::vector<MIIterator> taskIterator;
taskIterator.reserve(vTask.size());
for (uint i = 0; i < vTask.size(); ++i) {
auto &mii = taskIterator.emplace_back(mit, true);
mii.SetTaskNum(vTask.size());
mii.SetTaskId(i);
}
utils::result_set<void> res;
for (size_t i = 0; i < vTask.size(); ++i) {
res.insert(
ha_rcengine_->query_thread_pool.add_task(&JoinerGeneral::TaskInnerJoinPacks, this, &taskIterator[i], &vTask[i], &cond, &new_mind, &all_dims,
&pack_desc_locked, &tuples_in_output, limit, count_only, &stop_execution, &rows_passed, &rows_omitted));
}
res.get_all_with_except();
if (rows_passed > 0 && rows_omitted > 0)
rc_control_.lock(m_conn->GetThreadID())
<< "Roughly omitted " << int(rows_omitted / double(rows_passed) * 1000) / 10.0 << "% rows." << system::unlock;
}
void JoinerGeneral::ExecuteOuterJoinLoop(Condition &cond, MINewContents &new_mind, DimensionVector &all_dims,
DimensionVector &outer_dims, int64_t &tuples_in_output, int64_t output_limit) {
MEASURE_FET("JoinerGeneral::ExecuteOuterJoinLoop(...)");
......
......@@ -114,6 +114,48 @@ int64_t ValueMatching_HashTable::ByteSize() {
return res + max_no_rows * total_width;
}
bool ValueMatching_HashTable::FindCurrentRow(unsigned char *input_buffer, int64_t &row, bool add_if_new,
int match_width) {
unsigned int crc_code = HashValue(input_buffer, match_width);
unsigned int ht_pos = (crc_code & ht_mask);
unsigned int row_no = ht[ht_pos];
if (row_no == 0xFFFFFFFF) { // empty hash position
if (!add_if_new) {
row = common::NULL_VALUE_64;
return false;
}
row_no = no_rows;
ht[ht_pos] = row_no;
}
while (row_no < no_rows) {
unsigned char *cur_row = (unsigned char *)t.GetRow(row_no);
if (std::memcmp(cur_row, input_buffer, match_width) == 0) {
row = row_no;
return true; // position found
}
unsigned int *next_pos = (unsigned int *)(cur_row + next_pos_offset);
if (*next_pos == 0) { // not found and no more conflicted values
if (add_if_new) *next_pos = no_rows;
row_no = no_rows;
} else {
DEBUG_ASSERT(row_no < *next_pos);
row_no = *next_pos;
}
}
// row_no == no_rows in this place
if (!add_if_new) {
row = common::NULL_VALUE_64;
return false;
}
row = row_no;
int64_t new_row = t.AddEmptyRow(); // 0 is set as a "NextPos"
ASSERT(new_row == row, "wrong row number");
std::memcpy(t.GetRow(row), input_buffer, match_width);
no_rows++;
return false;
}
bool ValueMatching_HashTable::FindCurrentRow(unsigned char *input_buffer, int64_t &row, bool add_if_new) {
unsigned int crc_code = HashValue(input_buffer, matching_width);
unsigned int ht_pos = (crc_code & ht_mask);
......
......@@ -44,6 +44,8 @@ class ValueMatching_HashTable : public mm::TraceableObject, public ValueMatching
bool FindCurrentRow(unsigned char *input_buffer, int64_t &row, bool add_if_new = true) override;
bool FindCurrentRow(unsigned char *input_buffer, int64_t &row, bool add_if_new, int match_width) override;
void Rewind(bool release = false) override { t.Rewind(release); }
int64_t GetCurrentRow() override { return t.GetCurrent(); }
void NextRow() override { t.NextRow(); }
......
......@@ -43,7 +43,7 @@ void ValueMatchingTable::Clear() { no_rows = 0; }
ValueMatchingTable *ValueMatchingTable::CreateNew_ValueMatchingTable(int64_t mem_available, int64_t max_no_groups,
int64_t max_group_code, int _total_width,
int _input_buf_width, int _match_width,
uint32_t power) {
uint32_t power, bool use_lookup_table) {
// trivial case: one group only
if (_input_buf_width == 0) {
ValueMatching_OnePosition *new_object = new ValueMatching_OnePosition();
......@@ -52,10 +52,12 @@ ValueMatchingTable *ValueMatchingTable::CreateNew_ValueMatchingTable(int64_t mem
}
// easy case: narrow scope of group codes, which may be used
if (max_group_code < common::PLUS_INF_64 && max_group_code < max_no_groups * 1.5) {
ValueMatching_LookupTable *new_object = new ValueMatching_LookupTable();
new_object->Init(max_group_code, _total_width, _input_buf_width, _match_width, power);
return new_object;
if (use_lookup_table) {
if (max_group_code < common::PLUS_INF_64 && max_group_code < max_no_groups * 1.5) {
ValueMatching_LookupTable *new_object = new ValueMatching_LookupTable();
new_object->Init(max_group_code, _total_width, _input_buf_width, _match_width, power);
return new_object;
}
}
// default
......
......@@ -52,6 +52,10 @@ class ValueMatchingTable { // abstract class: interface for value matching
// already exists, false if put as a new row
virtual bool FindCurrentRow(unsigned char *input_buffer, int64_t &row, bool add_if_new = true) = 0;
virtual bool FindCurrentRow(unsigned char *input_buffer, int64_t &row, bool add_if_new, int match_width) {
return FindCurrentRow(input_buffer, row, add_if_new);
};
int64_t NoRows() { return int64_t(no_rows); } // rows stored so far
virtual bool IsOnePass() = 0; // true if the aggregator is capable of storing all groups
// up to max_no_groups declared in Init()
......@@ -83,7 +87,8 @@ class ValueMatchingTable { // abstract class: interface for value matching
*/
static ValueMatchingTable *CreateNew_ValueMatchingTable(int64_t mem_available, int64_t max_no_groups,
int64_t max_group_code, int _total_width,
int _input_buf_width, int _match_width, uint32_t power);
int _input_buf_width, int _match_width, uint32_t power,
bool use_lookup_table = true);
protected:
int total_width; // whole row
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册