提交 75fb8c8e 编写于 作者: A A. Unique TensorFlower 提交者: TensorFlower Gardener

#tf-data Provide autotune with fresh values for cpu/ram budget

The loop that runs Autotune will fetch current values for available CPU and RAM on each iteration. This helps in situations where the hardware resources available to tf.data may be vertically scaled up or down based on usage during the process' lifetime.

PiperOrigin-RevId: 565197940
上级 7cf3460c
......@@ -16,6 +16,7 @@ limitations under the License.
#include "tensorflow/core/data/root_dataset.h"
#include <algorithm>
#include <cstdint>
#include <functional>
#include <memory>
#include <string>
......@@ -79,21 +80,29 @@ void SetRootDatasetParams(const Options& options, RootDataset::Params* params) {
params->autotune_algorithm =
options.autotune_options().autotune_algorithm();
}
params->autotune_cpu_budget = value_or_default(
options.autotune_options().cpu_budget(), 0, GetCpuBudget());
int64_t cpu_budget_from_options = options.autotune_options().cpu_budget();
if (cpu_budget_from_options == 0) {
params->autotune_cpu_budget_func = [] { return GetCpuBudget(); };
} else {
params->autotune_cpu_budget_func = [cpu_budget_from_options] {
return cpu_budget_from_options;
};
}
params->autotune_ram_budget_from_options =
options.autotune_options().ram_budget();
double ram_budget_share;
if (experiments.contains("autotune_buffer_optimization")) {
// When running this experiment, increase the ram_budget since it already
// takes into account the ram usage in buffer sizing, which is not the
// case for prefetch autotuner. Without this, we see degradation in some
// jobs for lack of buffers while ram usage is low.
params->autotune_ram_budget =
value_or_default(options.autotune_options().ram_budget(), 0,
0.90 * port::AvailableRam());
ram_budget_share = 0.9;
} else {
params->autotune_ram_budget =
value_or_default(options.autotune_options().ram_budget(), 0,
model::kRamBudgetShare * port::AvailableRam());
ram_budget_share = model::kRamBudgetShare;
}
params->autotune_free_memory_func = [ram_budget_share]() {
return ram_budget_share * port::AvailableRam();
};
}
void AddTraceMetadata(const RootDataset::Params& params, const Options& options,
......@@ -102,12 +111,13 @@ void AddTraceMetadata(const RootDataset::Params& params, const Options& options,
trace_metadata->push_back(std::make_pair(
kAlgorithm, model::AutotuneAlgorithm_Name(params.autotune_algorithm)));
trace_metadata->push_back(std::make_pair(
kCpuBudget, strings::Printf("%lld", static_cast<long long>(
params.autotune_cpu_budget))));
kCpuBudget,
strings::Printf("%lld", static_cast<long long>(
params.autotune_cpu_budget_func()))));
int64_t ram_budget = params.ComputeInitialAutotuneRamBudget();
trace_metadata->push_back(std::make_pair(
kRamBudget,
strings::Printf("%lld", static_cast<long long>(
params.autotune_ram_budget / 1.0e6))));
strings::Printf("%lld", static_cast<long long>(ram_budget / 1.0e6))));
}
if (params.max_intra_op_parallelism >= 0) {
trace_metadata->push_back(std::make_pair(
......@@ -181,7 +191,7 @@ class RootDataset::Iterator : public DatasetIterator<RootDataset> {
// so no matter whether dataset()->params_.autotune is on or not
// we need to pass ram_budget_manager_ to the downstream dataset operations
ram_budget_manager_ = std::make_shared<model::RamBudgetManager>(
dataset()->params_.autotune_ram_budget);
dataset()->params_.ComputeInitialAutotuneRamBudget());
if (dataset()->params_.autotune) {
model_ = ctx->model() != nullptr ? ctx->model()
......@@ -300,11 +310,34 @@ class RootDataset::Iterator : public DatasetIterator<RootDataset> {
Status EnsureModelThreadStarted(IteratorContext* ctx) {
mutex_lock l(mu_);
if (!model_thread_) {
model_thread_ = ctx->StartThread("tf_data_model", [this]() {
RunMode run_mode = ctx->run_mode();
model_thread_ = ctx->StartThread("tf_data_model", [this, run_mode]() {
RootDataset::Params params = dataset()->params_;
std::function<int64_t(int64_t)> ram_budget_func;
int64_t ram_budget_from_options =
params.autotune_ram_budget_from_options;
if (ram_budget_from_options > 0) {
ram_budget_func = [ram_budget_from_options](int64_t) {
return ram_budget_from_options;
};
} else {
if (run_mode == RunMode::STANDALONE) {
// Dynamic RAM budget should only apply to tf.data service.
auto free_memory_func = params.autotune_free_memory_func;
ram_budget_func = [free_memory_func](int64_t total_buffered_bytes) {
return free_memory_func() + total_buffered_bytes;
};
} else {
int64_t constant_ram_budget =
params.ComputeInitialAutotuneRamBudget();
ram_budget_func = [constant_ram_budget](int64_t) {
return constant_ram_budget;
};
}
}
Status status = model_->OptimizeLoop(
dataset()->params_.autotune_algorithm,
dataset()->params_.autotune_cpu_budget, *ram_budget_manager_,
cancellation_manager_.get());
params.autotune_algorithm, params.autotune_cpu_budget_func,
ram_budget_func, *ram_budget_manager_, cancellation_manager_.get());
if (!status.ok()) {
LOG(WARNING) << "Optimization loop failed: " << status;
}
......
......@@ -15,11 +15,12 @@ limitations under the License.
#ifndef TENSORFLOW_CORE_DATA_ROOT_DATASET_H_
#define TENSORFLOW_CORE_DATA_ROOT_DATASET_H_
#include <cstdint>
#include <functional>
#include <memory>
#include <vector>
#include "tensorflow/core/framework/dataset.h"
#include "tensorflow/core/framework/model.h"
#include "tensorflow/core/framework/model.pb.h"
#include "tensorflow/core/platform/refcount.h"
......@@ -33,10 +34,19 @@ class RootDataset : public DatasetBase {
struct Params {
bool autotune = true;
model::AutotuneAlgorithm autotune_algorithm;
int64_t autotune_cpu_budget = 0;
int64_t autotune_ram_budget = 0;
std::function<int64_t()> autotune_cpu_budget_func;
std::function<int64_t()> autotune_free_memory_func;
int64_t autotune_ram_budget_from_options;
int64_t max_intra_op_parallelism = 1;
int64_t private_threadpool_size = 0;
int64_t ComputeInitialAutotuneRamBudget() const {
if (autotune_ram_budget_from_options > 0) {
return autotune_ram_budget_from_options;
} else {
return autotune_free_memory_func();
}
}
};
static Status FromOptions(const DatasetBase* input, DatasetBase** output);
......
......@@ -847,6 +847,8 @@ class IteratorContext {
bool warm_start() { return params_.warm_start; }
RunMode run_mode() { return params_.run_mode; }
std::unique_ptr<thread::ThreadPool> CreateThreadPool(const string& name,
int num_threads) {
if (params_.thread_pool) {
......
......@@ -27,6 +27,7 @@ limitations under the License.
#include "tensorflow/core/lib/gtl/cleanup.h"
#include "tensorflow/core/lib/strings/str_util.h"
#include "tensorflow/core/platform/host_info.h"
#include "tensorflow/core/platform/logging.h"
#include "tensorflow/core/platform/mem.h"
#include "tensorflow/core/platform/statusor.h"
......@@ -2276,7 +2277,9 @@ void Model::FlushMetrics() {
}
}
void Model::Optimize(AutotuneAlgorithm algorithm, int64_t cpu_budget,
void Model::Optimize(AutotuneAlgorithm algorithm,
std::function<int64_t()> cpu_budget_func,
std::function<int64_t(int64_t)> ram_budget_func,
double model_input_time,
RamBudgetManager& ram_budget_manager,
CancellationManager* cancellation_manager) {
......@@ -2285,15 +2288,17 @@ void Model::Optimize(AutotuneAlgorithm algorithm, int64_t cpu_budget,
tf_shared_lock l(mu_);
snapshot = output_->Snapshot();
}
int64_t ram_budget = ram_budget_manager.AvailableModelRam();
int64_t total_ram_budget = ram_budget_func(TotalBufferedBytes(snapshot));
ram_budget_manager.UpdateBudget(total_ram_budget);
int64_t model_ram_budget = ram_budget_manager.AvailableModelRam();
int64_t original_model_bytes = TotalMaximumBufferedBytes(snapshot);
if (!port::JobName().empty()) {
RecordAutotuneRamUsage(ram_budget, original_model_bytes);
RecordAutotuneRamUsage(model_ram_budget, original_model_bytes);
}
OptimizationParams optimization_params;
optimization_params.set_algorithm(algorithm);
optimization_params.set_cpu_budget(cpu_budget);
optimization_params.set_ram_budget(ram_budget);
optimization_params.set_cpu_budget(cpu_budget_func());
optimization_params.set_ram_budget(model_ram_budget);
optimization_params.set_model_input_time(model_input_time);
switch (algorithm) {
case AutotuneAlgorithm::DEFAULT:
......@@ -2407,7 +2412,9 @@ bool Model::ShouldStop(int64_t cpu_budget, int64_t ram_budget,
}
// TODO(jsimsa): Add support for tracking and using the model input time.
Status Model::OptimizeLoop(AutotuneAlgorithm algorithm, int64_t cpu_budget,
Status Model::OptimizeLoop(AutotuneAlgorithm algorithm,
std::function<int64_t()> cpu_budget_func,
std::function<int64_t(int64_t)> ram_budget_func,
RamBudgetManager& ram_budget_manager,
CancellationManager* cancellation_manager) {
std::function<void()> unused;
......@@ -2446,8 +2453,8 @@ Status Model::OptimizeLoop(AutotuneAlgorithm algorithm, int64_t cpu_budget,
if (algorithm == AutotuneAlgorithm::STAGE_BASED) {
model_input_time = ComputeTargetTimeNsec();
}
Optimize(algorithm, cpu_budget, model_input_time, ram_budget_manager,
cancellation_manager);
Optimize(algorithm, cpu_budget_func, ram_budget_func, model_input_time,
ram_budget_manager, cancellation_manager);
int64_t end_ms = EnvTime::NowMicros() / EnvTime::kMillisToMicros;
VLOG(2) << "Optimized for " << end_ms - start_ms << " ms.";
......
......@@ -204,9 +204,15 @@ class RamBudgetManager {
return budget_ - legacy_prefetch_allocated_;
}
void UpdateBudget(int64_t budget) {
mutex_lock l(mu_);
budget_ = budget;
VLOG(2) << "Updated ram budget to " << budget;
}
private:
mutable mutex mu_;
const int64_t budget_;
int64_t budget_ TF_GUARDED_BY(mu_) = 0;
// Number of bytes allocated by legacy prefetch autotuner.
int64_t legacy_prefetch_allocated_ TF_GUARDED_BY(mu_) = 0;
// Number of bytes allocated by the model.
......@@ -897,15 +903,27 @@ class Model {
// Uses the given algorithm and resource budgets to periodically perform the
// autotuning optimization.
//
// `cpu_budget_func` can be used to provide the optimizer with up-to-date
// values in cases where CPUs budgets may be changed by the runtime
// dynamically.
//
// `ram_budget_func` is similar to `cpu_budget_func`. This lambda takes a
// parameter that is the total number of bytes currently buffered by the
// model.
//
// To terminate the execution of the optimization loop, the caller needs to
// invoke `cancellation_mgr->StartCancel()`.
Status OptimizeLoop(AutotuneAlgorithm algorithm, int64_t cpu_budget,
Status OptimizeLoop(AutotuneAlgorithm algorithm,
std::function<int64_t()> cpu_budget_func,
std::function<int64_t(int64_t)> ram_budget_func,
RamBudgetManager& ram_budget_manager,
CancellationManager* cancellation_manager);
// Uses the given algorithm and resource budgets to perform the autotuning
// optimization.
void Optimize(AutotuneAlgorithm algorithm, int64_t cpu_budget,
void Optimize(AutotuneAlgorithm algorithm,
std::function<int64_t()> cpu_budget_func,
std::function<int64_t(int64_t)> ram_budget_func,
double model_input_time, RamBudgetManager& ram_budget_manager,
CancellationManager* cancellation_manager);
......
......@@ -16,7 +16,9 @@ limitations under the License.
#include "tensorflow/core/framework/model.h"
#include <algorithm>
#include <cstdint>
#include <cstdlib>
#include <functional>
#include <memory>
#include <string>
#include <tuple>
......@@ -39,6 +41,14 @@ using ::tensorflow::monitoring::testing::CellReader;
using ::testing::AllOf;
using ::testing::HasSubstr;
std::function<int64_t()> CpuBudgetFunc(int64_t budget) {
return [budget]() { return budget; };
}
std::function<int64_t(int64_t)> RamBudgetFunc(int64_t budget) {
return [budget](int64_t) { return budget; };
}
int64_t CountParametersOnNode(const string& node_name,
const Model::ModelParameters& parameters) {
int64_t cnt = 0;
......@@ -1336,9 +1346,10 @@ TEST_P(OptimizeZeroRamBudgetTest, Model) {
&node3);
CancellationManager cancellation_manager;
RamBudgetManager ram_budget_manager(/*budget=*/0);
model.Optimize(algorithm, /*cpu_budget=*/40, /*model_input_time=*/0,
ram_budget_manager, &cancellation_manager);
RamBudgetManager ram_budget_manager(0);
model.Optimize(algorithm, CpuBudgetFunc(40), RamBudgetFunc(0),
/*model_input_time=*/0, ram_budget_manager,
&cancellation_manager);
EXPECT_EQ(node1->parameter_value("parallelism"), 1);
EXPECT_EQ(node2->parameter_value("buffer_size"), 0);
EXPECT_EQ(node3->parameter_value("parallelism"), 1);
......@@ -1384,10 +1395,10 @@ TEST(ModelTest, ModelCollectOptimizationMetrics) {
HasSubstr("autotune: true"), HasSubstr("num_elements: 1"),
HasSubstr("processing_time: 100")));
CancellationManager cancellation_manager;
RamBudgetManager ram_budget_manager(/*budget=*/1000);
model.Optimize(AutotuneAlgorithm::STAGE_BASED, /*cpu_budget=*/20,
/*model_input_time=*/50, ram_budget_manager,
&cancellation_manager);
RamBudgetManager ram_budget_manager(0);
model.Optimize(
AutotuneAlgorithm::STAGE_BASED, CpuBudgetFunc(40), RamBudgetFunc(1000),
/*model_input_time=*/50, ram_budget_manager, &cancellation_manager);
model.output()->record_element();
model.output()->record_start(300);
model.output()->record_stop(400);
......@@ -1403,9 +1414,9 @@ TEST(ModelTest, ModelCollectOptimizationMetrics) {
model.RecordIteratorGapTime(12);
// Call optimization again. Metrics collected after the first optimization
// and the added gap times should be returned as well.
model.Optimize(AutotuneAlgorithm::STAGE_BASED, /*cpu_budget=*/20,
/*model_input_time=*/50, ram_budget_manager,
&cancellation_manager);
model.Optimize(
AutotuneAlgorithm::STAGE_BASED, CpuBudgetFunc(20), RamBudgetFunc(1000),
/*model_input_time=*/50, ram_budget_manager, &cancellation_manager);
EXPECT_THAT(
cell_reader.Read(model_id),
AllOf(HasSubstr("key: 0"), HasSubstr("name: \"unknown0\""),
......@@ -2367,11 +2378,11 @@ TEST_F(BufferSizeTest, OptimizeBuffers_PlentyOfMemory) {
EXPECT_EQ(2, node_6->buffered_elements_high());
CancellationManager cancellation_manager;
RamBudgetManager ram_budget_manager(/*budget=*/10000);
RamBudgetManager ram_budget_manager(0);
model_->AddExperiment("autotune_buffer_optimization");
model_->Optimize(AutotuneAlgorithm::STAGE_BASED, /*cpu_budget=*/1000,
/*model_input_time=*/0, ram_budget_manager,
&cancellation_manager);
model_->Optimize(
AutotuneAlgorithm::STAGE_BASED, CpuBudgetFunc(1000), RamBudgetFunc(10000),
/*model_input_time=*/0, ram_budget_manager, &cancellation_manager);
EXPECT_EQ(2, node_1->parameter_value(kBufferSize));
EXPECT_EQ(4, node_3->parameter_value(kBufferSize));
......@@ -2515,11 +2526,11 @@ TEST_F(BufferSizeTest, OptimizeBuffers_TightMemory) {
EXPECT_EQ(5, node_4->buffered_elements_high());
CancellationManager cancellation_manager;
RamBudgetManager ram_budget_manager(/*budget=*/3000);
RamBudgetManager ram_budget_manager(0);
model_->AddExperiment("autotune_buffer_optimization");
model_->Optimize(AutotuneAlgorithm::STAGE_BASED, /*cpu_budget=*/1000,
/*model_input_time=*/0, ram_budget_manager,
&cancellation_manager);
model_->Optimize(
AutotuneAlgorithm::STAGE_BASED, CpuBudgetFunc(1000), RamBudgetFunc(3000),
/*model_input_time=*/0, ram_budget_manager, &cancellation_manager);
EXPECT_DOUBLE_EQ(7.0, node_1->parameter_value(kBufferSize));
EXPECT_DOUBLE_EQ(7.0, node_2->parameter_value(kBufferSize));
......@@ -2587,10 +2598,10 @@ TEST_F(ModelTimingTest, OptimizeStageBased_OneStage) {
)pb");
CancellationManager cancellation_manager;
RamBudgetManager ram_budget_manager(/*budget=*/1000);
model_->Optimize(AutotuneAlgorithm::STAGE_BASED, /*cpu_budget=*/20,
/*model_input_time=*/50, ram_budget_manager,
&cancellation_manager);
RamBudgetManager ram_budget_manager(0);
model_->Optimize(
AutotuneAlgorithm::STAGE_BASED, CpuBudgetFunc(20), RamBudgetFunc(1000),
/*model_input_time=*/50, ram_budget_manager, &cancellation_manager);
EXPECT_EQ(5, GetNode(/*node_id=*/1)->parameter_value("parallelism"));
}
......@@ -2643,10 +2654,10 @@ TEST_F(ModelTimingTest, OptimizeStageBased_CappedByParameterMax) {
CellReader<int64_t> cell_reader(
"/tensorflow/data/autotune_stopping_criteria");
CancellationManager cancellation_manager;
RamBudgetManager ram_budget_manager(/*budget=*/1000);
model_->Optimize(AutotuneAlgorithm::STAGE_BASED, /*cpu_budget=*/20,
/*model_input_time=*/50, ram_budget_manager,
&cancellation_manager);
RamBudgetManager ram_budget_manager(0);
model_->Optimize(
AutotuneAlgorithm::STAGE_BASED, CpuBudgetFunc(20), RamBudgetFunc(1000),
/*model_input_time=*/50, ram_budget_manager, &cancellation_manager);
// The max value is set to 3. Otherwise, the expected parallelism value is 5.
EXPECT_EQ(3, GetNode(/*node_id=*/1)->parameter_value("parallelism"));
......@@ -2713,10 +2724,10 @@ TEST_F(ModelTimingTest, OptimizeStageBased_TwoStages) {
)pb");
CancellationManager cancellation_manager;
RamBudgetManager ram_budget_manager(/*budget=*/1000);
model_->Optimize(AutotuneAlgorithm::STAGE_BASED, /*cpu_budget=*/5,
/*model_input_time=*/50, ram_budget_manager,
&cancellation_manager);
RamBudgetManager ram_budget_manager(0);
model_->Optimize(
AutotuneAlgorithm::STAGE_BASED, CpuBudgetFunc(5), RamBudgetFunc(1000),
/*model_input_time=*/50, ram_budget_manager, &cancellation_manager);
EXPECT_EQ(5, GetNode(/*node_id=*/1)->parameter_value("parallelism"));
EXPECT_EQ(5, GetNode(/*node_id=*/2)->parameter_value("parallelism"));
......@@ -2867,10 +2878,11 @@ TEST_F(ModelTimingTest, OptimizeStageBased_ParallelInterleaveMaxParallelism) {
CellReader<int64_t> cell_reader(
"/tensorflow/data/autotune_stopping_criteria");
CancellationManager cancellation_manager;
RamBudgetManager ram_budget_manager(/*budget=*/10000);
RamBudgetManager ram_budget_manager(0);
// Not enough RAM, the original `parallelism` should not change.
model_->AddExperiment("stage_based_autotune_v2");
model_->Optimize(AutotuneAlgorithm::STAGE_BASED, /*cpu_budget=*/10000,
model_->Optimize(AutotuneAlgorithm::STAGE_BASED, CpuBudgetFunc(10000),
RamBudgetFunc(10000),
/*model_input_time=*/60, ram_budget_manager,
&cancellation_manager);
EXPECT_EQ(10, GetNode(/*node_id=*/2)->parameter_value("parallelism"));
......@@ -2942,32 +2954,30 @@ TEST_F(ModelTimingTest, OptimizeStageBased_TwoStages_RamBudgetExceeded) {
CellReader<int64_t> cell_reader(
"/tensorflow/data/autotune_stopping_criteria");
CancellationManager cancellation_manager;
RamBudgetManager empty_ram_budget_manager(/*budget=*/0);
RamBudgetManager ram_budget_manager(0);
// Not enough RAM, the original `parallelism` should not change.
model_->Optimize(AutotuneAlgorithm::STAGE_BASED, /*cpu_budget=*/10,
/*model_input_time=*/100, empty_ram_budget_manager,
&cancellation_manager);
model_->Optimize(
AutotuneAlgorithm::STAGE_BASED, CpuBudgetFunc(10), RamBudgetFunc(0),
/*model_input_time=*/100, ram_budget_manager, &cancellation_manager);
EXPECT_EQ(4, GetNode(/*node_id=*/1)->parameter_value("parallelism"));
EXPECT_EQ(4, GetNode(/*node_id=*/2)->parameter_value("parallelism"));
EXPECT_EQ(cell_reader.Delta(
"ram_budget_exceeded:ParallelMapV2[]_Arbitrary[]_Stuff(id:2)"),
1);
RamBudgetManager high_mem_ram_budget_manager(/*budget=*/100000);
// Has enough RAM, the original `parallelism` should increase.
model_->Optimize(AutotuneAlgorithm::STAGE_BASED, /*cpu_budget=*/10,
/*model_input_time=*/0, high_mem_ram_budget_manager,
&cancellation_manager);
model_->Optimize(
AutotuneAlgorithm::STAGE_BASED, CpuBudgetFunc(10), RamBudgetFunc(100000),
/*model_input_time=*/0, ram_budget_manager, &cancellation_manager);
EXPECT_EQ(12, GetNode(/*node_id=*/1)->parameter_value("parallelism"));
EXPECT_EQ(16, GetNode(/*node_id=*/2)->parameter_value("parallelism"));
EXPECT_EQ(
cell_reader.Delta(
"parameter_max_exceeded:ParallelMapV2[]_Arbitrary[]_Stuff(id:2)"),
1);
RamBudgetManager low_mem_ram_budget_manager(/*budget=*/100);
// Not enough RAM, the original `parallelism` should not change.
model_->Optimize(AutotuneAlgorithm::STAGE_BASED, /*cpu_budget=*/10,
/*model_input_time=*/0, low_mem_ram_budget_manager,
&cancellation_manager);
model_->Optimize(
AutotuneAlgorithm::STAGE_BASED, CpuBudgetFunc(10), RamBudgetFunc(100),
/*model_input_time=*/0, ram_budget_manager, &cancellation_manager);
EXPECT_EQ(12, GetNode(/*node_id=*/1)->parameter_value("parallelism"));
EXPECT_EQ(16, GetNode(/*node_id=*/2)->parameter_value("parallelism"));
EXPECT_EQ(cell_reader.Delta(
......@@ -3027,10 +3037,10 @@ TEST_F(ModelTimingTest, OptimizeStageBased_PipelineRatio) {
)pb");
CancellationManager cancellation_manager;
RamBudgetManager ram_budget_manager(/*budget=*/10000);
model_->Optimize(AutotuneAlgorithm::STAGE_BASED, /*cpu_budget=*/20,
/*model_input_time=*/100, ram_budget_manager,
&cancellation_manager);
RamBudgetManager ram_budget_manager(0);
model_->Optimize(
AutotuneAlgorithm::STAGE_BASED, CpuBudgetFunc(20), RamBudgetFunc(10000),
/*model_input_time=*/100, ram_budget_manager, &cancellation_manager);
EXPECT_EQ(3, GetNode(/*node_id=*/1)->parameter_value("parallelism"));
}
......@@ -3087,10 +3097,10 @@ TEST_F(ModelTimingTest, OptimizeStageBased_PipelineRatioLessThanOne) {
)pb");
CancellationManager cancellation_manager;
RamBudgetManager ram_budget_manager(/*budget=*/10000);
model_->Optimize(AutotuneAlgorithm::STAGE_BASED, /*cpu_budget=*/20,
/*model_input_time=*/50, ram_budget_manager,
&cancellation_manager);
RamBudgetManager ram_budget_manager(0);
model_->Optimize(
AutotuneAlgorithm::STAGE_BASED, CpuBudgetFunc(20), RamBudgetFunc(10000),
/*model_input_time=*/50, ram_budget_manager, &cancellation_manager);
EXPECT_EQ(14, GetNode(/*node_id=*/1)->parameter_value("parallelism"));
}
......@@ -3280,11 +3290,11 @@ TEST_F(ModelTimingTest, ComputeSnapshotProcessingTimeSingleStage1) {
)pb");
CancellationManager cancellation_manager;
RamBudgetManager ram_budget_manager(/*budget=*/1);
RamBudgetManager ram_budget_manager(0);
// Ensure the model snapshot is populated.
model_->Optimize(AutotuneAlgorithm::STAGE_BASED, /*cpu_budget=*/1,
/*model_input_time=*/1000000, ram_budget_manager,
&cancellation_manager);
model_->Optimize(
AutotuneAlgorithm::STAGE_BASED, CpuBudgetFunc(1), RamBudgetFunc(1),
/*model_input_time=*/1000000, ram_budget_manager, &cancellation_manager);
EXPECT_EQ(model_->ComputeSnapshotProcessingTimeNsec(), 1200.0);
}
......@@ -3327,10 +3337,10 @@ TEST_F(ModelTimingTest, ComputeSnapshotProcessingTimeSingleStage2) {
CancellationManager cancellation_manager;
// Ensure the model snapshot is populated.
RamBudgetManager ram_budget_manager(/*budget=*/1);
model_->Optimize(AutotuneAlgorithm::STAGE_BASED, /*cpu_budget=*/1,
/*model_input_time=*/1000000, ram_budget_manager,
&cancellation_manager);
RamBudgetManager ram_budget_manager(0);
model_->Optimize(
AutotuneAlgorithm::STAGE_BASED, CpuBudgetFunc(1), RamBudgetFunc(1),
/*model_input_time=*/1000000, ram_budget_manager, &cancellation_manager);
EXPECT_EQ(model_->ComputeSnapshotProcessingTimeNsec(), 1500.0);
}
......@@ -3406,10 +3416,10 @@ TEST_F(ModelTimingTest, ComputeSnapshotProcessingTimeMultipleStages) {
CancellationManager cancellation_manager;
// Ensure the model snapshot is populated.
RamBudgetManager ram_budget_manager(/*budget=*/1);
model_->Optimize(AutotuneAlgorithm::STAGE_BASED, /*cpu_budget=*/1,
/*model_input_time=*/1000000, ram_budget_manager,
&cancellation_manager);
RamBudgetManager ram_budget_manager(0);
model_->Optimize(
AutotuneAlgorithm::STAGE_BASED, CpuBudgetFunc(1), RamBudgetFunc(1),
/*model_input_time=*/1000000, ram_budget_manager, &cancellation_manager);
EXPECT_EQ(model_->ComputeSnapshotProcessingTimeNsec(), 1500.0);
}
......@@ -3462,6 +3472,49 @@ TEST_F(ModelTimingTest, SelfTime) {
EXPECT_DOUBLE_EQ(910, node_2->ComputeSelfTime());
}
TEST(RamBudgetManagerTest, Ctor) {
RamBudgetManager rbm(10);
EXPECT_EQ(rbm.AvailableModelRam(), 10);
}
TEST(RamBudgetManagerTest, RequestAllocations) {
RamBudgetManager rbm(10);
// Over budget
EXPECT_FALSE(rbm.RequestModelAllocation(11));
// Model allocations specify total bytes
EXPECT_TRUE(rbm.RequestModelAllocation(10));
EXPECT_TRUE(rbm.RequestModelAllocation(7));
// Still within budget
EXPECT_TRUE(rbm.RequestLegacyPrefetchBytes(3));
// Over budget
EXPECT_FALSE(rbm.RequestLegacyPrefetchBytes(2));
// Reducing the model allocation should make room for more prefetch bytes
EXPECT_TRUE(rbm.RequestModelAllocation(5));
EXPECT_TRUE(rbm.RequestLegacyPrefetchBytes(2));
}
TEST(RamBudgetManagerTest, RequestAllocationsWithBudgetAdjustment) {
RamBudgetManager rbm(10);
// Over budget
EXPECT_FALSE(rbm.RequestModelAllocation(11));
// Update budget
rbm.UpdateBudget(15);
EXPECT_TRUE(rbm.RequestModelAllocation(11));
EXPECT_TRUE(rbm.RequestLegacyPrefetchBytes(3));
// Over budget
EXPECT_FALSE(rbm.RequestLegacyPrefetchBytes(2));
// Reducing the model allocation should make room for more prefetch bytes
EXPECT_TRUE(rbm.RequestModelAllocation(7));
// Ok, since 7 + 3 + 2 <= 15
EXPECT_TRUE(rbm.RequestLegacyPrefetchBytes(2));
// Update budget again
rbm.UpdateBudget(16);
// Over budget 5 > 16 - 7 - (3 + 2)
EXPECT_FALSE(rbm.RequestLegacyPrefetchBytes(5));
// Just fits
EXPECT_TRUE(rbm.RequestLegacyPrefetchBytes(4));
}
} // namespace
} // namespace model
} // namespace data
......
......@@ -14,6 +14,8 @@ limitations under the License.
==============================================================================*/
#include "tensorflow/core/kernels/data/model_dataset_op.h"
#include <cstdint>
#include "tensorflow/core/data/dataset_utils.h"
#include "tensorflow/core/framework/cancellation.h"
......@@ -191,9 +193,15 @@ class ModelDatasetOp::Dataset : public DatasetBase {
auto ram_budget_manager = ctx->ram_budget_manager();
model_thread_ =
ctx->StartThread("tf_data_model", [this, ram_budget_manager]() {
int64_t captured_cpu_budget = cpu_budget_;
int64_t captured_ram_budget = ram_budget_;
Status status = model_->OptimizeLoop(
dataset()->algorithm_, cpu_budget_, *ram_budget_manager,
cancellation_manager_.get());
dataset()->algorithm_,
[captured_cpu_budget]() { return captured_cpu_budget; },
[captured_ram_budget](int64_t) {
return captured_ram_budget;
},
*ram_budget_manager, cancellation_manager_.get());
if (!status.ok()) {
LOG(WARNING)
<< "Optimization loop failed: " << status.ToString();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册