diff --git a/dbms/src/Dictionaries/MySQLDictionarySource.cpp b/dbms/src/Dictionaries/MySQLDictionarySource.cpp index 46188b431df1f6843b2b392fac585a45b858eb14..4ca237881331cf7ebfb4e78b5d15a895a6124acf 100644 --- a/dbms/src/Dictionaries/MySQLDictionarySource.cpp +++ b/dbms/src/Dictionaries/MySQLDictionarySource.cpp @@ -1,3 +1,5 @@ +#include +#include #include #if USE_MYSQL @@ -5,6 +7,7 @@ #include #include +#include namespace DB @@ -26,7 +29,8 @@ MySQLDictionarySource::MySQLDictionarySource(const DictionaryStructure & dict_st sample_block{sample_block}, pool{config, config_prefix}, query_builder{dict_struct, db, table, where, ExternalQueryBuilder::Backticks}, - load_all_query{query_builder.composeLoadAllQuery()} + load_all_query{query_builder.composeLoadAllQuery()}, + invalidate_query{config.getString(config_prefix + ".invalidate_query", "")} { } @@ -41,7 +45,8 @@ MySQLDictionarySource::MySQLDictionarySource(const MySQLDictionarySource & other sample_block{other.sample_block}, pool{other.pool}, query_builder{dict_struct, db, table, where, ExternalQueryBuilder::Backticks}, - load_all_query{other.load_all_query}, last_modification{other.last_modification} + load_all_query{other.load_all_query}, last_modification{other.last_modification}, + invalidate_query{other.invalidate_query}, invalidate_query_response{other.invalidate_query_response} { } @@ -72,6 +77,15 @@ BlockInputStreamPtr MySQLDictionarySource::loadKeys( bool MySQLDictionarySource::isModified() const { + if (!invalidate_query.empty()) + { + auto response = doInvalidateQuery(invalidate_query); + if (response == invalidate_query_response) + return false; + invalidate_query_response = response; + return true; + } + if (dont_check_update_time) return true; @@ -162,7 +176,16 @@ LocalDateTime MySQLDictionarySource::getLastModification() const return update_time; } +std::string MySQLDictionarySource::doInvalidateQuery(const std::string & request) const +{ + Block sample_block; + ColumnPtr column(std::make_shared()); + sample_block.insert(ColumnWithTypeAndName(column, std::make_shared(), "Sample Block")); + MySQLBlockInputStream blockInputStream(pool.Get(), request, sample_block, 1); + return readInvalidateQuery(blockInputStream); +} } #endif + diff --git a/dbms/src/Dictionaries/MySQLDictionarySource.h b/dbms/src/Dictionaries/MySQLDictionarySource.h index fdc87223bf557474d51161b90511ba161b1d5860..5229fb36ce9cebb8cb650142f395c67e621477c0 100644 --- a/dbms/src/Dictionaries/MySQLDictionarySource.h +++ b/dbms/src/Dictionaries/MySQLDictionarySource.h @@ -42,12 +42,14 @@ public: std::string toString() const override; private: - Poco::Logger * log; - static std::string quoteForLike(const std::string s); LocalDateTime getLastModification() const; + // execute invalidate_query. expects single cell in result + std::string doInvalidateQuery(const std::string & request) const; + + Poco::Logger * log; const DictionaryStructure dict_struct; const std::string db; const std::string table; @@ -58,6 +60,8 @@ private: ExternalQueryBuilder query_builder; const std::string load_all_query; LocalDateTime last_modification; + std::string invalidate_query; + mutable std::string invalidate_query_response; }; } diff --git a/dbms/src/Dictionaries/ODBCBlockInputStream.cpp b/dbms/src/Dictionaries/ODBCBlockInputStream.cpp index f7c098f07ed0816c0d4a3f7cf8886d91f464aae0..2e9cd1e112683193f784327e3072a97ba7be3e09 100644 --- a/dbms/src/Dictionaries/ODBCBlockInputStream.cpp +++ b/dbms/src/Dictionaries/ODBCBlockInputStream.cpp @@ -3,6 +3,7 @@ #include #include +#include #include #include @@ -24,7 +25,8 @@ ODBCBlockInputStream::ODBCBlockInputStream( statement{(this->session << query_str, Poco::Data::Keywords::now)}, result{statement}, iterator{result.begin()}, - max_block_size{max_block_size} + max_block_size{max_block_size}, + log(&Logger::get("ODBCBlockInputStream")) { if (sample_block.columns() != result.columnCount()) throw Exception{ @@ -80,6 +82,7 @@ Block ODBCBlockInputStream::readImpl() for (const auto i : ext::range(0, columns.size())) columns[i] = block.safeGetByPosition(i).column.get(); + size_t num_rows = 0; while (iterator != result.end()) { @@ -95,11 +98,12 @@ Block ODBCBlockInputStream::readImpl() insertDefaultValue(columns[idx], *description.sample_columns[idx]); } + ++iterator; + ++num_rows; if (num_rows == max_block_size) break; - ++iterator; } return block; diff --git a/dbms/src/Dictionaries/ODBCBlockInputStream.h b/dbms/src/Dictionaries/ODBCBlockInputStream.h index a01987272e858d47f79adaed5549c8666f9688d0..7e4949efb76883967b178e6c3abddb801c511f66 100644 --- a/dbms/src/Dictionaries/ODBCBlockInputStream.h +++ b/dbms/src/Dictionaries/ODBCBlockInputStream.h @@ -41,6 +41,8 @@ private: const std::size_t max_block_size; ExternalResultDescription description; + + Poco::Logger * log; }; } diff --git a/dbms/src/Dictionaries/ODBCDictionarySource.cpp b/dbms/src/Dictionaries/ODBCDictionarySource.cpp index c2bad4eacca5388d560506c4947c6c647d80ff5d..01571843724df1e9d122c14439623504b66d30f5 100644 --- a/dbms/src/Dictionaries/ODBCDictionarySource.cpp +++ b/dbms/src/Dictionaries/ODBCDictionarySource.cpp @@ -1,8 +1,11 @@ +#include +#include #include #include #include #include #include +#include namespace DB @@ -22,7 +25,8 @@ ODBCDictionarySource::ODBCDictionarySource(const DictionaryStructure & dict_stru where{config.getString(config_prefix + ".where", "")}, sample_block{sample_block}, query_builder{dict_struct, db, table, where, ExternalQueryBuilder::None}, /// NOTE Better to obtain quoting style via ODBC interface. - load_all_query{query_builder.composeLoadAllQuery()} + load_all_query{query_builder.composeLoadAllQuery()}, + invalidate_query{config.getString(config_prefix + ".invalidate_query", "")} { pool = createAndCheckResizePocoSessionPool([&] () { return std::make_shared( config.getString(config_prefix + ".connector", "ODBC"), @@ -40,7 +44,8 @@ ODBCDictionarySource::ODBCDictionarySource(const ODBCDictionarySource & other) sample_block{other.sample_block}, pool{other.pool}, query_builder{dict_struct, db, table, where, ExternalQueryBuilder::None}, - load_all_query{other.load_all_query} + load_all_query{other.load_all_query}, + invalidate_query{other.invalidate_query}, invalidate_query_response{other.invalidate_query_response} { } @@ -78,11 +83,6 @@ BlockInputStreamPtr ODBCDictionarySource::loadKeys( return std::make_shared(pool->get(), query, sample_block, max_block_size); } -bool ODBCDictionarySource::isModified() const -{ - return true; -} - bool ODBCDictionarySource::supportsSelectiveLoad() const { return true; @@ -98,5 +98,26 @@ std::string ODBCDictionarySource::toString() const return "ODBC: " + db + '.' + table + (where.empty() ? "" : ", where: " + where); } +bool ODBCDictionarySource::isModified() const +{ + if (!invalidate_query.empty()) + { + auto response = doInvalidateQuery(invalidate_query); + if (invalidate_query_response == response) + return false; + invalidate_query_response = response; + } + return true; +} + + +std::string ODBCDictionarySource::doInvalidateQuery(const std::string & request) const +{ + Block sample_block; + ColumnPtr column(std::make_shared()); + sample_block.insert(ColumnWithTypeAndName(column, std::make_shared(), "Sample Block")); + ODBCBlockInputStream blockInputStream(pool->get(), request, sample_block, 1); + return readInvalidateQuery(blockInputStream); +} } diff --git a/dbms/src/Dictionaries/ODBCDictionarySource.h b/dbms/src/Dictionaries/ODBCDictionarySource.h index 8e1ea57cc87e9c088ed25941b5e4c172b9f5a10b..0a26bd7914e357ff444a462f48cdfa6514e55ecf 100644 --- a/dbms/src/Dictionaries/ODBCDictionarySource.h +++ b/dbms/src/Dictionaries/ODBCDictionarySource.h @@ -52,6 +52,9 @@ public: std::string toString() const override; private: + // execute invalidate_query. expects single cell in result + std::string doInvalidateQuery(const std::string & request) const; + Poco::Logger * log; const DictionaryStructure dict_struct; @@ -62,6 +65,8 @@ private: std::shared_ptr pool = nullptr; ExternalQueryBuilder query_builder; const std::string load_all_query; + std::string invalidate_query; + mutable std::string invalidate_query_response; using PocoSessionPoolConstructor = std::function()>; diff --git a/dbms/src/Dictionaries/readInvalidateQuery.cpp b/dbms/src/Dictionaries/readInvalidateQuery.cpp new file mode 100644 index 0000000000000000000000000000000000000000..51bb1300cd129aa41a76547f2c0d4b40eec959b2 --- /dev/null +++ b/dbms/src/Dictionaries/readInvalidateQuery.cpp @@ -0,0 +1,56 @@ +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int TOO_MUCH_COLUMNS; +extern const int TOO_MUCH_ROWS; +extern const int RECEIVED_EMPTY_DATA; +} + +std::string readInvalidateQuery(IProfilingBlockInputStream & blockInputStream) +{ + blockInputStream.readPrefix(); + std::string response; + + try + { + Block block = blockInputStream.read(); + if (!block) + throw Exception("Empty response", ErrorCodes::RECEIVED_EMPTY_DATA); + + auto columns = block.columns(); + if (columns > 1) + throw Exception("Expected single column in resultset, got " + std::to_string(columns), ErrorCodes::TOO_MUCH_COLUMNS); + + auto rows = block.rows(); + if (rows == 0) + throw Exception("Expected single row in resultset, got 0", ErrorCodes::RECEIVED_EMPTY_DATA); + if (rows > 1) + throw Exception("Expected single row in resultset, got at least " + std::to_string(rows), ErrorCodes::TOO_MUCH_ROWS); + + auto column = block.getByPosition(0).column; + response = column->getDataAt(0).toString(); + + while (block = blockInputStream.read()) + { + if (block.rows() > 0) + throw Exception("Expected single row in resultset, got at least " + std::to_string(rows + 1), ErrorCodes::TOO_MUCH_ROWS); + } + } + catch (Exception& exception) + { + blockInputStream.cancel(); + blockInputStream.readSuffix(); + throw; + } + + blockInputStream.readSuffix(); + + return response; +} + +} diff --git a/dbms/src/Dictionaries/readInvalidateQuery.h b/dbms/src/Dictionaries/readInvalidateQuery.h new file mode 100644 index 0000000000000000000000000000000000000000..14743e23fb331850bbf51233bc004b545eae7109 --- /dev/null +++ b/dbms/src/Dictionaries/readInvalidateQuery.h @@ -0,0 +1,13 @@ +#pragma once +#include + +class IProfilingBlockInputStream; + +namespace DB +{ + +// Using in MySQLDictionarySource and ODBCDictionarySource after processing invalidate_query +std::string readInvalidateQuery(IProfilingBlockInputStream & blockInputStream); + + +}