From f9fbff63fbce3fe41f41e8ddda33a36c6335e168 Mon Sep 17 00:00:00 2001 From: Rachel Lim Date: Tue, 19 Mar 2019 09:14:39 -0700 Subject: [PATCH] [tf.data] Add option to control whether vectorization is aggressive (i.e. always vectorizes) or safe (i.e. uses ChooseFastestBranchDataset) PiperOrigin-RevId: 239203789 --- .../core/grappler/optimizers/data/BUILD | 1 + .../optimizers/data/map_vectorization.cc | 52 +++-- .../optimizers/data/map_vectorization.h | 26 ++- .../optimizers/data/map_vectorization_test.cc | 205 +++++++++++++----- .../optimizers/data/meta_optimizer.cc | 54 ++++- .../core/kernels/data/optimize_dataset_op.cc | 20 +- tensorflow/core/ops/dataset_ops.cc | 1 + .../python/data/experimental/__init__.py | 2 + .../optimization/map_vectorization_test.py | 49 ++--- .../experimental/ops/optimization_options.py | 48 +++- tensorflow/python/data/ops/dataset_ops.py | 12 +- ...erimental.-map-vectorization-options.pbtxt | 18 ++ .../v1/tensorflow.data.experimental.pbtxt | 4 + .../api/golden/v1/tensorflow.raw_ops.pbtxt | 2 +- ...erimental.-map-vectorization-options.pbtxt | 18 ++ .../v2/tensorflow.data.experimental.pbtxt | 4 + .../api/golden/v2/tensorflow.raw_ops.pbtxt | 2 +- 17 files changed, 397 insertions(+), 121 deletions(-) create mode 100644 tensorflow/tools/api/golden/v1/tensorflow.data.experimental.-map-vectorization-options.pbtxt create mode 100644 tensorflow/tools/api/golden/v2/tensorflow.data.experimental.-map-vectorization-options.pbtxt diff --git a/tensorflow/core/grappler/optimizers/data/BUILD b/tensorflow/core/grappler/optimizers/data/BUILD index 49f36fc7c55..bb0f163e69c 100644 --- a/tensorflow/core/grappler/optimizers/data/BUILD +++ b/tensorflow/core/grappler/optimizers/data/BUILD @@ -548,6 +548,7 @@ cc_library( hdrs = ["meta_optimizer.h"], deps = [ "@com_google_absl//absl/container:flat_hash_map", + "@com_google_absl//absl/strings", "//tensorflow/core/grappler/clusters:cluster", "//tensorflow/core/grappler/optimizers:arithmetic_optimizer", "//tensorflow/core/grappler/optimizers:custom_graph_optimizer", diff --git a/tensorflow/core/grappler/optimizers/data/map_vectorization.cc b/tensorflow/core/grappler/optimizers/data/map_vectorization.cc index 983b0436338..0f3bbfd88d7 100644 --- a/tensorflow/core/grappler/optimizers/data/map_vectorization.cc +++ b/tensorflow/core/grappler/optimizers/data/map_vectorization.cc @@ -534,44 +534,66 @@ Status MapVectorization::OptimizeAndCollectStats(Cluster* cluster, AddVectorizedFunction(*map_node, *map_func, library); CHECK_NOTNULL(vectorized_func); - std::vector vectorized_branch; NodeDef* new_batch_node; TF_RETURN_IF_ERROR(AddNewBatchNode( *batch_node, *input_node, *vectorized_func, &graph, &new_batch_node)); - vectorized_branch.push_back(new_batch_node); NodeDef* new_map_node; TF_RETURN_IF_ERROR(AddNewMapNode(*map_node, *batch_node, *new_batch_node, *vectorized_func, &graph, &new_map_node)); - vectorized_branch.push_back(new_map_node); + NodeDef* optional_new_prefetch_node = nullptr; if (optional_prefetch_node) { // If the original pipeline was .map().prefetch().batch(), the new // pipeline is .batch().map().prefetch() - NodeDef* new_prefetch_node; TF_RETURN_IF_ERROR(AddNewPrefetchNode(*optional_prefetch_node, *batch_node, *new_map_node, &graph, - &new_prefetch_node)); - vectorized_branch.push_back(new_prefetch_node); + &optional_new_prefetch_node)); } + std::vector vectorized_branch( + {new_batch_node, new_map_node}); + std::vector original_branch({map_node}); if (optional_prefetch_node) { original_branch.push_back(optional_prefetch_node); + vectorized_branch.push_back(optional_new_prefetch_node); } - if (map_node->op() != kExperimentalMapAndBatchOp) { + if (batch_node->op() != kExperimentalMapAndBatchOp) { original_branch.push_back(batch_node); } - NodeDef* new_choose_fastest_node; - TF_RETURN_IF_ERROR(AddNewChooseFastestNode( - input_node, /*ratio_numerator_name=*/new_batch_node->input(1), - std::move(original_branch), std::move(vectorized_branch), &graph, - library, &new_choose_fastest_node)); + // Mark the original nodes for deletion. + for (const auto& n : original_branch) { + nodes_to_delete.insert(n->name()); + } + + if (use_choose_fastest_) { + // Optionally, use ChooseFastestBranch node to mitigate potential + // regressions caused by vectorization. + for (const auto& n : vectorized_branch) { + // Mark the vectorized nodes for deletion, since they will be added in + // the choose fastest dataset branch function separately. + nodes_to_delete.insert(n->name()); + } + NodeDef* new_choose_fastest_node; + TF_RETURN_IF_ERROR(AddNewChooseFastestNode( + input_node, /*ratio_numerator_name=*/new_batch_node->input(1), + std::move(original_branch), std::move(vectorized_branch), &graph, + library, &new_choose_fastest_node)); + // Make output of Batch point to ChooseFastest instead. + TF_RETURN_IF_ERROR(graph.UpdateFanouts(batch_node->name(), + new_choose_fastest_node->name())); + + } else { + // Make output of Batch point to the new Map (or Prefetch) node instead. + TF_RETURN_IF_ERROR(graph.UpdateFanouts( + batch_node->name(), optional_new_prefetch_node + ? optional_new_prefetch_node->name() + : new_map_node->name())); + } - // Make output of Batch point to ChooseFastest instead. - TF_RETURN_IF_ERROR(graph.UpdateFanouts(batch_node->name(), - new_choose_fastest_node->name())); + TF_RETURN_IF_ERROR(graph.DeleteNodes(nodes_to_delete)); stats->num_changes++; } return Status::OK(); diff --git a/tensorflow/core/grappler/optimizers/data/map_vectorization.h b/tensorflow/core/grappler/optimizers/data/map_vectorization.h index 88ec9cfec62..2d3c068e7f6 100644 --- a/tensorflow/core/grappler/optimizers/data/map_vectorization.h +++ b/tensorflow/core/grappler/optimizers/data/map_vectorization.h @@ -16,6 +16,7 @@ limitations under the License. #ifndef TENSORFLOW_CORE_GRAPPLER_OPTIMIZERS_DATA_MAP_VECTORIZATION_H_ #define TENSORFLOW_CORE_GRAPPLER_OPTIMIZERS_DATA_MAP_VECTORIZATION_H_ +#include "tensorflow/core/framework/attr_value.pb.h" #include "tensorflow/core/grappler/optimizers/data/optimizer_base.h" namespace tensorflow { @@ -33,10 +34,11 @@ namespace grappler { // (or map_and_batch) // // To: -// input --> map --> batch --------+ -// | (or map_and_batch) | -// | v -// +-----> batch --> map --> choose_fastest --> output +// input --> batch --> map --> output +// +// If the "ChooseFastest" configuration is enabled, it adds a +// ChooseFastestBranch dataset node to pick between the original map->batch +// branch and the vectorized batch->map branch. // class MapVectorization : public TFDataOptimizerBase { public: @@ -47,6 +49,19 @@ class MapVectorization : public TFDataOptimizerBase { Status Init( const tensorflow::RewriterConfig_CustomGraphOptimizer* config) override { + if (!config) return Status::OK(); + + const string& choose_fastest_param = + config->parameter_map().at("use_choose_fastest").s(); + if (choose_fastest_param == "true") { + use_choose_fastest_ = true; + } else if (choose_fastest_param == "false") { + use_choose_fastest_ = false; + } else { + return errors::Internal( + "Received an invalid value for parameter \"use_choose_fastest\"", + choose_fastest_param); + } return Status::OK(); } @@ -56,6 +71,9 @@ class MapVectorization : public TFDataOptimizerBase { void Feedback(Cluster* cluster, const GrapplerItem& item, const GraphDef& optimize_output, double result) override; + + private: + bool use_choose_fastest_ = false; }; } // namespace grappler diff --git a/tensorflow/core/grappler/optimizers/data/map_vectorization_test.cc b/tensorflow/core/grappler/optimizers/data/map_vectorization_test.cc index 884bc17d98f..370bf5abe7c 100644 --- a/tensorflow/core/grappler/optimizers/data/map_vectorization_test.cc +++ b/tensorflow/core/grappler/optimizers/data/map_vectorization_test.cc @@ -53,6 +53,19 @@ constexpr char kAttrNameDtype[] = "dtype"; using test::function::NDef; +Status OptimizeWithMapVectorization(const GrapplerItem& item, GraphDef* output, + bool use_choose_fastest) { + MapVectorization optimizer; + RewriterConfig_CustomGraphOptimizer config; + if (use_choose_fastest) { + (*config.mutable_parameter_map())["use_choose_fastest"].set_s("true"); + } else { + (*config.mutable_parameter_map())["use_choose_fastest"].set_s("false"); + } + TF_RETURN_IF_ERROR(optimizer.Init(&config)); + return optimizer.Optimize(nullptr, item, output); +} + // Adds a simple vectorizable map function that is akin to // dataset.map(lambda x: tf.identity(x)) FunctionDef* AddMapFn(MutableGraphView* graph) { @@ -188,6 +201,35 @@ const FunctionDef* GetFunction(const GraphDef& graph, return &graph.library().function(found); } +void CheckVectorizedWithoutChooseFastest( + const GraphDef& output, gtl::ArraySlice expected_vectorized_branch, + const string& input_name) { + std::vector vectorized_branch; + for (const auto& op : expected_vectorized_branch) { + // This assumes that vectorized op is the only one that exists in the graph. + // For our test cases, this is true (we don't have superfluous map/batch + // nodes in other parts of the pipeline). + ASSERT_EQ(graph_utils::FindAllGraphNodesWithOp(op, output).size(), 1); + vectorized_branch.push_back( + &output.node(graph_utils::FindGraphNodeWithOp(op, output))); + } + + for (int i = 1; i < vectorized_branch.size() - 1; ++i) { + const NodeDef* node = vectorized_branch[i]; + const NodeDef* next_node = vectorized_branch[i + 1]; + ASSERT_EQ(next_node->input(0), node->name()); + } + ASSERT_EQ(vectorized_branch[0]->input(0), input_name); + + const NodeDef* vectorized_map_node = vectorized_branch[1]; + string function_name = + vectorized_map_node->attr().at(kAttrNameF).func().name(); + + const FunctionDef* function = GetFunction(output, function_name); + ASSERT_NE(function, nullptr); + EXPECT_EQ(function->node_def(0).op(), "Identity"); +} + // Checks that a graph has undergone the map_vectorization transformation // successfully, whereby the new graph has the shape: // @@ -198,10 +240,15 @@ const FunctionDef* GetFunction(const GraphDef& graph, // | // +--> old map --> old batch // -void CheckVectorized(const GraphDef& output, - gtl::ArraySlice expected_vectorized_branch, - gtl::ArraySlice expected_original_branch, - const string& input_name) { +void CheckVectorizedWithChooseFastest( + const GraphDef& output, gtl::ArraySlice expected_vectorized_branch, + gtl::ArraySlice expected_original_branch, + const string& input_name) { + for (const auto& op : {kBatchOp, kBatchV2Op, kMapOp, kParallelMapOp, + kExperimentalMapAndBatchOp}) { + // Check that the dataset nodes have been removed from the main graph. + ASSERT_EQ(graph_utils::FindAllGraphNodesWithOp(op, output).size(), 0); + } ASSERT_EQ( graph_utils::FindAllGraphNodesWithOp(kChooseFastestOp, output).size(), 1); const NodeDef& choose_fastest_node = @@ -234,12 +281,13 @@ void CheckVectorized(const GraphDef& output, } class MapThenBatchTest - : public ::testing::TestWithParam> {}; + : public ::testing::TestWithParam> {}; TEST_P(MapThenBatchTest, IsVectorized) { int num_parallel_calls = std::get<0>(GetParam()); bool use_batch_v2 = std::get<1>(GetParam()); int prefetch = std::get<2>(GetParam()); + bool use_choose_fastest = std::get<3>(GetParam()); GrapplerItem item; MutableGraphView graph(&item.graph); auto range_dataset = AddRangeNode(&graph); @@ -251,9 +299,8 @@ TEST_P(MapThenBatchTest, IsVectorized) { dataset = AddPrefetchNode(&graph, dataset->name(), prefetch); } dataset = AddBatchNode(&graph, dataset->name(), use_batch_v2); - MapVectorization optimizer; GraphDef output; - TF_ASSERT_OK(optimizer.Optimize(nullptr, item, &output)); + TF_ASSERT_OK(OptimizeWithMapVectorization(item, &output, use_choose_fastest)); std::vector expected_original_branch; expected_original_branch.push_back(num_parallel_calls > 0 ? kParallelMapOp @@ -272,14 +319,24 @@ TEST_P(MapThenBatchTest, IsVectorized) { expected_vectorized_branch.push_back(kPrefetchOp); } - CheckVectorized(output, expected_vectorized_branch, expected_original_branch, - range_dataset->name()); + if (use_choose_fastest) { + CheckVectorizedWithChooseFastest(output, expected_vectorized_branch, + expected_original_branch, + range_dataset->name()); + + } else { + CheckVectorizedWithoutChooseFastest(output, expected_vectorized_branch, + range_dataset->name()); + } } INSTANTIATE_TEST_SUITE_P(MapThenBatchTest, MapThenBatchTest, ::testing::Combine(::testing::Values(0, 12), ::testing::Bool(), - ::testing::Values(0, 20))); + ::testing::Values(0, 20), + ::testing::Bool())); + +class MapAndBatchTest : public ::testing::TestWithParam {}; NodeDef* AddMapAndBatchNode(MutableGraphView* graph, const string& input_dataset, const string& map_fn, @@ -307,7 +364,7 @@ NodeDef* AddMapAndBatchNode(MutableGraphView* graph, return graph->AddNode(std::move(result)); } -TEST(MapVectorizationTest, VectorizeExperimentalMapAndBatch) { +TEST_P(MapAndBatchTest, VectorizeExperimentalMapAndBatch) { GrapplerItem item; MutableGraphView graph(&item.graph); auto range_node = AddRangeNode(&graph); @@ -316,16 +373,24 @@ TEST(MapVectorizationTest, VectorizeExperimentalMapAndBatch) { map_fn->signature().name()); ASSERT_NE(map_and_batch_node, nullptr); - MapVectorization optimizer; GraphDef output; - TF_ASSERT_OK(optimizer.Optimize(nullptr, item, &output)); + bool use_choose_fastest = GetParam(); - CheckVectorized(output, {kBatchV2Op, kParallelMapOp}, - {kExperimentalMapAndBatchOp}, range_node->name()); + TF_ASSERT_OK(OptimizeWithMapVectorization(item, &output, use_choose_fastest)); + if (use_choose_fastest) { + CheckVectorizedWithChooseFastest(output, {kBatchV2Op, kParallelMapOp}, + {kExperimentalMapAndBatchOp}, + range_node->name()); + } else { + CheckVectorizedWithoutChooseFastest(output, {kBatchV2Op, kParallelMapOp}, + range_node->name()); + } } +INSTANTIATE_TEST_SUITE_P(MapAndBatchTest, MapAndBatchTest, ::testing::Bool()); + class ChainedMapAndBatchTest - : public ::testing::TestWithParam> {}; + : public ::testing::TestWithParam> {}; // Tests: // 1) map.batch.map.batch @@ -352,52 +417,76 @@ TEST_P(ChainedMapAndBatchTest, IsVectorized) { bool fuse_0 = std::get<0>(GetParam()); bool fuse_1 = std::get<1>(GetParam()); + bool use_choose_fastest = std::get<2>(GetParam()); auto map_and_batch_0 = make_map_and_batch(input_node, fuse_0); auto map_and_batch_1 = make_map_and_batch(map_and_batch_0, fuse_1); ASSERT_NE(map_and_batch_1, nullptr); - MapVectorization optimizer; GraphDef output; - TF_ASSERT_OK(optimizer.Optimize(nullptr, item, &output)); + TF_ASSERT_OK(OptimizeWithMapVectorization(item, &output, use_choose_fastest)); TF_ASSERT_OK(TopologicalSort(&output)); - std::vector choose_fastest_nodes = - graph_utils::FindAllGraphNodesWithOp(kChooseFastestOp, output); - ASSERT_EQ(choose_fastest_nodes.size(), 2); - - std::vector fused_sequence({kExperimentalMapAndBatchOp}); - std::vector unfused_sequence({kParallelMapOp, kBatchV2Op}); - const NodeDef& range_node = - output.node(graph_utils::FindGraphNodeWithOp(kRangeOp, output)); - const NodeDef& choose_fastest_0 = output.node(choose_fastest_nodes[0]); - ASSERT_EQ(choose_fastest_0.input(0), range_node.name()); - const NodeDef& choose_fastest_1 = output.node(choose_fastest_nodes[1]); - ASSERT_EQ(choose_fastest_1.input(0), choose_fastest_0.name()); - - auto check_branches = [&output](const NodeDef& choose_fastest_node, - gtl::ArraySlice original_ops) { - const auto& functions_list = - choose_fastest_node.attr().at("branches").list(); - - // Branch 0: vectorized - const FunctionDef* branch_0 = - GetFunction(output, functions_list.func(0).name()); - ASSERT_NE(branch_0, nullptr); - CheckBranch(*branch_0, {kBatchV2Op, kParallelMapOp}); - - // Branch 1: original - const FunctionDef* branch_1 = - GetFunction(output, functions_list.func(1).name()); - ASSERT_NE(branch_1, nullptr); - CheckBranch(*branch_1, original_ops); - }; - - check_branches(choose_fastest_0, fuse_0 ? fused_sequence : unfused_sequence); - check_branches(choose_fastest_1, fuse_1 ? fused_sequence : unfused_sequence); + if (use_choose_fastest) { + std::vector choose_fastest_nodes = + graph_utils::FindAllGraphNodesWithOp(kChooseFastestOp, output); + ASSERT_EQ(choose_fastest_nodes.size(), 2); + + std::vector fused_sequence({kExperimentalMapAndBatchOp}); + std::vector unfused_sequence({kParallelMapOp, kBatchV2Op}); + const NodeDef& range_node = + output.node(graph_utils::FindGraphNodeWithOp(kRangeOp, output)); + const NodeDef& choose_fastest_0 = output.node(choose_fastest_nodes[0]); + ASSERT_EQ(choose_fastest_0.input(0), range_node.name()); + const NodeDef& choose_fastest_1 = output.node(choose_fastest_nodes[1]); + ASSERT_EQ(choose_fastest_1.input(0), choose_fastest_0.name()); + + auto check_branches = [&output](const NodeDef& choose_fastest_node, + gtl::ArraySlice original_ops) { + const auto& functions_list = + choose_fastest_node.attr().at("branches").list(); + + // Branch 0: vectorized + const FunctionDef* branch_0 = + GetFunction(output, functions_list.func(0).name()); + ASSERT_NE(branch_0, nullptr); + CheckBranch(*branch_0, {kBatchV2Op, kParallelMapOp}); + + // Branch 1: original + const FunctionDef* branch_1 = + GetFunction(output, functions_list.func(1).name()); + ASSERT_NE(branch_1, nullptr); + CheckBranch(*branch_1, original_ops); + }; + + check_branches(choose_fastest_0, + fuse_0 ? fused_sequence : unfused_sequence); + check_branches(choose_fastest_1, + fuse_1 ? fused_sequence : unfused_sequence); + } else { + std::vector map_nodes = + graph_utils::FindAllGraphNodesWithOp(kParallelMapOp, output); + std::vector batch_nodes = + graph_utils::FindAllGraphNodesWithOp(kBatchV2Op, output); + ASSERT_EQ(map_nodes.size(), 2); + ASSERT_EQ(batch_nodes.size(), 2); + + const NodeDef& range_node = + output.node(graph_utils::FindGraphNodeWithOp(kRangeOp, output)); + + const NodeDef& batch_node_0 = output.node(batch_nodes[0]); + EXPECT_EQ(batch_node_0.input(0), range_node.name()); + const NodeDef& map_node_0 = output.node(map_nodes[0]); + EXPECT_EQ(map_node_0.input(0), batch_node_0.name()); + const NodeDef& batch_node_1 = output.node(batch_nodes[1]); + EXPECT_EQ(batch_node_1.input(0), map_node_0.name()); + const NodeDef& map_node_1 = output.node(map_nodes[1]); + EXPECT_EQ(map_node_1.input(0), batch_node_1.name()); + } } INSTANTIATE_TEST_SUITE_P(ChainedMapAndBatchTest, ChainedMapAndBatchTest, ::testing::Combine(::testing::Bool(), + ::testing::Bool(), ::testing::Bool())); // Not all dataset types have "output_shapes" and "output_types" @@ -434,9 +523,8 @@ TEST(MapVectorizationTest, VectorizeWithUndefinedOutputShapes) { auto map_node = AddMapNode(&graph, input_node->name(), map_fn->signature().name()); auto batch_node = AddBatchNode(&graph, map_node->name()); - MapVectorization optimizer; GraphDef output; - TF_ASSERT_OK(optimizer.Optimize(nullptr, item, &output)); + TF_ASSERT_OK(OptimizeWithMapVectorization(item, &output, true)); CheckNotVectorized(output, map_node->op(), batch_node->op(), input_node->name()); } @@ -454,9 +542,8 @@ TEST(MapVectorizationTest, VectorizeWithUnknownRank) { auto map_node = AddMapNode(&graph, input_node->name(), map_fn->signature().name()); auto batch_node = AddBatchNode(&graph, map_node->name()); - MapVectorization optimizer; GraphDef output; - TF_ASSERT_OK(optimizer.Optimize(nullptr, item, &output)); + TF_ASSERT_OK(OptimizeWithMapVectorization(item, &output, true)); CheckNotVectorized(output, map_node->op(), batch_node->op(), input_node->name()); } @@ -474,9 +561,8 @@ TEST(MapVectorizationTest, VectorizeWithUnknownDim) { auto map_node = AddMapNode(&graph, input_node->name(), map_fn->signature().name()); auto batch_node = AddBatchNode(&graph, map_node->name()); - MapVectorization optimizer; GraphDef output; - TF_ASSERT_OK(optimizer.Optimize(nullptr, item, &output)); + TF_ASSERT_OK(OptimizeWithMapVectorization(item, &output, true)); CheckNotVectorized(output, map_node->op(), batch_node->op(), input_node->name()); } @@ -493,10 +579,9 @@ TEST(MapVectorizationTest, VectorizeWithUndefinedOutputTypes) { auto map_node = AddMapNode(&graph, input_node->name(), map_fn->signature().name()); auto batch_node = AddBatchNode(&graph, map_node->name()); - MapVectorization optimizer; GraphDef output; - TF_ASSERT_OK(optimizer.Optimize(nullptr, item, &output)); - CheckVectorized( + TF_ASSERT_OK(OptimizeWithMapVectorization(item, &output, true)); + CheckVectorizedWithChooseFastest( output, /*expected_vectorized_branch=*/{batch_node->op(), map_node->op()}, /*expected_original_branch=*/{map_node->op(), batch_node->op()}, input_node->name()); diff --git a/tensorflow/core/grappler/optimizers/data/meta_optimizer.cc b/tensorflow/core/grappler/optimizers/data/meta_optimizer.cc index 584759f85d4..ec98c7ae361 100644 --- a/tensorflow/core/grappler/optimizers/data/meta_optimizer.cc +++ b/tensorflow/core/grappler/optimizers/data/meta_optimizer.cc @@ -15,6 +15,7 @@ limitations under the License. #include "tensorflow/core/grappler/optimizers/data/meta_optimizer.h" +#include "absl/strings/str_split.h" #include "tensorflow/core/grappler/clusters/cluster.h" #include "tensorflow/core/grappler/grappler_item.h" #include "tensorflow/core/grappler/optimizers/arithmetic_optimizer.h" @@ -29,6 +30,50 @@ limitations under the License. namespace tensorflow { namespace grappler { +namespace { + +using ConfigMap = + std::map; + +// Parses a list of string optimizer configurations into a map from +// optimizer name -> rewriter config for that optimizer. +Status ToConfigMap( + const tensorflow::RewriterConfig_CustomGraphOptimizer* config, + ConfigMap* result) { + auto found = gtl::FindOrNull(config->parameter_map(), "optimizer_configs"); + if (!found) return Status::OK(); + + auto& options = found->list().s(); + for (const auto& option_string : options) { + // The option string has the format + // :: + std::vector split = absl::StrSplit(option_string, ':'); + if (split.size() != 3) { + return errors::Internal( + "Wrong format for optimizer options. Expect ::, received: ", + option_string); + } + + const string& optimizer_name = split[0]; + const string& config_key = split[1]; + const string& config_value = split[2]; + + auto optimizer_config = gtl::FindOrNull(*result, optimizer_name); + if (!optimizer_config) { + (*result)[optimizer_name] = + tensorflow::RewriterConfig_CustomGraphOptimizer(); + optimizer_config = gtl::FindOrNull(*result, optimizer_name); + } + (*optimizer_config->mutable_parameter_map())[config_key].set_s( + config_value); + } + + return Status::OK(); +} + +} // namespace + Status TFDataMetaOptimizer::Optimize(Cluster* cluster, const GrapplerItem& item, GraphDef* output) { // Stores the optimized item so far. @@ -86,13 +131,16 @@ Status TFDataMetaOptimizer::Init( // Initialize custom tf.data optimizers based on config. auto& optimizers = config->parameter_map().at("optimizers").list().s(); + ConfigMap optimizer_configs; + TF_RETURN_IF_ERROR(ToConfigMap(config, &optimizer_configs)); + for (const auto& optimizer_name : optimizers) { auto optimizer = CustomGraphOptimizerRegistry::CreateByNameOrNull(optimizer_name); if (optimizer) { - // None of our data optimizers implement a meaningful Init function. - // This returns an error in case any of them does. - TF_RETURN_IF_ERROR(optimizer->Init()); + TF_RETURN_IF_ERROR( + optimizer->Init(gtl::FindOrNull(optimizer_configs, optimizer_name))); + enabled_optimizers_[optimizer_name] = std::move(optimizer); } else { // This should never happen. diff --git a/tensorflow/core/kernels/data/optimize_dataset_op.cc b/tensorflow/core/kernels/data/optimize_dataset_op.cc index 17094e30017..5064940b64c 100644 --- a/tensorflow/core/kernels/data/optimize_dataset_op.cc +++ b/tensorflow/core/kernels/data/optimize_dataset_op.cc @@ -36,6 +36,8 @@ class OptimizeDatasetOp : public UnaryDatasetOpKernel { graph_def_version_(ctx->graph_def_version()) { OP_REQUIRES_OK(ctx, ctx->GetAttr("output_types", &output_types_)); OP_REQUIRES_OK(ctx, ctx->GetAttr("output_shapes", &output_shapes_)); + OP_REQUIRES_OK(ctx, + ctx->GetAttr("optimization_configs", &optimizer_configs_)); } protected: @@ -44,8 +46,8 @@ class OptimizeDatasetOp : public UnaryDatasetOpKernel { std::vector optimizations; OP_REQUIRES_OK( ctx, ParseVectorArgument(ctx, "optimizations", &optimizations)); - Dataset* dataset = - new Dataset(ctx, input, optimizations, output_types_, output_shapes_); + Dataset* dataset = new Dataset(ctx, input, optimizations, output_types_, + output_shapes_, optimizer_configs_); Status s = dataset->Optimize(ctx); if (s.ok()) { *output = dataset; @@ -61,9 +63,11 @@ class OptimizeDatasetOp : public UnaryDatasetOpKernel { Dataset(OpKernelContext* ctx, const DatasetBase* input, const std::vector& optimizations, const DataTypeVector& output_types, - const std::vector& output_shapes) + const std::vector& output_shapes, + const std::vector& optimizer_configs) : GraphRewriteDataset(ctx, input, output_types, output_shapes), - optimizations_(optimizations) {} + optimizations_(optimizations), + optimizer_configs_(optimizer_configs) {} string DebugString() const override { return "OptimizeDatasetOp::Dataset"; } @@ -81,15 +85,23 @@ class OptimizeDatasetOp : public UnaryDatasetOpKernel { for (const auto& opt : optimizations_) { custom_optimizations_list->add_s(opt); } + auto* config_list = + (*custom_optimizer->mutable_parameter_map())["optimizer_configs"] + .mutable_list(); + for (const auto& config : optimizer_configs_) { + config_list->add_s(config); + } return rewriter_config; } const std::vector optimizations_; + const std::vector optimizer_configs_; }; const int graph_def_version_; DataTypeVector output_types_; std::vector output_shapes_; + std::vector optimizer_configs_; }; REGISTER_KERNEL_BUILDER(Name("OptimizeDataset").Device(DEVICE_CPU), diff --git a/tensorflow/core/ops/dataset_ops.cc b/tensorflow/core/ops/dataset_ops.cc index 89c1204cf0e..6390d9dbfee 100644 --- a/tensorflow/core/ops/dataset_ops.cc +++ b/tensorflow/core/ops/dataset_ops.cc @@ -624,6 +624,7 @@ REGISTER_OP("OptimizeDataset") .Output("handle: variant") .Attr("output_types: list(type) >= 1") .Attr("output_shapes: list(shape) >= 1") + .Attr("optimization_configs: list(string) = []") .SetShapeFn(shape_inference::ScalarShape); REGISTER_OP("OptionalFromValue") diff --git a/tensorflow/python/data/experimental/__init__.py b/tensorflow/python/data/experimental/__init__.py index a5da41bdf4a..98454f3ac7d 100644 --- a/tensorflow/python/data/experimental/__init__.py +++ b/tensorflow/python/data/experimental/__init__.py @@ -26,6 +26,7 @@ See [Importing Data](https://tensorflow.org/guide/datasets) for an overview. @@CheckpointInputPipelineHook @@CsvDataset @@DatasetStructure +@@MapVectorizationOptions @@NestedStructure @@OptimizationOptions @@Optional @@ -102,6 +103,7 @@ from tensorflow.python.data.experimental.ops.interleave_ops import sample_from_d from tensorflow.python.data.experimental.ops.iterator_ops import CheckpointInputPipelineHook from tensorflow.python.data.experimental.ops.iterator_ops import make_saveable_from_iterator from tensorflow.python.data.experimental.ops.optimization import AUTOTUNE +from tensorflow.python.data.experimental.ops.optimization_options import MapVectorizationOptions from tensorflow.python.data.experimental.ops.optimization_options import OptimizationOptions from tensorflow.python.data.experimental.ops.parsing_ops import parse_example_dataset from tensorflow.python.data.experimental.ops.prefetching_ops import copy_to_device diff --git a/tensorflow/python/data/experimental/kernel_tests/optimization/map_vectorization_test.py b/tensorflow/python/data/experimental/kernel_tests/optimization/map_vectorization_test.py index 8dfcdc7e4b5..7dbfafe22a6 100644 --- a/tensorflow/python/data/experimental/kernel_tests/optimization/map_vectorization_test.py +++ b/tensorflow/python/data/experimental/kernel_tests/optimization/map_vectorization_test.py @@ -321,6 +321,13 @@ def _generate_optimization_test_cases(): @test_util.run_all_in_graph_and_eager_modes class MapVectorizationTest(test_base.DatasetTestBase, parameterized.TestCase): + def _enable_map_vectorization(self, dataset, use_choose=True): + options = dataset_ops.Options() + opt_options = options.experimental_optimization + opt_options.map_vectorization.enabled = True + opt_options.map_vectorization.use_choose_fastest = use_choose + return dataset.with_options(options) + def _get_test_datasets(self, base_dataset, map_fn, @@ -361,10 +368,7 @@ class MapVectorizationTest(test_base.DatasetTestBase, parameterized.TestCase): # to verify the optimization result. optimized = _make_dataset(["ChooseFastestBranch"] if expect_optimized else [map_node_name, "Batch"]) - options = dataset_ops.Options() - options.experimental_optimization.apply_default_optimizations = False - options.experimental_optimization.map_vectorization = True - optimized = optimized.with_options(options) + optimized = self._enable_map_vectorization(optimized) return unoptimized, optimized @parameterized.named_parameters(_generate_optimization_test_cases()) @@ -404,16 +408,12 @@ class MapVectorizationTest(test_base.DatasetTestBase, parameterized.TestCase): def testOptimizationWithMapAndBatchFusion(self): # Tests that vectorization works on fused map and batch. - y = constant_op.constant(1, shape=(2,)) - z = constant_op.constant(2, shape=(2,)) - def map_fn(x): - return x, y, z + return x**2 + base_dataset = dataset_ops.Dataset.range(1000) options = dataset_ops.Options() options.experimental_optimization.apply_default_optimizations = False - base_dataset = dataset_ops.Dataset.from_tensor_slices([[1, 2], - [3, 4]]).repeat(5) base_dataset = base_dataset.with_options(options) def _make_dataset(node_names): @@ -423,9 +423,7 @@ class MapVectorizationTest(test_base.DatasetTestBase, parameterized.TestCase): unoptimized = _make_dataset(["MapAndBatch"]) optimized = _make_dataset(["ChooseFastestBranch"]) - options = dataset_ops.Options() - options.experimental_optimization.map_vectorization = True - optimized = optimized.with_options(options) + optimized = self._enable_map_vectorization(optimized) self.assertDatasetsEqual(optimized, unoptimized) @parameterized.named_parameters( @@ -474,10 +472,7 @@ class MapVectorizationTest(test_base.DatasetTestBase, parameterized.TestCase): unoptimized = make_dataset(unoptimized_seq) optimized = make_dataset(["ChooseFastestBranch", "ChooseFastestBranch"]) - options = dataset_ops.Options() - options.experimental_optimization.map_vectorization = True - optimized = optimized.with_options(options) - + optimized = self._enable_map_vectorization(optimized) self.assertDatasetsEqual(optimized, unoptimized) def testOptimizationIgnoreStateful(self): @@ -536,9 +531,7 @@ class MapVectorizationTest(test_base.DatasetTestBase, parameterized.TestCase): options.experimental_optimization.apply_default_optimizations = False unoptimized = unoptimized.with_options(options) - options = dataset_ops.Options() - options.experimental_optimization.map_vectorization = True - optimized = unoptimized.with_options(options) + optimized = self._enable_map_vectorization(unoptimized) self.assertDatasetsEqual(unoptimized, optimized) def testOptimizationWithSparseTensor(self): @@ -554,10 +547,7 @@ class MapVectorizationTest(test_base.DatasetTestBase, parameterized.TestCase): options = dataset_ops.Options() options.experimental_optimization.apply_default_optimizations = False unoptimized = unoptimized.with_options(options) - - options = dataset_ops.Options() - options.experimental_optimization.map_vectorization = True - optimized = unoptimized.with_options(options) + optimized = self._enable_map_vectorization(unoptimized) self.assertDatasetsEqual(unoptimized, optimized) def testOptimizationWithPrefetch(self): @@ -565,11 +555,16 @@ class MapVectorizationTest(test_base.DatasetTestBase, parameterized.TestCase): dataset = dataset.map(lambda x: x) dataset = dataset.prefetch(1) dataset = dataset.batch(10) - options = dataset_ops.Options() - options.experimental_optimization.map_vectorization = True - dataset = dataset.with_options(options) + dataset = self._enable_map_vectorization(dataset) self.assertDatasetProduces(dataset, [list(range(10))]) + def testOptimizationWithoutChooseFastest(self): + dataset = dataset_ops.Dataset.range(10) + dataset = dataset.map(lambda x: x**2) + dataset = dataset.batch(10) + dataset = self._enable_map_vectorization(dataset, use_choose=False) + self.assertDatasetProduces(dataset, [[x**2 for x in range(10)]]) + if __name__ == "__main__": test.main() diff --git a/tensorflow/python/data/experimental/ops/optimization_options.py b/tensorflow/python/data/experimental/ops/optimization_options.py index e11fa884409..02d4ecd9c05 100644 --- a/tensorflow/python/data/experimental/ops/optimization_options.py +++ b/tensorflow/python/data/experimental/ops/optimization_options.py @@ -17,11 +17,42 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function - from tensorflow.python.data.util import options from tensorflow.python.util.tf_export import tf_export +@tf_export("data.experimental.MapVectorizationOptions") +class MapVectorizationOptions(options.OptionsBase): + """Represents options for the MapVectorization optimization.""" + # TODO(rachelim): Other configuration parameters can go here, for example, + # how many "experiments" to run with ChooseFastestBranchDataset. + enabled = options.create_option( + name="enabled", + ty=bool, + docstring= + "Whether to vectorize map transformations. If None, defaults to False." + ) + + use_choose_fastest = options.create_option( + name="use_choose_fastest", + ty=bool, + docstring="Whether to use ChooseFastestBranchDataset with this " + "transformation. If True, the pipeline picks between the vectorized and " + "original segment at runtime based on their iterations speed. If None, " + "defaults to False.") + + def _static_optimizations(self): + if self.enabled: + return ["map_vectorization"] + return [] + + def _static_optimization_configs(self): + if self.use_choose_fastest: + return ["map_vectorization:use_choose_fastest:true"] + else: + return ["map_vectorization:use_choose_fastest:false"] + + @tf_export("data.experimental.OptimizationOptions") class OptimizationOptions(options.OptionsBase): """Represents options for dataset optimizations. @@ -102,9 +133,11 @@ class OptimizationOptions(options.OptionsBase): map_vectorization = options.create_option( name="map_vectorization", - ty=bool, + ty=MapVectorizationOptions, docstring= - "Whether to vectorize map transformations. If None, defaults to False.") + "The map vectorization options associated with the dataset. See " + "`tf.data.experimental.MapVectorizationOptions` for more details.", + default_factory=MapVectorizationOptions) noop_elimination = options.create_option( name="noop_elimination", @@ -128,7 +161,6 @@ class OptimizationOptions(options.OptionsBase): "map_and_filter_fusion", "map_parallelization", "map_fusion", - "map_vectorization", "noop_elimination", "shuffle_and_repeat_fusion", ] @@ -147,4 +179,12 @@ class OptimizationOptions(options.OptionsBase): for optimization in optimizations_to_disable: if getattr(self, optimization) is not False: result.add(optimization) + + if self.map_vectorization is not None: + result.update(self.map_vectorization._static_optimizations()) # pylint: disable=protected-access return sorted(list(result)) + + def _static_optimization_configs(self): + if self.map_vectorization is not None: + return self.map_vectorization._static_optimization_configs() # pylint: disable=protected-access + return [] diff --git a/tensorflow/python/data/ops/dataset_ops.py b/tensorflow/python/data/ops/dataset_ops.py index a86f74542f0..1a902357ecb 100644 --- a/tensorflow/python/data/ops/dataset_ops.py +++ b/tensorflow/python/data/ops/dataset_ops.py @@ -191,7 +191,8 @@ class DatasetV2(object): "`tf.enable_resource_variables()` at the start of the program." % ", ".join(static_optimizations)) else: - dataset = _OptimizeDataset(dataset, static_optimizations) + dataset = _OptimizeDataset(dataset, static_optimizations, + options._static_optimization_configs()) # pylint: disable=protected-access autotune = True cpu_budget = 0 # Indicates that all CPU cores should be used. @@ -2009,6 +2010,10 @@ class Options(options_lib.OptionsBase): result.append("latency_all_edges") return result + def _static_optimization_configs(self): + """Produces the list of configurations for enabled static optimizations.""" + return self.experimental_optimization._static_optimization_configs() # pylint: disable=protected-access + def merge(self, options): """Merges itself with the given `tf.data.Options`. @@ -3295,15 +3300,18 @@ class _ModelDataset(UnaryUnchangedStructureDataset): class _OptimizeDataset(UnaryUnchangedStructureDataset): """A `Dataset` that acts as an identity, and applies optimizations.""" - def __init__(self, input_dataset, optimizations): + def __init__(self, input_dataset, optimizations, optimization_configs=None): self._input_dataset = input_dataset if optimizations is None: optimizations = [] + if optimization_configs is None: + optimization_configs = [] self._optimizations = ops.convert_to_tensor( optimizations, dtype=dtypes.string, name="optimizations") variant_tensor = gen_dataset_ops.optimize_dataset( input_dataset._variant_tensor, # pylint: disable=protected-access self._optimizations, + optimization_configs=optimization_configs, **flat_structure(self)) super(_OptimizeDataset, self).__init__(input_dataset, variant_tensor) diff --git a/tensorflow/tools/api/golden/v1/tensorflow.data.experimental.-map-vectorization-options.pbtxt b/tensorflow/tools/api/golden/v1/tensorflow.data.experimental.-map-vectorization-options.pbtxt new file mode 100644 index 00000000000..ae713e8831b --- /dev/null +++ b/tensorflow/tools/api/golden/v1/tensorflow.data.experimental.-map-vectorization-options.pbtxt @@ -0,0 +1,18 @@ +path: "tensorflow.data.experimental.MapVectorizationOptions" +tf_class { + is_instance: "" + is_instance: "" + is_instance: "" + member { + name: "enabled" + mtype: "" + } + member { + name: "use_choose_fastest" + mtype: "" + } + member_method { + name: "__init__" + argspec: "args=[\'self\'], varargs=None, keywords=None, defaults=None" + } +} diff --git a/tensorflow/tools/api/golden/v1/tensorflow.data.experimental.pbtxt b/tensorflow/tools/api/golden/v1/tensorflow.data.experimental.pbtxt index 9aa5955a16f..d62c67cfb2a 100644 --- a/tensorflow/tools/api/golden/v1/tensorflow.data.experimental.pbtxt +++ b/tensorflow/tools/api/golden/v1/tensorflow.data.experimental.pbtxt @@ -20,6 +20,10 @@ tf_module { name: "INFINITE_CARDINALITY" mtype: "" } + member { + name: "MapVectorizationOptions" + mtype: "" + } member { name: "NestedStructure" mtype: "" diff --git a/tensorflow/tools/api/golden/v1/tensorflow.raw_ops.pbtxt b/tensorflow/tools/api/golden/v1/tensorflow.raw_ops.pbtxt index 489771285c7..717bb8c72ef 100644 --- a/tensorflow/tools/api/golden/v1/tensorflow.raw_ops.pbtxt +++ b/tensorflow/tools/api/golden/v1/tensorflow.raw_ops.pbtxt @@ -2182,7 +2182,7 @@ tf_module { } member_method { name: "OptimizeDataset" - argspec: "args=[\'input_dataset\', \'optimizations\', \'output_types\', \'output_shapes\', \'name\'], varargs=None, keywords=None, defaults=[\'None\'], " + argspec: "args=[\'input_dataset\', \'optimizations\', \'output_types\', \'output_shapes\', \'optimization_configs\', \'name\'], varargs=None, keywords=None, defaults=[\'[]\', \'None\'], " } member_method { name: "OptionalFromValue" diff --git a/tensorflow/tools/api/golden/v2/tensorflow.data.experimental.-map-vectorization-options.pbtxt b/tensorflow/tools/api/golden/v2/tensorflow.data.experimental.-map-vectorization-options.pbtxt new file mode 100644 index 00000000000..ae713e8831b --- /dev/null +++ b/tensorflow/tools/api/golden/v2/tensorflow.data.experimental.-map-vectorization-options.pbtxt @@ -0,0 +1,18 @@ +path: "tensorflow.data.experimental.MapVectorizationOptions" +tf_class { + is_instance: "" + is_instance: "" + is_instance: "" + member { + name: "enabled" + mtype: "" + } + member { + name: "use_choose_fastest" + mtype: "" + } + member_method { + name: "__init__" + argspec: "args=[\'self\'], varargs=None, keywords=None, defaults=None" + } +} diff --git a/tensorflow/tools/api/golden/v2/tensorflow.data.experimental.pbtxt b/tensorflow/tools/api/golden/v2/tensorflow.data.experimental.pbtxt index 695756890eb..1d7ac210305 100644 --- a/tensorflow/tools/api/golden/v2/tensorflow.data.experimental.pbtxt +++ b/tensorflow/tools/api/golden/v2/tensorflow.data.experimental.pbtxt @@ -20,6 +20,10 @@ tf_module { name: "INFINITE_CARDINALITY" mtype: "" } + member { + name: "MapVectorizationOptions" + mtype: "" + } member { name: "NestedStructure" mtype: "" diff --git a/tensorflow/tools/api/golden/v2/tensorflow.raw_ops.pbtxt b/tensorflow/tools/api/golden/v2/tensorflow.raw_ops.pbtxt index 489771285c7..717bb8c72ef 100644 --- a/tensorflow/tools/api/golden/v2/tensorflow.raw_ops.pbtxt +++ b/tensorflow/tools/api/golden/v2/tensorflow.raw_ops.pbtxt @@ -2182,7 +2182,7 @@ tf_module { } member_method { name: "OptimizeDataset" - argspec: "args=[\'input_dataset\', \'optimizations\', \'output_types\', \'output_shapes\', \'name\'], varargs=None, keywords=None, defaults=[\'None\'], " + argspec: "args=[\'input_dataset\', \'optimizations\', \'output_types\', \'output_shapes\', \'optimization_configs\', \'name\'], varargs=None, keywords=None, defaults=[\'[]\', \'None\'], " } member_method { name: "OptionalFromValue" -- GitLab