parallel_do_op.cc 14.4 KB
Newer Older
1
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
Y
Yang Yang 已提交
2

Y
Yang Yang 已提交
3 4 5
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Y
Yang Yang 已提交
6

Y
Yang Yang 已提交
7
    http://www.apache.org/licenses/LICENSE-2.0
Y
Yang Yang 已提交
8

Y
Yang Yang 已提交
9 10 11 12 13
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
Y
Yang Yang 已提交
14 15

#include <vector>
Y
Yang Yang 已提交
16

Y
Yi Wang 已提交
17 18 19 20
#include "paddle/fluid/framework/executor.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/threadpool.h"
#include "paddle/fluid/operators/detail/safe_ref.h"
X
Xin Pan 已提交
21
#include "paddle/fluid/platform/profiler.h"
Y
Yang Yang 已提交
22 23 24 25

namespace paddle {
namespace operators {

Y
Yang Yu 已提交
26 27 28
static constexpr char kInputs[] = "inputs";
static constexpr char kParameters[] = "parameters";
static constexpr char kPlaces[] = "places";
Y
Yang Yang 已提交
29

Y
Yang Yu 已提交
30 31
static constexpr char kOutputs[] = "outputs";
static constexpr char kParallelScopes[] = "parallel_scopes";
Y
Yang Yang 已提交
32

Y
Yang Yu 已提交
33
static constexpr char kParallelBlock[] = "sub_block";
Y
Yang Yang 已提交
34
static constexpr char kUseNCCL[] = "use_nccl";
Y
Yang Yang 已提交
35

Y
Yang Yang 已提交
36
using LoDTensor = framework::LoDTensor;
Y
Yang Yang 已提交
37
using SelectedRows = framework::SelectedRows;
Y
Yang Yang 已提交
38

Y
Yu Yang 已提交
39 40
static void SplitTensorAndMoveTensorToScopes(
    const framework::Scope &scope, std::vector<framework::Scope *> *sub_scopes,
Y
Yang Yang 已提交
41 42
    const std::vector<platform::Place> &places,
    const std::vector<std::string> &names) {
Y
Yu Yang 已提交
43
  size_t num_sub_scopes = 0;
Y
Yang Yang 已提交
44
  for (auto &argu : names) {
Y
Yang Yu 已提交
45 46 47 48
    const auto &tensor =
        detail::Ref(scope.FindVar(argu),
                    "Cannot find variable %s in the parent scope", argu)
            .Get<LoDTensor>();
Y
Yang Yang 已提交
49 50 51
    auto lod_tensors = tensor.SplitLoDTensor(places);

    for (auto &lod : lod_tensors) {
Y
Yang Yang 已提交
52
      VLOG(3) << lod.dims();
Y
Yang Yang 已提交
53
    }
Y
Yu Yang 已提交
54 55 56 57 58 59 60 61 62 63 64 65
    if (num_sub_scopes == 0) {
      num_sub_scopes = lod_tensors.size();
    } else {
      PADDLE_ENFORCE_EQ(num_sub_scopes, lod_tensors.size());
    }
    PADDLE_ENFORCE_NE(num_sub_scopes, 0);
    if (sub_scopes->size() == 0) {
      sub_scopes->reserve(num_sub_scopes);
      for (size_t i = 0; i < num_sub_scopes; ++i) {
        sub_scopes->emplace_back(&scope.NewScope());
      }
    }
Y
Yang Yang 已提交
66

Y
Yu Yang 已提交
67
    for (size_t i = 0; i < lod_tensors.size(); ++i) {
Y
Yang Yu 已提交
68 69 70
      *detail::Ref(sub_scopes->at(i)->Var(argu),
                   "Cannot find variable in the sub-scope", argu)
           .GetMutable<LoDTensor>() = lod_tensors[i];
Y
Yang Yang 已提交
71 72 73 74
    }
  }
}

Y
Yang Yang 已提交
75 76 77
inline void CopyOrShare(const framework::Variable &src,
                        const platform::Place &dst_place,
                        framework::Variable *dst) {
Y
Yang Yang 已提交
78 79 80
  if (src.IsType<LoDTensor>()) {
    if (src.Get<LoDTensor>().place() == dst_place) {
      dst->GetMutable<LoDTensor>()->ShareDataWith(src.Get<LoDTensor>());
D
dzhwinter 已提交
81
      dst->GetMutable<LoDTensor>()->set_lod(src.Get<LoDTensor>().lod());
Y
Yang Yang 已提交
82
    } else {
Y
Yi Wang 已提交
83
      TensorCopy(src.Get<LoDTensor>(), dst_place, dst->GetMutable<LoDTensor>());
Y
Yang Yang 已提交
84 85 86 87 88
    }
  } else if (src.IsType<SelectedRows>()) {
    auto &src_sr = src.Get<SelectedRows>();
    auto *dst_sr = dst->GetMutable<SelectedRows>();
    dst_sr->set_height(src_sr.height());
Y
Yang Yang 已提交
89 90
    if (src_sr.value().place() == dst_place) {
      dst_sr->mutable_value()->ShareDataWith(src_sr.value());
D
dzhwinter 已提交
91
      dst_sr->set_rows(src_sr.rows());
Y
Yang Yang 已提交
92
    } else {
Y
Yi Wang 已提交
93
      TensorCopy(src_sr.value(), dst_place, dst_sr->mutable_value());
Y
Yang Yang 已提交
94
    }
Y
Yang Yang 已提交
95 96 97 98 99
  } else {
    PADDLE_THROW("Expect LoDTensor/SelectedRows, get %s", src.Type().name());
  }
}

Y
Yang Yang 已提交
100 101 102 103 104 105
void WaitOnPlace(const platform::Place place) {
  platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
  auto &dev_ctx = *pool.Get(place);
  dev_ctx.Wait();
}

106 107 108 109 110 111 112 113 114
void WaitOnPlaces(const std::vector<platform::Place> places) {
  platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();

  for (auto &place : places) {
    auto &dev_ctx = *pool.Get(place);
    dev_ctx.Wait();
  }
}

Y
Yang Yang 已提交
115
class ParallelDoOp : public framework::OperatorBase {
Y
Yang Yang 已提交
116 117 118 119 120
 public:
  ParallelDoOp(const std::string &type,
               const framework::VariableNameMap &inputs,
               const framework::VariableNameMap &outputs,
               const framework::AttributeMap &attrs)
Y
Yu Yang 已提交
121
      : framework::OperatorBase(type, inputs, outputs, attrs) {}
Y
Yang Yang 已提交
122

123 124 125
 private:
  void RunImpl(const framework::Scope &scope,
               const platform::Place &place) const override {
Y
Yang Yang 已提交
126 127 128 129 130
    // get device context from pool
    platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
    auto &dev_ctx = *pool.Get(place);

    auto *block = Attr<framework::BlockDesc *>(kParallelBlock);
Y
Yang Yang 已提交
131
    auto *program = block->Program();
Y
Yang Yang 已提交
132

133
    auto &places = scope.FindVar(Input(kPlaces))->Get<platform::PlaceList>();
Y
Yang Yang 已提交
134

Y
Yang Yang 已提交
135 136 137
    auto &sub_scopes = *scope.FindVar(Output(kParallelScopes))
                            ->GetMutable<std::vector<framework::Scope *>>();

138
    // split input
Y
Yu Yang 已提交
139
    SplitTensorAndMoveTensorToScopes(scope, &sub_scopes, places,
Y
Yang Yang 已提交
140
                                     Inputs(kInputs));
Y
Yu Yang 已提交
141

142 143 144 145 146
    // copy parameter
    for (auto &param : Inputs(kParameters)) {
      PADDLE_ENFORCE(scope.FindVar(param)->IsType<LoDTensor>(),
                     "Only support parameter type as LoDTensor");
      auto &src = scope.FindVar(param)->Get<LoDTensor>();
Y
Yu Yang 已提交
147
      for (size_t i = 0; i < sub_scopes.size(); ++i) {
148 149 150
        auto &place = places[i];
        auto *sub_scope = sub_scopes[i];
        auto *dst = sub_scope->Var(param)->GetMutable<LoDTensor>();
Y
Yi Wang 已提交
151
        framework::TensorCopy(src, place, dst);
152 153 154
      }
    }
    WaitOnPlaces(places);
Y
Yang Yang 已提交
155

Y
Yang Yu 已提交
156 157
    std::vector<std::future<void>> workers;
    workers.reserve(places.size());
Y
Yu Yang 已提交
158
    for (size_t place_idx = 0; place_idx < sub_scopes.size(); ++place_idx) {
Y
Yang Yang 已提交
159 160 161
      auto &place = places[place_idx];
      auto *cur_scope = sub_scopes[place_idx];

X
Xin Pan 已提交
162 163 164 165 166 167 168 169
      workers.emplace_back(
          framework::Async([program, cur_scope, place, block, place_idx] {
            // Give the thread an id to distinguish parallel block with same id.
            platform::RecordThread rt(static_cast<int>(place_idx) + 1);
            framework::Executor executor(place);
            executor.Run(*program, cur_scope, block->ID(),
                         false /*create_local_scope*/);
          }));
Y
Yang Yang 已提交
170 171
    }
    for (auto &worker : workers) {
Y
Yang Yu 已提交
172
      worker.wait();
Y
Yang Yang 已提交
173
    }
174
    WaitOnPlaces(places);
Y
Yang Yang 已提交
175 176 177 178

    // merge output
    for (auto &o_name : Outputs(kOutputs)) {
      std::vector<const framework::LoDTensor *> lod_tensors;
Y
Yang Yu 已提交
179
      lod_tensors.reserve(sub_scopes.size());
Y
Yang Yang 已提交
180
      for (auto *sub_scope : sub_scopes) {
Y
Yang Yu 已提交
181
        lod_tensors.emplace_back(&sub_scope->FindVar(o_name)->Get<LoDTensor>());
Y
Yang Yang 已提交
182 183 184 185 186 187
      }

      auto *lod_tensor_to_be_merged =
          scope.FindVar(o_name)->GetMutable<LoDTensor>();
      lod_tensor_to_be_merged->MergeLoDTensor(lod_tensors, dev_ctx.GetPlace());
    }
188
    WaitOnPlaces(places);
Y
Yang Yang 已提交
189
  }
Y
Yang Yang 已提交
190 191 192 193
};

class ParallelDoOpProtoMaker : public framework::OpProtoAndCheckerMaker {
 public:
Y
Yang Yang 已提交
194
  ParallelDoOpProtoMaker(OpProto *proto, framework::OpAttrChecker *op_checker)
Y
Yang Yang 已提交
195 196 197 198 199 200
      : OpProtoAndCheckerMaker(proto, op_checker) {
    AddInput(kInputs, "").AsDuplicable();
    AddInput(kParameters, "").AsDuplicable();
    AddInput(kPlaces, "");
    AddOutput(kOutputs, "").AsDuplicable();
    AddOutput(kParallelScopes, "");
Y
Yang Yang 已提交
201
    AddAttr<framework::BlockDesc *>(kParallelBlock, "");
Y
Yang Yang 已提交
202 203
    AddAttr<bool>(kUseNCCL, "true if we use nccl on backward")
        .SetDefault(false);
Y
Yang Yang 已提交
204 205 206 207 208 209
    AddComment(R"DOC(
ParallelDo Operator.
)DOC");
  }
};

Y
Yu Yang 已提交
210
class ParallelDoGradOp : public framework::OperatorBase {
Y
Yang Yang 已提交
211 212 213 214 215
 public:
  ParallelDoGradOp(const std::string &type,
                   const framework::VariableNameMap &inputs,
                   const framework::VariableNameMap &outputs,
                   const framework::AttributeMap &attrs)
Y
Yu Yang 已提交
216
      : framework::OperatorBase(type, inputs, outputs, attrs) {}
Y
Yang Yang 已提交
217

218 219 220
 private:
  void RunImpl(const framework::Scope &scope,
               const platform::Place &place) const override {
Y
Yang Yang 已提交
221
    auto *block = Attr<framework::BlockDesc *>(kParallelBlock);
Y
Yang Yang 已提交
222 223 224 225
    auto *program = block->Program();

    auto &sub_scopes = scope.FindVar(Input(kParallelScopes))
                           ->Get<std::vector<framework::Scope *>>();
226
    auto &places = scope.FindVar(Input(kPlaces))->Get<platform::PlaceList>();
Y
Yang Yang 已提交
227 228

    // feed output@grad
Y
Yu Yang 已提交
229 230 231
    SplitTensorAndMoveTensorToScopes(
        scope, const_cast<std::vector<framework::Scope *> *>(&sub_scopes),
        places, Inputs(framework::GradVarName(kOutputs)));
232
    WaitOnPlaces(places);
Y
Yang Yang 已提交
233 234

    // exe run
Y
Yang Yu 已提交
235
    std::vector<std::future<void>> workers;
Y
Yu Yang 已提交
236 237 238
    for (size_t i = 0; i < sub_scopes.size(); ++i) {
      auto &place = places[i];
      auto *cur_scope = sub_scopes[i];
Y
Yang Yang 已提交
239 240

      // execute
X
Xin Pan 已提交
241 242 243 244 245 246 247 248
      workers.emplace_back(
          framework::Async([program, cur_scope, place, block, i] {
            // Give the thread an id to distinguish parallel block with same id.
            platform::RecordThread rt(static_cast<int>(i) + 1);
            framework::Executor executor(place);
            executor.Run(*program, cur_scope, block->ID(),
                         false /*create_local_scope*/);
          }));
Y
Yang Yang 已提交
249 250
    }
    for (auto &worker : workers) {
Y
Yang Yu 已提交
251
      worker.wait();
Y
Yang Yang 已提交
252
    }
253
    WaitOnPlaces(places);
Y
Yang Yang 已提交
254

Y
Yang Yang 已提交
255 256 257 258 259 260 261 262 263 264 265
    // NCCL allreduce op will be added by backward,
    // so no need to explicitly accumulate grad
    if (!(Attr<bool>(kUseNCCL))) {
      AccumulateGrad(scope, place, sub_scopes, places);
    } else {
      for (auto &place : places) {
        PADDLE_ENFORCE(platform::is_gpu_place(place),
                       "NCCL only supports cuda place");
      }
    }
    for (auto &s : Outputs(framework::GradVarName(kParameters))) {
266
      if (s == framework::kEmptyVarName) {
Y
Yang Yang 已提交
267 268 269
        continue;
      }
      VLOG(3) << "Moving " << s;
Y
Yang Yang 已提交
270 271 272
      CopyOrShare(*sub_scopes[0]->FindVar(s), place, scope.FindVar(s));
    }
    WaitOnPlaces(places);
Y
Yang Yang 已提交
273 274 275 276 277 278
  }

  void AccumulateGrad(const framework::Scope &scope,
                      const platform::Place &place,
                      const std::vector<framework::Scope *> &sub_scopes,
                      const platform::PlaceList &places) const {
Y
Yang Yang 已提交
279
    for (auto &s : Outputs(framework::GradVarName(kParameters))) {
280
      if (s == framework::kEmptyVarName) {
Y
Yang Yang 已提交
281 282
        continue;
      }
Y
Yang Yang(Tony) 已提交
283 284
      VLOG(3) << "Accumulating " << s;
      if (s == framework::kEmptyVarName) continue;
Y
Yang Yang 已提交
285
      std::string tmp_name;
Y
Yang Yang 已提交
286
      auto *tmp = sub_scopes[0]->Var(&tmp_name);
Y
Yu Yang 已提交
287 288

      for (size_t i = 1; i < sub_scopes.size(); ++i) {
Y
Yang Yang 已提交
289
        CopyOrShare(*sub_scopes[i]->FindVar(s), places[0], tmp);
Y
Yang Yang(Tony) 已提交
290
        WaitOnPlaces(places);
Y
Yang Yang 已提交
291

Y
Yang Yang 已提交
292
        auto sum_op = framework::OpRegistry::CreateOp(
Y
Yu Yang 已提交
293
            "sum", {{"X", {s, tmp_name}}}, {{"Out", {s}}},
Y
Yang Yang 已提交
294
            framework::AttributeMap{});
Y
Yang Yang(Tony) 已提交
295
        VLOG(10) << sum_op->DebugStringEx(sub_scopes[0]);
296
        sum_op->Run(*sub_scopes[0], places[0]);
Y
Yang Yang 已提交
297
        WaitOnPlace(places[0]);
Y
Yang Yang 已提交
298 299
      }

Y
Yang Yang 已提交
300
      CopyOrShare(*sub_scopes[0]->FindVar(s), place, scope.FindVar(s));
Y
Yang Yang 已提交
301
    }
Y
Yang Yang 已提交
302
    WaitOnPlaces(places);
Y
Yang Yang 已提交
303
  }
Y
Yang Yang 已提交
304 305
};

Y
Yu Yang 已提交
306 307 308 309 310 311 312
std::ostream &operator<<(std::ostream &sout,
                         const std::vector<std::string> &strs) {
  std::copy(strs.begin(), strs.end(),
            std::ostream_iterator<std::string>(sout, ","));
  return sout;
}

Y
Yang Yang 已提交
313 314 315 316 317
class ParallelDoGradOpDescMaker : public framework::SingleGradOpDescMaker {
 public:
  using framework::SingleGradOpDescMaker::SingleGradOpDescMaker;

 protected:
Y
Yang Yang 已提交
318 319
  virtual std::unique_ptr<framework::OpDesc> Apply() const {
    auto *grad = new framework::OpDesc();
Y
Yang Yang 已提交
320
    grad->SetType("parallel_do_grad");
Y
Yang Yang 已提交
321
    for (auto &input_param : this->InputNames()) {
Y
Yang Yang 已提交
322
      VLOG(3) << input_param;
Y
Yang Yang 已提交
323
      grad->SetInput(input_param, this->Input(input_param));
324 325 326 327
      if (input_param != kPlaces) {
        grad->SetOutput(framework::GradVarName(input_param),
                        this->InputGrad(input_param, false));
      }
Y
Yang Yang 已提交
328
    }
Y
Yang Yu 已提交
329 330 331 332 333 334 335 336 337 338 339
    auto *g_block = this->grad_block_[0];

    // All variable name that needed by gradient operators
    std::unordered_set<std::string> all_inputs_in_grad_blocks;

    for (size_t i = 0; i < g_block->OpSize(); ++i) {
      auto *op = g_block->Op(i);
      for (auto &var_name : op->InputArgumentNames()) {
        all_inputs_in_grad_blocks.insert(var_name);
      }
    }
Y
Yang Yang 已提交
340 341 342 343 344 345 346 347

    for (auto &output_param : this->OutputNames()) {
      if (output_param == kParallelScopes) {
        grad->SetInput(output_param, this->Output(output_param));
        grad->SetInput(framework::GradVarName(output_param),
                       this->Output(output_param));
      } else {
        grad->SetInput(output_param, this->Output(output_param));
Y
Yang Yu 已提交
348 349 350
        std::vector<std::string> og_names;
        for (auto &og_name : this->OutputGrad(output_param)) {
          if (all_inputs_in_grad_blocks.count(og_name) != 0) {
Y
Yang Yu 已提交
351 352
            // there are some gradient operators who need the OG. So make this
            // OG as an input of parallel.do
Y
Yang Yu 已提交
353 354
            og_names.push_back(og_name);
          }
Y
Yang Yu 已提交
355 356
          // else, there is no operator who need the OG. Do not use this OG as
          // an input
Y
Yang Yu 已提交
357 358
        }
        grad->SetInput(framework::GradVarName(output_param), og_names);
Y
Yang Yang 已提交
359 360 361 362 363
      }
    }
    grad->SetAttrMap(this->Attrs());
    grad->SetBlockAttr(kParallelBlock, *grad_block_[0]);

Y
Yang Yang 已提交
364
    return std::unique_ptr<framework::OpDesc>(grad);
Y
Yang Yang 已提交
365 366 367 368 369 370
  }
};

class ParallelDoGradOpShapeInference : public framework::InferShapeBase {
 public:
  void operator()(framework::InferShapeContext *ctx) const override {
Y
Yu Yang 已提交
371
    PADDLE_ENFORCE(ctx->HasInputs(kParameters));
Y
Yang Yang 已提交
372
    PADDLE_ENFORCE(ctx->HasInputs(kInputs));
Y
Yang Yang(Tony) 已提交
373
    PADDLE_ENFORCE(ctx->HasInputs(kOutputs));
Y
Yu Yang 已提交
374 375 376 377 378 379 380 381 382 383 384 385 386 387

    ctx->SetOutputsDim(framework::GradVarName(kParameters),
                       ctx->GetInputsDim(kParameters));

    auto i_dims = ctx->GetInputsDim(kInputs);
    auto ig_names = ctx->Outputs(framework::GradVarName(kInputs));

    for (size_t i = 0; i < ig_names.size(); ++i) {
      auto &ig_name = ig_names[i];
      if (ig_name == framework::kEmptyVarName) {
        continue;
      }

      ctx->SetDims({ig_name}, {i_dims[i]});
Y
Yang Yang 已提交
388
    }
Y
Yu Yang 已提交
389

Y
Yang Yang(Tony) 已提交
390 391 392 393 394 395 396 397
    auto p_dims = ctx->GetInputsDim(kParameters);
    auto pg_names = ctx->Outputs(framework::GradVarName(kParameters));
    for (size_t i = 0; i < pg_names.size(); ++i) {
      auto &pg_name = pg_names[i];
      if (pg_name == framework::kEmptyVarName) {
        continue;
      }
      ctx->SetDims({pg_name}, {p_dims[i]});
Y
Yang Yang 已提交
398
    }
Y
Yang Yang 已提交
399 400 401 402 403 404 405 406 407 408 409
  }
};

}  // namespace operators
}  // namespace paddle

REGISTER_OPERATOR(parallel_do, paddle::operators::ParallelDoOp,
                  paddle::operators::ParallelDoOpProtoMaker,
                  paddle::operators::ParallelDoGradOpDescMaker);
REGISTER_OPERATOR(parallel_do_grad, paddle::operators::ParallelDoGradOp,
                  paddle::operators::ParallelDoGradOpShapeInference);