reduce_op_handle.cc 11.3 KB
Newer Older
C
chengduoZH 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
//   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/framework/details/reduce_op_handle.h"
16

C
chengduoZH 已提交
17
#include "paddle/fluid/framework/details/container_cast.h"
C
chengduoZH 已提交
18
#include "paddle/fluid/framework/details/reduce_and_gather.h"
C
chengduoZH 已提交
19
#include "paddle/fluid/framework/details/variable_visitor.h"
20
#include "paddle/fluid/platform/profiler.h"
C
chengduoZH 已提交
21

22
PADDLE_DEFINE_EXPORTED_bool(
C
chengduo 已提交
23 24 25
    cpu_deterministic, false,
    "Whether to make the result of computation deterministic in CPU side.");

C
chengduoZH 已提交
26 27 28 29
namespace paddle {
namespace framework {
namespace details {

30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
std::once_flag CollectiveContext::init_flag_;
std::unique_ptr<CollectiveContext> CollectiveContext::context_;

static inline std::string GetRemoteVarName(const std::string &var_name,
                                           int trainer_id) {
  return string::Sprintf("%s_merged_tmp@trainer_%d", var_name, trainer_id);
}

void ReduceOpHandle::Wait(
    const std::map<platform::Place, platform::DeviceContext *> &dev_ctxes) {
  // TODO(gongwb): use event wait?
  for (auto &dev_ctx : dev_ctxes) {
    dev_ctx.second->Wait();
  }
}

C
chengduoZH 已提交
46
void ReduceOpHandle::RunImpl() {
47
  platform::RecordEvent record_event(Name());
Y
Yancey1989 已提交
48

C
chengduoZH 已提交
49
  if (places_.size() == 1) return;
C
chengduoZH 已提交
50
  // the input and output may have dummy var.
C
chengduoZH 已提交
51
  auto in_var_handles = DynamicCast<VarHandle>(inputs_);
C
chengduoZH 已提交
52 53 54

  PADDLE_ENFORCE_EQ(
      in_var_handles.size(), places_.size(),
55 56 57 58
      platform::errors::InvalidArgument(
          "The number of inputs should equal to the number of places, but got "
          "the number of inputs is %d and the number of places is %d.",
          in_var_handles.size(), places_.size()));
C
chengduoZH 已提交
59

C
chengduoZH 已提交
60 61 62 63
  VarHandle *out_var_handle;
  {
    auto out_var_handles = DynamicCast<VarHandle>(outputs_);

T
tensor-tang 已提交
64
    PADDLE_ENFORCE_EQ(out_var_handles.size(), 1UL,
65 66 67
                      platform::errors::InvalidArgument(
                          "The number of output should be one, but got %d.",
                          out_var_handles.size()));
C
chengduoZH 已提交
68 69
    out_var_handle = out_var_handles.front();
  }
C
chengduoZH 已提交
70

C
chengduoZH 已提交
71
  auto in_0_handle = in_var_handles[0];
C
chengduoZH 已提交
72

73
  auto &var_scopes = local_exec_scopes_;
C
chengduoZH 已提交
74 75

  auto pre_in_var =
G
gongweibao 已提交
76
      var_scopes.at(in_0_handle->scope_idx())->FindVar(in_0_handle->name());
77 78 79 80

  PADDLE_ENFORCE_NOT_NULL(pre_in_var, platform::errors::NotFound(
                                          "Variable %s is not found in scope.",
                                          in_0_handle->name()));
C
chengduoZH 已提交
81

C
chengduoZH 已提交
82
  // NOTE: The Places of all input tensor must be all on CPU or all on GPU.
C
chengduoZH 已提交
83
  std::vector<platform::Place> in_places;  // used to get dev_ctx
C
chengduoZH 已提交
84
  for (auto *in_handle : in_var_handles) {
G
gongweibao 已提交
85
    in_places.emplace_back(in_handle->place());
C
chengduoZH 已提交
86
    auto in_var =
G
gongweibao 已提交
87
        var_scopes.at(in_handle->scope_idx())->FindVar(in_handle->name());
88 89 90 91 92

    PADDLE_ENFORCE_NOT_NULL(
        in_var, platform::errors::NotFound("Variable %s is not found in scope.",
                                           in_handle->name()));

C
chengduoZH 已提交
93
    VariableVisitor::EnforceShapeAndDTypeEQ(*pre_in_var, *in_var);
C
chengduoZH 已提交
94
  }
C
chengduoZH 已提交
95

G
gongweibao 已提交
96 97
  auto out_var = var_scopes.at(out_var_handle->scope_idx())
                     ->FindVar(out_var_handle->name());
98 99 100 101

  PADDLE_ENFORCE_NOT_NULL(
      out_var, platform::errors::NotFound("Variable %s is not found in scope.",
                                          out_var_handle->name()));
C
chengduoZH 已提交
102

C
chengduoZH 已提交
103 104 105 106 107
  // NOTE: The tensors' Place of input and output must be all on GPU or all on
  // CPU.
  auto in_p = VariableVisitor::GetMutableTensor(pre_in_var).place();
  platform::Place t_out_p;
  if (platform::is_gpu_place(in_p)) {
108 109 110
    PADDLE_ENFORCE_EQ(platform::is_gpu_place(out_var_handle->place()), true,
                      platform::errors::PreconditionNotMet(
                          "Places of input and output must be all on GPU."));
G
gongweibao 已提交
111
    t_out_p = out_var_handle->place();
C
chengduoZH 已提交
112 113 114
  } else {
    t_out_p = platform::CPUPlace();
  }
C
chengduoZH 已提交
115

C
chengduoZH 已提交
116
  if (pre_in_var->IsType<framework::SelectedRows>()) {
117
    this->RunAndRecordEvent([&] {
118 119
      std::vector<const SelectedRows *> in_selected_rows =
          GetInputValues<SelectedRows>(in_var_handles, var_scopes);
120 121 122 123 124 125 126 127 128

      const CollectiveContext &collective_context =
          *CollectiveContext::GetInstance();
      VLOG(10) << "GatherSelectedRows CollectiveContext:"
               << collective_context.String();

      // TODO(gongwb): add cpu support
      if (collective_context.endpoints_.size() <= 1 ||
          is_cpu_place(in_places[0]) || is_cpu_place(t_out_p)) {
129 130 131 132 133
        GatherLocalSelectedRowsFunctor functor(
            in_selected_rows, in_places, dev_ctxes_, t_out_p,
            out_var->GetMutable<framework::SelectedRows>());
        WaitInputVarGenerated();
        functor();
134 135
        return;
      }
136
    });
C
chengduoZH 已提交
137
  } else {
C
chengduoZH 已提交
138 139
    std::vector<const LoDTensor *> lod_tensors =
        GetInputValues<LoDTensor>(in_var_handles, var_scopes);
C
chengduo 已提交
140

C
chengduoZH 已提交
141
    if (paddle::platform::is_cpu_place(lod_tensors[0]->place())) {
142
      WaitInputVarGenerated();
143
      this->RunAndRecordEvent([&] {
C
chengduo 已提交
144 145 146 147 148 149 150
        // FIXME(zcd): The order of summing is important,
        // especially when the type of data is float or double.
        // For example, the result of `a+b+c+d` may be different
        // with the result of `c+a+b+d`, so the summing order should be fixed.
        if (!FLAGS_cpu_deterministic) {
          ReduceLoDTensor func(lod_tensors,
                               out_var->GetMutable<framework::LoDTensor>());
Y
Yu Yang 已提交
151
          VisitDataType(lod_tensors[0]->type(), func);
C
chengduo 已提交
152 153 154
        } else {
          // We sum lod_tensors to reduce_sum_trg which is in local_scopes_0
          // here, but it doesn't mean reduce_sum_trg must be in local_scopes_0.
155
          auto &reduce_sum_trg = *this->local_exec_scopes_[0]
G
gongweibao 已提交
156
                                      ->FindVar(out_var_handle->name())
C
chengduo 已提交
157 158
                                      ->GetMutable<framework::LoDTensor>();
          ReduceLoDTensor func(lod_tensors, &reduce_sum_trg);
Y
Yu Yang 已提交
159
          VisitDataType(lod_tensors[0]->type(), func);
C
chengduo 已提交
160 161 162 163 164 165

          auto trg = out_var->GetMutable<framework::LoDTensor>();
          if (reduce_sum_trg.data<void>() != trg->data<void>()) {
            TensorCopy(reduce_sum_trg, platform::CPUPlace(), trg);
          }
        }
166
      });
C
chengduoZH 已提交
167
    } else if (paddle::platform::is_gpu_place(lod_tensors[0]->place())) {
168
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
C
chengduoZH 已提交
169 170 171
      auto pre_in = pre_in_var->Get<framework::LoDTensor>();
      VariableVisitor::ShareDimsAndLoD(*pre_in_var, out_var);
      VariableVisitor::GetMutableTensor(out_var).mutable_data(
G
gongweibao 已提交
172
          out_var_handle->place(), pre_in.type());
C
chengduoZH 已提交
173

G
gongweibao 已提交
174
      auto out_p = out_var_handle->place();
175
      int root_id = BOOST_GET_CONST(platform::CUDAPlace, out_p).device;
C
chengduoZH 已提交
176
      std::vector<std::function<void()>> all_reduce_calls;
C
chengduoZH 已提交
177
      for (size_t i = 0; i < var_scopes.size(); ++i) {
C
chengduoZH 已提交
178
        auto &p = in_places[i];
C
chengduoZH 已提交
179
        auto &lod_tensor = *lod_tensors[i];
C
chengduoZH 已提交
180

181
        int dev_id = BOOST_GET_CONST(platform::CUDAPlace, p).device;
C
chengduoZH 已提交
182
        auto &nccl_ctx = nccl_ctxs_->at(dev_id);
C
chengduoZH 已提交
183 184 185

        void *buffer = const_cast<void *>(lod_tensor.data<void>());
        void *recvbuffer = nullptr;
C
chengduoZH 已提交
186
        if (root_id == dev_id) {
C
chengduoZH 已提交
187 188
          recvbuffer =
              out_var->GetMutable<framework::LoDTensor>()->mutable_data(
G
gongweibao 已提交
189
                  out_var_handle->place());
C
chengduoZH 已提交
190 191
        }

C
chengduoZH 已提交
192
        int type = platform::ToNCCLDataType(lod_tensor.type());
C
chengduoZH 已提交
193 194 195
        size_t numel = static_cast<size_t>(lod_tensor.numel());
        all_reduce_calls.emplace_back(
            [buffer, recvbuffer, type, numel, root_id, &nccl_ctx] {
196
              PADDLE_ENFORCE_CUDA_SUCCESS(platform::dynload::ncclReduce(
C
chengduoZH 已提交
197 198 199
                  buffer, recvbuffer, numel, static_cast<ncclDataType_t>(type),
                  ncclSum, root_id, nccl_ctx.comm_, nccl_ctx.stream()));
            });
C
chengduoZH 已提交
200 201
      }

202
      WaitInputVarGenerated();
C
chengduoZH 已提交
203 204 205 206 207 208
      this->RunAndRecordEvent([&] {
        platform::NCCLGroupGuard guard;
        for (auto &call : all_reduce_calls) {
          call();
        }
      });
C
chengduoZH 已提交
209
#else
210 211
      PADDLE_THROW(
          platform::errors::PreconditionNotMet("Not compiled with CUDA."));
212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264
#endif
    } else if (paddle::platform::is_xpu_place(lod_tensors[0]->place())) {
#if defined(PADDLE_WITH_XPU_BKCL)
      auto pre_in = pre_in_var->Get<framework::LoDTensor>();
      VariableVisitor::ShareDimsAndLoD(*pre_in_var, out_var);
      VariableVisitor::GetMutableTensor(out_var).mutable_data(
          out_var_handle->place(), pre_in.type());

      auto out_p = out_var_handle->place();
      int root_id = BOOST_GET_CONST(platform::XPUPlace, out_p).device;
      std::vector<std::function<void()>> all_reduce_calls;
      for (size_t i = 0; i < var_scopes.size(); ++i) {
        auto &p = in_places[i];
        auto &lod_tensor = *lod_tensors[i];

        int dev_id = BOOST_GET_CONST(platform::XPUPlace, p).device;
        auto &bkcl_ctx = bkcl_ctxs_->at(dev_id);

        void *buffer = const_cast<void *>(lod_tensor.data<void>());
        void *recvbuffer = nullptr;
        if (root_id == dev_id) {
          recvbuffer =
              out_var->GetMutable<framework::LoDTensor>()->mutable_data(
                  out_var_handle->place());
        }

        int type = platform::ToBKCLDataType(lod_tensor.type());
        size_t numel = static_cast<size_t>(lod_tensor.numel());
        all_reduce_calls.emplace_back([buffer, recvbuffer, type, numel, root_id,
                                       &bkcl_ctx] {
          PADDLE_ENFORCE_EQ(bkcl_reduce(bkcl_ctx.comm(), buffer, recvbuffer,
                                        numel, static_cast<BKCLDataType>(type),
                                        BKCL_ADD, root_id, nullptr),
                            BKCL_SUCCESS, platform::errors::Unavailable(
                                              "bkcl_all_reduce failed"));
        });
      }

      WaitInputVarGenerated();
      this->RunAndRecordEvent([&] {
        PADDLE_ENFORCE_EQ(
            bkcl_group_start(), BKCL_SUCCESS,
            platform::errors::Unavailable("bkcl_group_start failed"));
        for (auto &call : all_reduce_calls) {
          call();
        }
        PADDLE_ENFORCE_EQ(
            bkcl_group_end(), BKCL_SUCCESS,
            platform::errors::Unavailable("bkcl_group_end failed"));
      });
#else
      PADDLE_THROW(
          platform::errors::PreconditionNotMet("Not compiled with XPU."));
C
chengduoZH 已提交
265 266
#endif
    } else {
267
      PADDLE_THROW(platform::errors::InvalidArgument(
268 269
          "The place of tensor should be CPUPlace, CUDAPlace or XPUPlace, but "
          "got %s.",
270
          lod_tensors[0]->place()));
C
chengduoZH 已提交
271 272 273
    }
  }
}
C
chengduoZH 已提交
274

C
chengduoZH 已提交
275 276 277
template <typename T>
std::vector<const T *> ReduceOpHandle::GetInputValues(
    const std::vector<VarHandle *> &in_var_handles,
278
    const std::vector<Scope *> &var_scopes) const {
C
chengduoZH 已提交
279 280
  std::vector<const T *> in_selected_rows;
  for (auto *in_handle : in_var_handles) {
G
gongweibao 已提交
281 282
    auto &in_sr = var_scopes.at(in_handle->scope_idx())
                      ->FindVar(in_handle->name())
C
chengduoZH 已提交
283 284
                      ->Get<T>();
    in_selected_rows.emplace_back(&in_sr);
C
chengduoZH 已提交
285
  }
C
chengduoZH 已提交
286
  return in_selected_rows;
C
chengduoZH 已提交
287 288
}

C
chengduoZH 已提交
289 290 291 292
std::string ReduceOpHandle::Name() const { return "reduce"; }
}  // namespace details
}  // namespace framework
}  // namespace paddle