EnabledQuota.cpp 10.0 KB
Newer Older
1
#include <Access/EnabledQuota.h>
2
#include <Access/QuotaUsage.h>
3 4 5 6
#include <Common/Exception.h>
#include <Common/quoteString.h>
#include <ext/chrono_io.h>
#include <ext/range.h>
7
#include <boost/smart_ptr/make_shared.hpp>
8 9 10 11 12 13 14 15 16 17
#include <boost/range/algorithm/fill.hpp>


namespace DB
{
namespace ErrorCodes
{
    extern const int QUOTA_EXPIRED;
}

18
struct EnabledQuota::Impl
19 20 21 22 23 24 25 26 27 28
{
    [[noreturn]] static void throwQuotaExceed(
        const String & user_name,
        const String & quota_name,
        ResourceType resource_type,
        ResourceAmount used,
        ResourceAmount max,
        std::chrono::seconds duration,
        std::chrono::system_clock::time_point end_of_interval)
    {
29
        const auto & type_info = Quota::ResourceTypeInfo::get(resource_type);
30 31
        throw Exception(
            "Quota for user " + backQuote(user_name) + " for " + ext::to_string(duration) + " has been exceeded: "
32
                + type_info.outputWithAmount(used) + "/" + type_info.amountToString(max) + ". "
33 34 35 36 37
                + "Interval will end at " + ext::to_string(end_of_interval) + ". " + "Name of quota template: " + backQuote(quota_name),
            ErrorCodes::QUOTA_EXPIRED);
    }


38 39 40 41 42
    /// Returns the end of the current interval. If the passed `current_time` is greater than that end,
    /// the function automatically recalculates the interval's end by adding the interval's duration
    /// one or more times until the interval's end is greater than `current_time`.
    /// If that recalculation occurs the function also resets amounts of resources used and sets the variable
    /// `counters_were_reset`.
43
    static std::chrono::system_clock::time_point getEndOfInterval(
44
        const Interval & interval, std::chrono::system_clock::time_point current_time, bool & counters_were_reset)
45 46 47 48 49 50
    {
        auto & end_of_interval = interval.end_of_interval;
        auto end_loaded = end_of_interval.load();
        auto end = std::chrono::system_clock::time_point{end_loaded};
        if (current_time < end)
        {
51
            counters_were_reset = false;
52 53 54
            return end;
        }

55 56 57
        /// We reset counters only if the interval's end has been calculated before.
        /// If it hasn't we just calculate the interval's end for the first time and don't reset counters yet.
        bool need_reset_counters = (end_loaded.count() != 0);
58 59 60

        do
        {
61 62 63 64 65 66 67
            /// Calculate the end of the next interval:
            ///  |                     X                                 |
            /// end               current_time                next_end = end + duration * n
            /// where n is an integer number, n >= 1.
            const auto duration = interval.duration;
            UInt64 n = static_cast<UInt64>((current_time - end + duration) / duration);
            end = end + duration * n;
68 69 70 71 72 73
            if (end_of_interval.compare_exchange_strong(end_loaded, end.time_since_epoch()))
                break;
            end = std::chrono::system_clock::time_point{end_loaded};
        }
        while (current_time >= end);

74 75 76 77 78
        if (need_reset_counters)
        {
            boost::range::fill(interval.used, 0);
            counters_were_reset = true;
        }
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
        return end;
    }


    static void used(
        const String & user_name,
        const Intervals & intervals,
        ResourceType resource_type,
        ResourceAmount amount,
        std::chrono::system_clock::time_point current_time,
        bool check_exceeded)
    {
        for (const auto & interval : intervals.intervals)
        {
            ResourceAmount used = (interval.used[resource_type] += amount);
            ResourceAmount max = interval.max[resource_type];
95
            if (!max)
96 97 98 99
                continue;
            if (used > max)
            {
                bool counters_were_reset = false;
100
                auto end_of_interval = getEndOfInterval(interval, current_time, counters_were_reset);
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
                if (counters_were_reset)
                {
                    used = (interval.used[resource_type] += amount);
                    if ((used > max) && check_exceeded)
                        throwQuotaExceed(user_name, intervals.quota_name, resource_type, used, max, interval.duration, end_of_interval);
                }
                else if (check_exceeded)
                    throwQuotaExceed(user_name, intervals.quota_name, resource_type, used, max, interval.duration, end_of_interval);
            }
        }
    }

    static void checkExceeded(
        const String & user_name,
        const Intervals & intervals,
        ResourceType resource_type,
        std::chrono::system_clock::time_point current_time)
    {
        for (const auto & interval : intervals.intervals)
        {
            ResourceAmount used = interval.used[resource_type];
            ResourceAmount max = interval.max[resource_type];
123
            if (!max)
124 125 126
                continue;
            if (used > max)
            {
127 128 129
                bool counters_were_reset = false;
                std::chrono::system_clock::time_point end_of_interval = getEndOfInterval(interval, current_time, counters_were_reset);
                if (!counters_were_reset)
130 131 132 133 134 135 136 137 138 139
                    throwQuotaExceed(user_name, intervals.quota_name, resource_type, used, max, interval.duration, end_of_interval);
            }
        }
    }

    static void checkExceeded(
        const String & user_name,
        const Intervals & intervals,
        std::chrono::system_clock::time_point current_time)
    {
140
        for (auto resource_type : ext::range(Quota::MAX_RESOURCE_TYPE))
141 142 143 144 145
            checkExceeded(user_name, intervals, resource_type, current_time);
    }
};


146 147 148 149 150 151 152 153 154 155
EnabledQuota::Interval::Interval()
{
    for (auto resource_type : ext::range(MAX_RESOURCE_TYPE))
    {
        used[resource_type].store(0);
        max[resource_type] = 0;
    }
}


156
EnabledQuota::Interval & EnabledQuota::Interval::operator =(const Interval & src)
157
{
A
Alexey Milovidov 已提交
158 159 160
    if (this == &src)
        return *this;

161 162 163 164 165 166 167 168 169 170 171 172
    randomize_interval = src.randomize_interval;
    duration = src.duration;
    end_of_interval.store(src.end_of_interval.load());
    for (auto resource_type : ext::range(MAX_RESOURCE_TYPE))
    {
        max[resource_type] = src.max[resource_type];
        used[resource_type].store(src.used[resource_type].load());
    }
    return *this;
}


173
std::optional<QuotaUsage> EnabledQuota::Intervals::getUsage(std::chrono::system_clock::time_point current_time) const
174
{
175 176 177 178 179 180 181
    if (!quota_id)
        return {};
    QuotaUsage usage;
    usage.quota_id = *quota_id;
    usage.quota_name = quota_name;
    usage.quota_key = quota_key;
    usage.intervals.reserve(intervals.size());
182 183
    for (const auto & in : intervals)
    {
184 185
        usage.intervals.push_back({});
        auto & out = usage.intervals.back();
186 187
        out.duration = in.duration;
        out.randomize_interval = in.randomize_interval;
188 189
        bool counters_were_reset = false;
        out.end_of_interval = Impl::getEndOfInterval(in, current_time, counters_were_reset);
190 191
        for (auto resource_type : ext::range(MAX_RESOURCE_TYPE))
        {
192 193
            if (in.max[resource_type])
                out.max[resource_type] = in.max[resource_type];
194 195 196
            out.used[resource_type] = in.used[resource_type];
        }
    }
197
    return usage;
198 199 200
}


201
EnabledQuota::EnabledQuota(const Params & params_) : params(params_)
202 203 204
{
}

205
EnabledQuota::~EnabledQuota() = default;
206 207


208
void EnabledQuota::used(ResourceType resource_type, ResourceAmount amount, bool check_exceeded) const
209 210 211 212 213
{
    used({resource_type, amount}, check_exceeded);
}


214
void EnabledQuota::used(const std::pair<ResourceType, ResourceAmount> & resource, bool check_exceeded) const
215
{
216
    auto loaded = intervals.load();
217
    auto current_time = std::chrono::system_clock::now();
218
    Impl::used(getUserName(), *loaded, resource.first, resource.second, current_time, check_exceeded);
219 220 221
}


222
void EnabledQuota::used(const std::pair<ResourceType, ResourceAmount> & resource1, const std::pair<ResourceType, ResourceAmount> & resource2, bool check_exceeded) const
223
{
224
    auto loaded = intervals.load();
225
    auto current_time = std::chrono::system_clock::now();
226 227
    Impl::used(getUserName(), *loaded, resource1.first, resource1.second, current_time, check_exceeded);
    Impl::used(getUserName(), *loaded, resource2.first, resource2.second, current_time, check_exceeded);
228 229 230
}


231
void EnabledQuota::used(const std::pair<ResourceType, ResourceAmount> & resource1, const std::pair<ResourceType, ResourceAmount> & resource2, const std::pair<ResourceType, ResourceAmount> & resource3, bool check_exceeded) const
232
{
233
    auto loaded = intervals.load();
234
    auto current_time = std::chrono::system_clock::now();
235 236 237
    Impl::used(getUserName(), *loaded, resource1.first, resource1.second, current_time, check_exceeded);
    Impl::used(getUserName(), *loaded, resource2.first, resource2.second, current_time, check_exceeded);
    Impl::used(getUserName(), *loaded, resource3.first, resource3.second, current_time, check_exceeded);
238 239 240
}


241
void EnabledQuota::used(const std::vector<std::pair<ResourceType, ResourceAmount>> & resources, bool check_exceeded) const
242
{
243
    auto loaded = intervals.load();
244 245
    auto current_time = std::chrono::system_clock::now();
    for (const auto & resource : resources)
246
        Impl::used(getUserName(), *loaded, resource.first, resource.second, current_time, check_exceeded);
247 248 249
}


250
void EnabledQuota::checkExceeded() const
251
{
252
    auto loaded = intervals.load();
253
    Impl::checkExceeded(getUserName(), *loaded, std::chrono::system_clock::now());
254 255 256
}


257
void EnabledQuota::checkExceeded(ResourceType resource_type) const
258
{
259
    auto loaded = intervals.load();
260
    Impl::checkExceeded(getUserName(), *loaded, resource_type, std::chrono::system_clock::now());
261 262 263
}


264
std::optional<QuotaUsage> EnabledQuota::getUsage() const
265
{
266
    auto loaded = intervals.load();
267
    return loaded->getUsage(std::chrono::system_clock::now());
268 269 270
}


271
std::shared_ptr<const EnabledQuota> EnabledQuota::getUnlimitedQuota()
272
{
273 274 275 276 277 278 279
    static const std::shared_ptr<const EnabledQuota> res = []
    {
        auto unlimited_quota = std::shared_ptr<EnabledQuota>(new EnabledQuota);
        unlimited_quota->intervals = boost::make_shared<Intervals>();
        return unlimited_quota;
    }();
    return res;
280 281 282
}

}