提交 b27309e8 编写于 作者: I Ihor Indyk 提交者: TensorFlower Gardener

[tf.data] Adds an upper bound for the total buffer limit of the model in...

[tf.data] Adds an upper bound for the total buffer limit of the model in `Model::Optimize` as % of available RAM.

PiperOrigin-RevId: 263789432
上级 0c9c859a
......@@ -874,7 +874,7 @@ class DatasetBaseIterator : public IteratorBase {
void RecordBufferDequeue(IteratorContext* ctx,
const std::vector<Tensor>& element) {
if (collect_resource_usage(ctx)) {
node_->add_buffered_bytes(-GetAllocatedBytes(element));
node_->record_buffer_event(-GetAllocatedBytes(element), -1);
}
}
......@@ -883,7 +883,7 @@ class DatasetBaseIterator : public IteratorBase {
void RecordBufferEnqueue(IteratorContext* ctx,
const std::vector<Tensor>& element) {
if (collect_resource_usage(ctx)) {
node_->add_buffered_bytes(GetAllocatedBytes(element));
node_->record_buffer_event(GetAllocatedBytes(element), 1);
}
}
......
......@@ -28,8 +28,6 @@ namespace {
// Key of the derivative w.r.t. the last input time in the gradient of
// `OutputTime`.
constexpr char kInputTimeDerivativeKey[] = "last_input_time";
constexpr char kParallelism[] = "parallelism";
constexpr char kBufferSize[] = "buffer_size";
// Wrapper for the square function to reduce verbosity.
inline double Square(double x) { return x * x; }
......@@ -713,13 +711,14 @@ void Model::AddProcessingTime(const string& name, int64 delta) {
}
}
void Model::Optimize(AutotuneAlgorithm algorithm, int64 cpu_budget) {
void Model::Optimize(AutotuneAlgorithm algorithm, int64 cpu_budget,
int64 ram_budget) {
switch (algorithm) {
case AutotuneAlgorithm::HILL_CLIMB:
OptimizeHillClimb(cpu_budget);
OptimizeHillClimb(cpu_budget, ram_budget);
break;
case AutotuneAlgorithm::GRADIENT_DESCENT:
OptimizeGradientDescent(cpu_budget);
OptimizeGradientDescent(cpu_budget, ram_budget);
break;
}
}
......@@ -808,7 +807,7 @@ std::map<string, std::shared_ptr<Parameter>> Model::CollectEssentialParallelism(
return essential_parameters;
}
void Model::OptimizeGradientDescent(int64 cpu_budget) {
void Model::OptimizeGradientDescent(int64 cpu_budget, int64 ram_budget) {
std::shared_ptr<Node> snapshot;
{
tf_shared_lock lock(mu_);
......@@ -817,6 +816,9 @@ void Model::OptimizeGradientDescent(int64 cpu_budget) {
VLOG(2) << "Starting optimization of tunable parameters with GradientDescent";
auto parameters = CollectTunableParameters(snapshot);
auto essential_parameters = CollectEssentialParallelism(snapshot);
// We add the number of model's buffered bytes because it is excluded from the
// memory budget, but it is included in the maximum number of buffered bytes.
ram_budget += TotalBufferedBytes(snapshot);
for (auto& pair : parameters) {
pair.second->value = pair.second->min;
}
......@@ -841,9 +843,11 @@ void Model::OptimizeGradientDescent(int64 cpu_budget) {
model_parallelism += std::round(pair.second->value);
}
// We terminate once the improvement of the output latency is too small or
// the essential transformations' parallelism reaches the CPU budget.
// the essential transformations' parallelism reaches the CPU budget or the
// worst-case total buffer size exceeds the memory budget.
if (std::abs(output_time - new_output_time) < kOptimizationPrecision ||
model_parallelism > cpu_budget) {
model_parallelism > cpu_budget ||
TotalMaximumBufferedBytes(snapshot) > ram_budget) {
break;
}
double max_abs_derivative = 1.0;
......@@ -879,7 +883,7 @@ void Model::OptimizeGradientDescent(int64 cpu_budget) {
}
}
void Model::OptimizeHillClimb(int64 cpu_budget) {
void Model::OptimizeHillClimb(int64 cpu_budget, int64 ram_budget) {
std::shared_ptr<Node> snapshot;
{
tf_shared_lock lock(mu_);
......@@ -888,6 +892,9 @@ void Model::OptimizeHillClimb(int64 cpu_budget) {
VLOG(2) << "Starting optimization of tunable parameters with HillClimb";
const double processing_time = TotalProcessingTime(snapshot);
auto parameters = CollectTunableParameters(snapshot);
// We add the number of model's buffered bytes because it is excluded from the
// memory budget, but it is included in the maximum number of buffered bytes.
ram_budget += TotalBufferedBytes(snapshot);
// Buffer size parameter will only be incremented if the output latency
// improvement is greater than this constant.
constexpr double kBufferSizeMinDelta = 1.0L;
......@@ -904,7 +911,8 @@ void Model::OptimizeHillClimb(int64 cpu_budget) {
break;
}
}
if (output_time < processing_time / cpu_budget || all_max) {
if (output_time < processing_time / cpu_budget || all_max ||
TotalMaximumBufferedBytes(snapshot) > ram_budget) {
break;
}
double best_delta = -1.0L;
......@@ -955,6 +963,14 @@ double Model::OutputTime(std::shared_ptr<Node> node,
return node->OutputTime(&input_times, gradient);
}
double Model::TotalBufferedBytes(std::shared_ptr<Node> node) {
return node->TotalBufferedBytes();
}
double Model::TotalMaximumBufferedBytes(std::shared_ptr<Node> node) {
return node->TotalMaximumBufferedBytes();
}
double Model::TotalProcessingTime(std::shared_ptr<Node> node) {
return node->TotalProcessingTime(/*processing_times=*/nullptr);
}
......
......@@ -37,6 +37,8 @@ namespace model {
// A constant that can be used to enable auto-tuning.
constexpr int64 kAutotune = -1;
constexpr char kParallelism[] = "parallelism";
constexpr char kBufferSize[] = "buffer_size";
enum class AutotuneAlgorithm {
HILL_CLIMB = 0,
......@@ -126,12 +128,6 @@ class Node {
virtual ~Node() {}
// Increments the bytes buffered by the given delta.
void add_buffered_bytes(int64 delta) LOCKS_EXCLUDED(mu_) {
mutex_lock l(mu_);
buffered_bytes_ += delta;
}
// Adds an input.
void add_input(std::shared_ptr<Node> node) LOCKS_EXCLUDED(mu_) {
mutex_lock l(mu_);
......@@ -156,6 +152,12 @@ class Node {
return buffered_bytes_;
}
// Returns the number of elements stored in this node's buffer.
int64 buffered_elements() const LOCKS_EXCLUDED(mu_) {
tf_shared_lock l(mu_);
return buffered_elements_;
}
// Indicates whether the node has tunable parameters.
bool has_tunable_parameters() const LOCKS_EXCLUDED(mu_) {
tf_shared_lock l(mu_);
......@@ -195,6 +197,14 @@ class Node {
return processing_time_;
}
// Records the change in this node's buffer.
void record_buffer_event(int64 bytes_delta, int64 elements_delta)
LOCKS_EXCLUDED(mu_) {
mutex_lock l(mu_);
buffered_bytes_ += bytes_delta;
buffered_elements_ += elements_delta;
}
// Records that the node produced an element.
void record_element() LOCKS_EXCLUDED(mu_) {
mutex_lock l(mu_);
......@@ -294,6 +304,7 @@ class Node {
mutex_lock l2(result->mu_);
result->autotune_ = autotune_;
result->buffered_bytes_ = buffered_bytes_;
result->buffered_elements_ = buffered_elements_;
result->processing_time_ = processing_time_;
result->num_elements_ = num_elements_;
result->parameters_ = parameters_;
......@@ -310,6 +321,49 @@ class Node {
return SelfProcessingTimeLocked();
}
// Returns the total number of bytes buffered in all nodes in the subtree for
// which autotuning is enabled.
double TotalBufferedBytes() const LOCKS_EXCLUDED(mu_) {
tf_shared_lock l(mu_);
if (!autotune_) {
return 0;
}
double result = 0;
auto* parameter = gtl::FindOrNull(parameters_, kBufferSize);
if (!parameter) {
parameter = gtl::FindOrNull(parameters_, kParallelism);
}
if (parameter) {
result = buffered_bytes_;
}
for (auto& input : inputs_) {
result += input->TotalBufferedBytes();
}
return result;
}
// Collects the total buffer limit of all nodes in the subtree for which
// autotuning is enabled. This number represents the amount of memory that
// would be used by the subtree nodes if all of their buffers were full.
double TotalMaximumBufferedBytes() const LOCKS_EXCLUDED(mu_) {
tf_shared_lock l(mu_);
if (!autotune_) {
return 0;
}
double result = 0;
auto* parameter = gtl::FindOrNull(parameters_, kBufferSize);
if (!parameter) {
parameter = gtl::FindOrNull(parameters_, kParallelism);
}
if (parameter) {
result = (*parameter)->value * AverageBufferedElementSize();
}
for (auto& input : inputs_) {
result += input->TotalMaximumBufferedBytes();
}
return result;
}
// Returns the per-element CPU time spent in the subtree rooted in this node.
// If `processing_times` is not `nullptr`, collects the per-element CPU time
// spent in each node of the subtree.
......@@ -336,6 +390,15 @@ class Node {
virtual std::shared_ptr<Node> Clone(std::shared_ptr<Node> output) const
SHARED_LOCKS_REQUIRED(mu_) = 0;
// Returns the average size of an element buffered in this node.
double AverageBufferedElementSize() const SHARED_LOCKS_REQUIRED(mu_) {
if (buffered_elements_ == 0) {
return 0;
}
return static_cast<double>(buffered_bytes_) /
static_cast<double>(buffered_elements_);
}
// Returns the sum of per-element output time for the inputs of this node and
// if `gradient` is not `nullptr`, collects gradients of output times w.r.t.
// tunable parameters and the last input time.
......@@ -433,6 +496,7 @@ class Node {
// from computation of output time and processing time.
bool autotune_ GUARDED_BY(mu_) = true;
int64 buffered_bytes_ GUARDED_BY(mu_) = 0;
int64 buffered_elements_ GUARDED_BY(mu_) = 0;
int64 processing_time_ GUARDED_BY(mu_) = 0;
int64 num_elements_ GUARDED_BY(mu_) = 0;
std::map<std::thread::id, int64> work_start_ GUARDED_BY(mu_);
......@@ -521,7 +585,7 @@ class Model {
void AddProcessingTime(const string& name, int64 delta) LOCKS_EXCLUDED(mu_);
// Uses the given algorithm to perform the autotuning optimization.
void Optimize(AutotuneAlgorithm algorithm, int64 cpu_budget)
void Optimize(AutotuneAlgorithm algorithm, int64 cpu_budget, int64 ram_budget)
LOCKS_EXCLUDED(mu_);
// Records that a node has produced an element.
......@@ -561,7 +625,7 @@ class Model {
// This process is repeated until all parameters reach their maximum values or
// the projected output time is less than or equal to the processing time
// needed to produce an element divided by CPU budget.
void OptimizeHillClimb(int64 cpu_budget);
void OptimizeHillClimb(int64 cpu_budget, int64 ram_budget);
// This optimization algorithm starts by setting all tunable parallelism
// parameters to the minimum value. It then improves current parameters by
......@@ -570,7 +634,7 @@ class Model {
// repeated until either the output time improvement is smaller than threshold
// value or the output time is less than the processing time needed to produce
// an element divided by CPU budget.
void OptimizeGradientDescent(int64 cpu_budget);
void OptimizeGradientDescent(int64 cpu_budget, int64 ram_budget);
// Collects the output time and if `gradient` is not `nullptr`, the output
// time gradient w.r.t. tunable parameters of the subtree rooted in the given
......@@ -581,6 +645,16 @@ class Model {
// Collects the processing time for the given node.
double TotalProcessingTime(std::shared_ptr<Node> node);
// Collects the total number of bytes buffered in all nodes in the subtree
// rooted in the given node for which autotuning is enabled.
double TotalBufferedBytes(std::shared_ptr<Node> node);
// Collects the total buffer limit of all nodes in the subtree rooted in the
// given node for which autotuning is enabled. This number represents the
// amount of memory that would be used by the subtree nodes if all of their
// buffers were full.
double TotalMaximumBufferedBytes(std::shared_ptr<Node> node);
// Used for coordination between different input pipeline threads. Exclusive
// access is required only when adding or removing nodes. Concurrent access to
// existing nodes is protected by a node mutex.
......
......@@ -56,6 +56,12 @@ TEST_P(AsyncInterleaveManyTest, Model) {
async_interleave_many->remove_input(source2);
});
std::vector<double> input_times(1, input_time);
EXPECT_EQ(async_interleave_many->TotalBufferedBytes(), 0);
EXPECT_EQ(async_interleave_many->TotalMaximumBufferedBytes(), 0);
async_interleave_many->record_buffer_event(110, 10);
EXPECT_EQ(async_interleave_many->TotalBufferedBytes(), 110);
EXPECT_EQ(async_interleave_many->TotalMaximumBufferedBytes(),
110 * parallelism / 10);
async_interleave_many->add_processing_time(100);
EXPECT_EQ(async_interleave_many->processing_time(), 100);
EXPECT_EQ(
......@@ -118,6 +124,12 @@ TEST_P(AsyncKnownRatioTest, Model) {
model::MakeSourceNode({2, "source2", async_known_many});
async_known_many->add_input(source2);
std::vector<double> input_times(1, input_time);
EXPECT_EQ(async_known_many->TotalBufferedBytes(), 0);
EXPECT_EQ(async_known_many->TotalMaximumBufferedBytes(), 0);
async_known_many->record_buffer_event(110, 10);
EXPECT_EQ(async_known_many->TotalBufferedBytes(), 110);
EXPECT_EQ(async_known_many->TotalMaximumBufferedBytes(),
110 * parallelism / 10);
source1->add_processing_time(100);
EXPECT_EQ(async_known_many->TotalProcessingTime(/*processing_times=*/nullptr),
0);
......@@ -398,8 +410,19 @@ TEST(SetterGetterTest, Node) {
EXPECT_EQ(node->output(), nullptr);
EXPECT_EQ(node->buffered_bytes(), 0);
node->add_buffered_bytes(42);
EXPECT_EQ(node->buffered_elements(), 0);
EXPECT_EQ(node->TotalBufferedBytes(), 0);
EXPECT_EQ(node->TotalMaximumBufferedBytes(), 0);
node->record_buffer_event(42, 0);
EXPECT_EQ(node->buffered_bytes(), 42);
EXPECT_EQ(node->TotalBufferedBytes(), 0);
EXPECT_EQ(node->TotalMaximumBufferedBytes(), 0);
EXPECT_EQ(node->buffered_elements(), 0);
node->record_buffer_event(0, 11);
EXPECT_EQ(node->buffered_bytes(), 42);
EXPECT_EQ(node->TotalBufferedBytes(), 0);
EXPECT_EQ(node->TotalMaximumBufferedBytes(), 0);
EXPECT_EQ(node->buffered_elements(), 11);
EXPECT_EQ(node->processing_time(), 0);
node->record_start(1);
......@@ -416,6 +439,9 @@ TEST(SetterGetterTest, Node) {
node->add_input(input);
EXPECT_EQ(node->inputs().size(), 1);
EXPECT_EQ(node->inputs().front(), input);
input->record_buffer_event(13, 0);
EXPECT_EQ(node->TotalBufferedBytes(), 0);
EXPECT_EQ(node->TotalMaximumBufferedBytes(), 0);
node->remove_input(input);
EXPECT_EQ(node->inputs().size(), 0);
......
......@@ -29,6 +29,9 @@ namespace {
constexpr int64 kOptimizationPeriodThresholdMs = 60 * EnvTime::kSecondsToMillis;
// Default share of available RAM that can be used by model's internal buffers.
constexpr double kRamBudgetShare = 0.5;
class ModelDatasetOp : public UnaryDatasetOpKernel {
public:
explicit ModelDatasetOp(OpKernelConstruction* ctx)
......@@ -47,22 +50,25 @@ class ModelDatasetOp : public UnaryDatasetOpKernel {
OP_REQUIRES(ctx, cpu_budget_ > 0,
errors::InvalidArgument("CPU budget must be positive but is ",
cpu_budget_, "."));
ram_budget_ = kRamBudgetShare * port::AvailableRam();
}
void MakeDataset(OpKernelContext* ctx, DatasetBase* input,
DatasetBase** output) override {
*output = new Dataset(ctx, input, algorithm_, cpu_budget_);
*output = new Dataset(ctx, input, algorithm_, cpu_budget_, ram_budget_);
}
private:
class Dataset : public DatasetBase {
public:
Dataset(OpKernelContext* ctx, const DatasetBase* input,
model::AutotuneAlgorithm algorithm, int64 cpu_budget)
model::AutotuneAlgorithm algorithm, int64 cpu_budget,
int64 ram_budget)
: DatasetBase(DatasetContext(ctx)),
input_(input),
algorithm_(algorithm),
cpu_budget_(cpu_budget) {
cpu_budget_(cpu_budget),
ram_budget_(ram_budget) {
input_->Ref();
}
......@@ -190,7 +196,8 @@ class ModelDatasetOp : public UnaryDatasetOpKernel {
}
if (cancelled_) return;
}
model_->Optimize(dataset()->algorithm_, dataset()->cpu_budget_);
model_->Optimize(dataset()->algorithm_, dataset()->cpu_budget_,
dataset()->ram_budget_);
// Exponentially increase the period of running the optimization
// until a threshold is reached.
if (optimization_period_ms != kOptimizationPeriodThresholdMs) {
......@@ -213,10 +220,12 @@ class ModelDatasetOp : public UnaryDatasetOpKernel {
const DatasetBase* input_;
const model::AutotuneAlgorithm algorithm_;
const int64 cpu_budget_;
const int64 ram_budget_;
};
model::AutotuneAlgorithm algorithm_;
int64 cpu_budget_;
int64 ram_budget_;
};
REGISTER_KERNEL_BUILDER(Name("ModelDataset").Device(DEVICE_CPU),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册