From f96a7e401e2605b13cc64edc06dec56575321bb8 Mon Sep 17 00:00:00 2001 From: Martijn Bakker Date: Fri, 5 Apr 2019 18:59:48 +0100 Subject: [PATCH] remove the list lookup and prevent the global lock --- dbms/src/Common/SortedLookupPODArray.h | 52 --------------- dbms/src/Interpreters/Join.cpp | 12 +--- dbms/src/Interpreters/Join.h | 3 - dbms/src/Interpreters/RowRefs.cpp | 44 +++++++------ dbms/src/Interpreters/RowRefs.h | 90 +++++++++++++++++++++----- 5 files changed, 100 insertions(+), 101 deletions(-) delete mode 100644 dbms/src/Common/SortedLookupPODArray.h diff --git a/dbms/src/Common/SortedLookupPODArray.h b/dbms/src/Common/SortedLookupPODArray.h deleted file mode 100644 index d9b03f5704..0000000000 --- a/dbms/src/Common/SortedLookupPODArray.h +++ /dev/null @@ -1,52 +0,0 @@ -#pragma once - -#include -//#include - -namespace DB -{ - -/** - * This class is intended to push sortable data into. - * When looking up values the container ensures that it is sorted for log(N) lookup - * - * Note, this is only efficient when the insertions happen in one stage, followed by all retrievals - * This way the data only gets sorted once. - */ - -template -class SortedLookupPODArray -{ -public: - using Base = std::vector; - //using Base = PaddedPODArray; - - template - void insert(U && x, TAllocatorParams &&... allocator_params) - { - array.push_back(std::forward(x), std::forward(allocator_params)...); - sorted = false; - } - - typename Base::const_iterator upper_bound(const T & k) - { - if (!sorted) - sort(); - return std::upper_bound(array.cbegin(), array.cend(), k); - } - - typename Base::const_iterator cbegin() const { return array.cbegin(); } - typename Base::const_iterator cend() const { return array.cend(); } - -private: - Base array; - bool sorted = false; - - void sort() - { - std::sort(array.begin(), array.end()); - sorted = true; - } -}; - -} diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index 7faaac5f60..08d4233179 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -422,8 +422,8 @@ namespace typename Map::mapped_type * time_series_map = &emplace_result.getMapped(); if (emplace_result.isInserted()) - time_series_map = new (time_series_map) typename Map::mapped_type(); - time_series_map->insert(join.getAsofType(), join.getAsofData(), asof_column, stored_block, i); + time_series_map = new (time_series_map) typename Map::mapped_type(join.getAsofType()); + time_series_map->insert(join.getAsofType(), asof_column, stored_block, i); } }; @@ -511,10 +511,7 @@ void Join::prepareBlockListStructure(Block & stored_block) for (const auto & name : key_names_right) { if (strictness == ASTTableJoin::Strictness::Asof && name == key_names_right.back()) - { - LOG_DEBUG(log, "preventing removal of ASOF join column with name=" << name); break; // this is the last column so break is OK - } if (!erased.count(name)) stored_block.erase(stored_block.getPositionByName(name)); @@ -556,8 +553,6 @@ bool Join::insertFromBlock(const Block & block) prepareBlockListStructure(*stored_block); - LOG_DEBUG(log, "insertFromBlock stored_block=" << stored_block->dumpStructure()); - size_t size = stored_block->columns(); /// Rare case, when joined columns are constant. To avoid code bloat, simply materialize them. @@ -720,7 +715,7 @@ std::unique_ptr NO_INLINE joinRightIndexedColumns( if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof) { - if (const RowRef * found = mapped.findAsof(join.getAsofType(), join.getAsofData(), asof_column, i)) + if (const RowRef * found = mapped.findAsof(join.getAsofType(), asof_column, i)) { filter[i] = 1; mapped.setUsed(); @@ -1096,7 +1091,6 @@ void Join::joinGet(Block & block, const String & column_name) const void Join::joinBlock(Block & block, const Names & key_names_left, const NamesAndTypesList & columns_added_by_join) const { std::shared_lock lock(rwlock); - LOG_DEBUG(log, "joinBlock: " << block.dumpStructure()); checkTypesOfKeys(block, key_names_left, sample_block_with_keys); diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index 85255aaaaa..7a223f46b3 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -132,8 +132,6 @@ public: ASTTableJoin::Kind getKind() const { return kind; } AsofRowRefs::Type getAsofType() const { return *asof_type; } - AsofRowRefs::LookupLists & getAsofData() { return asof_lookup_lists; } - const AsofRowRefs::LookupLists & getAsofData() const { return asof_lookup_lists; } /** Depending on template parameter, adds or doesn't add a flag, that element was used (row was joined). * Depending on template parameter, decide whether to overwrite existing values when encountering the same key again @@ -369,7 +367,6 @@ private: private: Type type = Type::EMPTY; std::optional asof_type; - AsofRowRefs::LookupLists asof_lookup_lists; static Type chooseMethod(const ColumnRawPtrs & key_columns, Sizes & key_sizes); diff --git a/dbms/src/Interpreters/RowRefs.cpp b/dbms/src/Interpreters/RowRefs.cpp index 9fea981913..46e665ab42 100644 --- a/dbms/src/Interpreters/RowRefs.cpp +++ b/dbms/src/Interpreters/RowRefs.cpp @@ -30,51 +30,53 @@ void callWithType(AsofRowRefs::Type which, F && f) } // namespace -void AsofRowRefs::insert(Type type, LookupLists & lookup_data, const IColumn * asof_column, const Block * block, size_t row_num) +AsofRowRefs::AsofRowRefs(Type type) +{ + auto call = [&](const auto & t) + { + using T = std::decay_t; + using LookupType = typename Entry::LookupType; + lookups = std::make_unique(); + }; + + callWithType(type, call); +} + +void AsofRowRefs::insert(Type type, const IColumn * asof_column, const Block * block, size_t row_num) { auto call = [&](const auto & t) { using T = std::decay_t; - using LookupType = typename Entry::LookupType; + using LookupPtr = typename Entry::LookupPtr; auto * column = typeid_cast *>(asof_column); T key = column->getElement(row_num); auto entry = Entry(key, RowRef(block, row_num)); - - std::lock_guard lock(lookup_data.mutex); - - if (!lookups) - { - lookup_data.lookups.push_back(Lookups()); - lookup_data.lookups.back() = LookupType(); - lookups = &lookup_data.lookups.back(); - } - std::get(*lookups).insert(entry); + std::get(lookups)->insert(entry); }; callWithType(type, call); } -const RowRef * AsofRowRefs::findAsof(Type type, const LookupLists & lookup_data, const IColumn * asof_column, size_t row_num) const +const RowRef * AsofRowRefs::findAsof(Type type, const IColumn * asof_column, size_t row_num) const { const RowRef * out = nullptr; auto call = [&](const auto & t) { using T = std::decay_t; - using LookupType = typename Entry::LookupType; + using LookupPtr = typename Entry::LookupPtr; auto * column = typeid_cast *>(asof_column); T key = column->getElement(row_num); + auto & typed_lookup = std::get(lookups); - std::lock_guard lock(lookup_data.mutex); - - if (!lookups) - return; + // The first thread that calls upper_bound ensures that the data is sorted + auto it = typed_lookup->upper_bound(Entry(key)); - auto & typed_lookup = std::get(*lookups); - auto it = typed_lookup.upper_bound(Entry(key)); - if (it != typed_lookup.cbegin()) + // cbegin() is safe to call now because the array is immutable after sorting + // hence the pointer to a entry can be returned + if (it != typed_lookup->cbegin()) out = &((--it)->row_ref); }; diff --git a/dbms/src/Interpreters/RowRefs.h b/dbms/src/Interpreters/RowRefs.h index 227fba965b..1dad479d62 100644 --- a/dbms/src/Interpreters/RowRefs.h +++ b/dbms/src/Interpreters/RowRefs.h @@ -1,12 +1,12 @@ #pragma once #include -#include #include #include #include #include +#include namespace DB { @@ -32,14 +32,68 @@ struct RowRefList : RowRef RowRefList(const Block * block_, size_t row_num_) : RowRef(block_, row_num_) {} }; +/** + * This class is intended to push sortable data into. + * When looking up values the container ensures that it is sorted for log(N) lookup + * After calling any of the lookup methods, it is no longer allowed to insert more data as this would invalidate the + * references that can be returned by the lookup methods + */ + +template +class SortedLookupVector +{ +public: + using Base = std::vector; + + // First stage, insertions into the vector + template + void insert(U && x, TAllocatorParams &&... allocator_params) + { + assert(!sorted.load(std::memory_order_acquire)); + array.push_back(std::forward(x), std::forward(allocator_params)...); + } + + // Transition into second stage, ensures that the vector is sorted + typename Base::const_iterator upper_bound(const T & k) + { + sort(); + return std::upper_bound(array.cbegin(), array.cend(), k); + } + + // After ensuring that the vector is sorted by calling a lookup these are safe to call + typename Base::const_iterator cbegin() const { return array.cbegin(); } + typename Base::const_iterator cend() const { return array.cend(); } + +private: + Base array; + std::atomic sorted = false; + mutable std::mutex lock; + + // Double checked locking with SC atomics works in C++ + // https://preshing.com/20130930/double-checked-locking-is-fixed-in-cpp11/ + // The first thread that calls one of the lookup methods sorts the data + // After calling the first lookup method it is no longer allowed to insert any data + // the array becomes immutable + void sort() + { + if(!sorted.load(std::memory_order_acquire)) { + std::lock_guard l(lock); + if(!sorted.load(std::memory_order_relaxed)) { + std::sort(array.begin(), array.end()); + sorted.store(true, std::memory_order_release); + } + } + } +}; + class AsofRowRefs { public: template struct Entry { - using LookupType = SortedLookupPODArray>; - + using LookupType = SortedLookupVector>; + using LookupPtr = std::unique_ptr; T asof_value; RowRef row_ref; @@ -53,16 +107,10 @@ public: }; using Lookups = std::variant< - Entry::LookupType, - Entry::LookupType, - Entry::LookupType, - Entry::LookupType>; - - struct LookupLists - { - mutable std::mutex mutex; - std::list lookups; - }; + Entry::LookupPtr, + Entry::LookupPtr, + Entry::LookupPtr, + Entry::LookupPtr>; enum class Type { @@ -72,13 +120,23 @@ public: keyf64, }; + AsofRowRefs() {} + AsofRowRefs(Type t); + static std::optional getTypeSize(const IColumn * asof_column, size_t & type_size); - void insert(Type type, LookupLists &, const IColumn * asof_column, const Block * block, size_t row_num); - const RowRef * findAsof(Type type, const LookupLists &, const IColumn * asof_column, size_t row_num) const; + // This will be synchronized by the rwlock mutex in Join.h + void insert(Type type, const IColumn * asof_column, const Block * block, size_t row_num); + + // This will internally synchronize + const RowRef * findAsof(Type type, const IColumn * asof_column, size_t row_num) const; private: - Lookups * lookups = nullptr; + // Lookups can be stored in a HashTable because it is memmovable + // A std::variant contains a currently active type id (memmovable), together with a union of the types + // The types are all std::unique_ptr, which contains a single pointer, which is memmovable. + // Source: https://github.com/yandex/ClickHouse/issues/4906 + Lookups lookups; }; } -- GitLab