diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index 3336f175ebefd0591e3f8eea94647261322e2036..fd8616a1819a5f0ee764746b1920d4e7d0be681b 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -120,6 +120,31 @@ Join::Type Join::chooseMethod(const ColumnRawPtrs & key_columns, Sizes & key_siz } +struct ASOFSplit { + ColumnRawPtrs key_columns; + Sizes key_sizes; + + ColumnRawPtrs asof_columns; + Sizes asof_sizes; +}; + +template +ASOFSplit split_asof_columns(const ColumnRawPtrs & key_columns, const Sizes & key_sizes) { + ASOFSplit spl; + spl.key_columns = key_columns; + spl.key_sizes = key_sizes; + + if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof) { + spl.asof_columns.push_back(spl.key_columns.back()); + spl.key_columns.pop_back(); + + spl.asof_sizes.push_back(spl.key_sizes.back()); + spl.key_sizes.pop_back(); + } + + return spl; +} + template struct KeyGetterForTypeImpl; @@ -227,6 +252,7 @@ static void convertColumnToNullable(ColumnWithTypeAndName & column) void Join::setSampleBlock(const Block & block) { std::unique_lock lock(rwlock); + LOG_DEBUG(log, "setSampleBlock: " << block.dumpStructure()); if (!empty()) return; @@ -251,8 +277,35 @@ void Join::setSampleBlock(const Block & block) key_columns[i] = &static_cast(*key_columns[i]).getNestedColumn(); } - /// Choose data structure to use for JOIN. - init(chooseMethod(key_columns, key_sizes)); + if(strictness == ASTTableJoin::Strictness::Asof) { + if( kind != ASTTableJoin::Kind::Left) { + throw Exception("ASOF only supports LEFT as base join", ErrorCodes::NOT_IMPLEMENTED); + } + + if( key_columns.back()->sizeOfValueIfFixed() != sizeof(ASOFTimeType)) { + std::string msg = "ASOF join columns need to have size "; + msg += std::to_string(sizeof(ASOFTimeType)); + throw Exception(msg, ErrorCodes::BAD_TYPE_OF_FIELD); + } + key_columns.pop_back(); + + if( key_columns.empty() ) { + throw Exception("ASOF join cannot be done without a joining column", ErrorCodes::LOGICAL_ERROR); + } + + /// this is going to set up the appropriate hash table for the direct lookup part of the join + /// However, this does not depend on the size of the asof join key (as that goes into the BST) + /// Therefore, add it back in such that it can be extracted appropriately from the full stored + /// key_columns and key_sizes + init(chooseMethod(key_columns, key_sizes)); + key_sizes.push_back(sizeof(ASOFTimeType)); + } + else + { + /// Choose data structure to use for JOIN. + init(chooseMethod(key_columns, key_sizes)); + } + sample_block_with_columns_to_add = materializeBlock(block); @@ -288,6 +341,30 @@ void Join::setSampleBlock(const Block & block) convertColumnToNullable(sample_block_with_columns_to_add.getByPosition(i)); } +void Join::TSRowRef::insert(Join::ASOFTimeType t, const Block * block, size_t row_num) { + ts.insert(std::pair(t, RowRef(block, row_num))); +} + +std::string Join::TSRowRef::dumpStructure() const { + std::stringstream ss; + + for (auto const& x : ts) + { + ss << "(t=" << x.first << " row_num=" << x.second.row_num << " ptr=" << x.second.block << "),"; + } + + return ss.str(); +} +size_t Join::TSRowRef::size() const { + return ts.size(); +} +std::optional> Join::TSRowRef::find_asof(Join::ASOFTimeType t) const { + auto it = ts.upper_bound(t); + if(it == ts.cbegin()) { + return {}; + } + return *(--it); +} namespace { @@ -336,20 +413,44 @@ namespace } }; + template + struct Inserter + { + template + static ALWAYS_INLINE void insert(Map & map, KeyGetter & key_getter, AsofGetter & asof_getter, Block * stored_block, size_t i, Arena & pool) + { + auto emplace_result = key_getter.emplaceKey(map, i, pool); + typename Map::mapped_type * time_series_map = &emplace_result.getMapped(); + + if (emplace_result.isInserted()) { + time_series_map = new (time_series_map) typename Map::mapped_type(); + } + auto k = asof_getter.getKey(i, pool); + time_series_map->insert(k, stored_block, i); + std::cout << "inserted key into time series map=" << k << " result=" << time_series_map->dumpStructure() << std::endl; + } + }; + template void NO_INLINE insertFromBlockImplTypeCase( Map & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool) { - KeyGetter key_getter(key_columns, key_sizes, nullptr); + auto spl = split_asof_columns(key_columns, key_sizes); + KeyGetter key_getter(spl.key_columns, spl.key_sizes, nullptr); for (size_t i = 0; i < rows; ++i) { if (has_null_map && (*null_map)[i]) continue; - Inserter::insert(map, key_getter, stored_block, i, pool); + if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof) { + Join::AsofGetterType asof_getter(spl.asof_columns, spl.asof_sizes, nullptr); + Inserter::insert(map, key_getter, asof_getter, stored_block, i, pool); + } else { + Inserter::insert(map, key_getter, stored_block, i, pool); + } } } @@ -387,10 +488,10 @@ namespace } } - bool Join::insertFromBlock(const Block & block) { std::unique_lock lock(rwlock); + LOG_DEBUG(log, "joinBlock: " << block.dumpStructure()); if (empty()) throw Exception("Logical error: Join was not initialized", ErrorCodes::LOGICAL_ERROR); @@ -515,6 +616,7 @@ public: columns[j]->insertFrom(*block.getByPosition(right_indexes[j]).column, row_num); } + void appendDefaultRow() { for (size_t j = 0; j < right_indexes.size(); ++j) @@ -553,6 +655,21 @@ void addFoundRow(const typename Map::mapped_type & mapped, AddedColumns & added, } }; +template +void addFoundRow(const typename Map::mapped_type & mapped, AddedColumns & added, IColumn::Offset & current_offset [[maybe_unused]], Join::ASOFTimeType asof_key) +{ + if ( auto v = mapped.find_asof(asof_key) ) { + std::pair res = *v; +// std::cout << "Adder::addFound" << " to_add" << num_columns_to_add << " i=" << i << " asof_key=" << asof_key << " found=" << res.first << std::endl; + added.appendFromBlock(*res.second.block, res.second.row_num); + } + else + { +// std::cout << "Adder::addFound" << " not found in map" << num_columns_to_add << " i=" << i << " asof_key=" << asof_key << std::endl; + added.appendDefaultRow(); + } +} + template void addNotFoundRow(AddedColumns & added [[maybe_unused]], IColumn::Offset & current_offset [[maybe_unused]]) { @@ -567,16 +684,20 @@ void addNotFoundRow(AddedColumns & added [[maybe_unused]], IColumn::Offset & cur /// Makes filter (1 if row presented in right table) and returns offsets to replicate (for ALL JOINS). template std::unique_ptr NO_INLINE joinRightIndexedColumns( - const Map & map, size_t rows, KeyGetter & key_getter, + const Map & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes, AddedColumns & added_columns, ConstNullMapPtr null_map, IColumn::Filter & filter) { std::unique_ptr offsets_to_replicate; if constexpr (STRICTNESS == ASTTableJoin::Strictness::All) offsets_to_replicate = std::make_unique(rows); - IColumn::Offset current_offset = 0; Arena pool; + auto spl = split_asof_columns(key_columns, key_sizes); + KeyGetter key_getter(spl.key_columns, spl.key_sizes, nullptr); + + IColumn::Offset current_offset = 0; + for (size_t i = 0; i < rows; ++i) { if (_has_null_map && (*null_map)[i]) @@ -589,10 +710,21 @@ std::unique_ptr NO_INLINE joinRightIndexedColumns( if (find_result.isFound()) { - filter[i] = 1; + auto & mapped = find_result.getMapped(); - mapped.setUsed(); - addFoundRow(mapped, added_columns, current_offset); + + if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof) { + assert(!spl.asof_columns.empty()); + assert(!spl.asof_sizes.empty()); + Join::AsofGetterType asof_getter(spl.asof_columns, spl.asof_sizes, nullptr); + auto asof_key = asof_getter.getKey(i, pool); + + addFoundRow(mapped, added_columns, current_offset, asof_key); + } else { + filter[i] = 1; + mapped.setUsed(); + addFoundRow(mapped, added_columns, current_offset); + } } else addNotFoundRow<_add_missing>(added_columns, current_offset); @@ -613,14 +745,13 @@ IColumn::Filter joinRightColumns( constexpr bool left_or_full = static_in_v; IColumn::Filter filter(rows, 0); - KeyGetter key_getter(key_columns, key_sizes, nullptr); if (null_map) offsets_to_replicate = joinRightIndexedColumns( - map, rows, key_getter, added_columns, null_map, filter); + map, rows, key_columns, key_sizes, added_columns, null_map, filter); else offsets_to_replicate = joinRightIndexedColumns( - map, rows, key_getter, added_columns, null_map, filter); + map, rows, key_columns, key_sizes, added_columns, null_map, filter); return filter; } @@ -722,7 +853,7 @@ void Join::joinBlockImpl( NameSet needed_key_names_right = requiredRightKeys(key_names_right, columns_added_by_join); - if constexpr (STRICTNESS == ASTTableJoin::Strictness::Any) + if constexpr (STRICTNESS == ASTTableJoin::Strictness::Any || STRICTNESS == ASTTableJoin::Strictness::Asof) { constexpr bool inner_or_right = static_in_v; if constexpr (inner_or_right) @@ -943,9 +1074,8 @@ void Join::joinGet(Block & block, const String & column_name) const void Join::joinBlock(Block & block, const Names & key_names_left, const NamesAndTypesList & columns_added_by_join) const { -// std::cerr << "joinBlock: " << block.dumpStructure() << "\n"; - std::shared_lock lock(rwlock); + LOG_DEBUG(log, "joinBlock: " << block.dumpStructure()); checkTypesOfKeys(block, key_names_left, sample_block_with_keys); @@ -1027,6 +1157,14 @@ struct AdderNonJoined } }; +template +struct AdderNonJoined +{ + static void add(const Mapped & /*mapped*/, size_t & /*rows_added*/, MutableColumns & /*columns_left*/, MutableColumns & /*columns_right*/) + { + // If we have a leftover match in the right hand side, not required to join because we are doing asof left + } +}; /// Stream from not joined earlier rows of the right table. class NonJoinedBlockInputStream : public IBlockInputStream diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index ef60f712b1fe527ea74b989bb71767b1c7fbeaff..91000f9a33d93307e89019d5c4373cc873d7dd6e 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -150,6 +151,21 @@ public: RowRefList(const Block * block_, size_t row_num_) : RowRef(block_, row_num_) {} }; + /// Map for a time series + using ASOFTimeType = UInt32; + using AsofGetterType = ColumnsHashing::HashMethodOneNumber; + struct TSRowRef + { + // TODO use the arena allocator to get memory for this + // This would require ditching std::map because std::allocator is incompatible with the arena allocator + std::map ts; + + TSRowRef() {} + void insert(ASOFTimeType t, const Block * block, size_t row_num); + std::optional> find_asof(ASOFTimeType t) const; + std::string dumpStructure() const; + size_t size() const; + }; /** Depending on template parameter, adds or doesn't add a flag, that element was used (row was joined). * Depending on template parameter, decide whether to overwrite existing values when encountering the same key again @@ -181,7 +197,6 @@ public: bool getUsed() const { return true; } }; - /// Different types of keys for maps. #define APPLY_FOR_JOIN_VARIANTS(M) \ M(key8) \ @@ -282,6 +297,7 @@ public: using MapsAnyFull = MapsTemplate>; using MapsAnyFullOverwrite = MapsTemplate>; using MapsAllFull = MapsTemplate>; + using MapsAsof = MapsTemplate>; template struct KindTrait @@ -300,7 +316,7 @@ public: template using Map = typename MapGetterImpl::fill_right, strictness, overwrite>::Map; - static constexpr std::array STRICTNESSES = {ASTTableJoin::Strictness::Any, ASTTableJoin::Strictness::All}; + static constexpr std::array STRICTNESSES = {ASTTableJoin::Strictness::Any, ASTTableJoin::Strictness::All, ASTTableJoin::Strictness::Asof}; static constexpr std::array KINDS = {ASTTableJoin::Kind::Left, ASTTableJoin::Kind::Inner, ASTTableJoin::Kind::Full, ASTTableJoin::Kind::Right}; @@ -377,7 +393,7 @@ private: */ BlocksList blocks; - std::variant maps; + std::variant maps; /// Additional data - strings for string keys and continuation elements of single-linked lists of references to rows. Arena pool; @@ -454,4 +470,10 @@ struct Join::MapGetterImpl using Map = MapsAllFull; }; +template +struct Join::MapGetterImpl +{ + using Map = MapsAsof; +}; + } diff --git a/dbms/src/Parsers/ASTTablesInSelectQuery.cpp b/dbms/src/Parsers/ASTTablesInSelectQuery.cpp index fb2780463774b7d89c7f189ea2805756d7b3aca6..98cf6254a4ffbbfb90085b3b6b55c38c2b73588e 100644 --- a/dbms/src/Parsers/ASTTablesInSelectQuery.cpp +++ b/dbms/src/Parsers/ASTTablesInSelectQuery.cpp @@ -145,6 +145,9 @@ void ASTTableJoin::formatImplBeforeTable(const FormatSettings & settings, Format case Strictness::All: settings.ostr << "ALL "; break; + case Strictness::Asof: + settings.ostr << "ASOF "; + break; } } diff --git a/dbms/src/Parsers/ASTTablesInSelectQuery.h b/dbms/src/Parsers/ASTTablesInSelectQuery.h index 5565d0ba3f818a88c4baedde103e93703e73f773..1e14d5778c63c3eb75dfac38c16bef107e6e661c 100644 --- a/dbms/src/Parsers/ASTTablesInSelectQuery.h +++ b/dbms/src/Parsers/ASTTablesInSelectQuery.h @@ -75,7 +75,8 @@ struct ASTTableJoin : public IAST { Unspecified, Any, /// If there are many suitable rows to join, use any from them (also known as unique JOIN). - All /// If there are many suitable rows to join, use all of them and replicate rows of "left" table (usual semantic of JOIN). + All, /// If there are many suitable rows to join, use all of them and replicate rows of "left" table (usual semantic of JOIN). + Asof, /// For the last JOIN column, pick the latest value }; /// Join method. diff --git a/dbms/src/Parsers/ExpressionElementParsers.cpp b/dbms/src/Parsers/ExpressionElementParsers.cpp index 4cde7219fdba6a389856efa8afdadb27447d893c..2741aa0d4918bbf0a3707cc19ba1c7116c9dad2b 100644 --- a/dbms/src/Parsers/ExpressionElementParsers.cpp +++ b/dbms/src/Parsers/ExpressionElementParsers.cpp @@ -1113,6 +1113,7 @@ const char * ParserAlias::restricted_keywords[] = "INNER", "FULL", "CROSS", + "ASOF", "JOIN", "GLOBAL", "ANY", diff --git a/dbms/src/Parsers/ParserTablesInSelectQuery.cpp b/dbms/src/Parsers/ParserTablesInSelectQuery.cpp index 0ba2a403a94d2da17eef356798ed6068cb5de718..6b970b0565fe1108bdf9f1352462dcea7fc6d375 100644 --- a/dbms/src/Parsers/ParserTablesInSelectQuery.cpp +++ b/dbms/src/Parsers/ParserTablesInSelectQuery.cpp @@ -135,6 +135,8 @@ bool ParserTablesInSelectQueryElement::parseImpl(Pos & pos, ASTPtr & node, Expec table_join->strictness = ASTTableJoin::Strictness::Any; else if (ParserKeyword("ALL").ignore(pos)) table_join->strictness = ASTTableJoin::Strictness::All; + else if (ParserKeyword("ASOF").ignore(pos)) + table_join->strictness = ASTTableJoin::Strictness::Asof; else table_join->strictness = ASTTableJoin::Strictness::Unspecified; diff --git a/dbms/src/Storages/StorageJoin.cpp b/dbms/src/Storages/StorageJoin.cpp index cfa2a9a2933957f5f53e1c7fce9a08abfb17111d..02a6c8d39b3fa0c36f11ff57141519e112c77b6c 100644 --- a/dbms/src/Storages/StorageJoin.cpp +++ b/dbms/src/Storages/StorageJoin.cpp @@ -333,6 +333,9 @@ private: columns[j]->insertFrom(*it->getSecond().block->getByPosition(column_indices[j]).column.get(), it->getSecond().row_num); ++rows_added; } + else if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof) { + throw Exception("ASOF join storage is not implemented yet", ErrorCodes::NOT_IMPLEMENTED); + } else for (auto current = &static_cast(it->getSecond()); current != nullptr; current = current->next) diff --git a/dbms/tests/queries/0_stateless/00926_asof_joins.reference b/dbms/tests/queries/0_stateless/00926_asof_joins.reference new file mode 100644 index 0000000000000000000000000000000000000000..6334228175314f8c4e8f0df44e305c1ea5d53462 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00926_asof_joins.reference @@ -0,0 +1,14 @@ +1 1970-01-01 01:00:05 1 1.5 2 +1 1970-01-01 01:00:06 1 1.51 2 +1 1970-01-01 01:00:10 11 11.5 12 +1 1970-01-01 01:00:11 11 11.51 12 +1 1970-01-01 01:00:15 5 5.5 6 +1 1970-01-01 01:00:16 5 5.6 6 +1 1970-01-01 01:00:20 7 7.5 8 +2 1970-01-01 01:00:05 11 2.5 12 +2 1970-01-01 01:00:06 11 2.51 12 +2 1970-01-01 01:00:10 21 12.5 22 +2 1970-01-01 01:00:11 21 12.51 22 +2 1970-01-01 01:00:15 5 6.5 6 +2 1970-01-01 01:00:16 5 5.6 6 +2 1970-01-01 01:00:20 17 8.5 18 diff --git a/dbms/tests/queries/0_stateless/00926_asof_joins.sql b/dbms/tests/queries/0_stateless/00926_asof_joins.sql new file mode 100644 index 0000000000000000000000000000000000000000..d8f62d1af4c8f6f0afc9710e97a98c6d2fdcfdb7 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00926_asof_joins.sql @@ -0,0 +1,17 @@ +USE test; + +DROP TABLE IF EXISTS md; +DROP TABLE IF EXISTS tv; + +CREATE TABLE md(key UInt32, t DateTime, bid Float64, ask Float64) ENGINE = MergeTree() ORDER BY (key, t); +INSERT INTO test.md(key,t,bid,ask) VALUES (1,20,7,8),(1,5,1,2),(1,10,11,12),(1,15,5,6); +INSERT INTO test.md(key,t,bid,ask) VALUES (2,20,17,18),(2,5,11,12),(2,10,21,22),(2,15,5,6); + +CREATE TABLE tv(key UInt32, t DateTime, tv Float64) ENGINE = MergeTree() ORDER BY (key, t); +INSERT INTO test.tv(key,t,tv) VALUES (1,5,1.5),(1,6,1.51),(1,10,11.5),(1,11,11.51),(1,15,5.5),(1,16,5.6),(1,20,7.5); +INSERT INTO test.tv(key,t,tv) VALUES (2,5,2.5),(2,6,2.51),(2,10,12.5),(2,11,12.51),(2,15,6.5),(2,16,5.6),(2,20,8.5); + +SELECT tv.key, tv.t, md.bid, tv.tv, md.ask FROM tv ASOF LEFT JOIN md USING(key,t) ORDER BY (tv.key, tv.t); + +DROP TABLE md; +DROP TABLE tv; \ No newline at end of file diff --git a/dbms/tests/queries/0_stateless/00927_asof_join_noninclusive.reference b/dbms/tests/queries/0_stateless/00927_asof_join_noninclusive.reference new file mode 100644 index 0000000000000000000000000000000000000000..20369ceda37b5cdf21ea2da0370e8ab963440a7d --- /dev/null +++ b/dbms/tests/queries/0_stateless/00927_asof_join_noninclusive.reference @@ -0,0 +1,10 @@ +1 1970-01-01 01:00:01 1 0 0000-00-00 00:00:00 0 +1 1970-01-01 01:00:02 2 2 1970-01-01 01:00:02 1 +1 1970-01-01 01:00:03 3 2 1970-01-01 01:00:03 1 +1 1970-01-01 01:00:04 4 4 1970-01-01 01:00:04 1 +1 1970-01-01 01:00:05 5 4 1970-01-01 01:00:05 1 +2 1970-01-01 01:00:01 1 0 0000-00-00 00:00:00 0 +2 1970-01-01 01:00:02 2 0 0000-00-00 00:00:00 0 +2 1970-01-01 01:00:03 3 3 1970-01-01 01:00:03 2 +2 1970-01-01 01:00:04 4 3 1970-01-01 01:00:04 2 +2 1970-01-01 01:00:05 5 3 1970-01-01 01:00:05 2 diff --git a/dbms/tests/queries/0_stateless/00927_asof_join_noninclusive.sql b/dbms/tests/queries/0_stateless/00927_asof_join_noninclusive.sql new file mode 100644 index 0000000000000000000000000000000000000000..b8815466f2701e0b8f1171d8f3b3ef1d06f39c57 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00927_asof_join_noninclusive.sql @@ -0,0 +1,18 @@ +USE test; + +DROP TABLE IF EXISTS A; +DROP TABLE IF EXISTS B; + +CREATE TABLE A(k UInt32, t DateTime, a Float64) ENGINE = MergeTree() ORDER BY (k, t); +INSERT INTO A(k,t,a) VALUES (1,1,1),(1,2,2),(1,3,3),(1,4,4),(1,5,5); +INSERT INTO A(k,t,a) VALUES (2,1,1),(2,2,2),(2,3,3),(2,4,4),(2,5,5); + +CREATE TABLE B(k UInt32, t DateTime, b Float64) ENGINE = MergeTree() ORDER BY (k, t); +INSERT INTO B(k,t,b) VALUES (1,2,2),(1,4,4); +INSERT INTO B(k,t,b) VALUES (2,3,3); + +SELECT A.k, A.t, A.a, B.b, B.t, B.k FROM A ASOF LEFT JOIN B USING(k,t) ORDER BY (A.k, A.t); + + +DROP TABLE A; +DROP TABLE B; \ No newline at end of file