未验证 提交 f87e3ae4 编写于 作者: A Artem Zuikov 提交者: GitHub

Merge pull request #4863 from Gladdy/martijn-asof-nomap

Perform ASOF join with sorting PODArray once and support multiple column types
#pragma once
#include <Common/PODArray.h>
namespace DB
{
/**
* This class is intended to push sortable data into.
* When looking up values the container ensures that it is sorted for log(N) lookup
*
* Note, this is only efficient when the insertions happen in one stage, followed by all retrievals
* This way the data only gets sorted once.
*/
template <typename T, size_t INITIAL_SIZE = 4096, typename TAllocator = Allocator<false>>
class SortedLookupPODArray : private PaddedPODArray<T, INITIAL_SIZE, TAllocator>
{
public:
using Base = PaddedPODArray<T, INITIAL_SIZE, TAllocator>;
using Base::PODArray;
using Base::cbegin;
using Base::cend;
template <typename U, typename ... TAllocatorParams>
void insert(U && x, TAllocatorParams &&... allocator_params)
{
Base::push_back(std::forward<U>(x), std::forward<TAllocatorParams>(allocator_params)...);
sorted = false;
}
typename Base::const_iterator upper_bound (const T& k)
{
if (!sorted)
this->sort();
return std::upper_bound(this->cbegin(), this->cend(), k);
}
private:
void sort()
{
std::sort(this->begin(), this->end());
sorted = true;
}
bool sorted = false;
};
}
\ No newline at end of file
......@@ -298,12 +298,18 @@ void Join::setSampleBlock(const Block & block)
if (kind != ASTTableJoin::Kind::Left and kind != ASTTableJoin::Kind::Inner)
throw Exception("ASOF only supports LEFT and INNER as base joins", ErrorCodes::NOT_IMPLEMENTED);
if (key_columns.back()->sizeOfValueIfFixed() != sizeof(ASOFTimeType))
const IColumn * asof_column = key_columns.back();
size_t asof_size;
if (auto t = AsofRowRefs::getTypeSize(asof_column))
std::tie(asof_type, asof_size) = *t;
else
{
std::string msg = "ASOF join column needs to have size ";
msg += std::to_string(sizeof(ASOFTimeType));
std::string msg = "ASOF join not supported for type";
msg += asof_column->getFamilyName();
throw Exception(msg, ErrorCodes::BAD_TYPE_OF_FIELD);
}
key_columns.pop_back();
if (key_columns.empty())
......@@ -314,7 +320,7 @@ void Join::setSampleBlock(const Block & block)
/// Therefore, add it back in such that it can be extracted appropriately from the full stored
/// key_columns and key_sizes
init(chooseMethod(key_columns, key_sizes));
key_sizes.push_back(sizeof(ASOFTimeType));
key_sizes.push_back(asof_size);
}
else
{
......@@ -357,47 +363,19 @@ void Join::setSampleBlock(const Block & block)
convertColumnToNullable(sample_block_with_columns_to_add.getByPosition(i));
}
void Join::TSRowRef::insert(Join::ASOFTimeType t, const Block * block, size_t row_num)
{
ts.insert(std::pair(t, RowRef(block, row_num)));
}
std::string Join::TSRowRef::dumpStructure() const
{
std::stringstream ss;
for (auto const& x : ts)
{
ss << "(t=" << x.first << " row_num=" << x.second.row_num << " ptr=" << x.second.block << "),";
}
return ss.str();
}
size_t Join::TSRowRef::size() const
{
return ts.size();
}
std::optional<std::pair<Join::ASOFTimeType, Join::RowRef>> Join::TSRowRef::findAsof(Join::ASOFTimeType t) const
{
auto it = ts.upper_bound(t);
if (it == ts.cbegin())
return {};
return *(--it);
}
namespace
{
/// Inserting an element into a hash table of the form `key -> reference to a string`, which will then be used by JOIN.
template <ASTTableJoin::Strictness STRICTNESS, typename Map, typename KeyGetter>
struct Inserter
{
static void insert(Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool);
static void insert(const Join &, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool);
};
template <typename Map, typename KeyGetter>
struct Inserter<ASTTableJoin::Strictness::Any, Map, KeyGetter>
{
static ALWAYS_INLINE void insert(Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool)
static ALWAYS_INLINE void insert(const Join &, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool)
{
auto emplace_result = key_getter.emplaceKey(map, i, pool);
......@@ -409,7 +387,7 @@ namespace
template <typename Map, typename KeyGetter>
struct Inserter<ASTTableJoin::Strictness::All, Map, KeyGetter>
{
static ALWAYS_INLINE void insert(Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool)
static ALWAYS_INLINE void insert(const Join &, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool)
{
auto emplace_result = key_getter.emplaceKey(map, i, pool);
......@@ -435,26 +413,21 @@ namespace
template <typename Map, typename KeyGetter>
struct Inserter<ASTTableJoin::Strictness::Asof, Map, KeyGetter>
{
template<typename AsofGetter>
static ALWAYS_INLINE void insert(Map & map, KeyGetter & key_getter, AsofGetter & asof_getter, Block * stored_block, size_t i, Arena & pool)
static ALWAYS_INLINE void insert(const Join & join, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool, const IColumn * asof_column)
{
auto emplace_result = key_getter.emplaceKey(map, i, pool);
typename Map::mapped_type * time_series_map = &emplace_result.getMapped();
if (emplace_result.isInserted())
{
time_series_map = new (time_series_map) typename Map::mapped_type();
}
auto k = asof_getter.getKey(i, pool);
time_series_map->insert(k, stored_block, i);
// std::cout << "inserted key into time series map=" << k << " result=" << time_series_map->dumpStructure() << std::endl;
time_series_map = new (time_series_map) typename Map::mapped_type(join.getAsofType());
time_series_map->insert(asof_column, stored_block, i, pool);
}
};
template <ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map, bool has_null_map>
void NO_INLINE insertFromBlockImplTypeCase(
Map & map, size_t rows, const ColumnRawPtrs & key_columns,
const Join & join, Map & map, size_t rows, const ColumnRawPtrs & key_columns,
const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool)
{
const IColumn * asof_column [[maybe_unused]] = nullptr;
......@@ -469,30 +442,28 @@ namespace
continue;
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof)
{
auto asof_getter = Join::AsofGetterType(asof_column);
Inserter<STRICTNESS, Map, KeyGetter>::insert(map, key_getter, asof_getter, stored_block, i, pool);
} else
Inserter<STRICTNESS, Map, KeyGetter>::insert(map, key_getter, stored_block, i, pool);
Inserter<STRICTNESS, Map, KeyGetter>::insert(join, map, key_getter, stored_block, i, pool, asof_column);
else
Inserter<STRICTNESS, Map, KeyGetter>::insert(join, map, key_getter, stored_block, i, pool);
}
}
template <ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map>
void insertFromBlockImplType(
Map & map, size_t rows, const ColumnRawPtrs & key_columns,
const Join & join, Map & map, size_t rows, const ColumnRawPtrs & key_columns,
const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool)
{
if (null_map)
insertFromBlockImplTypeCase<STRICTNESS, KeyGetter, Map, true>(map, rows, key_columns, key_sizes, stored_block, null_map, pool);
insertFromBlockImplTypeCase<STRICTNESS, KeyGetter, Map, true>(join, map, rows, key_columns, key_sizes, stored_block, null_map, pool);
else
insertFromBlockImplTypeCase<STRICTNESS, KeyGetter, Map, false>(map, rows, key_columns, key_sizes, stored_block, null_map, pool);
insertFromBlockImplTypeCase<STRICTNESS, KeyGetter, Map, false>(join, map, rows, key_columns, key_sizes, stored_block, null_map, pool);
}
template <ASTTableJoin::Strictness STRICTNESS, typename Maps>
void insertFromBlockImpl(
Join::Type type, Maps & maps, size_t rows, const ColumnRawPtrs & key_columns,
const Join & join, Join::Type type, Maps & maps, size_t rows, const ColumnRawPtrs & key_columns,
const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool)
{
switch (type)
......@@ -503,7 +474,7 @@ namespace
#define M(TYPE) \
case Join::Type::TYPE: \
insertFromBlockImplType<STRICTNESS, typename KeyGetterForType<Join::Type::TYPE, std::remove_reference_t<decltype(*maps.TYPE)>>::Type>(\
*maps.TYPE, rows, key_columns, key_sizes, stored_block, null_map, pool); \
join, *maps.TYPE, rows, key_columns, key_sizes, stored_block, null_map, pool); \
break;
APPLY_FOR_JOIN_VARIANTS(M)
#undef M
......@@ -590,7 +561,7 @@ bool Join::insertFromBlock(const Block & block)
{
dispatch([&](auto, auto strictness_, auto & map)
{
insertFromBlockImpl<strictness_>(type, map, rows, key_columns, key_sizes, stored_block, null_map, pool);
insertFromBlockImpl<strictness_>(*this, type, map, rows, key_columns, key_sizes, stored_block, null_map, pool);
});
}
......@@ -678,20 +649,6 @@ void addFoundRow(const typename Map::mapped_type & mapped, AddedColumns & added,
}
};
template <typename Map>
bool addFoundRowAsof(const typename Map::mapped_type & mapped, AddedColumns & added, IColumn::Offset & current_offset [[maybe_unused]], Join::ASOFTimeType asof_key)
{
if (auto v = mapped.findAsof(asof_key))
{
std::pair<Join::ASOFTimeType, Join::RowRef> res = *v;
// std::cout << "Adder::addFound" << " to_add" << num_columns_to_add << " i=" << i << " asof_key=" << asof_key << " found=" << res.first << std::endl;
added.appendFromBlock(*res.second.block, res.second.row_num);
return true;
}
// std::cout << "Adder::addFound" << " not found in map" << num_columns_to_add << " i=" << i << " asof_key=" << asof_key << std::endl;
return false;
}
template <bool _add_missing>
void addNotFoundRow(AddedColumns & added [[maybe_unused]], IColumn::Offset & current_offset [[maybe_unused]])
{
......@@ -740,14 +697,11 @@ std::unique_ptr<IColumn::Offsets> NO_INLINE joinRightIndexedColumns(
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof)
{
Join::AsofGetterType asof_getter(asof_column);
auto asof_key = asof_getter.getKey(i, pool);
bool actually_found = addFoundRowAsof<Map>(mapped, added_columns, current_offset, asof_key);
if (actually_found)
if (const RowRef * found = mapped.findAsof(asof_column, i, pool))
{
filter[i] = 1;
mapped.setUsed();
added_columns.appendFromBlock(*found->block, found->row_num);
}
else
addNotFoundRow<_add_missing>(added_columns, current_offset);
......
......@@ -6,6 +6,7 @@
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Interpreters/AggregationCommon.h>
#include <Interpreters/RowRefs.h>
#include <Core/SettingsCommon.h>
#include <Common/Arena.h>
......@@ -130,42 +131,7 @@ public:
size_t getTotalByteCount() const;
ASTTableJoin::Kind getKind() const { return kind; }
/// Reference to the row in block.
struct RowRef
{
const Block * block = nullptr;
size_t row_num = 0;
RowRef() {}
RowRef(const Block * block_, size_t row_num_) : block(block_), row_num(row_num_) {}
};
/// Single linked list of references to rows. Used for ALL JOINs (non-unique JOINs)
struct RowRefList : RowRef
{
RowRefList * next = nullptr;
RowRefList() {}
RowRefList(const Block * block_, size_t row_num_) : RowRef(block_, row_num_) {}
};
/// Map for a time series
using ASOFTimeType = UInt32;
using AsofGetterType = ColumnsHashing::HashMethodOneNumber<ASOFTimeType, ASOFTimeType, ASOFTimeType, false>;
struct TSRowRef
{
// TODO use the arena allocator to get memory for this
// This would require ditching std::map because std::allocator is incompatible with the arena allocator
std::map<ASOFTimeType, RowRef> ts;
TSRowRef() {}
void insert(ASOFTimeType t, const Block * block, size_t row_num);
std::optional<std::pair<ASOFTimeType, RowRef>> findAsof(ASOFTimeType t) const;
std::string dumpStructure() const;
size_t size() const;
};
AsofRowRefs::Type getAsofType() const { return asof_type; }
/** Depending on template parameter, adds or doesn't add a flag, that element was used (row was joined).
* Depending on template parameter, decide whether to overwrite existing values when encountering the same key again
......@@ -297,7 +263,7 @@ public:
using MapsAnyFull = MapsTemplate<WithFlags<true, false, RowRef>>;
using MapsAnyFullOverwrite = MapsTemplate<WithFlags<true, true, RowRef>>;
using MapsAllFull = MapsTemplate<WithFlags<true, false, RowRefList>>;
using MapsAsof = MapsTemplate<WithFlags<false, false, TSRowRef>>;
using MapsAsof = MapsTemplate<WithFlags<false, false, AsofRowRefs>>;
template <ASTTableJoin::Kind KIND>
struct KindTrait
......@@ -400,6 +366,7 @@ private:
private:
Type type = Type::EMPTY;
AsofRowRefs::Type asof_type = AsofRowRefs::Type::EMPTY;
static Type chooseMethod(const ColumnRawPtrs & key_columns, Sizes & key_sizes);
......
#include <Interpreters/RowRefs.h>
#include <Common/ColumnsHashing.h>
#include <Core/Block.h>
#include <Columns/IColumn.h>
#include <optional>
namespace DB
{
void AsofRowRefs::Lookups::create(AsofRowRefs::Type which)
{
switch (which)
{
case Type::EMPTY: break;
#define M(NAME, TYPE) \
case Type::NAME: NAME = std::make_unique<typename decltype(NAME)::element_type>(); break;
APPLY_FOR_ASOF_JOIN_VARIANTS(M)
#undef M
}
}
template<typename T>
using AsofGetterType = ColumnsHashing::HashMethodOneNumber<T, T, T, false>;
void AsofRowRefs::insert(const IColumn * asof_column, const Block * block, size_t row_num, Arena & pool)
{
switch (type)
{
case Type::EMPTY: break;
#define M(NAME, TYPE) \
case Type::NAME: { \
auto asof_getter = AsofGetterType<TYPE>(asof_column); \
auto entry = Entry<TYPE>(asof_getter.getKey(row_num, pool), RowRef(block, row_num)); \
lookups.NAME->insert(entry); \
break; \
}
APPLY_FOR_ASOF_JOIN_VARIANTS(M)
#undef M
}
}
const RowRef * AsofRowRefs::findAsof(const IColumn * asof_column, size_t row_num, Arena & pool) const
{
switch (type)
{
case Type::EMPTY: return nullptr;
#define M(NAME, TYPE) \
case Type::NAME: { \
auto asof_getter = AsofGetterType<TYPE>(asof_column); \
TYPE key = asof_getter.getKey(row_num, pool); \
auto it = lookups.NAME->upper_bound(Entry<TYPE>(key)); \
if (it == lookups.NAME->cbegin()) \
return nullptr; \
return &((--it)->row_ref); \
}
APPLY_FOR_ASOF_JOIN_VARIANTS(M)
#undef M
}
__builtin_unreachable();
}
std::optional<std::pair<AsofRowRefs::Type, size_t>> AsofRowRefs::getTypeSize(const IColumn * asof_column)
{
#define M(NAME, TYPE) \
if (strcmp(#TYPE, asof_column->getFamilyName()) == 0) \
return std::make_pair(Type::NAME,sizeof(TYPE));
APPLY_FOR_ASOF_JOIN_VARIANTS(M)
#undef M
return {};
}
}
\ No newline at end of file
#pragma once
#include <Columns/IColumn.h>
#include <Common/SortedLookupPODArray.h>
#include <optional>
namespace DB
{
class Block;
/// Reference to the row in block.
struct RowRef
{
const Block * block = nullptr;
size_t row_num = 0;
RowRef() {}
RowRef(const Block * block_, size_t row_num_) : block(block_), row_num(row_num_) {}
};
/// Single linked list of references to rows. Used for ALL JOINs (non-unique JOINs)
struct RowRefList : RowRef
{
RowRefList * next = nullptr;
RowRefList() {}
RowRefList(const Block * block_, size_t row_num_) : RowRef(block_, row_num_) {}
};
class AsofRowRefs
{
public:
/// Different types of asof join keys
#define APPLY_FOR_ASOF_JOIN_VARIANTS(M) \
M(key32, UInt32) \
M(key64, UInt64) \
M(keyf32, Float32) \
M(keyf64, Float64)
enum class Type
{
EMPTY,
#define M(NAME, TYPE) NAME,
APPLY_FOR_ASOF_JOIN_VARIANTS(M)
#undef M
};
static std::optional<std::pair<Type, size_t>> getTypeSize(const IColumn * asof_column);
template<typename T>
struct Entry
{
T asof_value;
RowRef row_ref;
Entry(T v) : asof_value(v) {}
Entry(T v, RowRef rr) : asof_value(v), row_ref(rr) {}
bool operator< (const Entry& o) const
{
return asof_value < o.asof_value;
}
};
struct Lookups
{
#define M(NAME, TYPE) \
std::unique_ptr<SortedLookupPODArray<Entry<TYPE>>> NAME;
APPLY_FOR_ASOF_JOIN_VARIANTS(M)
#undef M
void create(Type which);
};
AsofRowRefs() : type(Type::EMPTY) {}
AsofRowRefs(Type t) : type(t)
{
lookups.create(t);
}
void insert(const IColumn * asof_column, const Block * block, size_t row_num, Arena & pool);
const RowRef * findAsof(const IColumn * asof_column, size_t row_num, Arena & pool) const;
private:
const Type type;
mutable Lookups lookups;
};
}
\ No newline at end of file
2 1 1 0
2 3 3 3
2 5 5 3
2 1 1 0
2 3 3 3
2 5 5 3
2 1 1 0
2 3 3 3
2 5 5 3
2 1 1 0
2 3 3 3
2 5 5 3
#!/usr/bin/env bash
set -e
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
$CLICKHOUSE_CLIENT -q "USE test;"
for typename in "UInt32" "UInt64" "Float64" "Float32"
do
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS A;"
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS B;"
$CLICKHOUSE_CLIENT -q "CREATE TABLE A(k UInt32, t ${typename}, a Float64) ENGINE = MergeTree() ORDER BY (k, t);"
$CLICKHOUSE_CLIENT -q "INSERT INTO A(k,t,a) VALUES (2,1,1),(2,3,3),(2,5,5);"
$CLICKHOUSE_CLIENT -q "CREATE TABLE B(k UInt32, t ${typename}, b Float64) ENGINE = MergeTree() ORDER BY (k, t);"
$CLICKHOUSE_CLIENT -q "INSERT INTO B(k,t,b) VALUES (2,3,3);"
$CLICKHOUSE_CLIENT -q "SELECT k, t, a, b FROM A ASOF LEFT JOIN B USING(k,t) ORDER BY (k,t);"
done
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册