提交 4709b744 编写于 作者: M Martijn Bakker

address the code review issues

上级 3ac66dfd
#pragma once
#include <Common/PODArray.h>
namespace DB {
/**
* This class is intended to push sortable data into.
* When looking up values the container ensures that it is sorted for log(N) lookup
*
* Note, this is only efficient when the insertions happen in one stage, followed by all retrievals
* This way the data only gets sorted once.
*/
template <typename T, size_t INITIAL_SIZE = 4096, typename TAllocator = Allocator<false>>
class SortedLookupPODArray : private PaddedPODArray<T, INITIAL_SIZE, TAllocator>
{
public:
using Base = PaddedPODArray<T, INITIAL_SIZE, TAllocator>;
using Base::PODArray;
using Base::cbegin;
using Base::cend;
template <typename U, typename ... TAllocatorParams>
void insert(U && x, TAllocatorParams &&... allocator_params)
{
Base::push_back(std::forward<U>(x), std::forward<TAllocatorParams>(allocator_params)...);
sorted = false;
}
typename Base::const_iterator upper_bound (const T& k)
{
if (!sorted)
this->sort();
return std::upper_bound(this->cbegin(), this->cend(), k);
}
private:
void sort()
{
std::sort(this->begin(), this->end());
sorted = true;
}
bool sorted = false;
};
}
\ No newline at end of file
......@@ -363,98 +363,19 @@ void Join::setSampleBlock(const Block & block)
convertColumnToNullable(sample_block_with_columns_to_add.getByPosition(i));
}
void Join::AsofRowRefs::Lookups::create(Join::AsofRowRefs::AsofType which)
{
switch (which)
{
case AsofType::EMPTY: break;
#define M(NAME, TYPE) \
case AsofType::NAME: NAME = std::make_unique<typename decltype(NAME)::element_type>(); break;
APPLY_FOR_ASOF_JOIN_VARIANTS(M)
#undef M
}
}
template<typename T>
using AsofGetterType = ColumnsHashing::HashMethodOneNumber<T, T, T, false>;
void Join::AsofRowRefs::insert(const IColumn * asof_column, const Block * block, size_t row_num, Arena & pool)
{
assert(!sorted);
switch (type)
{
case AsofType::EMPTY: break;
#define M(NAME, TYPE) \
case AsofType::NAME: { \
auto asof_getter = AsofGetterType<TYPE>(asof_column); \
auto entry = Entry<TYPE>(asof_getter.getKey(row_num, pool), RowRef(block, row_num)); \
lookups.NAME->push_back(entry); \
break; \
}
APPLY_FOR_ASOF_JOIN_VARIANTS(M)
#undef M
}
}
const Join::RowRef * Join::AsofRowRefs::findAsof(const IColumn * asof_column, size_t row_num, Arena & pool) const
{
if (!sorted)
{
// sort whenever needed
switch (type)
{
case AsofType::EMPTY: break;
#define M(NAME, TYPE) \
case AsofType::NAME: std::sort(lookups.NAME->begin(), lookups.NAME->end()); break;
APPLY_FOR_ASOF_JOIN_VARIANTS(M)
#undef M
}
sorted = true;
}
switch (type)
{
case AsofType::EMPTY: return nullptr;
#define M(NAME, TYPE) \
case AsofType::NAME: { \
auto asof_getter = AsofGetterType<TYPE>(asof_column); \
TYPE key = asof_getter.getKey(row_num, pool); \
auto it = std::upper_bound(lookups.NAME->cbegin(), lookups.NAME->cend(), Entry<TYPE>(key)); \
if (it == lookups.NAME->cbegin()) \
return nullptr; \
return &((--it)->row_ref); \
}
APPLY_FOR_ASOF_JOIN_VARIANTS(M)
#undef M
}
__builtin_unreachable();
}
std::optional<std::pair<Join::AsofRowRefs::AsofType, size_t>> Join::AsofRowRefs::getTypeSize(const IColumn * asof_column)
{
#define M(NAME, TYPE) \
if (strcmp(#TYPE, asof_column->getFamilyName()) == 0) \
return std::make_pair(AsofType::NAME,sizeof(TYPE));
APPLY_FOR_ASOF_JOIN_VARIANTS(M)
#undef M
return {};
}
namespace
{
/// Inserting an element into a hash table of the form `key -> reference to a string`, which will then be used by JOIN.
template <ASTTableJoin::Strictness STRICTNESS, typename Map, typename KeyGetter>
struct Inserter
{
static void insert(const Join *, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool);
static void insert(const Join &, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool);
};
template <typename Map, typename KeyGetter>
struct Inserter<ASTTableJoin::Strictness::Any, Map, KeyGetter>
{
static ALWAYS_INLINE void insert(const Join *, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool)
static ALWAYS_INLINE void insert(const Join &, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool)
{
auto emplace_result = key_getter.emplaceKey(map, i, pool);
......@@ -466,7 +387,7 @@ namespace
template <typename Map, typename KeyGetter>
struct Inserter<ASTTableJoin::Strictness::All, Map, KeyGetter>
{
static ALWAYS_INLINE void insert(const Join *, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool)
static ALWAYS_INLINE void insert(const Join &, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool)
{
auto emplace_result = key_getter.emplaceKey(map, i, pool);
......@@ -492,13 +413,13 @@ namespace
template <typename Map, typename KeyGetter>
struct Inserter<ASTTableJoin::Strictness::Asof, Map, KeyGetter>
{
static ALWAYS_INLINE void insert(const Join * join, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool, const IColumn * asof_column)
static ALWAYS_INLINE void insert(const Join & join, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool, const IColumn * asof_column)
{
auto emplace_result = key_getter.emplaceKey(map, i, pool);
typename Map::mapped_type * time_series_map = &emplace_result.getMapped();
if (emplace_result.isInserted())
time_series_map = new (time_series_map) typename Map::mapped_type(join->getAsofType());
time_series_map = new (time_series_map) typename Map::mapped_type(join.getAsofType());
time_series_map->insert(asof_column, stored_block, i, pool);
}
};
......@@ -506,7 +427,7 @@ namespace
template <ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map, bool has_null_map>
void NO_INLINE insertFromBlockImplTypeCase(
const Join * join, Map & map, size_t rows, const ColumnRawPtrs & key_columns,
const Join & join, Map & map, size_t rows, const ColumnRawPtrs & key_columns,
const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool)
{
const IColumn * asof_column [[maybe_unused]] = nullptr;
......@@ -530,7 +451,7 @@ namespace
template <ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map>
void insertFromBlockImplType(
const Join * join, Map & map, size_t rows, const ColumnRawPtrs & key_columns,
const Join & join, Map & map, size_t rows, const ColumnRawPtrs & key_columns,
const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool)
{
if (null_map)
......@@ -542,7 +463,7 @@ namespace
template <ASTTableJoin::Strictness STRICTNESS, typename Maps>
void insertFromBlockImpl(
const Join * join, Join::Type type, Maps & maps, size_t rows, const ColumnRawPtrs & key_columns,
const Join & join, Join::Type type, Maps & maps, size_t rows, const ColumnRawPtrs & key_columns,
const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool)
{
switch (type)
......@@ -640,7 +561,7 @@ bool Join::insertFromBlock(const Block & block)
{
dispatch([&](auto, auto strictness_, auto & map)
{
insertFromBlockImpl<strictness_>(this, type, map, rows, key_columns, key_sizes, stored_block, null_map, pool);
insertFromBlockImpl<strictness_>(*this, type, map, rows, key_columns, key_sizes, stored_block, null_map, pool);
});
}
......@@ -775,14 +696,16 @@ std::unique_ptr<IColumn::Offsets> NO_INLINE joinRightIndexedColumns(
auto & mapped = find_result.getMapped();
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof)
if (const Join::RowRef * found = mapped.findAsof(asof_column, i, pool))
{
if (const RowRef * found = mapped.findAsof(asof_column, i, pool))
{
filter[i] = 1;
mapped.setUsed();
added_columns.appendFromBlock(*found->block, found->row_num);
filter[i] = 1;
mapped.setUsed();
added_columns.appendFromBlock(*found->block, found->row_num);
}
else
addNotFoundRow<_add_missing>(added_columns, current_offset);
}
else
{
filter[i] = 1;
......
......@@ -6,6 +6,7 @@
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Interpreters/AggregationCommon.h>
#include <Interpreters/RowRefs.h>
#include <Core/SettingsCommon.h>
#include <Common/Arena.h>
......@@ -130,88 +131,7 @@ public:
size_t getTotalByteCount() const;
ASTTableJoin::Kind getKind() const { return kind; }
/// Reference to the row in block.
struct RowRef
{
const Block * block = nullptr;
size_t row_num = 0;
RowRef() {}
RowRef(const Block * block_, size_t row_num_) : block(block_), row_num(row_num_) {}
};
/// Single linked list of references to rows. Used for ALL JOINs (non-unique JOINs)
struct RowRefList : RowRef
{
RowRefList * next = nullptr;
RowRefList() {}
RowRefList(const Block * block_, size_t row_num_) : RowRef(block_, row_num_) {}
};
struct AsofRowRefs
{
/// Different types of asof join keys
#define APPLY_FOR_ASOF_JOIN_VARIANTS(M) \
M(key32, UInt32) \
M(key64, UInt64) \
M(keyf32, Float32) \
M(keyf64, Float64)
enum class AsofType
{
EMPTY,
#define M(NAME, TYPE) NAME,
APPLY_FOR_ASOF_JOIN_VARIANTS(M)
#undef M
};
static std::optional<std::pair<AsofType, size_t>> getTypeSize(const IColumn * asof_column);
template<typename T>
struct Entry
{
T asof_value;
RowRef row_ref;
Entry(T v) : asof_value(v) {}
Entry(T v, RowRef rr) : asof_value(v), row_ref(rr) {}
bool operator< (const Entry& o) const
{
return asof_value < o.asof_value;
}
};
struct Lookups
{
#define M(NAME, TYPE) \
std::unique_ptr<PODArray<Entry<TYPE>>> NAME;
APPLY_FOR_ASOF_JOIN_VARIANTS(M)
#undef M
void create(AsofType which);
};
AsofRowRefs() : type(AsofType::EMPTY) {}
AsofRowRefs(AsofType t) : type(t)
{
lookups.create(t);
}
void insert(const IColumn * asof_column, const Block * block, size_t row_num, Arena & pool);
const RowRef * findAsof(const IColumn * asof_column, size_t row_num, Arena & pool) const;
private:
const AsofType type;
mutable Lookups lookups;
mutable bool sorted = false;
};
AsofRowRefs::AsofType getAsofType() const { return asof_type; }
AsofRowRefs::Type getAsofType() const { return asof_type; }
/** Depending on template parameter, adds or doesn't add a flag, that element was used (row was joined).
* Depending on template parameter, decide whether to overwrite existing values when encountering the same key again
......@@ -446,7 +366,7 @@ private:
private:
Type type = Type::EMPTY;
AsofRowRefs::AsofType asof_type = AsofRowRefs::AsofType::EMPTY;
AsofRowRefs::Type asof_type = AsofRowRefs::Type::EMPTY;
static Type chooseMethod(const ColumnRawPtrs & key_columns, Sizes & key_sizes);
......
#include <Interpreters/RowRefs.h>
#include <Common/ColumnsHashing.h>
#include <Core/Block.h>
#include <Columns/IColumn.h>
#include <optional>
namespace DB
{
void AsofRowRefs::Lookups::create(AsofRowRefs::Type which)
{
switch (which)
{
case Type::EMPTY: break;
#define M(NAME, TYPE) \
case Type::NAME: NAME = std::make_unique<typename decltype(NAME)::element_type>(); break;
APPLY_FOR_ASOF_JOIN_VARIANTS(M)
#undef M
}
}
template<typename T>
using AsofGetterType = ColumnsHashing::HashMethodOneNumber<T, T, T, false>;
void AsofRowRefs::insert(const IColumn * asof_column, const Block * block, size_t row_num, Arena & pool)
{
switch (type)
{
case Type::EMPTY: break;
#define M(NAME, TYPE) \
case Type::NAME: { \
auto asof_getter = AsofGetterType<TYPE>(asof_column); \
auto entry = Entry<TYPE>(asof_getter.getKey(row_num, pool), RowRef(block, row_num)); \
lookups.NAME->insert(entry); \
break; \
}
APPLY_FOR_ASOF_JOIN_VARIANTS(M)
#undef M
}
}
const RowRef * AsofRowRefs::findAsof(const IColumn * asof_column, size_t row_num, Arena & pool) const
{
switch (type)
{
case Type::EMPTY: return nullptr;
#define M(NAME, TYPE) \
case Type::NAME: { \
auto asof_getter = AsofGetterType<TYPE>(asof_column); \
TYPE key = asof_getter.getKey(row_num, pool); \
auto it = lookups.NAME->upper_bound(Entry<TYPE>(key)); \
if (it == lookups.NAME->cbegin()) \
return nullptr; \
return &((--it)->row_ref); \
}
APPLY_FOR_ASOF_JOIN_VARIANTS(M)
#undef M
}
__builtin_unreachable();
}
std::optional<std::pair<AsofRowRefs::Type, size_t>> AsofRowRefs::getTypeSize(const IColumn * asof_column)
{
#define M(NAME, TYPE) \
if (strcmp(#TYPE, asof_column->getFamilyName()) == 0) \
return std::make_pair(Type::NAME,sizeof(TYPE));
APPLY_FOR_ASOF_JOIN_VARIANTS(M)
#undef M
return {};
}
}
\ No newline at end of file
#pragma once
#include <Columns/IColumn.h>
#include <Common/SortedLookupPODArray.h>
#include <optional>
namespace DB
{
class Block;
/// Reference to the row in block.
struct RowRef
{
const Block * block = nullptr;
size_t row_num = 0;
RowRef() {}
RowRef(const Block * block_, size_t row_num_) : block(block_), row_num(row_num_) {}
};
/// Single linked list of references to rows. Used for ALL JOINs (non-unique JOINs)
struct RowRefList : RowRef
{
RowRefList * next = nullptr;
RowRefList() {}
RowRefList(const Block * block_, size_t row_num_) : RowRef(block_, row_num_) {}
};
class AsofRowRefs
{
public:
/// Different types of asof join keys
#define APPLY_FOR_ASOF_JOIN_VARIANTS(M) \
M(key32, UInt32) \
M(key64, UInt64) \
M(keyf32, Float32) \
M(keyf64, Float64)
enum class Type
{
EMPTY,
#define M(NAME, TYPE) NAME,
APPLY_FOR_ASOF_JOIN_VARIANTS(M)
#undef M
};
static std::optional<std::pair<Type, size_t>> getTypeSize(const IColumn * asof_column);
template<typename T>
struct Entry
{
T asof_value;
RowRef row_ref;
Entry(T v) : asof_value(v) {}
Entry(T v, RowRef rr) : asof_value(v), row_ref(rr) {}
bool operator< (const Entry& o) const
{
return asof_value < o.asof_value;
}
};
struct Lookups
{
#define M(NAME, TYPE) \
std::unique_ptr<SortedLookupPODArray<Entry<TYPE>>> NAME;
APPLY_FOR_ASOF_JOIN_VARIANTS(M)
#undef M
void create(Type which);
};
AsofRowRefs() : type(Type::EMPTY) {}
AsofRowRefs(Type t) : type(t)
{
lookups.create(t);
}
void insert(const IColumn * asof_column, const Block * block, size_t row_num, Arena & pool);
const RowRef * findAsof(const IColumn * asof_column, size_t row_num, Arena & pool) const;
private:
const Type type;
mutable Lookups lookups;
};
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册