未验证 提交 295864e6 编写于 作者: A Amos Bird

better scalar query

上级 04a6c6ac
...@@ -365,7 +365,7 @@ private: ...@@ -365,7 +365,7 @@ private:
Stopwatch watch; Stopwatch watch;
RemoteBlockInputStream stream( RemoteBlockInputStream stream(
*(*connection_entries[connection_index]), *(*connection_entries[connection_index]),
query, {}, global_context, &settings, nullptr, Tables(), query_processing_stage); query, {}, global_context, &settings, nullptr, Scalars(), Tables(), query_processing_stage);
Progress progress; Progress progress;
stream.setProgressCallback([&progress](const Progress & value) { progress.incrementPiecewiseAtomically(value); }); stream.setProgressCallback([&progress](const Progress & value) { progress.incrementPiecewiseAtomically(value); });
......
...@@ -850,9 +850,10 @@ bool TCPHandler::receivePacket() ...@@ -850,9 +850,10 @@ bool TCPHandler::receivePacket()
return true; return true;
case Protocol::Client::Data: case Protocol::Client::Data:
case Protocol::Client::Scalar:
if (state.empty()) if (state.empty())
receiveUnexpectedData(); receiveUnexpectedData();
return receiveData(); return receiveData(packet_type == Protocol::Client::Scalar);
case Protocol::Client::Ping: case Protocol::Client::Ping:
writeVarUInt(Protocol::Server::Pong, *out); writeVarUInt(Protocol::Server::Pong, *out);
...@@ -957,18 +958,22 @@ void TCPHandler::receiveUnexpectedQuery() ...@@ -957,18 +958,22 @@ void TCPHandler::receiveUnexpectedQuery()
throw NetException("Unexpected packet Query received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT); throw NetException("Unexpected packet Query received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
} }
bool TCPHandler::receiveData() bool TCPHandler::receiveData(bool scalar)
{ {
initBlockInput(); initBlockInput();
/// The name of the temporary table for writing data, default to empty string /// The name of the temporary table for writing data, default to empty string
String external_table_name; String name;
readStringBinary(external_table_name, *in); readStringBinary(name, *in);
/// Read one block from the network and write it down /// Read one block from the network and write it down
Block block = state.block_in->read(); Block block = state.block_in->read();
if (block) if (block)
{
if (scalar)
query_context->addScalar(name, block);
else
{ {
/// If there is an insert request, then the data should be written directly to `state.io.out`. /// If there is an insert request, then the data should be written directly to `state.io.out`.
/// Otherwise, we write the blocks in the temporary `external_table_name` table. /// Otherwise, we write the blocks in the temporary `external_table_name` table.
...@@ -976,12 +981,12 @@ bool TCPHandler::receiveData() ...@@ -976,12 +981,12 @@ bool TCPHandler::receiveData()
{ {
StoragePtr storage; StoragePtr storage;
/// If such a table does not exist, create it. /// If such a table does not exist, create it.
if (!(storage = query_context->tryGetExternalTable(external_table_name))) if (!(storage = query_context->tryGetExternalTable(name)))
{ {
NamesAndTypesList columns = block.getNamesAndTypesList(); NamesAndTypesList columns = block.getNamesAndTypesList();
storage = StorageMemory::create("_external", external_table_name, ColumnsDescription{columns}, ConstraintsDescription{}); storage = StorageMemory::create("_external", name, ColumnsDescription{columns}, ConstraintsDescription{});
storage->startup(); storage->startup();
query_context->addExternalTable(external_table_name, storage); query_context->addExternalTable(name, storage);
} }
/// The data will be written directly to the table. /// The data will be written directly to the table.
state.io.out = storage->write(ASTPtr(), *query_context); state.io.out = storage->write(ASTPtr(), *query_context);
...@@ -990,6 +995,7 @@ bool TCPHandler::receiveData() ...@@ -990,6 +995,7 @@ bool TCPHandler::receiveData()
state.block_for_input = block; state.block_for_input = block;
else else
state.io.out->write(block); state.io.out->write(block);
}
return true; return true;
} }
else else
......
...@@ -153,7 +153,7 @@ private: ...@@ -153,7 +153,7 @@ private:
void receiveHello(); void receiveHello();
bool receivePacket(); bool receivePacket();
void receiveQuery(); void receiveQuery();
bool receiveData(); bool receiveData(bool scalar);
bool readDataNext(const size_t & poll_interval, const int & receive_timeout); bool readDataNext(const size_t & poll_interval, const int & receive_timeout);
void readData(const Settings & global_settings); void readData(const Settings & global_settings);
std::tuple<size_t, int> getReadTimeouts(const Settings & global_settings); std::tuple<size_t, int> getReadTimeouts(const Settings & global_settings);
......
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
namespace CurrentMetrics namespace CurrentMetrics
{ {
extern const Metric SendScalars;
extern const Metric SendExternalTables; extern const Metric SendExternalTables;
} }
...@@ -441,7 +442,7 @@ void Connection::sendCancel() ...@@ -441,7 +442,7 @@ void Connection::sendCancel()
} }
void Connection::sendData(const Block & block, const String & name) void Connection::sendData(const Block & block, const String & name, bool scalar)
{ {
//LOG_TRACE(log_wrapper.get(), "Sending data"); //LOG_TRACE(log_wrapper.get(), "Sending data");
...@@ -455,6 +456,9 @@ void Connection::sendData(const Block & block, const String & name) ...@@ -455,6 +456,9 @@ void Connection::sendData(const Block & block, const String & name)
block_out = std::make_shared<NativeBlockOutputStream>(*maybe_compressed_out, server_revision, block.cloneEmpty()); block_out = std::make_shared<NativeBlockOutputStream>(*maybe_compressed_out, server_revision, block.cloneEmpty());
} }
if (scalar)
writeVarUInt(Protocol::Client::Scalar, *out);
else
writeVarUInt(Protocol::Client::Data, *out); writeVarUInt(Protocol::Client::Data, *out);
writeStringBinary(name, *out); writeStringBinary(name, *out);
...@@ -484,6 +488,44 @@ void Connection::sendPreparedData(ReadBuffer & input, size_t size, const String ...@@ -484,6 +488,44 @@ void Connection::sendPreparedData(ReadBuffer & input, size_t size, const String
} }
void Connection::sendScalarsData(Scalars & data)
{
if (data.empty())
return;
Stopwatch watch;
size_t out_bytes = out ? out->count() : 0;
size_t maybe_compressed_out_bytes = maybe_compressed_out ? maybe_compressed_out->count() : 0;
size_t rows = 0;
CurrentMetrics::Increment metric_increment{CurrentMetrics::SendScalars};
for (auto & elem : data)
{
rows += elem.second.rows();
sendData(elem.second, elem.first, true /* scalar */);
}
out_bytes = out->count() - out_bytes;
maybe_compressed_out_bytes = maybe_compressed_out->count() - maybe_compressed_out_bytes;
double elapsed = watch.elapsedSeconds();
std::stringstream msg;
msg << std::fixed << std::setprecision(3);
msg << "Sent data for " << data.size() << " scalars, total " << rows << " rows in " << elapsed << " sec., "
<< static_cast<size_t>(rows / watch.elapsedSeconds()) << " rows/sec., "
<< maybe_compressed_out_bytes / 1048576.0 << " MiB (" << maybe_compressed_out_bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec.)";
if (compression == Protocol::Compression::Enable)
msg << ", compressed " << static_cast<double>(maybe_compressed_out_bytes) / out_bytes << " times to "
<< out_bytes / 1048576.0 << " MiB (" << out_bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec.)";
else
msg << ", no compression.";
LOG_DEBUG(log_wrapper.get(), msg.rdbuf());
}
void Connection::sendExternalTablesData(ExternalTablesData & data) void Connection::sendExternalTablesData(ExternalTablesData & data)
{ {
if (data.empty()) if (data.empty())
......
...@@ -133,7 +133,9 @@ public: ...@@ -133,7 +133,9 @@ public:
void sendCancel(); void sendCancel();
/// Send block of data; if name is specified, server will write it to external (temporary) table of that name. /// Send block of data; if name is specified, server will write it to external (temporary) table of that name.
void sendData(const Block & block, const String & name = ""); void sendData(const Block & block, const String & name = "", bool scalar = false);
/// Send all scalars.
void sendScalarsData(Scalars & data);
/// Send all contents of external (temporary) tables. /// Send all contents of external (temporary) tables.
void sendExternalTablesData(ExternalTablesData & data); void sendExternalTablesData(ExternalTablesData & data);
......
...@@ -51,6 +51,21 @@ MultiplexedConnections::MultiplexedConnections( ...@@ -51,6 +51,21 @@ MultiplexedConnections::MultiplexedConnections(
active_connection_count = connections.size(); active_connection_count = connections.size();
} }
void MultiplexedConnections::sendScalarsData(Scalars & data)
{
std::lock_guard lock(cancel_mutex);
if (!sent_query)
throw Exception("Cannot send scalars data: query not yet sent.", ErrorCodes::LOGICAL_ERROR);
for (ReplicaState & state : replica_states)
{
Connection * connection = state.connection;
if (connection != nullptr)
connection->sendScalarsData(data);
}
}
void MultiplexedConnections::sendExternalTablesData(std::vector<ExternalTablesData> & data) void MultiplexedConnections::sendExternalTablesData(std::vector<ExternalTablesData> & data)
{ {
std::lock_guard lock(cancel_mutex); std::lock_guard lock(cancel_mutex);
......
...@@ -27,6 +27,8 @@ public: ...@@ -27,6 +27,8 @@ public:
std::vector<IConnectionPool::Entry> && connections, std::vector<IConnectionPool::Entry> && connections,
const Settings & settings_, const ThrottlerPtr & throttler_); const Settings & settings_, const ThrottlerPtr & throttler_);
/// Send all scalars to replicas.
void sendScalarsData(Scalars & data);
/// Send all content of external tables to replicas. /// Send all content of external tables to replicas.
void sendExternalTablesData(std::vector<ExternalTablesData> & data); void sendExternalTablesData(std::vector<ExternalTablesData> & data);
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
M(OpenFileForWrite, "Number of files open for writing") \ M(OpenFileForWrite, "Number of files open for writing") \
M(Read, "Number of read (read, pread, io_getevents, etc.) syscalls in fly") \ M(Read, "Number of read (read, pread, io_getevents, etc.) syscalls in fly") \
M(Write, "Number of write (write, pwrite, io_getevents, etc.) syscalls in fly") \ M(Write, "Number of write (write, pwrite, io_getevents, etc.) syscalls in fly") \
M(SendScalars, "Number of connections that are sending data for scalars to remote servers.") \
M(SendExternalTables, "Number of connections that are sending data for external tables to remote servers. External tables are used to implement GLOBAL IN and GLOBAL JOIN operators with distributed subqueries.") \ M(SendExternalTables, "Number of connections that are sending data for external tables to remote servers. External tables are used to implement GLOBAL IN and GLOBAL JOIN operators with distributed subqueries.") \
M(QueryThread, "Number of query processing threads") \ M(QueryThread, "Number of query processing threads") \
M(ReadonlyReplica, "Number of Replicated tables that are currently in readonly state due to re-initialization after ZooKeeper session loss or due to startup without ZooKeeper configured.") \ M(ReadonlyReplica, "Number of Replicated tables that are currently in readonly state due to re-initialization after ZooKeeper session loss or due to startup without ZooKeeper configured.") \
......
...@@ -459,6 +459,8 @@ namespace ErrorCodes ...@@ -459,6 +459,8 @@ namespace ErrorCodes
extern const int DICTIONARY_ACCESS_DENIED = 482; extern const int DICTIONARY_ACCESS_DENIED = 482;
extern const int TOO_MANY_REDIRECTS = 483; extern const int TOO_MANY_REDIRECTS = 483;
extern const int INTERNAL_REDIS_ERROR = 484; extern const int INTERNAL_REDIS_ERROR = 484;
extern const int SCALAR_ALREADY_EXISTS = 485;
extern const int UNKNOWN_SCALAR = 486;
extern const int KEEPER_EXCEPTION = 999; extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000; extern const int POCO_EXCEPTION = 1000;
......
...@@ -112,7 +112,8 @@ namespace Protocol ...@@ -112,7 +112,8 @@ namespace Protocol
Cancel = 3, /// Cancel the query execution. Cancel = 3, /// Cancel the query execution.
Ping = 4, /// Check that connection to the server is alive. Ping = 4, /// Check that connection to the server is alive.
TablesStatusRequest = 5, /// Check status of tables on the server. TablesStatusRequest = 5, /// Check status of tables on the server.
KeepAlive = 6 /// Keep the connection alive KeepAlive = 6, /// Keep the connection alive
Scalar = 7 /// A block of data (compressed or not).
}; };
inline const char * toString(UInt64 packet) inline const char * toString(UInt64 packet)
......
...@@ -379,6 +379,8 @@ struct Settings : public SettingsCollection<Settings> ...@@ -379,6 +379,8 @@ struct Settings : public SettingsCollection<Settings>
M(SettingUInt64, max_live_view_insert_blocks_before_refresh, 64, "Limit maximum number of inserted blocks after which mergeable blocks are dropped and query is re-executed.") \ M(SettingUInt64, max_live_view_insert_blocks_before_refresh, 64, "Limit maximum number of inserted blocks after which mergeable blocks are dropped and query is re-executed.") \
M(SettingUInt64, min_free_disk_space_for_temporary_data, 0, "The minimum disk space to keep while writing temporary data used in external sorting and aggregation.") \ M(SettingUInt64, min_free_disk_space_for_temporary_data, 0, "The minimum disk space to keep while writing temporary data used in external sorting and aggregation.") \
\ \
M(SettingBool, enable_scalar_subquery_optimization, true, "If it is set to true, prevent scalar subqueries from (de)serializing large scalar values and possibly avoid running the same subquery more than once.") \
\
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \ /** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
\ \
M(SettingBool, allow_experimental_low_cardinality_type, true, "Obsolete setting, does nothing. Will be removed after 2019-08-13") \ M(SettingBool, allow_experimental_low_cardinality_type, true, "Obsolete setting, does nothing. Will be removed after 2019-08-13") \
......
...@@ -23,8 +23,8 @@ namespace ErrorCodes ...@@ -23,8 +23,8 @@ namespace ErrorCodes
RemoteBlockInputStream::RemoteBlockInputStream( RemoteBlockInputStream::RemoteBlockInputStream(
Connection & connection, Connection & connection,
const String & query_, const Block & header_, const Context & context_, const Settings * settings, const String & query_, const Block & header_, const Context & context_, const Settings * settings,
const ThrottlerPtr & throttler, const Tables & external_tables_, QueryProcessingStage::Enum stage_) const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
: header(header_), query(query_), context(context_), external_tables(external_tables_), stage(stage_) : header(header_), query(query_), context(context_), scalars(scalars_), external_tables(external_tables_), stage(stage_)
{ {
if (settings) if (settings)
context.setSettings(*settings); context.setSettings(*settings);
...@@ -38,8 +38,8 @@ RemoteBlockInputStream::RemoteBlockInputStream( ...@@ -38,8 +38,8 @@ RemoteBlockInputStream::RemoteBlockInputStream(
RemoteBlockInputStream::RemoteBlockInputStream( RemoteBlockInputStream::RemoteBlockInputStream(
std::vector<IConnectionPool::Entry> && connections, std::vector<IConnectionPool::Entry> && connections,
const String & query_, const Block & header_, const Context & context_, const Settings * settings, const String & query_, const Block & header_, const Context & context_, const Settings * settings,
const ThrottlerPtr & throttler, const Tables & external_tables_, QueryProcessingStage::Enum stage_) const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
: header(header_), query(query_), context(context_), external_tables(external_tables_), stage(stage_) : header(header_), query(query_), context(context_), scalars(scalars_), external_tables(external_tables_), stage(stage_)
{ {
if (settings) if (settings)
context.setSettings(*settings); context.setSettings(*settings);
...@@ -54,8 +54,8 @@ RemoteBlockInputStream::RemoteBlockInputStream( ...@@ -54,8 +54,8 @@ RemoteBlockInputStream::RemoteBlockInputStream(
RemoteBlockInputStream::RemoteBlockInputStream( RemoteBlockInputStream::RemoteBlockInputStream(
const ConnectionPoolWithFailoverPtr & pool, const ConnectionPoolWithFailoverPtr & pool,
const String & query_, const Block & header_, const Context & context_, const Settings * settings, const String & query_, const Block & header_, const Context & context_, const Settings * settings,
const ThrottlerPtr & throttler, const Tables & external_tables_, QueryProcessingStage::Enum stage_) const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
: header(header_), query(query_), context(context_), external_tables(external_tables_), stage(stage_) : header(header_), query(query_), context(context_), scalars(scalars_), external_tables(external_tables_), stage(stage_)
{ {
if (settings) if (settings)
context.setSettings(*settings); context.setSettings(*settings);
...@@ -120,6 +120,11 @@ void RemoteBlockInputStream::cancel(bool kill) ...@@ -120,6 +120,11 @@ void RemoteBlockInputStream::cancel(bool kill)
tryCancel("Cancelling query"); tryCancel("Cancelling query");
} }
void RemoteBlockInputStream::sendScalars()
{
multiplexed_connections->sendScalarsData(scalars);
}
void RemoteBlockInputStream::sendExternalTables() void RemoteBlockInputStream::sendExternalTables()
{ {
size_t count = multiplexed_connections->size(); size_t count = multiplexed_connections->size();
...@@ -308,6 +313,8 @@ void RemoteBlockInputStream::sendQuery() ...@@ -308,6 +313,8 @@ void RemoteBlockInputStream::sendQuery()
established = false; established = false;
sent_query = true; sent_query = true;
if (settings.enable_scalar_subquery_optimization)
sendScalars();
sendExternalTables(); sendExternalTables();
} }
......
...@@ -25,7 +25,7 @@ public: ...@@ -25,7 +25,7 @@ public:
RemoteBlockInputStream( RemoteBlockInputStream(
Connection & connection, Connection & connection,
const String & query_, const Block & header_, const Context & context_, const Settings * settings = nullptr, const String & query_, const Block & header_, const Context & context_, const Settings * settings = nullptr,
const ThrottlerPtr & throttler = nullptr, const Tables & external_tables_ = Tables(), const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete); QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
/// Accepts several connections already taken from pool. /// Accepts several connections already taken from pool.
...@@ -33,7 +33,7 @@ public: ...@@ -33,7 +33,7 @@ public:
RemoteBlockInputStream( RemoteBlockInputStream(
std::vector<IConnectionPool::Entry> && connections, std::vector<IConnectionPool::Entry> && connections,
const String & query_, const Block & header_, const Context & context_, const Settings * settings = nullptr, const String & query_, const Block & header_, const Context & context_, const Settings * settings = nullptr,
const ThrottlerPtr & throttler = nullptr, const Tables & external_tables_ = Tables(), const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete); QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
/// Takes a pool and gets one or several connections from it. /// Takes a pool and gets one or several connections from it.
...@@ -41,7 +41,7 @@ public: ...@@ -41,7 +41,7 @@ public:
RemoteBlockInputStream( RemoteBlockInputStream(
const ConnectionPoolWithFailoverPtr & pool, const ConnectionPoolWithFailoverPtr & pool,
const String & query_, const Block & header_, const Context & context_, const Settings * settings = nullptr, const String & query_, const Block & header_, const Context & context_, const Settings * settings = nullptr,
const ThrottlerPtr & throttler = nullptr, const Tables & external_tables_ = Tables(), const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete); QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
~RemoteBlockInputStream() override; ~RemoteBlockInputStream() override;
...@@ -71,6 +71,9 @@ public: ...@@ -71,6 +71,9 @@ public:
Block getHeader() const override { return header; } Block getHeader() const override { return header; }
protected: protected:
/// Send all scalars to remote servers
void sendScalars();
/// Send all temporary tables to remote servers /// Send all temporary tables to remote servers
void sendExternalTables(); void sendExternalTables();
...@@ -103,6 +106,8 @@ private: ...@@ -103,6 +106,8 @@ private:
String query_id = ""; String query_id = "";
Context context; Context context;
/// Scalars needed to be sent to remote servers
Scalars scalars;
/// Temporary tables needed to be sent to remote servers /// Temporary tables needed to be sent to remote servers
Tables external_tables; Tables external_tables;
QueryProcessingStage::Enum stage; QueryProcessingStage::Enum stage;
......
#include <Functions/IFunction.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <DataTypes/DataTypeString.h>
#include <Columns/ColumnString.h>
#include <Interpreters/Context.h>
#include <Common/Macros.h>
#include <Core/Field.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int ILLEGAL_COLUMN;
}
/** Get scalar value of sub queries from query context via IAST::Hash.
*/
class FunctionGetScalar : public IFunction
{
public:
static constexpr auto name = "__getScalar";
static FunctionPtr create(const Context & context)
{
return std::make_shared<FunctionGetScalar>(context);
}
FunctionGetScalar(const Context & context_) : context(context_) {}
String getName() const override
{
return name;
}
size_t getNumberOfArguments() const override
{
return 1;
}
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
if (arguments.size() != 1 || !isString(arguments[0].type) || !isColumnConst(*arguments[0].column))
throw Exception("Function " + getName() + " accepts one const string argument", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
auto scalar_name = assert_cast<const ColumnConst &>(*arguments[0].column).getField().get<String>();
scalar = context.getScalar(scalar_name).getByPosition(0);
return scalar.type;
}
void executeImpl(Block & block, const ColumnNumbers &, size_t result, size_t input_rows_count) override
{
block.getByPosition(result).column = ColumnConst::create(scalar.column, input_rows_count);
}
private:
mutable ColumnWithTypeAndName scalar;
const Context & context;
};
void registerFunctionGetScalar(FunctionFactory & factory)
{
factory.registerFunction<FunctionGetScalar>();
}
}
...@@ -52,6 +52,7 @@ void registerFunctionEvalMLMethod(FunctionFactory &); ...@@ -52,6 +52,7 @@ void registerFunctionEvalMLMethod(FunctionFactory &);
void registerFunctionBasename(FunctionFactory &); void registerFunctionBasename(FunctionFactory &);
void registerFunctionTransform(FunctionFactory &); void registerFunctionTransform(FunctionFactory &);
void registerFunctionGetMacro(FunctionFactory &); void registerFunctionGetMacro(FunctionFactory &);
void registerFunctionGetScalar(FunctionFactory &);
#if USE_ICU #if USE_ICU
void registerFunctionConvertCharset(FunctionFactory &); void registerFunctionConvertCharset(FunctionFactory &);
...@@ -106,6 +107,7 @@ void registerFunctionsMiscellaneous(FunctionFactory & factory) ...@@ -106,6 +107,7 @@ void registerFunctionsMiscellaneous(FunctionFactory & factory)
registerFunctionBasename(factory); registerFunctionBasename(factory);
registerFunctionTransform(factory); registerFunctionTransform(factory);
registerFunctionGetMacro(factory); registerFunctionGetMacro(factory);
registerFunctionGetScalar(factory);
#if USE_ICU #if USE_ICU
registerFunctionConvertCharset(factory); registerFunctionConvertCharset(factory);
......
...@@ -33,11 +33,13 @@ SelectStreamFactory::SelectStreamFactory( ...@@ -33,11 +33,13 @@ SelectStreamFactory::SelectStreamFactory(
const Block & header_, const Block & header_,
QueryProcessingStage::Enum processed_stage_, QueryProcessingStage::Enum processed_stage_,
QualifiedTableName main_table_, QualifiedTableName main_table_,
const Scalars & scalars_,
const Tables & external_tables_) const Tables & external_tables_)
: header(header_), : header(header_),
processed_stage{processed_stage_}, processed_stage{processed_stage_},
main_table(std::move(main_table_)), main_table(std::move(main_table_)),
table_func_ptr{nullptr}, table_func_ptr{nullptr},
scalars{scalars_},
external_tables{external_tables_} external_tables{external_tables_}
{ {
} }
...@@ -46,10 +48,12 @@ SelectStreamFactory::SelectStreamFactory( ...@@ -46,10 +48,12 @@ SelectStreamFactory::SelectStreamFactory(
const Block & header_, const Block & header_,
QueryProcessingStage::Enum processed_stage_, QueryProcessingStage::Enum processed_stage_,
ASTPtr table_func_ptr_, ASTPtr table_func_ptr_,
const Scalars & scalars_,
const Tables & external_tables_) const Tables & external_tables_)
: header(header_), : header(header_),
processed_stage{processed_stage_}, processed_stage{processed_stage_},
table_func_ptr{table_func_ptr_}, table_func_ptr{table_func_ptr_},
scalars{scalars_},
external_tables{external_tables_} external_tables{external_tables_}
{ {
} }
...@@ -92,7 +96,8 @@ void SelectStreamFactory::createForShard( ...@@ -92,7 +96,8 @@ void SelectStreamFactory::createForShard(
auto emplace_remote_stream = [&]() auto emplace_remote_stream = [&]()
{ {
auto stream = std::make_shared<RemoteBlockInputStream>(shard_info.pool, query, header, context, nullptr, throttler, external_tables, processed_stage); auto stream = std::make_shared<RemoteBlockInputStream>(
shard_info.pool, query, header, context, nullptr, throttler, scalars, external_tables, processed_stage);
stream->setPoolMode(PoolMode::GET_MANY); stream->setPoolMode(PoolMode::GET_MANY);
if (!table_func_ptr) if (!table_func_ptr)
stream->setMainTable(main_table); stream->setMainTable(main_table);
...@@ -190,8 +195,8 @@ void SelectStreamFactory::createForShard( ...@@ -190,8 +195,8 @@ void SelectStreamFactory::createForShard(
auto lazily_create_stream = [ auto lazily_create_stream = [
pool = shard_info.pool, shard_num = shard_info.shard_num, query, header = header, query_ast, context, throttler, pool = shard_info.pool, shard_num = shard_info.shard_num, query, header = header, query_ast, context, throttler,
main_table = main_table, table_func_ptr = table_func_ptr, external_tables = external_tables, stage = processed_stage, main_table = main_table, table_func_ptr = table_func_ptr, scalars = scalars, external_tables = external_tables,
local_delay]() stage = processed_stage, local_delay]()
-> BlockInputStreamPtr -> BlockInputStreamPtr
{ {
auto current_settings = context.getSettingsRef(); auto current_settings = context.getSettingsRef();
...@@ -233,7 +238,7 @@ void SelectStreamFactory::createForShard( ...@@ -233,7 +238,7 @@ void SelectStreamFactory::createForShard(
connections.emplace_back(std::move(try_result.entry)); connections.emplace_back(std::move(try_result.entry));
return std::make_shared<RemoteBlockInputStream>( return std::make_shared<RemoteBlockInputStream>(
std::move(connections), query, header, context, nullptr, throttler, external_tables, stage); std::move(connections), query, header, context, nullptr, throttler, scalars, external_tables, stage);
} }
}; };
......
...@@ -18,6 +18,7 @@ public: ...@@ -18,6 +18,7 @@ public:
const Block & header_, const Block & header_,
QueryProcessingStage::Enum processed_stage_, QueryProcessingStage::Enum processed_stage_,
QualifiedTableName main_table_, QualifiedTableName main_table_,
const Scalars & scalars_,
const Tables & external_tables); const Tables & external_tables);
/// TableFunction in a query. /// TableFunction in a query.
...@@ -25,6 +26,7 @@ public: ...@@ -25,6 +26,7 @@ public:
const Block & header_, const Block & header_,
QueryProcessingStage::Enum processed_stage_, QueryProcessingStage::Enum processed_stage_,
ASTPtr table_func_ptr_, ASTPtr table_func_ptr_,
const Scalars & scalars_,
const Tables & external_tables_); const Tables & external_tables_);
void createForShard( void createForShard(
...@@ -38,6 +40,7 @@ private: ...@@ -38,6 +40,7 @@ private:
QueryProcessingStage::Enum processed_stage; QueryProcessingStage::Enum processed_stage;
QualifiedTableName main_table; QualifiedTableName main_table;
ASTPtr table_func_ptr; ASTPtr table_func_ptr;
Scalars scalars;
Tables external_tables; Tables external_tables;
}; };
......
...@@ -88,6 +88,8 @@ namespace ErrorCodes ...@@ -88,6 +88,8 @@ namespace ErrorCodes
extern const int SESSION_IS_LOCKED; extern const int SESSION_IS_LOCKED;
extern const int CANNOT_GET_CREATE_TABLE_QUERY; extern const int CANNOT_GET_CREATE_TABLE_QUERY;
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int SCALAR_ALREADY_EXISTS;
extern const int UNKNOWN_SCALAR;
} }
...@@ -862,6 +864,21 @@ void Context::assertDatabaseDoesntExist(const String & database_name) const ...@@ -862,6 +864,21 @@ void Context::assertDatabaseDoesntExist(const String & database_name) const
} }
const Scalars & Context::getScalars() const
{
return scalars;
}
const Block & Context::getScalar(const String & name) const
{
auto it = scalars.find(name);
if (scalars.end() == it)
throw Exception("Scalar " + backQuoteIfNeed(name) + " doesn't exist (internal bug)", ErrorCodes::UNKNOWN_SCALAR);
return it->second;
}
Tables Context::getExternalTables() const Tables Context::getExternalTables() const
{ {
auto lock = getLock(); auto lock = getLock();
...@@ -959,6 +976,19 @@ void Context::addExternalTable(const String & table_name, const StoragePtr & sto ...@@ -959,6 +976,19 @@ void Context::addExternalTable(const String & table_name, const StoragePtr & sto
external_tables[table_name] = std::pair(storage, ast); external_tables[table_name] = std::pair(storage, ast);
} }
void Context::addScalar(const String & name, const Block & block)
{
scalars[name] = block;
}
bool Context::hasScalar(const String & name) const
{
return scalars.count(name);
}
StoragePtr Context::tryRemoveExternalTable(const String & table_name) StoragePtr Context::tryRemoveExternalTable(const String & table_name)
{ {
TableAndCreateASTs::const_iterator it = external_tables.find(table_name); TableAndCreateASTs::const_iterator it = external_tables.find(table_name);
......
...@@ -105,6 +105,9 @@ using InputInitializer = std::function<void(Context &, const StoragePtr &)>; ...@@ -105,6 +105,9 @@ using InputInitializer = std::function<void(Context &, const StoragePtr &)>;
/// Callback for reading blocks of data from client for function input() /// Callback for reading blocks of data from client for function input()
using InputBlocksReader = std::function<Block(Context &)>; using InputBlocksReader = std::function<Block(Context &)>;
/// Scalar results of sub queries
using Scalars = std::map<String, Block>;
/// An empty interface for an arbitrary object that may be attached by a shared pointer /// An empty interface for an arbitrary object that may be attached by a shared pointer
/// to query context, when using ClickHouse as a library. /// to query context, when using ClickHouse as a library.
struct IHostContext struct IHostContext
...@@ -144,6 +147,7 @@ private: ...@@ -144,6 +147,7 @@ private:
String default_format; /// Format, used when server formats data by itself and if query does not have FORMAT specification. String default_format; /// Format, used when server formats data by itself and if query does not have FORMAT specification.
/// Thus, used in HTTP interface. If not specified - then some globally default format is used. /// Thus, used in HTTP interface. If not specified - then some globally default format is used.
TableAndCreateASTs external_tables; /// Temporary tables. TableAndCreateASTs external_tables; /// Temporary tables.
Scalars scalars;
StoragePtr view_source; /// Temporary StorageValues used to generate alias columns for materialized views StoragePtr view_source; /// Temporary StorageValues used to generate alias columns for materialized views
Tables table_function_results; /// Temporary tables obtained by execution of table functions. Keyed by AST tree id. Tables table_function_results; /// Temporary tables obtained by execution of table functions. Keyed by AST tree id.
Context * query_context = nullptr; Context * query_context = nullptr;
...@@ -264,11 +268,15 @@ public: ...@@ -264,11 +268,15 @@ public:
void assertDatabaseDoesntExist(const String & database_name) const; void assertDatabaseDoesntExist(const String & database_name) const;
void checkDatabaseAccessRights(const std::string & database_name) const; void checkDatabaseAccessRights(const std::string & database_name) const;
const Scalars & getScalars() const;
const Block & getScalar(const String & name) const;
Tables getExternalTables() const; Tables getExternalTables() const;
StoragePtr tryGetExternalTable(const String & table_name) const; StoragePtr tryGetExternalTable(const String & table_name) const;
StoragePtr getTable(const String & database_name, const String & table_name) const; StoragePtr getTable(const String & database_name, const String & table_name) const;
StoragePtr tryGetTable(const String & database_name, const String & table_name) const; StoragePtr tryGetTable(const String & database_name, const String & table_name) const;
void addExternalTable(const String & table_name, const StoragePtr & storage, const ASTPtr & ast = {}); void addExternalTable(const String & table_name, const StoragePtr & storage, const ASTPtr & ast = {});
void addScalar(const String & name, const Block & block);
bool hasScalar(const String & name) const;
StoragePtr tryRemoveExternalTable(const String & table_name); StoragePtr tryRemoveExternalTable(const String & table_name);
StoragePtr executeTableFunction(const ASTPtr & table_expression); StoragePtr executeTableFunction(const ASTPtr & table_expression);
......
...@@ -12,8 +12,11 @@ ...@@ -12,8 +12,11 @@
#include <Interpreters/addTypeConversionToAST.h> #include <Interpreters/addTypeConversionToAST.h>
#include <DataStreams/IBlockInputStream.h> #include <DataStreams/IBlockInputStream.h>
#include <DataStreams/materializeBlock.h>
#include <DataTypes/DataTypeAggregateFunction.h> #include <DataTypes/DataTypeAggregateFunction.h>
#include <DataTypes/DataTypeTuple.h>
#include <Columns/ColumnTuple.h>
namespace DB namespace DB
{ {
...@@ -53,8 +56,29 @@ void ExecuteScalarSubqueriesMatcher::visit(ASTPtr & ast, Data & data) ...@@ -53,8 +56,29 @@ void ExecuteScalarSubqueriesMatcher::visit(ASTPtr & ast, Data & data)
visit(*t, ast, data); visit(*t, ast, data);
} }
/// Converting to literal values might take a fair amount of overhead when the value is large, (e.g.
/// Array, BitMap, etc.), This conversion is required for constant folding, index lookup, branch
/// elimination. However, these optimizations should never be related to large values, thus we
/// blacklist them here.
static bool worthConvertingToLiteral(const Block & scalar)
{
auto scalar_type_name = scalar.safeGetByPosition(0).type->getFamilyName();
std::set<String> useless_literal_types = {"Array", "Tuple", "AggregateFunction", "Function", "Set", "LowCardinality"};
return !useless_literal_types.count(scalar_type_name);
}
void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr & ast, Data & data) void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr & ast, Data & data)
{ {
auto hash = subquery.getTreeHash();
auto scalar_query_hash_str = toString(hash.first) + "_" + toString(hash.second);
Block scalar;
if (data.context.hasQueryContext() && data.context.getQueryContext().hasScalar(scalar_query_hash_str))
scalar = data.context.getQueryContext().getScalar(scalar_query_hash_str);
else if (data.scalars.count(scalar_query_hash_str))
scalar = data.scalars[scalar_query_hash_str];
else
{
Context subquery_context = data.context; Context subquery_context = data.context;
Settings subquery_settings = data.context.getSettings(); Settings subquery_settings = data.context.getSettings();
subquery_settings.max_result_rows = 1; subquery_settings.max_result_rows = 1;
...@@ -90,32 +114,40 @@ void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr ...@@ -90,32 +114,40 @@ void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr
throw; throw;
} }
block = materializeBlock(block);
size_t columns = block.columns(); size_t columns = block.columns();
if (columns == 1) if (columns == 1)
scalar = block;
else
{
ColumnWithTypeAndName ctn;
ctn.type = std::make_shared<DataTypeTuple>(block.getDataTypes());
ctn.column = ColumnTuple::create(block.getColumns());
scalar.insert(ctn);
}
}
const Settings & settings = data.context.getSettingsRef();
// Always convert to literals when there is no query context.
if (!settings.enable_scalar_subquery_optimization || worthConvertingToLiteral(scalar) || !data.context.hasQueryContext())
{ {
auto lit = std::make_unique<ASTLiteral>((*block.safeGetByPosition(0).column)[0]); auto lit = std::make_unique<ASTLiteral>((*scalar.safeGetByPosition(0).column)[0]);
lit->alias = subquery.alias; lit->alias = subquery.alias;
lit->prefer_alias_to_column_name = subquery.prefer_alias_to_column_name; lit->prefer_alias_to_column_name = subquery.prefer_alias_to_column_name;
ast = addTypeConversionToAST(std::move(lit), block.safeGetByPosition(0).type->getName()); ast = addTypeConversionToAST(std::move(lit), scalar.safeGetByPosition(0).type->getName());
} }
else else
{ {
auto tuple = std::make_shared<ASTFunction>(); auto func = makeASTFunction("__getScalar", std::make_shared<ASTLiteral>(scalar_query_hash_str));
tuple->alias = subquery.alias; func->alias = subquery.alias;
ast = tuple; func->prefer_alias_to_column_name = subquery.prefer_alias_to_column_name;
tuple->name = "tuple"; ast = std::move(func);
auto exp_list = std::make_shared<ASTExpressionList>();
tuple->arguments = exp_list;
tuple->children.push_back(tuple->arguments);
exp_list->children.resize(columns);
for (size_t i = 0; i < columns; ++i)
{
exp_list->children[i] = addTypeConversionToAST(
std::make_unique<ASTLiteral>((*block.safeGetByPosition(i).column)[0]),
block.safeGetByPosition(i).type->getName());
}
} }
data.scalars[scalar_query_hash_str] = std::move(scalar);
} }
void ExecuteScalarSubqueriesMatcher::visit(const ASTFunction & func, ASTPtr & ast, Data & data) void ExecuteScalarSubqueriesMatcher::visit(const ASTFunction & func, ASTPtr & ast, Data & data)
......
#pragma once #pragma once
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <Core/Block.h>
#include <Interpreters/InDepthNodeVisitor.h> #include <Interpreters/InDepthNodeVisitor.h>
namespace DB namespace DB
...@@ -36,6 +37,7 @@ public: ...@@ -36,6 +37,7 @@ public:
{ {
const Context & context; const Context & context;
size_t subquery_depth; size_t subquery_depth;
Scalars & scalars;
}; };
static bool needChildVisit(ASTPtr & node, const ASTPtr &); static bool needChildVisit(ASTPtr & node, const ASTPtr &);
......
...@@ -305,6 +305,12 @@ InterpreterSelectQuery::InterpreterSelectQuery( ...@@ -305,6 +305,12 @@ InterpreterSelectQuery::InterpreterSelectQuery(
syntax_analyzer_result = SyntaxAnalyzer(context, options).analyze( syntax_analyzer_result = SyntaxAnalyzer(context, options).analyze(
query_ptr, source_header.getNamesAndTypesList(), required_result_column_names, storage, NamesAndTypesList()); query_ptr, source_header.getNamesAndTypesList(), required_result_column_names, storage, NamesAndTypesList());
/// Save scalar sub queries's results in the query context
if (context.hasQueryContext())
for (const auto & it : syntax_analyzer_result->getScalars())
context.getQueryContext().addScalar(it.first, it.second);
query_analyzer = std::make_unique<SelectQueryExpressionAnalyzer>( query_analyzer = std::make_unique<SelectQueryExpressionAnalyzer>(
query_ptr, syntax_analyzer_result, context, query_ptr, syntax_analyzer_result, context,
NameSet(required_result_column_names.begin(), required_result_column_names.end()), NameSet(required_result_column_names.begin(), required_result_column_names.end()),
......
...@@ -220,10 +220,10 @@ void removeUnneededColumnsFromSelectClause(const ASTSelectQuery * select_query, ...@@ -220,10 +220,10 @@ void removeUnneededColumnsFromSelectClause(const ASTSelectQuery * select_query,
} }
/// Replacing scalar subqueries with constant values. /// Replacing scalar subqueries with constant values.
void executeScalarSubqueries(ASTPtr & query, const Context & context, size_t subquery_depth) void executeScalarSubqueries(ASTPtr & query, const Context & context, size_t subquery_depth, Scalars & scalars)
{ {
LogAST log; LogAST log;
ExecuteScalarSubqueriesVisitor::Data visitor_data{context, subquery_depth}; ExecuteScalarSubqueriesVisitor::Data visitor_data{context, subquery_depth, scalars};
ExecuteScalarSubqueriesVisitor(visitor_data, log.stream()).visit(query); ExecuteScalarSubqueriesVisitor(visitor_data, log.stream()).visit(query);
} }
...@@ -871,7 +871,7 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze( ...@@ -871,7 +871,7 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
removeUnneededColumnsFromSelectClause(select_query, required_result_columns, remove_duplicates); removeUnneededColumnsFromSelectClause(select_query, required_result_columns, remove_duplicates);
/// Executing scalar subqueries - replacing them with constant values. /// Executing scalar subqueries - replacing them with constant values.
executeScalarSubqueries(query, context, subquery_depth); executeScalarSubqueries(query, context, subquery_depth, result.scalars);
/// Optimize if with constant condition after constants was substituted instead of scalar subqueries. /// Optimize if with constant condition after constants was substituted instead of scalar subqueries.
OptimizeIfWithConstantConditionVisitor(result.aliases).visit(query); OptimizeIfWithConstantConditionVisitor(result.aliases).visit(query);
......
#pragma once #pragma once
#include <Core/Block.h>
#include <Core/NamesAndTypes.h> #include <Core/NamesAndTypes.h>
#include <Interpreters/Aliases.h> #include <Interpreters/Aliases.h>
#include <Interpreters/SelectQueryOptions.h> #include <Interpreters/SelectQueryOptions.h>
...@@ -14,6 +15,7 @@ class ASTFunction; ...@@ -14,6 +15,7 @@ class ASTFunction;
class AnalyzedJoin; class AnalyzedJoin;
class Context; class Context;
struct SelectQueryOptions; struct SelectQueryOptions;
using Scalars = std::map<String, Block>;
struct SyntaxAnalyzerResult struct SyntaxAnalyzerResult
{ {
...@@ -43,8 +45,12 @@ struct SyntaxAnalyzerResult ...@@ -43,8 +45,12 @@ struct SyntaxAnalyzerResult
/// Predicate optimizer overrides the sub queries /// Predicate optimizer overrides the sub queries
bool rewrite_subqueries = false; bool rewrite_subqueries = false;
/// Results of scalar sub queries
Scalars scalars;
void collectUsedColumns(const ASTPtr & query, const NamesAndTypesList & additional_source_columns); void collectUsedColumns(const ASTPtr & query, const NamesAndTypesList & additional_source_columns);
Names requiredSourceColumns() const { return required_source_columns.getNames(); } Names requiredSourceColumns() const { return required_source_columns.getNames(); }
const Scalars & getScalars() const { return scalars; }
}; };
using SyntaxAnalyzerResultPtr = std::shared_ptr<const SyntaxAnalyzerResult>; using SyntaxAnalyzerResultPtr = std::shared_ptr<const SyntaxAnalyzerResult>;
......
...@@ -323,11 +323,13 @@ BlockInputStreams StorageDistributed::read( ...@@ -323,11 +323,13 @@ BlockInputStreams StorageDistributed::read(
Block header = Block header =
InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage)).getSampleBlock(); InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage)).getSampleBlock();
const Scalars & scalars = context.hasQueryContext() ? context.getQueryContext().getScalars() : Scalars{};
ClusterProxy::SelectStreamFactory select_stream_factory = remote_table_function_ptr ClusterProxy::SelectStreamFactory select_stream_factory = remote_table_function_ptr
? ClusterProxy::SelectStreamFactory( ? ClusterProxy::SelectStreamFactory(
header, processed_stage, remote_table_function_ptr, context.getExternalTables()) header, processed_stage, remote_table_function_ptr, scalars, context.getExternalTables())
: ClusterProxy::SelectStreamFactory( : ClusterProxy::SelectStreamFactory(
header, processed_stage, QualifiedTableName{remote_database, remote_table}, context.getExternalTables()); header, processed_stage, QualifiedTableName{remote_database, remote_table}, scalars, context.getExternalTables());
if (settings.optimize_skip_unused_shards) if (settings.optimize_skip_unused_shards)
{ {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册