提交 f96a7e40 编写于 作者: M Martijn Bakker

remove the list lookup and prevent the global lock

上级 97dd0e2a
#pragma once
#include <vector>
//#include <Common/PODArray.h>
namespace DB
{
/**
* This class is intended to push sortable data into.
* When looking up values the container ensures that it is sorted for log(N) lookup
*
* Note, this is only efficient when the insertions happen in one stage, followed by all retrievals
* This way the data only gets sorted once.
*/
template <typename T>
class SortedLookupPODArray
{
public:
using Base = std::vector<T>;
//using Base = PaddedPODArray<T>;
template <typename U, typename ... TAllocatorParams>
void insert(U && x, TAllocatorParams &&... allocator_params)
{
array.push_back(std::forward<U>(x), std::forward<TAllocatorParams>(allocator_params)...);
sorted = false;
}
typename Base::const_iterator upper_bound(const T & k)
{
if (!sorted)
sort();
return std::upper_bound(array.cbegin(), array.cend(), k);
}
typename Base::const_iterator cbegin() const { return array.cbegin(); }
typename Base::const_iterator cend() const { return array.cend(); }
private:
Base array;
bool sorted = false;
void sort()
{
std::sort(array.begin(), array.end());
sorted = true;
}
};
}
......@@ -422,8 +422,8 @@ namespace
typename Map::mapped_type * time_series_map = &emplace_result.getMapped();
if (emplace_result.isInserted())
time_series_map = new (time_series_map) typename Map::mapped_type();
time_series_map->insert(join.getAsofType(), join.getAsofData(), asof_column, stored_block, i);
time_series_map = new (time_series_map) typename Map::mapped_type(join.getAsofType());
time_series_map->insert(join.getAsofType(), asof_column, stored_block, i);
}
};
......@@ -511,10 +511,7 @@ void Join::prepareBlockListStructure(Block & stored_block)
for (const auto & name : key_names_right)
{
if (strictness == ASTTableJoin::Strictness::Asof && name == key_names_right.back())
{
LOG_DEBUG(log, "preventing removal of ASOF join column with name=" << name);
break; // this is the last column so break is OK
}
if (!erased.count(name))
stored_block.erase(stored_block.getPositionByName(name));
......@@ -556,8 +553,6 @@ bool Join::insertFromBlock(const Block & block)
prepareBlockListStructure(*stored_block);
LOG_DEBUG(log, "insertFromBlock stored_block=" << stored_block->dumpStructure());
size_t size = stored_block->columns();
/// Rare case, when joined columns are constant. To avoid code bloat, simply materialize them.
......@@ -720,7 +715,7 @@ std::unique_ptr<IColumn::Offsets> NO_INLINE joinRightIndexedColumns(
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof)
{
if (const RowRef * found = mapped.findAsof(join.getAsofType(), join.getAsofData(), asof_column, i))
if (const RowRef * found = mapped.findAsof(join.getAsofType(), asof_column, i))
{
filter[i] = 1;
mapped.setUsed();
......@@ -1096,7 +1091,6 @@ void Join::joinGet(Block & block, const String & column_name) const
void Join::joinBlock(Block & block, const Names & key_names_left, const NamesAndTypesList & columns_added_by_join) const
{
std::shared_lock lock(rwlock);
LOG_DEBUG(log, "joinBlock: " << block.dumpStructure());
checkTypesOfKeys(block, key_names_left, sample_block_with_keys);
......
......@@ -132,8 +132,6 @@ public:
ASTTableJoin::Kind getKind() const { return kind; }
AsofRowRefs::Type getAsofType() const { return *asof_type; }
AsofRowRefs::LookupLists & getAsofData() { return asof_lookup_lists; }
const AsofRowRefs::LookupLists & getAsofData() const { return asof_lookup_lists; }
/** Depending on template parameter, adds or doesn't add a flag, that element was used (row was joined).
* Depending on template parameter, decide whether to overwrite existing values when encountering the same key again
......@@ -369,7 +367,6 @@ private:
private:
Type type = Type::EMPTY;
std::optional<AsofRowRefs::Type> asof_type;
AsofRowRefs::LookupLists asof_lookup_lists;
static Type chooseMethod(const ColumnRawPtrs & key_columns, Sizes & key_sizes);
......
......@@ -30,51 +30,53 @@ void callWithType(AsofRowRefs::Type which, F && f)
} // namespace
void AsofRowRefs::insert(Type type, LookupLists & lookup_data, const IColumn * asof_column, const Block * block, size_t row_num)
AsofRowRefs::AsofRowRefs(Type type)
{
auto call = [&](const auto & t)
{
using T = std::decay_t<decltype(t)>;
using LookupType = typename Entry<T>::LookupType;
lookups = std::make_unique<LookupType>();
};
callWithType(type, call);
}
void AsofRowRefs::insert(Type type, const IColumn * asof_column, const Block * block, size_t row_num)
{
auto call = [&](const auto & t)
{
using T = std::decay_t<decltype(t)>;
using LookupType = typename Entry<T>::LookupType;
using LookupPtr = typename Entry<T>::LookupPtr;
auto * column = typeid_cast<const ColumnVector<T> *>(asof_column);
T key = column->getElement(row_num);
auto entry = Entry<T>(key, RowRef(block, row_num));
std::lock_guard<std::mutex> lock(lookup_data.mutex);
if (!lookups)
{
lookup_data.lookups.push_back(Lookups());
lookup_data.lookups.back() = LookupType();
lookups = &lookup_data.lookups.back();
}
std::get<LookupType>(*lookups).insert(entry);
std::get<LookupPtr>(lookups)->insert(entry);
};
callWithType(type, call);
}
const RowRef * AsofRowRefs::findAsof(Type type, const LookupLists & lookup_data, const IColumn * asof_column, size_t row_num) const
const RowRef * AsofRowRefs::findAsof(Type type, const IColumn * asof_column, size_t row_num) const
{
const RowRef * out = nullptr;
auto call = [&](const auto & t)
{
using T = std::decay_t<decltype(t)>;
using LookupType = typename Entry<T>::LookupType;
using LookupPtr = typename Entry<T>::LookupPtr;
auto * column = typeid_cast<const ColumnVector<T> *>(asof_column);
T key = column->getElement(row_num);
auto & typed_lookup = std::get<LookupPtr>(lookups);
std::lock_guard<std::mutex> lock(lookup_data.mutex);
if (!lookups)
return;
// The first thread that calls upper_bound ensures that the data is sorted
auto it = typed_lookup->upper_bound(Entry<T>(key));
auto & typed_lookup = std::get<LookupType>(*lookups);
auto it = typed_lookup.upper_bound(Entry<T>(key));
if (it != typed_lookup.cbegin())
// cbegin() is safe to call now because the array is immutable after sorting
// hence the pointer to a entry can be returned
if (it != typed_lookup->cbegin())
out = &((--it)->row_ref);
};
......
#pragma once
#include <Columns/IColumn.h>
#include <Common/SortedLookupPODArray.h>
#include <optional>
#include <variant>
#include <list>
#include <mutex>
#include <algorithm>
namespace DB
{
......@@ -32,14 +32,68 @@ struct RowRefList : RowRef
RowRefList(const Block * block_, size_t row_num_) : RowRef(block_, row_num_) {}
};
/**
* This class is intended to push sortable data into.
* When looking up values the container ensures that it is sorted for log(N) lookup
* After calling any of the lookup methods, it is no longer allowed to insert more data as this would invalidate the
* references that can be returned by the lookup methods
*/
template <typename T>
class SortedLookupVector
{
public:
using Base = std::vector<T>;
// First stage, insertions into the vector
template <typename U, typename ... TAllocatorParams>
void insert(U && x, TAllocatorParams &&... allocator_params)
{
assert(!sorted.load(std::memory_order_acquire));
array.push_back(std::forward<U>(x), std::forward<TAllocatorParams>(allocator_params)...);
}
// Transition into second stage, ensures that the vector is sorted
typename Base::const_iterator upper_bound(const T & k)
{
sort();
return std::upper_bound(array.cbegin(), array.cend(), k);
}
// After ensuring that the vector is sorted by calling a lookup these are safe to call
typename Base::const_iterator cbegin() const { return array.cbegin(); }
typename Base::const_iterator cend() const { return array.cend(); }
private:
Base array;
std::atomic<bool> sorted = false;
mutable std::mutex lock;
// Double checked locking with SC atomics works in C++
// https://preshing.com/20130930/double-checked-locking-is-fixed-in-cpp11/
// The first thread that calls one of the lookup methods sorts the data
// After calling the first lookup method it is no longer allowed to insert any data
// the array becomes immutable
void sort()
{
if(!sorted.load(std::memory_order_acquire)) {
std::lock_guard<std::mutex> l(lock);
if(!sorted.load(std::memory_order_relaxed)) {
std::sort(array.begin(), array.end());
sorted.store(true, std::memory_order_release);
}
}
}
};
class AsofRowRefs
{
public:
template <typename T>
struct Entry
{
using LookupType = SortedLookupPODArray<Entry<T>>;
using LookupType = SortedLookupVector<Entry<T>>;
using LookupPtr = std::unique_ptr<LookupType>;
T asof_value;
RowRef row_ref;
......@@ -53,16 +107,10 @@ public:
};
using Lookups = std::variant<
Entry<UInt32>::LookupType,
Entry<UInt64>::LookupType,
Entry<Float32>::LookupType,
Entry<Float64>::LookupType>;
struct LookupLists
{
mutable std::mutex mutex;
std::list<Lookups> lookups;
};
Entry<UInt32>::LookupPtr,
Entry<UInt64>::LookupPtr,
Entry<Float32>::LookupPtr,
Entry<Float64>::LookupPtr>;
enum class Type
{
......@@ -72,13 +120,23 @@ public:
keyf64,
};
AsofRowRefs() {}
AsofRowRefs(Type t);
static std::optional<Type> getTypeSize(const IColumn * asof_column, size_t & type_size);
void insert(Type type, LookupLists &, const IColumn * asof_column, const Block * block, size_t row_num);
const RowRef * findAsof(Type type, const LookupLists &, const IColumn * asof_column, size_t row_num) const;
// This will be synchronized by the rwlock mutex in Join.h
void insert(Type type, const IColumn * asof_column, const Block * block, size_t row_num);
// This will internally synchronize
const RowRef * findAsof(Type type, const IColumn * asof_column, size_t row_num) const;
private:
Lookups * lookups = nullptr;
// Lookups can be stored in a HashTable because it is memmovable
// A std::variant contains a currently active type id (memmovable), together with a union of the types
// The types are all std::unique_ptr, which contains a single pointer, which is memmovable.
// Source: https://github.com/yandex/ClickHouse/issues/4906
Lookups lookups;
};
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册