parallel_do_op.cc 14.3 KB
Newer Older
Y
Yang Yang 已提交
1 2
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.

Y
Yang Yang 已提交
3 4 5
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Y
Yang Yang 已提交
6

Y
Yang Yang 已提交
7
    http://www.apache.org/licenses/LICENSE-2.0
Y
Yang Yang 已提交
8

Y
Yang Yang 已提交
9 10 11 12 13
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
Y
Yang Yang 已提交
14 15

#include <vector>
Y
Yang Yang 已提交
16

Y
Yang Yang 已提交
17 18
#include "paddle/framework/executor.h"
#include "paddle/framework/op_registry.h"
Y
Yang Yu 已提交
19
#include "paddle/framework/threadpool.h"
Y
Yang Yu 已提交
20
#include "paddle/operators/detail/safe_ref.h"
Y
Yang Yang 已提交
21 22 23 24

namespace paddle {
namespace operators {

Y
Yang Yu 已提交
25 26 27
static constexpr char kInputs[] = "inputs";
static constexpr char kParameters[] = "parameters";
static constexpr char kPlaces[] = "places";
Y
Yang Yang 已提交
28

Y
Yang Yu 已提交
29 30
static constexpr char kOutputs[] = "outputs";
static constexpr char kParallelScopes[] = "parallel_scopes";
Y
Yang Yang 已提交
31

Y
Yang Yu 已提交
32
static constexpr char kParallelBlock[] = "sub_block";
Y
Yang Yang 已提交
33
static constexpr char kUseNCCL[] = "use_nccl";
Y
Yang Yang 已提交
34

Y
Yang Yang 已提交
35
using LoDTensor = framework::LoDTensor;
Y
Yang Yang 已提交
36
using SelectedRows = framework::SelectedRows;
Y
Yang Yang 已提交
37

Y
Yu Yang 已提交
38 39
static void SplitTensorAndMoveTensorToScopes(
    const framework::Scope &scope, std::vector<framework::Scope *> *sub_scopes,
Y
Yang Yang 已提交
40 41
    const std::vector<platform::Place> &places,
    const std::vector<std::string> &names) {
Y
Yu Yang 已提交
42
  size_t num_sub_scopes = 0;
Y
Yang Yang 已提交
43
  for (auto &argu : names) {
Y
Yang Yu 已提交
44 45 46 47
    const auto &tensor =
        detail::Ref(scope.FindVar(argu),
                    "Cannot find variable %s in the parent scope", argu)
            .Get<LoDTensor>();
Y
Yang Yang 已提交
48 49 50
    auto lod_tensors = tensor.SplitLoDTensor(places);

    for (auto &lod : lod_tensors) {
Y
Yang Yang 已提交
51
      VLOG(3) << lod.dims();
Y
Yang Yang 已提交
52
    }
Y
Yu Yang 已提交
53 54 55 56 57 58 59 60 61 62 63 64
    if (num_sub_scopes == 0) {
      num_sub_scopes = lod_tensors.size();
    } else {
      PADDLE_ENFORCE_EQ(num_sub_scopes, lod_tensors.size());
    }
    PADDLE_ENFORCE_NE(num_sub_scopes, 0);
    if (sub_scopes->size() == 0) {
      sub_scopes->reserve(num_sub_scopes);
      for (size_t i = 0; i < num_sub_scopes; ++i) {
        sub_scopes->emplace_back(&scope.NewScope());
      }
    }
Y
Yang Yang 已提交
65

Y
Yu Yang 已提交
66
    for (size_t i = 0; i < lod_tensors.size(); ++i) {
Y
Yang Yu 已提交
67 68 69
      *detail::Ref(sub_scopes->at(i)->Var(argu),
                   "Cannot find variable in the sub-scope", argu)
           .GetMutable<LoDTensor>() = lod_tensors[i];
Y
Yang Yang 已提交
70 71 72 73
    }
  }
}

Y
Yang Yang 已提交
74 75 76
inline void CopyOrShare(const framework::Variable &src,
                        const platform::Place &dst_place,
                        framework::Variable *dst) {
Y
Yang Yang 已提交
77 78 79
  if (src.IsType<LoDTensor>()) {
    if (src.Get<LoDTensor>().place() == dst_place) {
      dst->GetMutable<LoDTensor>()->ShareDataWith(src.Get<LoDTensor>());
D
dzhwinter 已提交
80
      dst->GetMutable<LoDTensor>()->set_lod(src.Get<LoDTensor>().lod());
Y
Yang Yang 已提交
81 82
    } else {
      Copy(src.Get<LoDTensor>(), dst_place, dst->GetMutable<LoDTensor>());
D
dzhwinter 已提交
83
      framework::LoD lod(src.Get<LoDTensor>().lod());
D
dzhwinter 已提交
84 85
      lod.CopyToPeer(dst_place);
      dst->GetMutable<LoDTensor>()->set_lod(lod);
Y
Yang Yang 已提交
86 87 88 89 90
    }
  } else if (src.IsType<SelectedRows>()) {
    auto &src_sr = src.Get<SelectedRows>();
    auto *dst_sr = dst->GetMutable<SelectedRows>();
    dst_sr->set_height(src_sr.height());
Y
Yang Yang 已提交
91 92
    if (src_sr.value().place() == dst_place) {
      dst_sr->mutable_value()->ShareDataWith(src_sr.value());
D
dzhwinter 已提交
93
      dst_sr->set_rows(src_sr.rows());
Y
Yang Yang 已提交
94 95
    } else {
      Copy(src_sr.value(), dst_place, dst_sr->mutable_value());
D
dzhwinter 已提交
96
      framework::Vector<int64_t> lod(src_sr.rows());
D
dzhwinter 已提交
97 98
      lod.CopyToPeer(dst_place);
      dst_sr->set_rows(lod);
Y
Yang Yang 已提交
99
    }
Y
Yang Yang 已提交
100 101 102 103 104
  } else {
    PADDLE_THROW("Expect LoDTensor/SelectedRows, get %s", src.Type().name());
  }
}

Y
Yang Yang 已提交
105 106 107 108 109 110
void WaitOnPlace(const platform::Place place) {
  platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
  auto &dev_ctx = *pool.Get(place);
  dev_ctx.Wait();
}

111 112 113 114 115 116 117 118 119
void WaitOnPlaces(const std::vector<platform::Place> places) {
  platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();

  for (auto &place : places) {
    auto &dev_ctx = *pool.Get(place);
    dev_ctx.Wait();
  }
}

Y
Yang Yang 已提交
120
class ParallelDoOp : public framework::OperatorBase {
Y
Yang Yang 已提交
121 122 123 124 125
 public:
  ParallelDoOp(const std::string &type,
               const framework::VariableNameMap &inputs,
               const framework::VariableNameMap &outputs,
               const framework::AttributeMap &attrs)
Y
Yu Yang 已提交
126
      : framework::OperatorBase(type, inputs, outputs, attrs) {}
Y
Yang Yang 已提交
127 128

  void Run(const framework::Scope &scope,
Y
Yang Yang 已提交
129 130 131 132 133 134
           const platform::Place &place) const override {
    // get device context from pool
    platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
    auto &dev_ctx = *pool.Get(place);

    auto *block = Attr<framework::BlockDesc *>(kParallelBlock);
Y
Yang Yang 已提交
135
    auto *program = block->Program();
Y
Yang Yang 已提交
136

137
    auto &places = scope.FindVar(Input(kPlaces))->Get<platform::PlaceList>();
Y
Yang Yang 已提交
138

Y
Yang Yang 已提交
139 140 141
    auto &sub_scopes = *scope.FindVar(Output(kParallelScopes))
                            ->GetMutable<std::vector<framework::Scope *>>();

142
    // split input
Y
Yu Yang 已提交
143
    SplitTensorAndMoveTensorToScopes(scope, &sub_scopes, places,
Y
Yang Yang 已提交
144
                                     Inputs(kInputs));
Y
Yu Yang 已提交
145

146 147 148 149 150
    // copy parameter
    for (auto &param : Inputs(kParameters)) {
      PADDLE_ENFORCE(scope.FindVar(param)->IsType<LoDTensor>(),
                     "Only support parameter type as LoDTensor");
      auto &src = scope.FindVar(param)->Get<LoDTensor>();
Y
Yu Yang 已提交
151
      for (size_t i = 0; i < sub_scopes.size(); ++i) {
152 153 154 155
        auto &place = places[i];
        auto *sub_scope = sub_scopes[i];
        auto *dst = sub_scope->Var(param)->GetMutable<LoDTensor>();
        framework::Copy(src, place, dst);
D
dzhwinter 已提交
156 157 158
        framework::LoD lod(src.lod());
        lod.CopyToPeer(place);
        dst->set_lod(lod);
159 160 161
      }
    }
    WaitOnPlaces(places);
Y
Yang Yang 已提交
162

Y
Yang Yang 已提交
163
    //    PADDLE_ENFORCE_EQ(places.size(), sub_scopes.size());
Y
Yang Yu 已提交
164 165
    std::vector<std::future<void>> workers;
    workers.reserve(places.size());
Y
Yu Yang 已提交
166
    for (size_t place_idx = 0; place_idx < sub_scopes.size(); ++place_idx) {
Y
Yang Yang 已提交
167 168 169
      auto &place = places[place_idx];
      auto *cur_scope = sub_scopes[place_idx];

Y
Yang Yu 已提交
170 171
      workers.emplace_back(framework::Async([program, cur_scope, place, block] {
        framework::Executor executor(place);
Y
Yang Yang 已提交
172 173 174 175 176
        executor.Run(*program, cur_scope, block->ID(),
                     false /*create_local_scope*/);
      }));
    }
    for (auto &worker : workers) {
Y
Yang Yu 已提交
177
      worker.wait();
Y
Yang Yang 已提交
178
    }
179
    WaitOnPlaces(places);
Y
Yang Yang 已提交
180 181 182 183

    // merge output
    for (auto &o_name : Outputs(kOutputs)) {
      std::vector<const framework::LoDTensor *> lod_tensors;
Y
Yang Yu 已提交
184
      lod_tensors.reserve(sub_scopes.size());
Y
Yang Yang 已提交
185
      for (auto *sub_scope : sub_scopes) {
Y
Yang Yu 已提交
186
        lod_tensors.emplace_back(&sub_scope->FindVar(o_name)->Get<LoDTensor>());
Y
Yang Yang 已提交
187 188 189 190 191 192
      }

      auto *lod_tensor_to_be_merged =
          scope.FindVar(o_name)->GetMutable<LoDTensor>();
      lod_tensor_to_be_merged->MergeLoDTensor(lod_tensors, dev_ctx.GetPlace());
    }
193
    WaitOnPlaces(places);
Y
Yang Yang 已提交
194
  }
Y
Yang Yang 已提交
195 196 197 198
};

class ParallelDoOpProtoMaker : public framework::OpProtoAndCheckerMaker {
 public:
Y
Yang Yang 已提交
199
  ParallelDoOpProtoMaker(OpProto *proto, framework::OpAttrChecker *op_checker)
Y
Yang Yang 已提交
200 201 202 203 204 205
      : OpProtoAndCheckerMaker(proto, op_checker) {
    AddInput(kInputs, "").AsDuplicable();
    AddInput(kParameters, "").AsDuplicable();
    AddInput(kPlaces, "");
    AddOutput(kOutputs, "").AsDuplicable();
    AddOutput(kParallelScopes, "");
Y
Yang Yang 已提交
206
    AddAttr<framework::BlockDesc *>(kParallelBlock, "");
Y
Yang Yang 已提交
207 208
    AddAttr<bool>(kUseNCCL, "true if we use nccl on backward")
        .SetDefault(false);
Y
Yang Yang 已提交
209 210 211 212 213 214
    AddComment(R"DOC(
ParallelDo Operator.
)DOC");
  }
};

Y
Yu Yang 已提交
215
class ParallelDoGradOp : public framework::OperatorBase {
Y
Yang Yang 已提交
216 217 218 219 220
 public:
  ParallelDoGradOp(const std::string &type,
                   const framework::VariableNameMap &inputs,
                   const framework::VariableNameMap &outputs,
                   const framework::AttributeMap &attrs)
Y
Yu Yang 已提交
221
      : framework::OperatorBase(type, inputs, outputs, attrs) {}
Y
Yang Yang 已提交
222 223

  void Run(const framework::Scope &scope,
Y
Yang Yang 已提交
224 225
           const platform::Place &place) const override {
    auto *block = Attr<framework::BlockDesc *>(kParallelBlock);
Y
Yang Yang 已提交
226 227 228 229
    auto *program = block->Program();

    auto &sub_scopes = scope.FindVar(Input(kParallelScopes))
                           ->Get<std::vector<framework::Scope *>>();
230
    auto &places = scope.FindVar(Input(kPlaces))->Get<platform::PlaceList>();
Y
Yang Yang 已提交
231
    //    PADDLE_ENFORCE_EQ(places.size(), sub_scopes.size());
Y
Yang Yang 已提交
232 233

    // feed output@grad
Y
Yu Yang 已提交
234 235 236
    SplitTensorAndMoveTensorToScopes(
        scope, const_cast<std::vector<framework::Scope *> *>(&sub_scopes),
        places, Inputs(framework::GradVarName(kOutputs)));
237
    WaitOnPlaces(places);
Y
Yang Yang 已提交
238
    LOG(INFO) << "places " << places.size();
Y
Yang Yang 已提交
239 240

    // exe run
Y
Yang Yu 已提交
241
    std::vector<std::future<void>> workers;
Y
Yu Yang 已提交
242 243 244
    for (size_t i = 0; i < sub_scopes.size(); ++i) {
      auto &place = places[i];
      auto *cur_scope = sub_scopes[i];
Y
Yang Yang 已提交
245
      LOG(INFO) << place;
Y
Yang Yang 已提交
246 247

      // execute
Y
Yang Yu 已提交
248 249
      workers.emplace_back(framework::Async([program, cur_scope, place, block] {
        framework::Executor executor(place);
Y
Yang Yang 已提交
250 251 252 253
        executor.Run(*program, cur_scope, block->ID(),
                     false /*create_local_scope*/);
      }));
    }
Y
Yang Yang 已提交
254
    LOG(INFO) << "places " << places.size();
Y
Yang Yang 已提交
255
    for (auto &worker : workers) {
Y
Yang Yu 已提交
256
      worker.wait();
Y
Yang Yang 已提交
257
    }
258
    WaitOnPlaces(places);
Y
Yang Yang 已提交
259

Y
Yang Yang 已提交
260 261 262 263 264 265 266 267 268 269 270 271 272 273
    // NCCL allreduce op will be added by backward,
    // so no need to explicitly accumulate grad
    if (!(Attr<bool>(kUseNCCL))) {
      AccumulateGrad(scope, place, sub_scopes, places);
    } else {
      for (auto &place : places) {
        PADDLE_ENFORCE(platform::is_gpu_place(place),
                       "NCCL only supports cuda place");
      }
    }
    for (auto &s : Outputs(framework::GradVarName(kParameters))) {
      CopyOrShare(*sub_scopes[0]->FindVar(s), place, scope.FindVar(s));
    }
    WaitOnPlaces(places);
Y
Yang Yang 已提交
274 275 276 277 278 279
  }

  void AccumulateGrad(const framework::Scope &scope,
                      const platform::Place &place,
                      const std::vector<framework::Scope *> &sub_scopes,
                      const platform::PlaceList &places) const {
Y
Yang Yang 已提交
280
    for (auto &s : Outputs(framework::GradVarName(kParameters))) {
Y
Yang Yang(Tony) 已提交
281 282
      VLOG(3) << "Accumulating " << s;
      if (s == framework::kEmptyVarName) continue;
Y
Yang Yang 已提交
283
      std::string tmp_name;
Y
Yang Yang 已提交
284
      auto *tmp = sub_scopes[0]->Var(&tmp_name);
Y
Yu Yang 已提交
285 286

      for (size_t i = 1; i < sub_scopes.size(); ++i) {
Y
Yang Yang 已提交
287
        CopyOrShare(*sub_scopes[i]->FindVar(s), places[0], tmp);
Y
Yang Yang(Tony) 已提交
288
        WaitOnPlaces(places);
Y
Yang Yang 已提交
289

Y
Yang Yang 已提交
290
        auto sum_op = framework::OpRegistry::CreateOp(
Y
Yu Yang 已提交
291
            "sum", {{"X", {s, tmp_name}}}, {{"Out", {s}}},
Y
Yang Yang 已提交
292
            framework::AttributeMap{});
Y
Yang Yang(Tony) 已提交
293
        VLOG(10) << sum_op->DebugStringEx(sub_scopes[0]);
294
        sum_op->Run(*sub_scopes[0], places[0]);
Y
Yang Yang 已提交
295
        WaitOnPlace(places[0]);
Y
Yang Yang 已提交
296 297
      }

Y
Yang Yang 已提交
298
      CopyOrShare(*sub_scopes[0]->FindVar(s), place, scope.FindVar(s));
Y
Yang Yang 已提交
299
    }
Y
Yang Yang 已提交
300
    WaitOnPlaces(places);
Y
Yang Yang 已提交
301
  }
Y
Yang Yang 已提交
302 303
};

Y
Yu Yang 已提交
304 305 306 307 308 309 310
std::ostream &operator<<(std::ostream &sout,
                         const std::vector<std::string> &strs) {
  std::copy(strs.begin(), strs.end(),
            std::ostream_iterator<std::string>(sout, ","));
  return sout;
}

Y
Yang Yang 已提交
311 312 313 314 315
class ParallelDoGradOpDescMaker : public framework::SingleGradOpDescMaker {
 public:
  using framework::SingleGradOpDescMaker::SingleGradOpDescMaker;

 protected:
Y
Yang Yang 已提交
316 317
  virtual std::unique_ptr<framework::OpDesc> Apply() const {
    auto *grad = new framework::OpDesc();
Y
Yang Yang 已提交
318
    grad->SetType("parallel_do_grad");
Y
Yang Yang 已提交
319
    for (auto &input_param : this->InputNames()) {
Y
Yang Yang 已提交
320
      VLOG(3) << input_param;
Y
Yang Yang 已提交
321
      grad->SetInput(input_param, this->Input(input_param));
322 323 324 325
      if (input_param != kPlaces) {
        grad->SetOutput(framework::GradVarName(input_param),
                        this->InputGrad(input_param, false));
      }
Y
Yang Yang 已提交
326
    }
Y
Yang Yu 已提交
327 328 329 330 331 332 333 334 335 336 337
    auto *g_block = this->grad_block_[0];

    // All variable name that needed by gradient operators
    std::unordered_set<std::string> all_inputs_in_grad_blocks;

    for (size_t i = 0; i < g_block->OpSize(); ++i) {
      auto *op = g_block->Op(i);
      for (auto &var_name : op->InputArgumentNames()) {
        all_inputs_in_grad_blocks.insert(var_name);
      }
    }
Y
Yang Yang 已提交
338 339 340 341 342 343 344 345

    for (auto &output_param : this->OutputNames()) {
      if (output_param == kParallelScopes) {
        grad->SetInput(output_param, this->Output(output_param));
        grad->SetInput(framework::GradVarName(output_param),
                       this->Output(output_param));
      } else {
        grad->SetInput(output_param, this->Output(output_param));
Y
Yang Yu 已提交
346 347 348
        std::vector<std::string> og_names;
        for (auto &og_name : this->OutputGrad(output_param)) {
          if (all_inputs_in_grad_blocks.count(og_name) != 0) {
Y
Yang Yu 已提交
349 350
            // there are some gradient operators who need the OG. So make this
            // OG as an input of parallel.do
Y
Yang Yu 已提交
351 352
            og_names.push_back(og_name);
          }
Y
Yang Yu 已提交
353 354
          // else, there is no operator who need the OG. Do not use this OG as
          // an input
Y
Yang Yu 已提交
355 356
        }
        grad->SetInput(framework::GradVarName(output_param), og_names);
Y
Yang Yang 已提交
357 358 359 360 361
      }
    }
    grad->SetAttrMap(this->Attrs());
    grad->SetBlockAttr(kParallelBlock, *grad_block_[0]);

Y
Yang Yang 已提交
362
    return std::unique_ptr<framework::OpDesc>(grad);
Y
Yang Yang 已提交
363 364 365 366 367 368
  }
};

class ParallelDoGradOpShapeInference : public framework::InferShapeBase {
 public:
  void operator()(framework::InferShapeContext *ctx) const override {
Y
Yu Yang 已提交
369
    PADDLE_ENFORCE(ctx->HasInputs(kParameters));
Y
Yang Yang 已提交
370
    PADDLE_ENFORCE(ctx->HasInputs(kInputs));
Y
Yang Yang(Tony) 已提交
371
    PADDLE_ENFORCE(ctx->HasInputs(kOutputs));
Y
Yu Yang 已提交
372 373 374 375 376 377 378 379 380 381 382 383 384 385

    ctx->SetOutputsDim(framework::GradVarName(kParameters),
                       ctx->GetInputsDim(kParameters));

    auto i_dims = ctx->GetInputsDim(kInputs);
    auto ig_names = ctx->Outputs(framework::GradVarName(kInputs));

    for (size_t i = 0; i < ig_names.size(); ++i) {
      auto &ig_name = ig_names[i];
      if (ig_name == framework::kEmptyVarName) {
        continue;
      }

      ctx->SetDims({ig_name}, {i_dims[i]});
Y
Yang Yang 已提交
386
    }
Y
Yu Yang 已提交
387

Y
Yang Yang(Tony) 已提交
388 389 390 391 392 393 394 395
    auto p_dims = ctx->GetInputsDim(kParameters);
    auto pg_names = ctx->Outputs(framework::GradVarName(kParameters));
    for (size_t i = 0; i < pg_names.size(); ++i) {
      auto &pg_name = pg_names[i];
      if (pg_name == framework::kEmptyVarName) {
        continue;
      }
      ctx->SetDims({pg_name}, {p_dims[i]});
Y
Yang Yang 已提交
396
    }
Y
Yang Yang 已提交
397 398 399 400 401 402 403 404 405 406 407
  }
};

}  // namespace operators
}  // namespace paddle

REGISTER_OPERATOR(parallel_do, paddle::operators::ParallelDoOp,
                  paddle::operators::ParallelDoOpProtoMaker,
                  paddle::operators::ParallelDoGradOpDescMaker);
REGISTER_OPERATOR(parallel_do_grad, paddle::operators::ParallelDoGradOp,
                  paddle::operators::ParallelDoGradOpShapeInference);