提交 38fa1af5 编写于 作者: V Vitaly Baranov

Disable quota consumptions for the system queries and for selecting

from the `system.quota` and `system.quotas` tables.
上级 d9b1a733
...@@ -22,6 +22,9 @@ public: ...@@ -22,6 +22,9 @@ public:
virtual bool canExecuteWithProcessors() const { return false; } virtual bool canExecuteWithProcessors() const { return false; }
virtual bool ignoreQuota() const { return false; }
virtual bool ignoreLimits() const { return false; }
virtual ~IInterpreter() {} virtual ~IInterpreter() {}
}; };
......
...@@ -419,6 +419,17 @@ InterpreterSelectQuery::InterpreterSelectQuery( ...@@ -419,6 +419,17 @@ InterpreterSelectQuery::InterpreterSelectQuery(
/// null non-const columns to avoid useless memory allocations. However, a valid block sample /// null non-const columns to avoid useless memory allocations. However, a valid block sample
/// requires all columns to be of size 0, thus we need to sanitize the block here. /// requires all columns to be of size 0, thus we need to sanitize the block here.
sanitizeBlock(result_header); sanitizeBlock(result_header);
/// Remove limits for some tables in the `system` database.
if (storage && (storage->getDatabaseName() == "system"))
{
String table_name = storage->getTableName();
if ((table_name == "quotas") || (table_name == "quota_usage") || (table_name == "one"))
{
options.ignore_quota = true;
options.ignore_limits = true;
}
}
} }
...@@ -1783,7 +1794,7 @@ void InterpreterSelectQuery::executeFetchColumns( ...@@ -1783,7 +1794,7 @@ void InterpreterSelectQuery::executeFetchColumns(
if (!options.ignore_limits) if (!options.ignore_limits)
stream->setLimits(limits); stream->setLimits(limits);
if (options.to_stage == QueryProcessingStage::Complete) if (!options.ignore_quota && (options.to_stage == QueryProcessingStage::Complete))
stream->setQuota(quota); stream->setQuota(quota);
} }
...@@ -1793,7 +1804,7 @@ void InterpreterSelectQuery::executeFetchColumns( ...@@ -1793,7 +1804,7 @@ void InterpreterSelectQuery::executeFetchColumns(
if (!options.ignore_limits) if (!options.ignore_limits)
pipe.setLimits(limits); pipe.setLimits(limits);
if (options.to_stage == QueryProcessingStage::Complete) if (!options.ignore_quota && (options.to_stage == QueryProcessingStage::Complete))
pipe.setQuota(quota); pipe.setQuota(quota);
} }
} }
......
...@@ -74,6 +74,9 @@ public: ...@@ -74,6 +74,9 @@ public:
QueryPipeline executeWithProcessors() override; QueryPipeline executeWithProcessors() override;
bool canExecuteWithProcessors() const override { return true; } bool canExecuteWithProcessors() const override { return true; }
bool ignoreLimits() const override { return options.ignore_limits; }
bool ignoreQuota() const override { return options.ignore_quota; }
Block getSampleBlock(); Block getSampleBlock();
void ignoreWithTotals(); void ignoreWithTotals();
...@@ -260,7 +263,7 @@ private: ...@@ -260,7 +263,7 @@ private:
*/ */
void initSettings(); void initSettings();
const SelectQueryOptions options; SelectQueryOptions options;
ASTPtr query_ptr; ASTPtr query_ptr;
std::shared_ptr<Context> context; std::shared_ptr<Context> context;
SyntaxAnalyzerResultPtr syntax_analyzer_result; SyntaxAnalyzerResultPtr syntax_analyzer_result;
......
...@@ -107,6 +107,19 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery( ...@@ -107,6 +107,19 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
result_header = getCommonHeaderForUnion(headers); result_header = getCommonHeaderForUnion(headers);
} }
/// InterpreterSelectWithUnionQuery ignores limits if all nested interpreters ignore limits.
bool all_nested_ignore_limits = true;
bool all_nested_ignore_quota = true;
for (auto & interpreter : nested_interpreters)
{
if (!interpreter->ignoreLimits())
all_nested_ignore_limits = false;
if (!interpreter->ignoreQuota())
all_nested_ignore_quota = false;
}
options.ignore_limits |= all_nested_ignore_limits;
options.ignore_quota |= all_nested_ignore_quota;
} }
......
...@@ -34,6 +34,9 @@ public: ...@@ -34,6 +34,9 @@ public:
QueryPipeline executeWithProcessors() override; QueryPipeline executeWithProcessors() override;
bool canExecuteWithProcessors() const override { return true; } bool canExecuteWithProcessors() const override { return true; }
bool ignoreLimits() const override { return options.ignore_limits; }
bool ignoreQuota() const override { return options.ignore_quota; }
Block getSampleBlock(); Block getSampleBlock();
static Block getSampleBlock( static Block getSampleBlock(
...@@ -45,7 +48,7 @@ public: ...@@ -45,7 +48,7 @@ public:
ASTPtr getQuery() const { return query_ptr; } ASTPtr getQuery() const { return query_ptr; }
private: private:
const SelectQueryOptions options; SelectQueryOptions options;
ASTPtr query_ptr; ASTPtr query_ptr;
std::shared_ptr<Context> context; std::shared_ptr<Context> context;
......
...@@ -121,7 +121,9 @@ void startStopAction(Context & context, ASTSystemQuery & query, StorageActionBlo ...@@ -121,7 +121,9 @@ void startStopAction(Context & context, ASTSystemQuery & query, StorageActionBlo
InterpreterSystemQuery::InterpreterSystemQuery(const ASTPtr & query_ptr_, Context & context_) InterpreterSystemQuery::InterpreterSystemQuery(const ASTPtr & query_ptr_, Context & context_)
: query_ptr(query_ptr_->clone()), context(context_), log(&Poco::Logger::get("InterpreterSystemQuery")) {} : query_ptr(query_ptr_->clone()), context(context_), log(&Poco::Logger::get("InterpreterSystemQuery"))
{
}
BlockIO InterpreterSystemQuery::execute() BlockIO InterpreterSystemQuery::execute()
......
...@@ -20,6 +20,9 @@ public: ...@@ -20,6 +20,9 @@ public:
BlockIO execute() override; BlockIO execute() override;
bool ignoreQuota() const override { return true; }
bool ignoreLimits() const override { return true; }
private: private:
ASTPtr query_ptr; ASTPtr query_ptr;
Context & context; Context & context;
......
...@@ -24,19 +24,16 @@ struct SelectQueryOptions ...@@ -24,19 +24,16 @@ struct SelectQueryOptions
{ {
QueryProcessingStage::Enum to_stage; QueryProcessingStage::Enum to_stage;
size_t subquery_depth; size_t subquery_depth;
bool only_analyze; bool only_analyze = false;
bool modify_inplace; bool modify_inplace = false;
bool remove_duplicates; bool remove_duplicates = false;
bool ignore_limits; bool ignore_quota = false;
bool ignore_limits = false;
SelectQueryOptions(QueryProcessingStage::Enum stage = QueryProcessingStage::Complete, size_t depth = 0) SelectQueryOptions(QueryProcessingStage::Enum stage = QueryProcessingStage::Complete, size_t depth = 0)
: to_stage(stage) : to_stage(stage), subquery_depth(depth)
, subquery_depth(depth) {
, only_analyze(false) }
, modify_inplace(false)
, remove_duplicates(false)
, ignore_limits(false)
{}
SelectQueryOptions copy() const { return *this; } SelectQueryOptions copy() const { return *this; }
......
...@@ -271,10 +271,6 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl( ...@@ -271,10 +271,6 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
/// Check the limits. /// Check the limits.
checkASTSizeLimits(*ast, settings); checkASTSizeLimits(*ast, settings);
auto quota = context.getQuota();
quota->used(Quota::QUERIES, 1);
quota->checkExceeded(Quota::ERRORS);
/// Put query to process list. But don't put SHOW PROCESSLIST query itself. /// Put query to process list. But don't put SHOW PROCESSLIST query itself.
ProcessList::EntryPtr process_list_entry; ProcessList::EntryPtr process_list_entry;
if (!internal && !ast->as<ASTShowProcesslistQuery>()) if (!internal && !ast->as<ASTShowProcesslistQuery>())
...@@ -312,6 +308,21 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl( ...@@ -312,6 +308,21 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
auto interpreter = InterpreterFactory::get(ast, context, stage); auto interpreter = InterpreterFactory::get(ast, context, stage);
bool use_processors = settings.experimental_use_processors && allow_processors && interpreter->canExecuteWithProcessors(); bool use_processors = settings.experimental_use_processors && allow_processors && interpreter->canExecuteWithProcessors();
QuotaContextPtr quota;
if (!interpreter->ignoreQuota())
{
quota = context.getQuota();
quota->used(Quota::QUERIES, 1);
quota->checkExceeded(Quota::ERRORS);
}
IBlockInputStream::LocalLimits limits;
if (!interpreter->ignoreLimits())
{
limits.mode = IBlockInputStream::LIMITS_CURRENT;
limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode);
}
if (use_processors) if (use_processors)
pipeline = interpreter->executeWithProcessors(); pipeline = interpreter->executeWithProcessors();
else else
...@@ -338,17 +349,12 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl( ...@@ -338,17 +349,12 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
/// Hold element of process list till end of query execution. /// Hold element of process list till end of query execution.
res.process_list_entry = process_list_entry; res.process_list_entry = process_list_entry;
IBlockInputStream::LocalLimits limits;
limits.mode = IBlockInputStream::LIMITS_CURRENT;
limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode);
if (use_processors) if (use_processors)
{ {
pipeline.setProgressCallback(context.getProgressCallback());
pipeline.setProcessListElement(context.getProcessListElement());
/// Limits on the result, the quota on the result, and also callback for progress. /// Limits on the result, the quota on the result, and also callback for progress.
/// Limits apply only to the final result. /// Limits apply only to the final result.
pipeline.setProgressCallback(context.getProgressCallback());
pipeline.setProcessListElement(context.getProcessListElement());
if (stage == QueryProcessingStage::Complete) if (stage == QueryProcessingStage::Complete)
{ {
pipeline.resize(1); pipeline.resize(1);
...@@ -362,17 +368,18 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl( ...@@ -362,17 +368,18 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
} }
else else
{ {
/// Limits on the result, the quota on the result, and also callback for progress.
/// Limits apply only to the final result.
if (res.in) if (res.in)
{ {
res.in->setProgressCallback(context.getProgressCallback()); res.in->setProgressCallback(context.getProgressCallback());
res.in->setProcessListElement(context.getProcessListElement()); res.in->setProcessListElement(context.getProcessListElement());
/// Limits on the result, the quota on the result, and also callback for progress.
/// Limits apply only to the final result.
if (stage == QueryProcessingStage::Complete) if (stage == QueryProcessingStage::Complete)
{ {
res.in->setLimits(limits); if (!interpreter->ignoreQuota())
res.in->setQuota(quota); res.in->setQuota(quota);
if (!interpreter->ignoreLimits())
res.in->setLimits(limits);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册