提交 f6987cfa 编写于 作者: M Martijn Bakker

have the parser recognize asof joins and feed them through to the executor

insert the time series into a struct ready for joining

working version that inserts the data into the hash table using the existing dispatching machinery for various types

working asof left join in clickhouse

add a test for the asof join

do some asof join cleanup

revisit the logic in case the values match between left and right side
上级 f7513eed
......@@ -120,6 +120,31 @@ Join::Type Join::chooseMethod(const ColumnRawPtrs & key_columns, Sizes & key_siz
}
struct ASOFSplit {
ColumnRawPtrs key_columns;
Sizes key_sizes;
ColumnRawPtrs asof_columns;
Sizes asof_sizes;
};
template<ASTTableJoin::Strictness STRICTNESS>
ASOFSplit split_asof_columns(const ColumnRawPtrs & key_columns, const Sizes & key_sizes) {
ASOFSplit spl;
spl.key_columns = key_columns;
spl.key_sizes = key_sizes;
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof) {
spl.asof_columns.push_back(spl.key_columns.back());
spl.key_columns.pop_back();
spl.asof_sizes.push_back(spl.key_sizes.back());
spl.key_sizes.pop_back();
}
return spl;
}
template <Join::Type type, typename Value, typename Mapped>
struct KeyGetterForTypeImpl;
......@@ -227,6 +252,7 @@ static void convertColumnToNullable(ColumnWithTypeAndName & column)
void Join::setSampleBlock(const Block & block)
{
std::unique_lock lock(rwlock);
LOG_DEBUG(log, "setSampleBlock: " << block.dumpStructure());
if (!empty())
return;
......@@ -251,8 +277,35 @@ void Join::setSampleBlock(const Block & block)
key_columns[i] = &static_cast<const ColumnNullable &>(*key_columns[i]).getNestedColumn();
}
/// Choose data structure to use for JOIN.
init(chooseMethod(key_columns, key_sizes));
if(strictness == ASTTableJoin::Strictness::Asof) {
if( kind != ASTTableJoin::Kind::Left) {
throw Exception("ASOF only supports LEFT as base join", ErrorCodes::NOT_IMPLEMENTED);
}
if( key_columns.back()->sizeOfValueIfFixed() != sizeof(ASOFTimeType)) {
std::string msg = "ASOF join columns need to have size ";
msg += std::to_string(sizeof(ASOFTimeType));
throw Exception(msg, ErrorCodes::BAD_TYPE_OF_FIELD);
}
key_columns.pop_back();
if( key_columns.empty() ) {
throw Exception("ASOF join cannot be done without a joining column", ErrorCodes::LOGICAL_ERROR);
}
/// this is going to set up the appropriate hash table for the direct lookup part of the join
/// However, this does not depend on the size of the asof join key (as that goes into the BST)
/// Therefore, add it back in such that it can be extracted appropriately from the full stored
/// key_columns and key_sizes
init(chooseMethod(key_columns, key_sizes));
key_sizes.push_back(sizeof(ASOFTimeType));
}
else
{
/// Choose data structure to use for JOIN.
init(chooseMethod(key_columns, key_sizes));
}
sample_block_with_columns_to_add = materializeBlock(block);
......@@ -288,6 +341,30 @@ void Join::setSampleBlock(const Block & block)
convertColumnToNullable(sample_block_with_columns_to_add.getByPosition(i));
}
void Join::TSRowRef::insert(Join::ASOFTimeType t, const Block * block, size_t row_num) {
ts.insert(std::pair(t, RowRef(block, row_num)));
}
std::string Join::TSRowRef::dumpStructure() const {
std::stringstream ss;
for (auto const& x : ts)
{
ss << "(t=" << x.first << " row_num=" << x.second.row_num << " ptr=" << x.second.block << "),";
}
return ss.str();
}
size_t Join::TSRowRef::size() const {
return ts.size();
}
std::optional<std::pair<Join::ASOFTimeType, Join::RowRef>> Join::TSRowRef::find_asof(Join::ASOFTimeType t) const {
auto it = ts.upper_bound(t);
if(it == ts.cbegin()) {
return {};
}
return *(--it);
}
namespace
{
......@@ -336,20 +413,44 @@ namespace
}
};
template <typename Map, typename KeyGetter>
struct Inserter<ASTTableJoin::Strictness::Asof, Map, KeyGetter>
{
template<typename AsofGetter>
static ALWAYS_INLINE void insert(Map & map, KeyGetter & key_getter, AsofGetter & asof_getter, Block * stored_block, size_t i, Arena & pool)
{
auto emplace_result = key_getter.emplaceKey(map, i, pool);
typename Map::mapped_type * time_series_map = &emplace_result.getMapped();
if (emplace_result.isInserted()) {
time_series_map = new (time_series_map) typename Map::mapped_type();
}
auto k = asof_getter.getKey(i, pool);
time_series_map->insert(k, stored_block, i);
std::cout << "inserted key into time series map=" << k << " result=" << time_series_map->dumpStructure() << std::endl;
}
};
template <ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map, bool has_null_map>
void NO_INLINE insertFromBlockImplTypeCase(
Map & map, size_t rows, const ColumnRawPtrs & key_columns,
const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool)
{
KeyGetter key_getter(key_columns, key_sizes, nullptr);
auto spl = split_asof_columns<STRICTNESS>(key_columns, key_sizes);
KeyGetter key_getter(spl.key_columns, spl.key_sizes, nullptr);
for (size_t i = 0; i < rows; ++i)
{
if (has_null_map && (*null_map)[i])
continue;
Inserter<STRICTNESS, Map, KeyGetter>::insert(map, key_getter, stored_block, i, pool);
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof) {
Join::AsofGetterType asof_getter(spl.asof_columns, spl.asof_sizes, nullptr);
Inserter<STRICTNESS, Map, KeyGetter>::insert(map, key_getter, asof_getter, stored_block, i, pool);
} else {
Inserter<STRICTNESS, Map, KeyGetter>::insert(map, key_getter, stored_block, i, pool);
}
}
}
......@@ -387,10 +488,10 @@ namespace
}
}
bool Join::insertFromBlock(const Block & block)
{
std::unique_lock lock(rwlock);
LOG_DEBUG(log, "joinBlock: " << block.dumpStructure());
if (empty())
throw Exception("Logical error: Join was not initialized", ErrorCodes::LOGICAL_ERROR);
......@@ -515,6 +616,7 @@ public:
columns[j]->insertFrom(*block.getByPosition(right_indexes[j]).column, row_num);
}
void appendDefaultRow()
{
for (size_t j = 0; j < right_indexes.size(); ++j)
......@@ -553,6 +655,21 @@ void addFoundRow(const typename Map::mapped_type & mapped, AddedColumns & added,
}
};
template <typename Map>
void addFoundRow(const typename Map::mapped_type & mapped, AddedColumns & added, IColumn::Offset & current_offset [[maybe_unused]], Join::ASOFTimeType asof_key)
{
if ( auto v = mapped.find_asof(asof_key) ) {
std::pair<Join::ASOFTimeType, Join::RowRef> res = *v;
// std::cout << "Adder::addFound" << " to_add" << num_columns_to_add << " i=" << i << " asof_key=" << asof_key << " found=" << res.first << std::endl;
added.appendFromBlock(*res.second.block, res.second.row_num);
}
else
{
// std::cout << "Adder::addFound" << " not found in map" << num_columns_to_add << " i=" << i << " asof_key=" << asof_key << std::endl;
added.appendDefaultRow();
}
}
template <bool _add_missing>
void addNotFoundRow(AddedColumns & added [[maybe_unused]], IColumn::Offset & current_offset [[maybe_unused]])
{
......@@ -567,16 +684,20 @@ void addNotFoundRow(AddedColumns & added [[maybe_unused]], IColumn::Offset & cur
/// Makes filter (1 if row presented in right table) and returns offsets to replicate (for ALL JOINS).
template <bool _add_missing, ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map, bool _has_null_map>
std::unique_ptr<IColumn::Offsets> NO_INLINE joinRightIndexedColumns(
const Map & map, size_t rows, KeyGetter & key_getter,
const Map & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes,
AddedColumns & added_columns, ConstNullMapPtr null_map, IColumn::Filter & filter)
{
std::unique_ptr<IColumn::Offsets> offsets_to_replicate;
if constexpr (STRICTNESS == ASTTableJoin::Strictness::All)
offsets_to_replicate = std::make_unique<IColumn::Offsets>(rows);
IColumn::Offset current_offset = 0;
Arena pool;
auto spl = split_asof_columns<STRICTNESS>(key_columns, key_sizes);
KeyGetter key_getter(spl.key_columns, spl.key_sizes, nullptr);
IColumn::Offset current_offset = 0;
for (size_t i = 0; i < rows; ++i)
{
if (_has_null_map && (*null_map)[i])
......@@ -589,10 +710,21 @@ std::unique_ptr<IColumn::Offsets> NO_INLINE joinRightIndexedColumns(
if (find_result.isFound())
{
filter[i] = 1;
auto & mapped = find_result.getMapped();
mapped.setUsed();
addFoundRow<STRICTNESS, Map>(mapped, added_columns, current_offset);
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof) {
assert(!spl.asof_columns.empty());
assert(!spl.asof_sizes.empty());
Join::AsofGetterType asof_getter(spl.asof_columns, spl.asof_sizes, nullptr);
auto asof_key = asof_getter.getKey(i, pool);
addFoundRow<STRICTNESS, Map>(mapped, added_columns, current_offset, asof_key);
} else {
filter[i] = 1;
mapped.setUsed();
addFoundRow<STRICTNESS, Map>(mapped, added_columns, current_offset);
}
}
else
addNotFoundRow<_add_missing>(added_columns, current_offset);
......@@ -613,14 +745,13 @@ IColumn::Filter joinRightColumns(
constexpr bool left_or_full = static_in_v<KIND, ASTTableJoin::Kind::Left, ASTTableJoin::Kind::Full>;
IColumn::Filter filter(rows, 0);
KeyGetter key_getter(key_columns, key_sizes, nullptr);
if (null_map)
offsets_to_replicate = joinRightIndexedColumns<left_or_full, STRICTNESS, KeyGetter, Map, true>(
map, rows, key_getter, added_columns, null_map, filter);
map, rows, key_columns, key_sizes, added_columns, null_map, filter);
else
offsets_to_replicate = joinRightIndexedColumns<left_or_full, STRICTNESS, KeyGetter, Map, false>(
map, rows, key_getter, added_columns, null_map, filter);
map, rows, key_columns, key_sizes, added_columns, null_map, filter);
return filter;
}
......@@ -722,7 +853,7 @@ void Join::joinBlockImpl(
NameSet needed_key_names_right = requiredRightKeys(key_names_right, columns_added_by_join);
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Any)
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Any || STRICTNESS == ASTTableJoin::Strictness::Asof)
{
constexpr bool inner_or_right = static_in_v<KIND, ASTTableJoin::Kind::Inner, ASTTableJoin::Kind::Right>;
if constexpr (inner_or_right)
......@@ -943,9 +1074,8 @@ void Join::joinGet(Block & block, const String & column_name) const
void Join::joinBlock(Block & block, const Names & key_names_left, const NamesAndTypesList & columns_added_by_join) const
{
// std::cerr << "joinBlock: " << block.dumpStructure() << "\n";
std::shared_lock lock(rwlock);
LOG_DEBUG(log, "joinBlock: " << block.dumpStructure());
checkTypesOfKeys(block, key_names_left, sample_block_with_keys);
......@@ -1027,6 +1157,14 @@ struct AdderNonJoined<ASTTableJoin::Strictness::All, Mapped>
}
};
template <typename Mapped>
struct AdderNonJoined<ASTTableJoin::Strictness::Asof, Mapped>
{
static void add(const Mapped & /*mapped*/, size_t & /*rows_added*/, MutableColumns & /*columns_left*/, MutableColumns & /*columns_right*/)
{
// If we have a leftover match in the right hand side, not required to join because we are doing asof left
}
};
/// Stream from not joined earlier rows of the right table.
class NonJoinedBlockInputStream : public IBlockInputStream
......
#pragma once
#include <optional>
#include <shared_mutex>
#include <Parsers/ASTTablesInSelectQuery.h>
......@@ -150,6 +151,21 @@ public:
RowRefList(const Block * block_, size_t row_num_) : RowRef(block_, row_num_) {}
};
/// Map for a time series
using ASOFTimeType = UInt32;
using AsofGetterType = ColumnsHashing::HashMethodOneNumber<ASOFTimeType, ASOFTimeType, ASOFTimeType, false>;
struct TSRowRef
{
// TODO use the arena allocator to get memory for this
// This would require ditching std::map because std::allocator is incompatible with the arena allocator
std::map<ASOFTimeType, RowRef> ts;
TSRowRef() {}
void insert(ASOFTimeType t, const Block * block, size_t row_num);
std::optional<std::pair<ASOFTimeType, RowRef>> find_asof(ASOFTimeType t) const;
std::string dumpStructure() const;
size_t size() const;
};
/** Depending on template parameter, adds or doesn't add a flag, that element was used (row was joined).
* Depending on template parameter, decide whether to overwrite existing values when encountering the same key again
......@@ -181,7 +197,6 @@ public:
bool getUsed() const { return true; }
};
/// Different types of keys for maps.
#define APPLY_FOR_JOIN_VARIANTS(M) \
M(key8) \
......@@ -282,6 +297,7 @@ public:
using MapsAnyFull = MapsTemplate<WithFlags<true, false, RowRef>>;
using MapsAnyFullOverwrite = MapsTemplate<WithFlags<true, true, RowRef>>;
using MapsAllFull = MapsTemplate<WithFlags<true, false, RowRefList>>;
using MapsAsof = MapsTemplate<WithFlags<false, false, TSRowRef>>;
template <ASTTableJoin::Kind KIND>
struct KindTrait
......@@ -300,7 +316,7 @@ public:
template <ASTTableJoin::Kind kind, ASTTableJoin::Strictness strictness, bool overwrite>
using Map = typename MapGetterImpl<KindTrait<kind>::fill_right, strictness, overwrite>::Map;
static constexpr std::array<ASTTableJoin::Strictness, 2> STRICTNESSES = {ASTTableJoin::Strictness::Any, ASTTableJoin::Strictness::All};
static constexpr std::array<ASTTableJoin::Strictness, 3> STRICTNESSES = {ASTTableJoin::Strictness::Any, ASTTableJoin::Strictness::All, ASTTableJoin::Strictness::Asof};
static constexpr std::array<ASTTableJoin::Kind, 4> KINDS
= {ASTTableJoin::Kind::Left, ASTTableJoin::Kind::Inner, ASTTableJoin::Kind::Full, ASTTableJoin::Kind::Right};
......@@ -377,7 +393,7 @@ private:
*/
BlocksList blocks;
std::variant<MapsAny, MapsAnyOverwrite, MapsAll, MapsAnyFull, MapsAnyFullOverwrite, MapsAllFull> maps;
std::variant<MapsAny, MapsAnyOverwrite, MapsAll, MapsAnyFull, MapsAnyFullOverwrite, MapsAllFull, MapsAsof> maps;
/// Additional data - strings for string keys and continuation elements of single-linked lists of references to rows.
Arena pool;
......@@ -454,4 +470,10 @@ struct Join::MapGetterImpl<true, ASTTableJoin::Strictness::All, false>
using Map = MapsAllFull;
};
template <bool fill_right>
struct Join::MapGetterImpl<fill_right, ASTTableJoin::Strictness::Asof, false>
{
using Map = MapsAsof;
};
}
......@@ -145,6 +145,9 @@ void ASTTableJoin::formatImplBeforeTable(const FormatSettings & settings, Format
case Strictness::All:
settings.ostr << "ALL ";
break;
case Strictness::Asof:
settings.ostr << "ASOF ";
break;
}
}
......
......@@ -75,7 +75,8 @@ struct ASTTableJoin : public IAST
{
Unspecified,
Any, /// If there are many suitable rows to join, use any from them (also known as unique JOIN).
All /// If there are many suitable rows to join, use all of them and replicate rows of "left" table (usual semantic of JOIN).
All, /// If there are many suitable rows to join, use all of them and replicate rows of "left" table (usual semantic of JOIN).
Asof, /// For the last JOIN column, pick the latest value
};
/// Join method.
......
......@@ -1113,6 +1113,7 @@ const char * ParserAlias::restricted_keywords[] =
"INNER",
"FULL",
"CROSS",
"ASOF",
"JOIN",
"GLOBAL",
"ANY",
......
......@@ -135,6 +135,8 @@ bool ParserTablesInSelectQueryElement::parseImpl(Pos & pos, ASTPtr & node, Expec
table_join->strictness = ASTTableJoin::Strictness::Any;
else if (ParserKeyword("ALL").ignore(pos))
table_join->strictness = ASTTableJoin::Strictness::All;
else if (ParserKeyword("ASOF").ignore(pos))
table_join->strictness = ASTTableJoin::Strictness::Asof;
else
table_join->strictness = ASTTableJoin::Strictness::Unspecified;
......
......@@ -333,6 +333,9 @@ private:
columns[j]->insertFrom(*it->getSecond().block->getByPosition(column_indices[j]).column.get(), it->getSecond().row_num);
++rows_added;
}
else if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof) {
throw Exception("ASOF join storage is not implemented yet", ErrorCodes::NOT_IMPLEMENTED);
}
else
for (auto current = &static_cast<const typename Map::mapped_type::Base_t &>(it->getSecond()); current != nullptr;
current = current->next)
......
1 1970-01-01 01:00:05 1 1.5 2
1 1970-01-01 01:00:06 1 1.51 2
1 1970-01-01 01:00:10 11 11.5 12
1 1970-01-01 01:00:11 11 11.51 12
1 1970-01-01 01:00:15 5 5.5 6
1 1970-01-01 01:00:16 5 5.6 6
1 1970-01-01 01:00:20 7 7.5 8
2 1970-01-01 01:00:05 11 2.5 12
2 1970-01-01 01:00:06 11 2.51 12
2 1970-01-01 01:00:10 21 12.5 22
2 1970-01-01 01:00:11 21 12.51 22
2 1970-01-01 01:00:15 5 6.5 6
2 1970-01-01 01:00:16 5 5.6 6
2 1970-01-01 01:00:20 17 8.5 18
USE test;
DROP TABLE IF EXISTS md;
DROP TABLE IF EXISTS tv;
CREATE TABLE md(key UInt32, t DateTime, bid Float64, ask Float64) ENGINE = MergeTree() ORDER BY (key, t);
INSERT INTO test.md(key,t,bid,ask) VALUES (1,20,7,8),(1,5,1,2),(1,10,11,12),(1,15,5,6);
INSERT INTO test.md(key,t,bid,ask) VALUES (2,20,17,18),(2,5,11,12),(2,10,21,22),(2,15,5,6);
CREATE TABLE tv(key UInt32, t DateTime, tv Float64) ENGINE = MergeTree() ORDER BY (key, t);
INSERT INTO test.tv(key,t,tv) VALUES (1,5,1.5),(1,6,1.51),(1,10,11.5),(1,11,11.51),(1,15,5.5),(1,16,5.6),(1,20,7.5);
INSERT INTO test.tv(key,t,tv) VALUES (2,5,2.5),(2,6,2.51),(2,10,12.5),(2,11,12.51),(2,15,6.5),(2,16,5.6),(2,20,8.5);
SELECT tv.key, tv.t, md.bid, tv.tv, md.ask FROM tv ASOF LEFT JOIN md USING(key,t) ORDER BY (tv.key, tv.t);
DROP TABLE md;
DROP TABLE tv;
\ No newline at end of file
1 1970-01-01 01:00:01 1 0 0000-00-00 00:00:00 0
1 1970-01-01 01:00:02 2 2 1970-01-01 01:00:02 1
1 1970-01-01 01:00:03 3 2 1970-01-01 01:00:03 1
1 1970-01-01 01:00:04 4 4 1970-01-01 01:00:04 1
1 1970-01-01 01:00:05 5 4 1970-01-01 01:00:05 1
2 1970-01-01 01:00:01 1 0 0000-00-00 00:00:00 0
2 1970-01-01 01:00:02 2 0 0000-00-00 00:00:00 0
2 1970-01-01 01:00:03 3 3 1970-01-01 01:00:03 2
2 1970-01-01 01:00:04 4 3 1970-01-01 01:00:04 2
2 1970-01-01 01:00:05 5 3 1970-01-01 01:00:05 2
USE test;
DROP TABLE IF EXISTS A;
DROP TABLE IF EXISTS B;
CREATE TABLE A(k UInt32, t DateTime, a Float64) ENGINE = MergeTree() ORDER BY (k, t);
INSERT INTO A(k,t,a) VALUES (1,1,1),(1,2,2),(1,3,3),(1,4,4),(1,5,5);
INSERT INTO A(k,t,a) VALUES (2,1,1),(2,2,2),(2,3,3),(2,4,4),(2,5,5);
CREATE TABLE B(k UInt32, t DateTime, b Float64) ENGINE = MergeTree() ORDER BY (k, t);
INSERT INTO B(k,t,b) VALUES (1,2,2),(1,4,4);
INSERT INTO B(k,t,b) VALUES (2,3,3);
SELECT A.k, A.t, A.a, B.b, B.t, B.k FROM A ASOF LEFT JOIN B USING(k,t) ORDER BY (A.k, A.t);
DROP TABLE A;
DROP TABLE B;
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册