/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ module thrift.internal.resource_pool; import core.time : Duration, dur, TickDuration; import std.algorithm : minPos, reduce, remove; import std.array : array, empty; import std.exception : enforce; import std.conv : to; import std.random : randomCover, rndGen; import std.range : zip; import thrift.internal.algorithm : removeEqual; /** * A pool of resources, which can be iterated over, and where resources that * have failed too often can be temporarily disabled. * * This class is oblivious to the actual resource type managed. */ final class TResourcePool(Resource) { /** * Constructs a new instance. * * Params: * resources = The initial members of the pool. */ this(Resource[] resources) { resources_ = resources; } /** * Adds a resource to the pool. */ void add(Resource resource) { resources_ ~= resource; } /** * Removes a resource from the pool. * * Returns: Whether the resource could be found in the pool. */ bool remove(Resource resource) { auto oldLength = resources_.length; resources_ = removeEqual(resources_, resource); return resources_.length < oldLength; } /** * Returns an »enriched« input range to iterate over the pool members. */ static struct Range { /** * Whether the range is empty. * * This is the case if all members of the pool have been popped (or skipped * because they were disabled) and TResourcePool.cycle is false, or there * is no element to return in cycle mode because all have been temporarily * disabled. */ bool empty() @property { // If no resources are in the pool, the range will never become non-empty. if (resources_.empty) return true; // If we already got the next resource in the cache, it doesn't matter // whether there are more. if (cached_) return false; size_t examineCount; if (parent_.cycle) { // We want to check all the resources, but not iterate more than once // to avoid spinning in a loop if nothing is available. examineCount = resources_.length; } else { // When not in cycle mode, we just iterate the list exactly once. If all // items have been consumed, the interval below is empty. examineCount = resources_.length - nextIndex_; } foreach (i; 0 .. examineCount) { auto r = resources_[(nextIndex_ + i) % resources_.length]; auto fi = r in parent_.faultInfos_; if (fi && fi.resetTime != fi.resetTime.init) { if (fi.resetTime < parent_.getCurrentTick_()) { // The timeout expired, remove the resource from the list and go // ahead trying it. parent_.faultInfos_.remove(r); } else { // The timeout didn't expire yet, try the next resource. continue; } } cache_ = r; cached_ = true; nextIndex_ = nextIndex_ + i + 1; return false; } // If we get here, all resources are currently inactive or the non-cycle // pool has been exhausted, so there is nothing we can do. nextIndex_ = nextIndex_ + examineCount; return true; } /** * Returns the first resource in the range. */ Resource front() @property { enforce(!empty); return cache_; } /** * Removes the first resource from the range. * * Usually, this is combined with a call to TResourcePool.recordSuccess() * or recordFault(). */ void popFront() { enforce(!empty); cached_ = false; } /** * Returns whether the range will become non-empty at some point in the * future, and provides additional information when this will happen and * what will be the next resource. * * Makes only sense to call on empty ranges. * * Params: * next = The next resource that will become available. * waitTime = The duration until that resource will become available. */ bool willBecomeNonempty(out Resource next, out Duration waitTime) { // If no resources are in the pool, the range will never become non-empty. if (resources_.empty) return false; // If cycle mode is not enabled, a range never becomes non-empty after // being empty once, because all the elements have already been // used/skipped in order to become empty. if (!parent_.cycle) return false; auto fi = parent_.faultInfos_; auto nextPair = minPos!"a[1].resetTime < b[1].resetTime"( zip(fi.keys, fi.values) ).front; next = nextPair[0]; waitTime = to!Duration(nextPair[1].resetTime - parent_.getCurrentTick_()); return true; } private: this(TResourcePool parent, Resource[] resources) { parent_ = parent; resources_ = resources; } TResourcePool parent_; /// All available resources. We keep a copy of it as to not get confused /// when resources are added to/removed from the parent pool. Resource[] resources_; /// After we have determined the next element in empty(), we store it here. Resource cache_; /// Whether there is currently something in the cache. bool cached_; /// The index to start searching from at the next call to empty(). size_t nextIndex_; } /// Ditto Range opSlice() { auto res = resources_; if (permute) { res = array(randomCover(res, rndGen)); } return Range(this, res); } /** * Records a success for an operation on the given resource, cancelling a * fault streak, if any. */ void recordSuccess(Resource resource) { if (resource in faultInfos_) { faultInfos_.remove(resource); } } /** * Records a fault for the given resource. * * If a resource fails consecutively for more than faultDisableCount times, * it is temporarily disabled (no longer considered) until * faultDisableDuration has passed. */ void recordFault(Resource resource) { auto fi = resource in faultInfos_; if (!fi) { faultInfos_[resource] = FaultInfo(); fi = resource in faultInfos_; } ++fi.count; if (fi.count >= faultDisableCount) { // If the resource has hit the fault count limit, disable it for // specified duration. fi.resetTime = getCurrentTick_() + cast(TickDuration)faultDisableDuration; } } /** * Whether to randomly permute the order of the resources in the pool when * taking a range using opSlice(). * * This can be used e.g. as a simple form of load balancing. */ bool permute = true; /** * Whether to keep iterating over the pool members after all have been * returned/have failed once. */ bool cycle = false; /** * The number of consecutive faults after which a resource is disabled until * faultDisableDuration has passed. Zero to never disable resources. * * Defaults to zero. */ ushort faultDisableCount = 0; /** * The duration for which a resource is no longer considered after it has * failed too often. * * Defaults to one second. */ Duration faultDisableDuration = dur!"seconds"(1); private: Resource[] resources_; FaultInfo[Resource] faultInfos_; /// Function to get the current timestamp from some monotonic system clock. /// /// This is overridable to be able to write timing-insensitive unit tests. /// The extra indirection should not matter much performance-wise compared to /// the actual system call, and by its very nature thisshould not be on a hot /// path anyway. typeof(&TickDuration.currSystemTick) getCurrentTick_ = &TickDuration.currSystemTick; } private { struct FaultInfo { ushort count; TickDuration resetTime; } } unittest { auto pool = new TResourcePool!Object([]); enforce(pool[].empty); Object dummyRes; Duration dummyDur; enforce(!pool[].willBecomeNonempty(dummyRes, dummyDur)); } unittest { import std.datetime; import thrift.base; auto a = new Object; auto b = new Object; auto c = new Object; auto objs = [a, b, c]; auto pool = new TResourcePool!Object(objs); pool.permute = false; static Duration fakeClock; pool.getCurrentTick_ = () => cast(TickDuration)fakeClock; Object dummyRes = void; Duration dummyDur = void; { auto r = pool[]; foreach (i, o; objs) { enforce(!r.empty); enforce(r.front == o); r.popFront(); } enforce(r.empty); enforce(!r.willBecomeNonempty(dummyRes, dummyDur)); } { pool.faultDisableCount = 2; enforce(pool[].front == a); pool.recordFault(a); enforce(pool[].front == a); pool.recordSuccess(a); enforce(pool[].front == a); pool.recordFault(a); enforce(pool[].front == a); pool.recordFault(a); auto r = pool[]; enforce(r.front == b); r.popFront(); enforce(r.front == c); r.popFront(); enforce(r.empty); enforce(!r.willBecomeNonempty(dummyRes, dummyDur)); fakeClock += 2.seconds; // Not in cycle mode, has to be still empty after the timeouts expired. enforce(r.empty); enforce(!r.willBecomeNonempty(dummyRes, dummyDur)); foreach (o; objs) pool.recordSuccess(o); } { pool.faultDisableCount = 1; pool.recordFault(a); pool.recordFault(b); pool.recordFault(c); auto r = pool[]; enforce(r.empty); enforce(!r.willBecomeNonempty(dummyRes, dummyDur)); foreach (o; objs) pool.recordSuccess(o); } pool.cycle = true; { auto r = pool[]; foreach (o; objs ~ objs) { enforce(!r.empty); enforce(r.front == o); r.popFront(); } } { pool.faultDisableCount = 2; enforce(pool[].front == a); pool.recordFault(a); enforce(pool[].front == a); pool.recordSuccess(a); enforce(pool[].front == a); pool.recordFault(a); enforce(pool[].front == a); pool.recordFault(a); auto r = pool[]; enforce(r.front == b); r.popFront(); enforce(r.front == c); r.popFront(); enforce(r.front == b); fakeClock += 2.seconds; r.popFront(); enforce(r.front == c); r.popFront(); enforce(r.front == a); enforce(pool[].front == a); foreach (o; objs) pool.recordSuccess(o); } { pool.faultDisableCount = 1; pool.recordFault(a); fakeClock += 1.msecs; pool.recordFault(b); fakeClock += 1.msecs; pool.recordFault(c); auto r = pool[]; enforce(r.empty); // Make sure willBecomeNonempty gets the order right. enforce(r.willBecomeNonempty(dummyRes, dummyDur)); enforce(dummyRes == a); enforce(dummyDur > Duration.zero); foreach (o; objs) pool.recordSuccess(o); } }