EnabledQuota.cpp 9.3 KB
Newer Older
1
#include <Access/EnabledQuota.h>
2
#include <Access/QuotaUsage.h>
3 4 5 6
#include <Common/Exception.h>
#include <Common/quoteString.h>
#include <ext/chrono_io.h>
#include <ext/range.h>
7
#include <boost/smart_ptr/make_shared.hpp>
8 9 10 11 12 13 14 15 16 17
#include <boost/range/algorithm/fill.hpp>


namespace DB
{
namespace ErrorCodes
{
    extern const int QUOTA_EXPIRED;
}

18
struct EnabledQuota::Impl
19 20 21 22 23 24 25 26 27 28 29 30 31 32
{
    [[noreturn]] static void throwQuotaExceed(
        const String & user_name,
        const String & quota_name,
        ResourceType resource_type,
        ResourceAmount used,
        ResourceAmount max,
        std::chrono::seconds duration,
        std::chrono::system_clock::time_point end_of_interval)
    {
        std::function<String(UInt64)> amount_to_string = [](UInt64 amount) { return std::to_string(amount); };
        if (resource_type == Quota::EXECUTION_TIME)
            amount_to_string = [&](UInt64 amount) { return ext::to_string(std::chrono::nanoseconds(amount)); };

33
        const auto & type_info = Quota::ResourceTypeInfo::get(resource_type);
34 35
        throw Exception(
            "Quota for user " + backQuote(user_name) + " for " + ext::to_string(duration) + " has been exceeded: "
36
                + type_info.outputWithAmount(used) + "/" + type_info.amountToString(max) + ". "
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
                + "Interval will end at " + ext::to_string(end_of_interval) + ". " + "Name of quota template: " + backQuote(quota_name),
            ErrorCodes::QUOTA_EXPIRED);
    }


    static std::chrono::system_clock::time_point getEndOfInterval(
        const Interval & interval, std::chrono::system_clock::time_point current_time, bool * counters_were_reset = nullptr)
    {
        auto & end_of_interval = interval.end_of_interval;
        auto end_loaded = end_of_interval.load();
        auto end = std::chrono::system_clock::time_point{end_loaded};
        if (current_time < end)
        {
            if (counters_were_reset)
                *counters_were_reset = false;
            return end;
        }

        const auto duration = interval.duration;

        do
        {
            end = end + (current_time - end + duration) / duration * duration;
            if (end_of_interval.compare_exchange_strong(end_loaded, end.time_since_epoch()))
            {
                boost::range::fill(interval.used, 0);
                break;
            }
            end = std::chrono::system_clock::time_point{end_loaded};
        }
        while (current_time >= end);

        if (counters_were_reset)
            *counters_were_reset = true;
        return end;
    }


    static void used(
        const String & user_name,
        const Intervals & intervals,
        ResourceType resource_type,
        ResourceAmount amount,
        std::chrono::system_clock::time_point current_time,
        bool check_exceeded)
    {
        for (const auto & interval : intervals.intervals)
        {
            ResourceAmount used = (interval.used[resource_type] += amount);
            ResourceAmount max = interval.max[resource_type];
87
            if (!max)
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
                continue;
            if (used > max)
            {
                bool counters_were_reset = false;
                auto end_of_interval = getEndOfInterval(interval, current_time, &counters_were_reset);
                if (counters_were_reset)
                {
                    used = (interval.used[resource_type] += amount);
                    if ((used > max) && check_exceeded)
                        throwQuotaExceed(user_name, intervals.quota_name, resource_type, used, max, interval.duration, end_of_interval);
                }
                else if (check_exceeded)
                    throwQuotaExceed(user_name, intervals.quota_name, resource_type, used, max, interval.duration, end_of_interval);
            }
        }
    }

    static void checkExceeded(
        const String & user_name,
        const Intervals & intervals,
        ResourceType resource_type,
        std::chrono::system_clock::time_point current_time)
    {
        for (const auto & interval : intervals.intervals)
        {
            ResourceAmount used = interval.used[resource_type];
            ResourceAmount max = interval.max[resource_type];
115
            if (!max)
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
                continue;
            if (used > max)
            {
                bool used_counters_reset = false;
                std::chrono::system_clock::time_point end_of_interval = getEndOfInterval(interval, current_time, &used_counters_reset);
                if (!used_counters_reset)
                    throwQuotaExceed(user_name, intervals.quota_name, resource_type, used, max, interval.duration, end_of_interval);
            }
        }
    }

    static void checkExceeded(
        const String & user_name,
        const Intervals & intervals,
        std::chrono::system_clock::time_point current_time)
    {
132
        for (auto resource_type : ext::range(Quota::MAX_RESOURCE_TYPE))
133 134 135 136 137
            checkExceeded(user_name, intervals, resource_type, current_time);
    }
};


138 139 140 141 142 143 144 145 146 147
EnabledQuota::Interval::Interval()
{
    for (auto resource_type : ext::range(MAX_RESOURCE_TYPE))
    {
        used[resource_type].store(0);
        max[resource_type] = 0;
    }
}


148
EnabledQuota::Interval & EnabledQuota::Interval::operator =(const Interval & src)
149
{
A
Alexey Milovidov 已提交
150 151 152
    if (this == &src)
        return *this;

153 154 155 156 157 158 159 160 161 162 163 164
    randomize_interval = src.randomize_interval;
    duration = src.duration;
    end_of_interval.store(src.end_of_interval.load());
    for (auto resource_type : ext::range(MAX_RESOURCE_TYPE))
    {
        max[resource_type] = src.max[resource_type];
        used[resource_type].store(src.used[resource_type].load());
    }
    return *this;
}


165
std::optional<QuotaUsage> EnabledQuota::Intervals::getUsage(std::chrono::system_clock::time_point current_time) const
166
{
167 168 169 170 171 172 173
    if (!quota_id)
        return {};
    QuotaUsage usage;
    usage.quota_id = *quota_id;
    usage.quota_name = quota_name;
    usage.quota_key = quota_key;
    usage.intervals.reserve(intervals.size());
174 175
    for (const auto & in : intervals)
    {
176 177
        usage.intervals.push_back({});
        auto & out = usage.intervals.back();
178 179 180 181 182
        out.duration = in.duration;
        out.randomize_interval = in.randomize_interval;
        out.end_of_interval = Impl::getEndOfInterval(in, current_time);
        for (auto resource_type : ext::range(MAX_RESOURCE_TYPE))
        {
183 184
            if (in.max[resource_type])
                out.max[resource_type] = in.max[resource_type];
185 186 187
            out.used[resource_type] = in.used[resource_type];
        }
    }
188
    return usage;
189 190 191
}


192
EnabledQuota::EnabledQuota(const Params & params_) : params(params_)
193 194 195
{
}

196
EnabledQuota::~EnabledQuota() = default;
197 198


199
void EnabledQuota::used(ResourceType resource_type, ResourceAmount amount, bool check_exceeded) const
200 201 202 203 204
{
    used({resource_type, amount}, check_exceeded);
}


205
void EnabledQuota::used(const std::pair<ResourceType, ResourceAmount> & resource, bool check_exceeded) const
206
{
207
    auto loaded = intervals.load();
208
    auto current_time = std::chrono::system_clock::now();
209
    Impl::used(getUserName(), *loaded, resource.first, resource.second, current_time, check_exceeded);
210 211 212
}


213
void EnabledQuota::used(const std::pair<ResourceType, ResourceAmount> & resource1, const std::pair<ResourceType, ResourceAmount> & resource2, bool check_exceeded) const
214
{
215
    auto loaded = intervals.load();
216
    auto current_time = std::chrono::system_clock::now();
217 218
    Impl::used(getUserName(), *loaded, resource1.first, resource1.second, current_time, check_exceeded);
    Impl::used(getUserName(), *loaded, resource2.first, resource2.second, current_time, check_exceeded);
219 220 221
}


222
void EnabledQuota::used(const std::pair<ResourceType, ResourceAmount> & resource1, const std::pair<ResourceType, ResourceAmount> & resource2, const std::pair<ResourceType, ResourceAmount> & resource3, bool check_exceeded) const
223
{
224
    auto loaded = intervals.load();
225
    auto current_time = std::chrono::system_clock::now();
226 227 228
    Impl::used(getUserName(), *loaded, resource1.first, resource1.second, current_time, check_exceeded);
    Impl::used(getUserName(), *loaded, resource2.first, resource2.second, current_time, check_exceeded);
    Impl::used(getUserName(), *loaded, resource3.first, resource3.second, current_time, check_exceeded);
229 230 231
}


232
void EnabledQuota::used(const std::vector<std::pair<ResourceType, ResourceAmount>> & resources, bool check_exceeded) const
233
{
234
    auto loaded = intervals.load();
235 236
    auto current_time = std::chrono::system_clock::now();
    for (const auto & resource : resources)
237
        Impl::used(getUserName(), *loaded, resource.first, resource.second, current_time, check_exceeded);
238 239 240
}


241
void EnabledQuota::checkExceeded() const
242
{
243
    auto loaded = intervals.load();
244
    Impl::checkExceeded(getUserName(), *loaded, std::chrono::system_clock::now());
245 246 247
}


248
void EnabledQuota::checkExceeded(ResourceType resource_type) const
249
{
250
    auto loaded = intervals.load();
251
    Impl::checkExceeded(getUserName(), *loaded, resource_type, std::chrono::system_clock::now());
252 253 254
}


255
std::optional<QuotaUsage> EnabledQuota::getUsage() const
256
{
257
    auto loaded = intervals.load();
258
    return loaded->getUsage(std::chrono::system_clock::now());
259 260 261
}


262
std::shared_ptr<const EnabledQuota> EnabledQuota::getUnlimitedQuota()
263
{
264 265 266 267 268 269 270
    static const std::shared_ptr<const EnabledQuota> res = []
    {
        auto unlimited_quota = std::shared_ptr<EnabledQuota>(new EnabledQuota);
        unlimited_quota->intervals = boost::make_shared<Intervals>();
        return unlimited_quota;
    }();
    return res;
271 272 273
}

}