未验证 提交 753f5b84 编写于 作者: V Vitaly Baranov 提交者: GitHub

Merge pull request #20106 from vitlibar/fix-quota-consuming

Fix quota consuming
...@@ -26,10 +26,6 @@ struct EnabledQuota::Impl ...@@ -26,10 +26,6 @@ struct EnabledQuota::Impl
std::chrono::seconds duration, std::chrono::seconds duration,
std::chrono::system_clock::time_point end_of_interval) std::chrono::system_clock::time_point end_of_interval)
{ {
std::function<String(UInt64)> amount_to_string = [](UInt64 amount) { return std::to_string(amount); };
if (resource_type == Quota::EXECUTION_TIME)
amount_to_string = [&](UInt64 amount) { return ext::to_string(std::chrono::nanoseconds(amount)); };
const auto & type_info = Quota::ResourceTypeInfo::get(resource_type); const auto & type_info = Quota::ResourceTypeInfo::get(resource_type);
throw Exception( throw Exception(
"Quota for user " + backQuote(user_name) + " for " + ext::to_string(duration) + " has been exceeded: " "Quota for user " + backQuote(user_name) + " for " + ext::to_string(duration) + " has been exceeded: "
...@@ -39,35 +35,47 @@ struct EnabledQuota::Impl ...@@ -39,35 +35,47 @@ struct EnabledQuota::Impl
} }
/// Returns the end of the current interval. If the passed `current_time` is greater than that end,
/// the function automatically recalculates the interval's end by adding the interval's duration
/// one or more times until the interval's end is greater than `current_time`.
/// If that recalculation occurs the function also resets amounts of resources used and sets the variable
/// `counters_were_reset`.
static std::chrono::system_clock::time_point getEndOfInterval( static std::chrono::system_clock::time_point getEndOfInterval(
const Interval & interval, std::chrono::system_clock::time_point current_time, bool * counters_were_reset = nullptr) const Interval & interval, std::chrono::system_clock::time_point current_time, bool & counters_were_reset)
{ {
auto & end_of_interval = interval.end_of_interval; auto & end_of_interval = interval.end_of_interval;
auto end_loaded = end_of_interval.load(); auto end_loaded = end_of_interval.load();
auto end = std::chrono::system_clock::time_point{end_loaded}; auto end = std::chrono::system_clock::time_point{end_loaded};
if (current_time < end) if (current_time < end)
{ {
if (counters_were_reset) counters_were_reset = false;
*counters_were_reset = false;
return end; return end;
} }
const auto duration = interval.duration; /// We reset counters only if the interval's end has been calculated before.
/// If it hasn't we just calculate the interval's end for the first time and don't reset counters yet.
bool need_reset_counters = (end_loaded.count() != 0);
do do
{ {
end = end + (current_time - end + duration) / duration * duration; /// Calculate the end of the next interval:
/// | X |
/// end current_time next_end = end + duration * n
/// where n is an integer number, n >= 1.
const auto duration = interval.duration;
UInt64 n = static_cast<UInt64>((current_time - end + duration) / duration);
end = end + duration * n;
if (end_of_interval.compare_exchange_strong(end_loaded, end.time_since_epoch())) if (end_of_interval.compare_exchange_strong(end_loaded, end.time_since_epoch()))
{
boost::range::fill(interval.used, 0);
break; break;
}
end = std::chrono::system_clock::time_point{end_loaded}; end = std::chrono::system_clock::time_point{end_loaded};
} }
while (current_time >= end); while (current_time >= end);
if (counters_were_reset) if (need_reset_counters)
*counters_were_reset = true; {
boost::range::fill(interval.used, 0);
counters_were_reset = true;
}
return end; return end;
} }
...@@ -89,7 +97,7 @@ struct EnabledQuota::Impl ...@@ -89,7 +97,7 @@ struct EnabledQuota::Impl
if (used > max) if (used > max)
{ {
bool counters_were_reset = false; bool counters_were_reset = false;
auto end_of_interval = getEndOfInterval(interval, current_time, &counters_were_reset); auto end_of_interval = getEndOfInterval(interval, current_time, counters_were_reset);
if (counters_were_reset) if (counters_were_reset)
{ {
used = (interval.used[resource_type] += amount); used = (interval.used[resource_type] += amount);
...@@ -116,9 +124,9 @@ struct EnabledQuota::Impl ...@@ -116,9 +124,9 @@ struct EnabledQuota::Impl
continue; continue;
if (used > max) if (used > max)
{ {
bool used_counters_reset = false; bool counters_were_reset = false;
std::chrono::system_clock::time_point end_of_interval = getEndOfInterval(interval, current_time, &used_counters_reset); std::chrono::system_clock::time_point end_of_interval = getEndOfInterval(interval, current_time, counters_were_reset);
if (!used_counters_reset) if (!counters_were_reset)
throwQuotaExceed(user_name, intervals.quota_name, resource_type, used, max, interval.duration, end_of_interval); throwQuotaExceed(user_name, intervals.quota_name, resource_type, used, max, interval.duration, end_of_interval);
} }
} }
...@@ -177,7 +185,8 @@ std::optional<QuotaUsage> EnabledQuota::Intervals::getUsage(std::chrono::system_ ...@@ -177,7 +185,8 @@ std::optional<QuotaUsage> EnabledQuota::Intervals::getUsage(std::chrono::system_
auto & out = usage.intervals.back(); auto & out = usage.intervals.back();
out.duration = in.duration; out.duration = in.duration;
out.randomize_interval = in.randomize_interval; out.randomize_interval = in.randomize_interval;
out.end_of_interval = Impl::getEndOfInterval(in, current_time); bool counters_were_reset = false;
out.end_of_interval = Impl::getEndOfInterval(in, current_time, counters_were_reset);
for (auto resource_type : ext::range(MAX_RESOURCE_TYPE)) for (auto resource_type : ext::range(MAX_RESOURCE_TYPE))
{ {
if (in.max[resource_type]) if (in.max[resource_type])
......
...@@ -20,6 +20,11 @@ public: ...@@ -20,6 +20,11 @@ public:
BlockIO execute() override; BlockIO execute() override;
/// We ignore the quota and limits here because execute() will rewrite a show query as a SELECT query and then
/// the SELECT query will checks the quota and limits.
bool ignoreQuota() const override { return true; }
bool ignoreLimits() const override { return true; }
private: private:
ASTPtr query_ptr; ASTPtr query_ptr;
Context & context; Context & context;
......
...@@ -20,6 +20,11 @@ public: ...@@ -20,6 +20,11 @@ public:
BlockIO execute() override; BlockIO execute() override;
/// We ignore the quota and limits here because execute() will rewrite a show query as a SELECT query and then
/// the SELECT query will checks the quota and limits.
bool ignoreQuota() const override { return true; }
bool ignoreLimits() const override { return true; }
private: private:
ASTPtr query_ptr; ASTPtr query_ptr;
Context & context; Context & context;
......
...@@ -37,9 +37,6 @@ public: ...@@ -37,9 +37,6 @@ public:
BlockIO execute() override; BlockIO execute() override;
bool ignoreQuota() const override { return true; }
bool ignoreLimits() const override { return true; }
private: private:
ASTPtr query_ptr; ASTPtr query_ptr;
Context & context; Context & context;
......
...@@ -137,6 +137,9 @@ void StorageSystemQuotaUsage::fillDataImpl( ...@@ -137,6 +137,9 @@ void StorageSystemQuotaUsage::fillDataImpl(
column_quota_name.insertData(quota_name.data(), quota_name.length()); column_quota_name.insertData(quota_name.data(), quota_name.length());
column_quota_key.insertData(quota_key.data(), quota_key.length()); column_quota_key.insertData(quota_key.data(), quota_key.length());
if (add_column_is_current)
column_is_current->push_back(quota_id == current_quota_id);
if (!interval) if (!interval)
{ {
column_start_time.insertDefault(); column_start_time.insertDefault();
...@@ -171,9 +174,6 @@ void StorageSystemQuotaUsage::fillDataImpl( ...@@ -171,9 +174,6 @@ void StorageSystemQuotaUsage::fillDataImpl(
addValue(*column_max[resource_type], *column_max_null_map[resource_type], interval->max[resource_type], type_info); addValue(*column_max[resource_type], *column_max_null_map[resource_type], interval->max[resource_type], type_info);
addValue(*column_usage[resource_type], *column_usage_null_map[resource_type], interval->used[resource_type], type_info); addValue(*column_usage[resource_type], *column_usage_null_map[resource_type], interval->used[resource_type], type_info);
} }
if (add_column_is_current)
column_is_current->push_back(quota_id == current_quota_id);
}; };
auto add_rows = [&](const String & quota_name, const UUID & quota_id, const String & quota_key, const std::vector<QuotaUsage::Interval> & intervals) auto add_rows = [&](const String & quota_name, const UUID & quota_id, const String & quota_key, const std::vector<QuotaUsage::Interval> & intervals)
......
<yandex>
<users>
<user_with_no_quota>
<no_password/>
<networks>
<ip>::/0</ip>
</networks>
</user_with_no_quota>
</users>
</yandex>
...@@ -7,9 +7,10 @@ from helpers.cluster import ClickHouseCluster ...@@ -7,9 +7,10 @@ from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry, TSV from helpers.test_tools import assert_eq_with_retry, TSV
cluster = ClickHouseCluster(__file__) cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance', user_configs=["configs/users.d/assign_myquota.xml", instance = cluster.add_instance('instance', user_configs=["configs/users.d/assign_myquota_to_default_user.xml",
"configs/users.d/drop_default_quota.xml", "configs/users.d/drop_default_quota.xml",
"configs/users.d/quota.xml"]) "configs/users.d/myquota.xml",
"configs/users.d/user_with_no_quota.xml"])
def check_system_quotas(canonical): def check_system_quotas(canonical):
...@@ -49,9 +50,11 @@ def system_quotas_usage(canonical): ...@@ -49,9 +50,11 @@ def system_quotas_usage(canonical):
def copy_quota_xml(local_file_name, reload_immediately=True): def copy_quota_xml(local_file_name, reload_immediately=True):
script_dir = os.path.dirname(os.path.realpath(__file__)) script_dir = os.path.dirname(os.path.realpath(__file__))
instance.copy_file_to_container(os.path.join(script_dir, local_file_name), instance.copy_file_to_container(os.path.join(script_dir, local_file_name),
'/etc/clickhouse-server/users.d/quota.xml') '/etc/clickhouse-server/users.d/myquota.xml')
if reload_immediately: if reload_immediately:
instance.query("SYSTEM RELOAD CONFIG") # We use the special user 'user_with_no_quota' here because
# we don't want SYSTEM RELOAD CONFIG to mess our quota consuming checks.
instance.query("SYSTEM RELOAD CONFIG", user='user_with_no_quota')
@pytest.fixture(scope="module", autouse=True) @pytest.fixture(scope="module", autouse=True)
...@@ -71,12 +74,12 @@ def started_cluster(): ...@@ -71,12 +74,12 @@ def started_cluster():
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def reset_quotas_and_usage_info(): def reset_quotas_and_usage_info():
try: try:
yield
finally:
copy_quota_xml('simpliest.xml') # To reset usage info.
instance.query("DROP QUOTA IF EXISTS qA, qB") instance.query("DROP QUOTA IF EXISTS qA, qB")
copy_quota_xml('simpliest.xml') # To reset usage info. copy_quota_xml('simpliest.xml') # To reset usage info.
copy_quota_xml('normal_limits.xml') copy_quota_xml('normal_limits.xml')
yield
finally:
pass
def test_quota_from_users_xml(): def test_quota_from_users_xml():
...@@ -379,4 +382,11 @@ def test_query_inserts(): ...@@ -379,4 +382,11 @@ def test_query_inserts():
instance.query("INSERT INTO test_table values(1)") instance.query("INSERT INTO test_table values(1)")
system_quota_usage( system_quota_usage(
[["myQuota", "default", 31556952, 1, 1000, 0, 500, 1, 500, 0, "\\N", 0, "\\N", 0, "\\N", 0, 1000, 0, "\\N", "\\N"]]) [["myQuota", "default", 31556952, 1, 1000, 0, 500, 1, 500, 0, "\\N", 0, "\\N", 0, "\\N", 0, 1000, 0, "\\N", "\\N"]])
\ No newline at end of file
def test_consumption_show_tables_quota():
instance.query("SHOW TABLES")
assert re.match(
"myQuota\\tdefault\\t.*\\t31556952\\t1\\t1000\\t1\\t500\\t0\\t500\\t0\\t\\\\N\\t1\\t\\\\N\\t19\\t\\\\N\\t1\\t1000\\t35\\t\\\\N\\t.*\\t\\\\N\n",
instance.query("SHOW QUOTA"))
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册