提交 d7e5e650 编写于 作者: A alexey-milovidov 提交者: GitHub

Merge pull request #801 from yandex/dictionary-invalidate-query

Dictionary invalidate query
#include <DataTypes/DataTypeString.h>
#include <Columns/ColumnString.h>
#include <Common/config.h> #include <Common/config.h>
#if USE_MYSQL #if USE_MYSQL
...@@ -5,6 +7,7 @@ ...@@ -5,6 +7,7 @@
#include <Dictionaries/MySQLDictionarySource.h> #include <Dictionaries/MySQLDictionarySource.h>
#include <Dictionaries/MySQLBlockInputStream.h> #include <Dictionaries/MySQLBlockInputStream.h>
#include <Dictionaries/readInvalidateQuery.h>
namespace DB namespace DB
...@@ -26,7 +29,8 @@ MySQLDictionarySource::MySQLDictionarySource(const DictionaryStructure & dict_st ...@@ -26,7 +29,8 @@ MySQLDictionarySource::MySQLDictionarySource(const DictionaryStructure & dict_st
sample_block{sample_block}, sample_block{sample_block},
pool{config, config_prefix}, pool{config, config_prefix},
query_builder{dict_struct, db, table, where, ExternalQueryBuilder::Backticks}, query_builder{dict_struct, db, table, where, ExternalQueryBuilder::Backticks},
load_all_query{query_builder.composeLoadAllQuery()} load_all_query{query_builder.composeLoadAllQuery()},
invalidate_query{config.getString(config_prefix + ".invalidate_query", "")}
{ {
} }
...@@ -41,7 +45,8 @@ MySQLDictionarySource::MySQLDictionarySource(const MySQLDictionarySource & other ...@@ -41,7 +45,8 @@ MySQLDictionarySource::MySQLDictionarySource(const MySQLDictionarySource & other
sample_block{other.sample_block}, sample_block{other.sample_block},
pool{other.pool}, pool{other.pool},
query_builder{dict_struct, db, table, where, ExternalQueryBuilder::Backticks}, query_builder{dict_struct, db, table, where, ExternalQueryBuilder::Backticks},
load_all_query{other.load_all_query}, last_modification{other.last_modification} load_all_query{other.load_all_query}, last_modification{other.last_modification},
invalidate_query{other.invalidate_query}, invalidate_query_response{other.invalidate_query_response}
{ {
} }
...@@ -72,6 +77,15 @@ BlockInputStreamPtr MySQLDictionarySource::loadKeys( ...@@ -72,6 +77,15 @@ BlockInputStreamPtr MySQLDictionarySource::loadKeys(
bool MySQLDictionarySource::isModified() const bool MySQLDictionarySource::isModified() const
{ {
if (!invalidate_query.empty())
{
auto response = doInvalidateQuery(invalidate_query);
if (response == invalidate_query_response)
return false;
invalidate_query_response = response;
return true;
}
if (dont_check_update_time) if (dont_check_update_time)
return true; return true;
...@@ -162,7 +176,16 @@ LocalDateTime MySQLDictionarySource::getLastModification() const ...@@ -162,7 +176,16 @@ LocalDateTime MySQLDictionarySource::getLastModification() const
return update_time; return update_time;
} }
std::string MySQLDictionarySource::doInvalidateQuery(const std::string & request) const
{
Block sample_block;
ColumnPtr column(std::make_shared<ColumnString>());
sample_block.insert(ColumnWithTypeAndName(column, std::make_shared<DataTypeString>(), "Sample Block"));
MySQLBlockInputStream blockInputStream(pool.Get(), request, sample_block, 1);
return readInvalidateQuery(blockInputStream);
}
} }
#endif #endif
...@@ -42,12 +42,14 @@ public: ...@@ -42,12 +42,14 @@ public:
std::string toString() const override; std::string toString() const override;
private: private:
Poco::Logger * log;
static std::string quoteForLike(const std::string s); static std::string quoteForLike(const std::string s);
LocalDateTime getLastModification() const; LocalDateTime getLastModification() const;
// execute invalidate_query. expects single cell in result
std::string doInvalidateQuery(const std::string & request) const;
Poco::Logger * log;
const DictionaryStructure dict_struct; const DictionaryStructure dict_struct;
const std::string db; const std::string db;
const std::string table; const std::string table;
...@@ -58,6 +60,8 @@ private: ...@@ -58,6 +60,8 @@ private:
ExternalQueryBuilder query_builder; ExternalQueryBuilder query_builder;
const std::string load_all_query; const std::string load_all_query;
LocalDateTime last_modification; LocalDateTime last_modification;
std::string invalidate_query;
mutable std::string invalidate_query_response;
}; };
} }
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
#include <Columns/ColumnsNumber.h> #include <Columns/ColumnsNumber.h>
#include <Columns/ColumnString.h> #include <Columns/ColumnString.h>
#include <common/logger_useful.h>
#include <ext/range.hpp> #include <ext/range.hpp>
#include <vector> #include <vector>
...@@ -24,7 +25,8 @@ ODBCBlockInputStream::ODBCBlockInputStream( ...@@ -24,7 +25,8 @@ ODBCBlockInputStream::ODBCBlockInputStream(
statement{(this->session << query_str, Poco::Data::Keywords::now)}, statement{(this->session << query_str, Poco::Data::Keywords::now)},
result{statement}, result{statement},
iterator{result.begin()}, iterator{result.begin()},
max_block_size{max_block_size} max_block_size{max_block_size},
log(&Logger::get("ODBCBlockInputStream"))
{ {
if (sample_block.columns() != result.columnCount()) if (sample_block.columns() != result.columnCount())
throw Exception{ throw Exception{
...@@ -80,6 +82,7 @@ Block ODBCBlockInputStream::readImpl() ...@@ -80,6 +82,7 @@ Block ODBCBlockInputStream::readImpl()
for (const auto i : ext::range(0, columns.size())) for (const auto i : ext::range(0, columns.size()))
columns[i] = block.safeGetByPosition(i).column.get(); columns[i] = block.safeGetByPosition(i).column.get();
size_t num_rows = 0; size_t num_rows = 0;
while (iterator != result.end()) while (iterator != result.end())
{ {
...@@ -95,11 +98,12 @@ Block ODBCBlockInputStream::readImpl() ...@@ -95,11 +98,12 @@ Block ODBCBlockInputStream::readImpl()
insertDefaultValue(columns[idx], *description.sample_columns[idx]); insertDefaultValue(columns[idx], *description.sample_columns[idx]);
} }
++iterator;
++num_rows; ++num_rows;
if (num_rows == max_block_size) if (num_rows == max_block_size)
break; break;
++iterator;
} }
return block; return block;
......
...@@ -41,6 +41,8 @@ private: ...@@ -41,6 +41,8 @@ private:
const std::size_t max_block_size; const std::size_t max_block_size;
ExternalResultDescription description; ExternalResultDescription description;
Poco::Logger * log;
}; };
} }
#include <DataTypes/DataTypeString.h>
#include <Columns/ColumnString.h>
#include <Poco/Data/SessionPool.h> #include <Poco/Data/SessionPool.h>
#include <Poco/Util/AbstractConfiguration.h> #include <Poco/Util/AbstractConfiguration.h>
#include <Dictionaries/ODBCDictionarySource.h> #include <Dictionaries/ODBCDictionarySource.h>
#include <Dictionaries/ODBCBlockInputStream.h> #include <Dictionaries/ODBCBlockInputStream.h>
#include <common/logger_useful.h> #include <common/logger_useful.h>
#include <Dictionaries/readInvalidateQuery.h>
namespace DB namespace DB
...@@ -22,7 +25,8 @@ ODBCDictionarySource::ODBCDictionarySource(const DictionaryStructure & dict_stru ...@@ -22,7 +25,8 @@ ODBCDictionarySource::ODBCDictionarySource(const DictionaryStructure & dict_stru
where{config.getString(config_prefix + ".where", "")}, where{config.getString(config_prefix + ".where", "")},
sample_block{sample_block}, sample_block{sample_block},
query_builder{dict_struct, db, table, where, ExternalQueryBuilder::None}, /// NOTE Better to obtain quoting style via ODBC interface. query_builder{dict_struct, db, table, where, ExternalQueryBuilder::None}, /// NOTE Better to obtain quoting style via ODBC interface.
load_all_query{query_builder.composeLoadAllQuery()} load_all_query{query_builder.composeLoadAllQuery()},
invalidate_query{config.getString(config_prefix + ".invalidate_query", "")}
{ {
pool = createAndCheckResizePocoSessionPool([&] () { return std::make_shared<Poco::Data::SessionPool>( pool = createAndCheckResizePocoSessionPool([&] () { return std::make_shared<Poco::Data::SessionPool>(
config.getString(config_prefix + ".connector", "ODBC"), config.getString(config_prefix + ".connector", "ODBC"),
...@@ -40,7 +44,8 @@ ODBCDictionarySource::ODBCDictionarySource(const ODBCDictionarySource & other) ...@@ -40,7 +44,8 @@ ODBCDictionarySource::ODBCDictionarySource(const ODBCDictionarySource & other)
sample_block{other.sample_block}, sample_block{other.sample_block},
pool{other.pool}, pool{other.pool},
query_builder{dict_struct, db, table, where, ExternalQueryBuilder::None}, query_builder{dict_struct, db, table, where, ExternalQueryBuilder::None},
load_all_query{other.load_all_query} load_all_query{other.load_all_query},
invalidate_query{other.invalidate_query}, invalidate_query_response{other.invalidate_query_response}
{ {
} }
...@@ -78,11 +83,6 @@ BlockInputStreamPtr ODBCDictionarySource::loadKeys( ...@@ -78,11 +83,6 @@ BlockInputStreamPtr ODBCDictionarySource::loadKeys(
return std::make_shared<ODBCBlockInputStream>(pool->get(), query, sample_block, max_block_size); return std::make_shared<ODBCBlockInputStream>(pool->get(), query, sample_block, max_block_size);
} }
bool ODBCDictionarySource::isModified() const
{
return true;
}
bool ODBCDictionarySource::supportsSelectiveLoad() const bool ODBCDictionarySource::supportsSelectiveLoad() const
{ {
return true; return true;
...@@ -98,5 +98,26 @@ std::string ODBCDictionarySource::toString() const ...@@ -98,5 +98,26 @@ std::string ODBCDictionarySource::toString() const
return "ODBC: " + db + '.' + table + (where.empty() ? "" : ", where: " + where); return "ODBC: " + db + '.' + table + (where.empty() ? "" : ", where: " + where);
} }
bool ODBCDictionarySource::isModified() const
{
if (!invalidate_query.empty())
{
auto response = doInvalidateQuery(invalidate_query);
if (invalidate_query_response == response)
return false;
invalidate_query_response = response;
}
return true;
}
std::string ODBCDictionarySource::doInvalidateQuery(const std::string & request) const
{
Block sample_block;
ColumnPtr column(std::make_shared<ColumnString>());
sample_block.insert(ColumnWithTypeAndName(column, std::make_shared<DataTypeString>(), "Sample Block"));
ODBCBlockInputStream blockInputStream(pool->get(), request, sample_block, 1);
return readInvalidateQuery(blockInputStream);
}
} }
...@@ -52,6 +52,9 @@ public: ...@@ -52,6 +52,9 @@ public:
std::string toString() const override; std::string toString() const override;
private: private:
// execute invalidate_query. expects single cell in result
std::string doInvalidateQuery(const std::string & request) const;
Poco::Logger * log; Poco::Logger * log;
const DictionaryStructure dict_struct; const DictionaryStructure dict_struct;
...@@ -62,6 +65,8 @@ private: ...@@ -62,6 +65,8 @@ private:
std::shared_ptr<Poco::Data::SessionPool> pool = nullptr; std::shared_ptr<Poco::Data::SessionPool> pool = nullptr;
ExternalQueryBuilder query_builder; ExternalQueryBuilder query_builder;
const std::string load_all_query; const std::string load_all_query;
std::string invalidate_query;
mutable std::string invalidate_query_response;
using PocoSessionPoolConstructor = std::function<std::shared_ptr<Poco::Data::SessionPool>()>; using PocoSessionPoolConstructor = std::function<std::shared_ptr<Poco::Data::SessionPool>()>;
......
#include <Dictionaries/readInvalidateQuery.h>
#include <DataStreams/IProfilingBlockInputStream.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TOO_MUCH_COLUMNS;
extern const int TOO_MUCH_ROWS;
extern const int RECEIVED_EMPTY_DATA;
}
std::string readInvalidateQuery(IProfilingBlockInputStream & blockInputStream)
{
blockInputStream.readPrefix();
std::string response;
try
{
Block block = blockInputStream.read();
if (!block)
throw Exception("Empty response", ErrorCodes::RECEIVED_EMPTY_DATA);
auto columns = block.columns();
if (columns > 1)
throw Exception("Expected single column in resultset, got " + std::to_string(columns), ErrorCodes::TOO_MUCH_COLUMNS);
auto rows = block.rows();
if (rows == 0)
throw Exception("Expected single row in resultset, got 0", ErrorCodes::RECEIVED_EMPTY_DATA);
if (rows > 1)
throw Exception("Expected single row in resultset, got at least " + std::to_string(rows), ErrorCodes::TOO_MUCH_ROWS);
auto column = block.getByPosition(0).column;
response = column->getDataAt(0).toString();
while (block = blockInputStream.read())
{
if (block.rows() > 0)
throw Exception("Expected single row in resultset, got at least " + std::to_string(rows + 1), ErrorCodes::TOO_MUCH_ROWS);
}
}
catch (Exception& exception)
{
blockInputStream.cancel();
blockInputStream.readSuffix();
throw;
}
blockInputStream.readSuffix();
return response;
}
}
#pragma once
#include <string>
class IProfilingBlockInputStream;
namespace DB
{
// Using in MySQLDictionarySource and ODBCDictionarySource after processing invalidate_query
std::string readInvalidateQuery(IProfilingBlockInputStream & blockInputStream);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册