提交 7cc2871e 编写于 作者: A Alexey Milovidov

dbms: allowed to send custom settings with query; sending current settings to...

dbms: allowed to send custom settings with query; sending current settings to remote servers for distributed query processing [#CONV-2944].

上级 f8c17efe
......@@ -18,6 +18,8 @@
#include <DB/DataStreams/IBlockInputStream.h>
#include <DB/DataStreams/IBlockOutputStream.h>
#include <DB/Interpreters/Settings.h>
namespace DB
{
......@@ -72,7 +74,9 @@ public:
String getServerAddress() const;
/// query_id не должен быть равен 0.
void sendQuery(const String & query, UInt64 query_id_ = 1, UInt64 stage = QueryProcessingStage::Complete);
void sendQuery(const String & query, UInt64 query_id_ = 1, UInt64 stage = QueryProcessingStage::Complete,
const Settings * settings = NULL);
void sendCancel();
void sendData(const Block & block);
......
......@@ -22,5 +22,7 @@
#define DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE 128
#define DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES 3
#define DBMS_MIN_REVISION_WITH_PER_QUERY_SETTINGS 28558
/// Суффикс имени для столбца, содержащего смещения массива.
#define ARRAY_SIZES_COLUMN_NAME_SUFFIX ".size"
......@@ -65,7 +65,8 @@ namespace Protocol
enum Enum
{
Hello = 0, /// Имя, версия, ревизия, БД по-умолчанию.
Query = 1, /** Идентификатор запроса, информация, до какой стадии исполнять запрос,
Query = 1, /** Идентификатор запроса, настройки на отдельный запрос,
* информация, до какой стадии исполнять запрос,
* использовать ли сжатие, текст запроса (без данных для INSERT-а).
*/
Data = 2, /// Блок данных со сжатием или без.
......
......@@ -15,21 +15,35 @@ namespace DB
class RemoteBlockInputStream : public IProfilingBlockInputStream
{
public:
RemoteBlockInputStream(Connection & connection_, const String & query_,
RemoteBlockInputStream(Connection & connection_, const String & query_, const Settings * settings_,
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete)
: connection(connection_), query(query_), stage(stage_),
sent_query(false), finished(false), was_cancelled(false), got_exception_from_server(false),
log(&Logger::get("RemoteBlockInputStream (" + connection.getServerAddress() + ")"))
{
if (settings_)
{
send_settings = true;
settings = *settings_;
}
else
send_settings = false;
}
/// Захватывает владение соединением из пула.
RemoteBlockInputStream(ConnectionPool::Entry pool_entry_, const String & query_,
RemoteBlockInputStream(ConnectionPool::Entry pool_entry_, const String & query_, const Settings * settings_,
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete)
: pool_entry(pool_entry_), connection(*pool_entry), query(query_), stage(stage_),
sent_query(false), finished(false), was_cancelled(false), got_exception_from_server(false),
log(&Logger::get("RemoteBlockInputStream (" + connection.getServerAddress() + ")"))
{
if (settings_)
{
send_settings = true;
settings = *settings_;
}
else
send_settings = false;
}
......@@ -38,8 +52,8 @@ public:
BlockInputStreamPtr clone()
{
return pool_entry.isNull()
? new RemoteBlockInputStream(connection, query, stage)
: new RemoteBlockInputStream(pool_entry, query, stage);
? new RemoteBlockInputStream(connection, query, send_settings ? &settings : NULL, stage)
: new RemoteBlockInputStream(pool_entry, query, send_settings ? &settings : NULL, stage);
}
......@@ -127,7 +141,7 @@ protected:
{
if (!sent_query)
{
connection.sendQuery(query, 1, stage);
connection.sendQuery(query, 1, stage, send_settings ? &settings : NULL);
sent_query = true;
}
......@@ -173,6 +187,8 @@ private:
Connection & connection;
const String query;
bool send_settings;
Settings settings;
QueryProcessingStage::Enum stage;
/// Отправили запрос (это делается перед получением первого блока).
......
......@@ -130,6 +130,7 @@ public:
void setGlobalContext(Context & context_) { global_context = &context_; }
const Settings & getSettingsRef() const { return settings; };
Settings & getSettingsRef() { return settings; };
};
......
......@@ -4,6 +4,11 @@
#include <DB/Core/Defines.h>
#include <DB/Core/Field.h>
#include <DB/IO/ReadBuffer.h>
#include <DB/IO/WriteBuffer.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/WriteHelpers.h>
namespace DB
{
......@@ -19,13 +24,23 @@ struct Limits
/// Что делать, если ограничение превышено.
enum OverflowMode
{
THROW, /// Кинуть исключение.
BREAK, /// Прервать выполнение запроса, вернуть что есть.
ANY, /** Только для GROUP BY: не добавлять новые строки в набор,
* но продолжать агрегировать для ключей, успевших попасть в набор.
*/
THROW = 0, /// Кинуть исключение.
BREAK = 1, /// Прервать выполнение запроса, вернуть что есть.
ANY = 2, /** Только для GROUP BY: не добавлять новые строки в набор,
* но продолжать агрегировать для ключей, успевших попасть в набор.
*/
};
static String toString(OverflowMode mode)
{
const char * strings[] = { "throw", "break", "any" };
if (mode < THROW || mode > ANY)
throw Exception("Unknown overflow mode", ErrorCodes::UNKNOWN_OVERFLOW_MODE);
return strings[mode];
}
/** Ограничения на чтение из самых "глубоких" источников.
* То есть, только в самом глубоком подзапросе.
* При чтении с удалённого сервера, проверяется только на удалённом сервере.
......@@ -121,7 +136,82 @@ struct Limits
return true;
}
/// Установить настройку по имени. Прочитать сериализованное значение из буфера.
bool trySet(const String & name, ReadBuffer & buf)
{
if ( name == "max_rows_to_read"
|| name == "max_bytes_to_read"
|| name == "max_rows_to_group_by"
|| name == "max_rows_to_sort"
|| name == "max_bytes_to_sort"
|| name == "max_result_rows"
|| name == "max_result_bytes"
|| name == "max_execution_time"
|| name == "min_execution_speed"
|| name == "timeout_before_checking_execution_speed"
|| name == "max_columns_to_read"
|| name == "max_temporary_columns"
|| name == "max_temporary_non_const_columns"
|| name == "max_subquery_depth"
|| name == "max_pipeline_depth"
|| name == "max_ast_depth"
|| name == "max_ast_elements"
|| name == "readonly")
{
UInt64 value = 0;
readVarUInt(value, buf);
if (!trySet(name, value))
throw Exception("Logical error: unknown setting " + name, ErrorCodes::UNKNOWN_SETTING);
}
else if (name == "read_overflow_mode"
|| name == "group_by_overflow_mode"
|| name == "sort_overflow_mode"
|| name == "result_overflow_mode"
|| name == "timeout_overflow_mode")
{
String value;
readBinary(value, buf);
if (!trySet(name, value))
throw Exception("Logical error: unknown setting " + name, ErrorCodes::UNKNOWN_SETTING);
}
else
return false;
return true;
}
private:
friend class Settings;
/// Записать все настройки в буфер. (В отличие от соответствующего метода в Settings, пустая строка на конце не пишется).
void serialize(WriteBuffer & buf) const
{
writeStringBinary("max_rows_to_read", buf); writeVarUInt(max_rows_to_read, buf);
writeStringBinary("max_bytes_to_read", buf); writeVarUInt(max_bytes_to_read, buf);
writeStringBinary("read_overflow_mode", buf); writeStringBinary(toString(read_overflow_mode), buf);
writeStringBinary("max_rows_to_group_by", buf); writeVarUInt(max_rows_to_group_by, buf);
writeStringBinary("group_by_overflow_mode", buf); writeStringBinary(toString(group_by_overflow_mode), buf);
writeStringBinary("max_rows_to_sort", buf); writeVarUInt(max_rows_to_sort, buf);
writeStringBinary("max_bytes_to_sort", buf); writeVarUInt(max_bytes_to_sort, buf);
writeStringBinary("sort_overflow_mode", buf); writeStringBinary(toString(sort_overflow_mode), buf);
writeStringBinary("max_result_rows", buf); writeVarUInt(max_result_rows, buf);
writeStringBinary("max_result_bytes", buf); writeVarUInt(max_result_bytes, buf);
writeStringBinary("max_execution_time", buf); writeVarUInt(max_execution_time.totalSeconds(), buf);
writeStringBinary("timeout_overflow_mode", buf); writeStringBinary(toString(timeout_overflow_mode), buf);
writeStringBinary("min_execution_speed", buf); writeVarUInt(min_execution_speed, buf);
writeStringBinary("timeout_before_checking_execution_speed", buf); writeVarUInt(timeout_before_checking_execution_speed.totalSeconds(), buf);
writeStringBinary("max_columns_to_read", buf); writeVarUInt(max_columns_to_read, buf);
writeStringBinary("max_temporary_columns", buf); writeVarUInt(max_temporary_columns, buf);
writeStringBinary("max_temporary_non_const_columns", buf); writeVarUInt(max_temporary_non_const_columns, buf);
writeStringBinary("max_subquery_depth", buf); writeVarUInt(max_subquery_depth, buf);
writeStringBinary("max_pipeline_depth", buf); writeVarUInt(max_pipeline_depth, buf);
writeStringBinary("max_ast_depth", buf); writeVarUInt(max_ast_depth, buf);
writeStringBinary("max_ast_elements", buf); writeVarUInt(max_ast_elements, buf);
writeStringBinary("readonly", buf); writeVarUInt(readonly, buf);
}
OverflowMode getOverflowModeForGroupBy(const String & s)
{
if (s == "throw") return THROW;
......
......@@ -4,6 +4,11 @@
#include <DB/Core/Defines.h>
#include <DB/Core/Field.h>
#include <DB/IO/ReadBuffer.h>
#include <DB/IO/WriteBuffer.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/Interpreters/Limits.h>
......@@ -77,6 +82,70 @@ struct Settings
else if (!limits.trySet(name, value))
throw Exception("Unknown setting " + name, ErrorCodes::UNKNOWN_SETTING);
}
/// Установить настройку по имени. Прочитать сериализованное значение из буфера.
void set(const String & name, ReadBuffer & buf)
{
if ( name == "max_block_size"
|| name == "max_threads"
|| name == "max_query_size"
|| name == "asynchronous"
|| name == "interactive_delay"
|| name == "connect_timeout"
|| name == "receive_timeout"
|| name == "send_timeout"
|| name == "poll_interval"
|| name == "connect_timeout_with_failover_ms"
|| name == "max_distributed_connections"
|| name == "distributed_connections_pool_size"
|| name == "connections_with_failover_max_tries")
{
UInt64 value = 0;
readVarUInt(value, buf);
set(name, value);
}
else if (!limits.trySet(name, buf))
throw Exception("Unknown setting " + name, ErrorCodes::UNKNOWN_SETTING);
}
/// Прочитать настройки из буфера. Они записаны как набор name-value пар, идущих подряд, заканчивающихся пустым name.
void deserialize(ReadBuffer & buf)
{
while (true)
{
String name;
readBinary(name, buf);
/// Пустая строка - это маркер конца настроек.
if (name.empty())
break;
set(name, buf);
}
}
/// Записать все настройки в буфер.
void serialize(WriteBuffer & buf) const
{
writeStringBinary("max_block_size", buf); writeVarUInt(max_block_size, buf);
writeStringBinary("max_threads", buf); writeVarUInt(max_threads, buf);
writeStringBinary("max_query_size", buf); writeVarUInt(max_query_size, buf);
writeStringBinary("asynchronous", buf); writeVarUInt(asynchronous, buf);
writeStringBinary("interactive_delay", buf); writeVarUInt(interactive_delay, buf);
writeStringBinary("connect_timeout", buf); writeVarUInt(connect_timeout.totalSeconds(), buf);
writeStringBinary("receive_timeout", buf); writeVarUInt(receive_timeout.totalSeconds(), buf);
writeStringBinary("send_timeout", buf); writeVarUInt(send_timeout.totalSeconds(), buf);
writeStringBinary("poll_interval", buf); writeVarUInt(poll_interval, buf);
writeStringBinary("connect_timeout_with_failover_ms", buf); writeVarUInt(connect_timeout_with_failover_ms.totalMilliseconds(), buf);
writeStringBinary("max_distributed_connections", buf); writeVarUInt(max_distributed_connections, buf);
writeStringBinary("distributed_connections_pool_size", buf); writeVarUInt(distributed_connections_pool_size, buf);
writeStringBinary("connections_with_failover_max_tries", buf); writeVarUInt(connections_with_failover_max_tries, buf);
limits.serialize(buf);
/// Пустая строка - это маркер конца настроек.
writeStringBinary("", buf);
}
};
......
......@@ -11,6 +11,8 @@
#include <DB/Parsers/IAST.h>
#include <DB/Interpreters/Settings.h>
#include <DB/Storages/StoragePtr.h>
......@@ -64,12 +66,17 @@ public:
* (Обычно функция только читает столбцы из списка, но в других случаях,
* например, запрос может быть частично обработан на удалённом сервере.)
*
* settings - настройки на один запрос.
* Обычно Storage не заботится об этих настройках, так как они применяются в интерпретаторе.
* Но, например, при распределённой обработке запроса, настройки передаются на удалённый сервер.
*
* threads - рекомендация, сколько потоков возвращать,
* если хранилище может возвращать разное количество потоков.
*/
virtual BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1)
......
......@@ -54,6 +54,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1);
......
......@@ -128,6 +128,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1);
......
......@@ -60,6 +60,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1);
......
......@@ -31,6 +31,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1);
......
......@@ -128,6 +128,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1);
......
......@@ -28,6 +28,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1);
......
......@@ -43,6 +43,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1);
......
......@@ -25,6 +25,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1);
......
......@@ -28,6 +28,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1);
......
......@@ -104,6 +104,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1);
......
......@@ -185,7 +185,7 @@ bool Connection::ping()
}
void Connection::sendQuery(const String & query, UInt64 query_id_, UInt64 stage)
void Connection::sendQuery(const String & query, UInt64 query_id_, UInt64 stage, const Settings * settings)
{
forceConnected();
......@@ -195,6 +195,16 @@ void Connection::sendQuery(const String & query, UInt64 query_id_, UInt64 stage)
writeVarUInt(Protocol::Client::Query, *out);
writeIntBinary(query_id, *out);
/// Настройки на отдельный запрос.
if (server_revision >= DBMS_MIN_REVISION_WITH_PER_QUERY_SETTINGS)
{
if (settings)
settings->serialize(*out);
else
writeStringBinary("", *out);
}
writeVarUInt(stage, *out);
writeVarUInt(compression, *out);
......
......@@ -40,7 +40,7 @@ int main(int argc, char ** argv)
DB::StoragePtr table = context.getTable("default", "hits6");
DB::QueryProcessingStage::Enum stage;
DB::BlockInputStreams streams = table->read(column_names, NULL, stage, settings.max_block_size, settings.max_threads);
DB::BlockInputStreams streams = table->read(column_names, NULL, settings, stage, settings.max_block_size, settings.max_threads);
for (size_t i = 0, size = streams.size(); i < size; ++i)
streams[i] = new DB::AsynchronousBlockInputStream(streams[i]);
......
......@@ -283,7 +283,7 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(BlockInpu
/// Инициализируем изначальные потоки данных, на которые накладываются преобразования запроса. Таблица или подзапрос?
if (!query.table || !dynamic_cast<ASTSelectQuery *>(&*query.table))
streams = table->read(required_columns, query_ptr, from_stage, settings.max_block_size, settings.max_threads);
streams = table->read(required_columns, query_ptr, settings, from_stage, settings.max_block_size, settings.max_threads);
else
streams.push_back(maybeAsynchronous(interpreter_subquery->execute(), settings.asynchronous));
......
......@@ -15,7 +15,6 @@ public:
: server(server_)
, log(&Logger::get("HTTPHandler"))
{
LOG_TRACE(log, "In constructor.");
}
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response);
......
......@@ -76,8 +76,13 @@ void TCPHandler::runImpl()
SharedPtr<DB::Exception> exception;
try
{
/// Пакет Query. (Также, если пришёл пакет Ping - обрабатываем его и продолжаем ждать Query.)
{
/// Восстанавливаем контекст запроса.
query_context = connection_context;
/** Пакет Query. (Также, если пришёл пакет Ping - обрабатываем его и продолжаем ждать Query.)
* Могут прийти настройки на отдельный запрос, которые модифицируют query_context.
*/
receivePacket();
/// Обрабатываем Query
......@@ -207,7 +212,7 @@ void TCPHandler::processOrdinaryQuery()
while (true)
{
if (async_in.poll(connection_context.getSettingsRef().interactive_delay / 1000))
if (async_in.poll(query_context.getSettingsRef().interactive_delay / 1000))
{
block = async_in.read();
break;
......@@ -254,7 +259,6 @@ void TCPHandler::receiveHello()
String client_name;
UInt64 client_version_major = 0;
UInt64 client_version_minor = 0;
UInt64 client_revision = 0;
readVarUInt(packet_type, *in);
if (packet_type != Protocol::Client::Hello)
......@@ -335,6 +339,12 @@ void TCPHandler::receiveQuery()
readIntBinary(state.query_id, *in);
/// Настройки на отдельный запрос.
if (client_revision >= DBMS_MIN_REVISION_WITH_PER_QUERY_SETTINGS)
{
query_context.getSettingsRef().deserialize(*in);
}
readVarUInt(stage, *in);
state.stage = QueryProcessingStage::Enum(stage);
......@@ -343,7 +353,7 @@ void TCPHandler::receiveQuery()
readStringBinary(state.query, *in);
state.io = executeQuery(state.query, connection_context, state.stage);
state.io = executeQuery(state.query, query_context, state.stage);
}
......@@ -356,12 +366,12 @@ bool TCPHandler::receiveData()
else
state.maybe_compressed_in = in;
state.block_in = connection_context.getFormatFactory().getInput(
state.block_in = query_context.getFormatFactory().getInput(
"Native",
*state.maybe_compressed_in,
state.io.out_sample,
connection_context.getSettingsRef().max_block_size,
connection_context.getDataTypeFactory());
query_context.getSettingsRef().max_block_size,
query_context.getDataTypeFactory());
}
/// Прочитать из сети один блок и засунуть его в state.io.out (данные для INSERT-а)
......@@ -381,7 +391,7 @@ bool TCPHandler::isQueryCancelled()
if (state.is_cancelled || state.sent_all_data)
return true;
if (after_check_cancelled.elapsed() / 1000 < connection_context.getSettingsRef().interactive_delay)
if (after_check_cancelled.elapsed() / 1000 < query_context.getSettingsRef().interactive_delay)
return false;
after_check_cancelled.restart();
......@@ -421,7 +431,7 @@ void TCPHandler::sendData(Block & block)
else
state.maybe_compressed_out = out;
state.block_out = connection_context.getFormatFactory().getOutput(
state.block_out = query_context.getFormatFactory().getOutput(
"Native",
*state.maybe_compressed_out,
state.io.in_sample);
......@@ -466,7 +476,7 @@ void TCPHandler::sendProgress(size_t rows, size_t bytes)
if (state.sent_all_data)
return;
if (after_send_progress.elapsed() / 1000 < connection_context.getSettingsRef().interactive_delay)
if (after_send_progress.elapsed() / 1000 < query_context.getSettingsRef().interactive_delay)
return;
after_send_progress.restart();
......
......@@ -69,10 +69,10 @@ class TCPHandler : public Poco::Net::TCPServerConnection
{
public:
TCPHandler(Server & server_, const Poco::Net::StreamSocket & socket_)
: Poco::Net::TCPServerConnection(socket_), server(server_)
, log(&Logger::get("TCPHandler")), connection_context(server.global_context)
: Poco::Net::TCPServerConnection(socket_), server(server_),
log(&Logger::get("TCPHandler")), client_revision(0),
connection_context(server.global_context), query_context(connection_context)
{
LOG_TRACE(log, "In constructor.");
}
void run();
......@@ -81,7 +81,10 @@ private:
Server & server;
Logger * log;
UInt64 client_revision;
Context connection_context;
Context query_context;
SharedPtr<ReadBufferFromPocoSocket> in;
SharedPtr<WriteBufferFromPocoSocket> out;
......
......@@ -58,6 +58,7 @@ StorageDistributed::StorageDistributed(
BlockInputStreams StorageDistributed::read(
const Names & column_names,
ASTPtr query,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
......@@ -79,7 +80,7 @@ BlockInputStreams StorageDistributed::read(
BlockInputStreams res;
for (ConnectionPools::iterator it = pools.begin(); it != pools.end(); ++it)
res.push_back(new RemoteBlockInputStream((*it)->get(), modified_query, processed_stage));
res.push_back(new RemoteBlockInputStream((*it)->get(), modified_query, &settings, processed_stage));
return res;
}
......
......@@ -302,6 +302,7 @@ void StorageLog::rename(const String & new_path_to_db, const String & new_name)
BlockInputStreams StorageLog::read(
const Names & column_names,
ASTPtr query,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
......
......@@ -50,6 +50,7 @@ StorageMemory::StorageMemory(const std::string & name_, NamesAndTypesListPtr col
BlockInputStreams StorageMemory::read(
const Names & column_names,
ASTPtr query,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
......
......@@ -19,6 +19,7 @@ StorageMerge::StorageMerge(
BlockInputStreams StorageMerge::read(
const Names & column_names,
ASTPtr query,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
......@@ -52,6 +53,7 @@ BlockInputStreams StorageMerge::read(
BlockInputStreams source_streams = (*it)->read(
column_names,
query,
settings,
tmp_processed_stage,
max_block_size,
selected_tables.size() > threads ? 1 : (threads / selected_tables.size()));
......
......@@ -819,6 +819,7 @@ BlockOutputStreamPtr StorageMergeTree::write(ASTPtr query)
BlockInputStreams StorageMergeTree::read(
const Names & column_names_to_return,
ASTPtr query,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
......
......@@ -16,7 +16,8 @@ StorageSystemDatabases::StorageSystemDatabases(const std::string & name_, const
BlockInputStreams StorageSystemDatabases::read(
const Names & column_names, ASTPtr query, QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads)
const Names & column_names, ASTPtr query, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
......
......@@ -47,7 +47,8 @@ StorageSystemNumbers::StorageSystemNumbers(const std::string & name_)
BlockInputStreams StorageSystemNumbers::read(
const Names & column_names, ASTPtr query, QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads)
const Names & column_names, ASTPtr query, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
......
......@@ -19,7 +19,8 @@ StorageSystemOne::StorageSystemOne(const std::string & name_)
BlockInputStreams StorageSystemOne::read(
const Names & column_names, ASTPtr query, QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads)
const Names & column_names, ASTPtr query, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
......
......@@ -18,7 +18,8 @@ StorageSystemTables::StorageSystemTables(const std::string & name_, const Contex
BlockInputStreams StorageSystemTables::read(
const Names & column_names, ASTPtr query, QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads)
const Names & column_names, ASTPtr query, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
......
......@@ -224,6 +224,7 @@ void StorageTinyLog::rename(const String & new_path_to_db, const String & new_na
BlockInputStreams StorageTinyLog::read(
const Names & column_names,
ASTPtr query,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册