optimize_dataset_op.cc 8.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/* Copyright 2018 The TensorFlow Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
F
Fei Hu 已提交
15 16
#include "tensorflow/core/kernels/data/optimize_dataset_op.h"

17 18 19
// On mobile we do not provide optimize dataset op because not all of its
// dependencies are available there. The op is replaced with a no-op.
#if !defined(IS_MOBILE_PLATFORM)
20 21 22 23
#include <map>

#include "tensorflow/core/framework/partial_tensor_shape.h"
#include "tensorflow/core/framework/tensor.h"
A
A. Unique TensorFlower 已提交
24
#include "tensorflow/core/kernels/data/dataset_utils.h"
25
#include "tensorflow/core/kernels/data/rewrite_utils.h"
26
#include "tensorflow/core/lib/random/random.h"
A
A. Unique TensorFlower 已提交
27
#include "tensorflow/core/platform/host_info.h"
28 29 30
#include "tensorflow/core/protobuf/rewriter_config.pb.h"

namespace tensorflow {
31
namespace data {
32

F
Fei Hu 已提交
33 34 35
/* static */ constexpr const char* const OptimizeDatasetOp::kDatasetType;
/* static */ constexpr const char* const OptimizeDatasetOp::kInputDataset;
/* static */ constexpr const char* const OptimizeDatasetOp::kOptimizations;
A
A. Unique TensorFlower 已提交
36 37 38 39 40 41
/* static */ constexpr const char* const
    OptimizeDatasetOp::kOptimizationsEnabled;
/* static */ constexpr const char* const
    OptimizeDatasetOp::kOptimizationsDisabled;
/* static */ constexpr const char* const
    OptimizeDatasetOp::kOptimizationsDefault;
F
Fei Hu 已提交
42 43 44 45
/* static */ constexpr const char* const OptimizeDatasetOp::kOutputTypes;
/* static */ constexpr const char* const OptimizeDatasetOp::kOutputShapes;
/* static */ constexpr const char* const
    OptimizeDatasetOp::kOptimizationConfigs;
A
A. Unique TensorFlower 已提交
46 47
/* static */ constexpr const char* const OptimizeDatasetOp::kOptimizeDatasetV1;
/* static */ constexpr const char* const OptimizeDatasetOp::kOptimizeDatasetV2;
48

F
Fei Hu 已提交
49 50 51
constexpr char kOptimizerName[] = "tf_data_meta_optimizer";
constexpr char kOptimizers[] = "optimizers";
constexpr char kOptimizerConfigs[] = "optimizer_configs";
52

F
Fei Hu 已提交
53 54
OptimizeDatasetOp::OptimizeDatasetOp(OpKernelConstruction* ctx)
    : UnaryDatasetOpKernel(ctx) {
A
A. Unique TensorFlower 已提交
55 56 57 58 59 60
  auto& op_name = ctx->def().op();
  if (op_name == kOptimizeDatasetV1) {
    op_version_ = 1;
  } else if (op_name == kOptimizeDatasetV2) {
    op_version_ = 2;
  }
F
Fei Hu 已提交
61 62 63 64 65 66
  OP_REQUIRES_OK(ctx,
                 ctx->GetAttr(kOptimizationConfigs, &optimization_configs_));
}

void OptimizeDatasetOp::MakeDataset(OpKernelContext* ctx, DatasetBase* input,
                                    DatasetBase** output) {
67
  std::vector<tstring> optimizations;
A
A. Unique TensorFlower 已提交
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
  if (op_version_ == 1) {
    OP_REQUIRES_OK(
        ctx, ParseVectorArgument<tstring>(ctx, kOptimizations, &optimizations));
  } else if (op_version_ == 2) {
    std::vector<tstring> optimizations_enabled, optimizations_disabled,
        optimizations_default;
    OP_REQUIRES_OK(ctx, ParseVectorArgument<tstring>(ctx, kOptimizationsEnabled,
                                                     &optimizations_enabled));
    OP_REQUIRES_OK(ctx,
                   ParseVectorArgument<tstring>(ctx, kOptimizationsDisabled,
                                                &optimizations_disabled));
    OP_REQUIRES_OK(ctx, ParseVectorArgument<tstring>(ctx, kOptimizationsDefault,
                                                     &optimizations_default));

    string job_name = port::JobName();
83 84
    // The map that stores the live experiment names and for how much percentage
    // of the Borg jobs, the experiments will be randomly turned on.
85
    // clang-format off
86
    absl::flat_hash_map<string, uint64> live_experiments = {
87
        {"enable_gradient_descent", 100},
88
        {"map_parallelization", 1}
89
    };
90
    // clang-format on
91 92 93 94 95
    auto hash_func = [](const string& str) { return Hash64(str); };
    optimizations = SelectOptimizations(
        job_name, live_experiments, optimizations_enabled,
        optimizations_disabled, optimizations_default, hash_func);

96
    // Log and record the live experiments that will be applied.
97 98 99 100 101 102 103 104
    if (!job_name.empty() && !live_experiments.empty()) {
      VLOG(1) << "The input pipeline is subject to tf.data experiment. "
                 "Please see `go/tf-data-experiments` for more details.";

      for (auto& pair : live_experiments) {
        string experiment = pair.first;
        if (std::find(optimizations.begin(), optimizations.end(), experiment) !=
            optimizations.end()) {
105
          VLOG(1) << "The live experiment \"" << experiment << "\" is applied.";
106
          metrics::RecordTFDataExperiment(experiment);
107 108
        }
      }
A
A. Unique TensorFlower 已提交
109 110
    }
  }
111

112 113 114 115 116 117
  // The vector stores the graduated experiment names which will be turned on
  // for all input pipelines.
  // clang-format off
  std::vector<string> graduated_experiments = {"disable_intra_op_parallelism"};
  // clang-format on

118
  // Add the graduated experiments to the optimization list and log them.
119 120 121 122 123 124 125 126
  for (auto& experiment : graduated_experiments) {
    if (std::find(optimizations.begin(), optimizations.end(), experiment) ==
        optimizations.end()) {
      optimizations.push_back(experiment);
    }
    VLOG(1) << "The graduated experiment \"" << experiment << "\" is applied.";
  }

127 128 129 130 131 132 133
  // If there are no optimizations to be applied, directly return the input.
  if (optimizations.empty()) {
    *output = input;
    input->Ref();
    return;
  }

F
Fei Hu 已提交
134 135 136
  auto config_factory = [this, &optimizations]() {
    return CreateConfig(optimizations, optimization_configs_);
  };
137 138 139 140 141
  Status s = RewriteDataset(ctx, input, std::move(config_factory),
                            /*record_fingerprint=*/true, output);
  if (errors::IsDeadlineExceeded(s)) {
    // Ignore DeadlineExceeded as it implies that the attempted rewrite took too
    // long which should not prevent further computation.
142 143 144 145
    LOG(WARNING) << s.ToString();

    *output = input;
    input->Ref();
146 147 148
    return;
  }
  OP_REQUIRES_OK(ctx, s);
F
Fei Hu 已提交
149
}
150

F
Fei Hu 已提交
151
RewriterConfig OptimizeDatasetOp::CreateConfig(
152
    std::vector<tstring> optimizations,
F
Fei Hu 已提交
153 154 155 156 157 158 159 160 161 162
    std::vector<string> optimizations_configs) {
  RewriterConfig rewriter_config;
  rewriter_config.add_optimizers(kOptimizerName);
  rewriter_config.set_meta_optimizer_iterations(RewriterConfig::ONE);
  rewriter_config.set_fail_on_optimizer_errors(true);
  auto custom_optimizer = rewriter_config.add_custom_optimizers();
  custom_optimizer->set_name(kOptimizerName);
  auto* custom_optimizations_list =
      (*custom_optimizer->mutable_parameter_map())[kOptimizers].mutable_list();
  for (const auto& opt : optimizations) {
163
    custom_optimizations_list->add_s(opt.data(), opt.size());
F
Fei Hu 已提交
164 165 166 167 168
  }
  auto* config_list =
      (*custom_optimizer->mutable_parameter_map())[kOptimizerConfigs]
          .mutable_list();
  for (const auto& config : optimizations_configs) {
169
    config_list->add_s(config.data(), config.size());
F
Fei Hu 已提交
170 171 172 173 174
  }
  return rewriter_config;
}

namespace {
175 176
REGISTER_KERNEL_BUILDER(Name("OptimizeDataset").Device(DEVICE_CPU),
                        OptimizeDatasetOp);
A
A. Unique TensorFlower 已提交
177 178
REGISTER_KERNEL_BUILDER(Name("OptimizeDatasetV2").Device(DEVICE_CPU),
                        OptimizeDatasetOp);
179
}  // namespace
180
}  // namespace data
181
}  // namespace tensorflow
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
#else  // !IS_MOBILE_PLATFORM
namespace tensorflow {
namespace data {

OptimizeDatasetOp::OptimizeDatasetOp(OpKernelConstruction* ctx)
    : UnaryDatasetOpKernel(ctx) {}

void OptimizeDatasetOp::MakeDataset(OpKernelContext* ctx, DatasetBase* input,
                                    DatasetBase** output) {
  input->Ref();
  *output = input;
}

namespace {
REGISTER_KERNEL_BUILDER(Name("OptimizeDataset").Device(DEVICE_CPU),
                        OptimizeDatasetOp);
REGISTER_KERNEL_BUILDER(Name("OptimizeDatasetV2").Device(DEVICE_CPU),
                        OptimizeDatasetOp);
}  // namespace
}  // namespace data
}  // namespace tensorflow
#endif  // !IS_MOBILE_PLATFORM