imperative.cc 60.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include "paddle/fluid/pybind/imperative.h"
16

17
#include <Python.h>
18 19 20 21 22
// Avoid a problem with copysign defined in pyconfig.h on Windows.
#ifdef copysign
#undef copysign
#endif

23 24 25 26
#include <pybind11/chrono.h>
#include <pybind11/complex.h>
#include <pybind11/functional.h>
#include <pybind11/stl.h>
27

28
#include <algorithm>
29
#include <memory>
30
#include <set>
J
Jiabin Yang 已提交
31
#include <string>
32
#include <unordered_map>
33
#include <unordered_set>
34
#include <utility>
J
Jiabin Yang 已提交
35
#include <vector>
36

J
Jiabin Yang 已提交
37
#include "paddle/fluid/eager/api/all.h"
38
#include "paddle/fluid/framework/convert_utils.h"
39
#include "paddle/fluid/framework/scope_guard.h"
40
#include "paddle/fluid/imperative/all_reduce.h"
41
#include "paddle/fluid/imperative/amp_auto_cast.h"
42
#include "paddle/fluid/imperative/basic_engine.h"
43
#include "paddle/fluid/imperative/bkcl_context.h"
44
#include "paddle/fluid/imperative/data_loader.h"
45
#include "paddle/fluid/imperative/gloo_context.h"
K
kuizhiqing 已提交
46
#include "paddle/fluid/imperative/heter_ccl_context.h"
47
#include "paddle/fluid/imperative/hooks.h"
48
#include "paddle/fluid/imperative/layer.h"
J
Jiabin Yang 已提交
49
#include "paddle/fluid/imperative/nccl_context.h"
50
#include "paddle/fluid/imperative/partial_grad_engine.h"
51
#include "paddle/fluid/imperative/profiler.h"
52
#include "paddle/fluid/imperative/reducer.h"
53
#include "paddle/fluid/imperative/tracer.h"
M
minqiyang 已提交
54
#include "paddle/fluid/imperative/type_defs.h"
55
#include "paddle/fluid/imperative/xccl_context.h"
56
#include "paddle/fluid/memory/allocation/mmap_allocator.h"
57
#include "paddle/fluid/operators/utils.h"
L
Leo Chen 已提交
58
#include "paddle/fluid/pybind/cuda_streams_py.h"
59
#include "paddle/fluid/pybind/eager_utils.h"
60
#include "paddle/fluid/pybind/pybind_variant_caster.h"
J
Jiabin Yang 已提交
61
#include "paddle/fluid/pybind/slice_utils.h"
L
Leo Chen 已提交
62
#include "paddle/fluid/pybind/tensor_py.h"
63
#include "paddle/fluid/pybind/uva_utils.h"
64
#include "paddle/phi/core/compat/arg_map_context.h"
65
#include "paddle/phi/core/type_defs.h"
66

67
PHI_DECLARE_bool(set_to_1d);
68 69 70
namespace paddle {
namespace pybind {

71
std::atomic<int> VarBaseUniqueNameID{0};
72

73 74
namespace py = ::pybind11;

75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
template <typename T>
static T PyObjectCast(PyObject *obj) {
  try {
    return py::cast<T>(py::handle(obj));
  } catch (py::cast_error &) {
    PADDLE_THROW(platform::errors::InvalidArgument(
        "Python object is not type of %s", typeid(T).name()));
  }
}

class PyVariableWrapperHook : public imperative::VariableWrapperHook {
 public:
  explicit PyVariableWrapperHook(PyObject *func) : py_func_(func) {
    Py_INCREF(py_func_);
  }

91
  ~PyVariableWrapperHook() override {
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
    py::gil_scoped_acquire gil;
    Py_DECREF(py_func_);
  }

  std::shared_ptr<imperative::VariableWrapper> operator()(
      const std::shared_ptr<imperative::VariableWrapper> &var) override {
    py::gil_scoped_acquire gil;
    VLOG(3) << "Call PyVariableWrapperHook for var " << var->Name();

    // 1. unpack temp VarBase from VariableWrapper
    std::shared_ptr<imperative::VarBase> tmp_varbase =
        std::make_shared<imperative::VarBase>(var);

    // 2. call hook and return
    PyObject *res = nullptr;
    try {
108 109
      res = PyObject_CallFunctionObjArgs(
          py_func_, py::cast(tmp_varbase).ptr(), nullptr);
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
    } catch (platform::EnforceNotMet &e) {
      throw std::move(e);
    } catch (std::exception &e) {
      PADDLE_THROW(platform::errors::Unavailable(
          "Hook function of Tensor raises an exception: %s.", e.what()));
    } catch (...) {
      PADDLE_THROW(platform::errors::Fatal(
          "Hook function of Tensor raises an unknown exception."));
    }

    PADDLE_ENFORCE_NOT_NULL(res,
                            platform::errors::Unavailable(
                                "Hook function of Tensor return a nullptr."));
    if (res == Py_None) {
      return var;
    }

C
Chen Weihang 已提交
127 128 129 130 131
    auto res_varbase = PyObjectCast<std::shared_ptr<imperative::VarBase>>(res);
    // Here the reference count of `res` is 2, so we decreases the reference
    // count manually to avoid memory leaks
    Py_DECREF(res);
    return res_varbase->SharedVar();
132 133 134 135 136 137
  }

 private:
  PyObject *py_func_;
};

L
Leo Chen 已提交
138 139 140 141 142
static const platform::Place PyObjectToPlace(const py::object &place_obj) {
  if (py::isinstance<platform::CPUPlace>(place_obj)) {
    return place_obj.cast<platform::CPUPlace>();
  } else if (py::isinstance<platform::CUDAPlace>(place_obj)) {
    return place_obj.cast<platform::CUDAPlace>();
143 144
  } else if (py::isinstance<platform::XPUPlace>(place_obj)) {
    return place_obj.cast<platform::XPUPlace>();
L
Leo Chen 已提交
145 146
  } else if (py::isinstance<platform::CUDAPinnedPlace>(place_obj)) {
    return place_obj.cast<platform::CUDAPinnedPlace>();
147 148
  } else if (py::isinstance<platform::IPUPlace>(place_obj)) {
    return place_obj.cast<platform::IPUPlace>();
149 150
  } else if (py::isinstance<platform::Place>(place_obj)) {
    return place_obj.cast<platform::Place>();
151 152
  } else if (py::isinstance<platform::CustomPlace>(place_obj)) {
    return place_obj.cast<platform::CustomPlace>();
L
Leo Chen 已提交
153 154
  } else {
    PADDLE_THROW(platform::errors::InvalidArgument(
155
        "Place should be one of "
张春乔 已提交
156
        "Place/CPUPlace/XPUPlace/CUDAPlace/CUDAPinnedPlace/IPUPlace/"
张春乔 已提交
157
        "CustomPlace"));
L
Leo Chen 已提交
158 159 160
  }
}

L
Leo Chen 已提交
161
// only initialize varbase, but not its tensor.
162 163 164 165
static void InitVarBaseOnly(imperative::VarBase *self,
                            const std::string &name,
                            bool persistable = false,
                            int stop_gradient = -1) {
166 167 168 169
  auto name_ = name.empty()
                   ? imperative::GetCurrentTracer()->GenerateUniqueName(
                         "generated_tensor")
                   : name;
L
Leo Chen 已提交
170 171 172

  VLOG(5) << "Init Tensor as: / name: " << name_
          << " / persistable: " << persistable
173
          << " / stop_gradient: " << stop_gradient;
L
Leo Chen 已提交
174 175 176 177 178 179 180 181 182
  new (self) imperative::VarBase(name_);
  if (stop_gradient != -1) {
    self->SetOverridedStopGradient(stop_gradient);
  }
  self->SetPersistable(persistable);
  self->SetType(framework::proto::VarType::LOD_TENSOR);
}

// initialize varbase and its tensor.
183 184 185 186 187 188 189
static void InitVarBaseAndTensor(imperative::VarBase *self,
                                 const py::array &array,
                                 const platform::Place &place,
                                 const std::string &name,
                                 bool persistable = false,
                                 bool zero_copy = false,
                                 int stop_gradient = -1) {
L
Leo Chen 已提交
190
  InitVarBaseOnly(self, name, persistable, stop_gradient);
191
  auto *tensor = self->MutableVar()->GetMutable<phi::DenseTensor>();
L
Leo Chen 已提交
192
  VLOG(4) << "zero_copy: " << zero_copy;
L
Leo Chen 已提交
193
  if (platform::is_cpu_place(place)) {
194
    SetTensorFromPyArray<platform::CPUPlace>(tensor, array, place, zero_copy);
195
  } else if (platform::is_xpu_place(place)) {
196
    SetTensorFromPyArray<platform::XPUPlace>(tensor, array, place, zero_copy);
L
Leo Chen 已提交
197
  } else if (platform::is_gpu_place(place)) {
198
    SetTensorFromPyArray<platform::CUDAPlace>(tensor, array, place, zero_copy);
L
Leo Chen 已提交
199
  } else if (platform::is_cuda_pinned_place(place)) {
200 201
    SetTensorFromPyArray<platform::CUDAPinnedPlace>(
        tensor, array, place, zero_copy);
202 203
  } else if (platform::is_ipu_place(place)) {
    SetTensorFromPyArray<platform::IPUPlace>(tensor, array, place, zero_copy);
204
  } else if (platform::is_custom_place(place)) {
205 206
    SetTensorFromPyArray<platform::CustomPlace>(
        tensor, array, place, zero_copy);
207
  } else {
L
Leo Chen 已提交
208
    PADDLE_THROW(platform::errors::InvalidArgument(
209
        "Place should be one of "
张春乔 已提交
210
        "CPUPlace/XPUPlace/CUDAPlace/CUDAPinnedPlace/IPUPlace/"));
J
Jiabin Yang 已提交
211
  }
212
  self->SetDataType(framework::TransToProtoVarType(tensor->dtype()));
213 214 215 216
}

static void InitVarBaseFromNumpyWithKwargs(imperative::VarBase *self,
                                           const py::kwargs &kwargs) {
217
  VLOG(4) << "Init VarBase from kwargs: ";
L
Leo Chen 已提交
218 219 220 221 222 223
  auto persistable = kwargs.contains("persistable")
                         ? kwargs["persistable"].cast<bool>()
                         : false;
  auto zero_copy =
      kwargs.contains("zero_copy") ? kwargs["zero_copy"].cast<bool>() : false;
  auto name = kwargs.contains("name") ? kwargs["name"].cast<std::string>() : "";
224 225 226
  auto stop_gradient = kwargs.contains("stop_gradient")
                           ? kwargs["stop_gradient"].cast<int>()
                           : -1;
L
Leo Chen 已提交
227
  auto default_place = imperative::GetCurrentTracer()->ExpectedPlace();
L
Leo Chen 已提交
228 229 230 231 232 233 234

  if (kwargs.contains("value")) {
    auto array = kwargs["value"].cast<py::array>();
    // place is only used when array is given, otherwise, it is meaningless and
    // ignored
    auto place = kwargs.contains("place") ? PyObjectToPlace(kwargs["place"])
                                          : default_place;
235 236
    InitVarBaseAndTensor(
        self, array, place, name, persistable, zero_copy, stop_gradient);
L
Leo Chen 已提交
237 238 239
  } else {
    InitVarBaseOnly(self, name, persistable, stop_gradient);
  }
240
}
241

242 243
template <typename P>
static void InitVarBaseFromNumpyWithArg(imperative::VarBase *self,
244 245
                                        const py::array &array,
                                        const P &place,
L
Leo Chen 已提交
246 247
                                        bool persistable = false,
                                        bool zero_copy = false,
248 249 250 251 252
                                        std::string name = "",
                                        int stop_gradient = -1) {
  VLOG(4) << "Init VarBase from Arg: ";
  // 0: self, 1: value, 2: place, 3: persistable, 4: zero_copy, 5: name , 6:
  // stop_gradient
253
  if (name.empty()) {
254 255
    name =
        imperative::GetCurrentTracer()->GenerateUniqueName("generated_tensor");
L
Leo Chen 已提交
256
  }
257 258
  VLOG(5) << "Init Tensor as: / name: " << name
          << " / persistable: " << persistable << " / zero_copy: " << zero_copy
259
          << " / stop_gradient: " << stop_gradient << " / at " << place;
L
Leo Chen 已提交
260
  new (self) imperative::VarBase(name);
261
  self->SetPersistable(persistable);
262
  auto *tensor = self->MutableVar()->GetMutable<phi::DenseTensor>();
263 264 265
  if (stop_gradient != -1) {
    self->SetOverridedStopGradient(stop_gradient);
  }
266 267
  SetTensorFromPyArray<P>(tensor, array, place, zero_copy);
  self->SetType(framework::proto::VarType::LOD_TENSOR);
268
  self->SetDataType(framework::TransToProtoVarType(tensor->dtype()));
269 270 271
}

static void InitVarBaseFromNumpyWithArgDefault(imperative::VarBase *self,
L
Leo Chen 已提交
272 273
                                               const py::array &array) {
  auto place = imperative::GetCurrentTracer()->ExpectedPlace();
274
  VLOG(4) << "Init VarBase from numpy at " << place;
L
Leo Chen 已提交
275
  InitVarBaseAndTensor(self, array, place, "");
276
}
277

B
Baibaifan 已提交
278
static void InitVarBaseFromTensorWithArgDefault(imperative::VarBase *self,
279
                                                const phi::DenseTensor &tensor,
B
Baibaifan 已提交
280
                                                const std::string &name) {
281 282
  VLOG(4) << "Init VarBase";
  auto place = imperative::GetCurrentTracer()->ExpectedPlace();
283 284 285 286
  auto name_ = name.empty()
                   ? imperative::GetCurrentTracer()->GenerateUniqueName(
                         "generated_tensor")
                   : name;
B
Baibaifan 已提交
287
  new (self) imperative::VarBase(name_);
288 289
  self->SetPersistable(false);
  self->SetType(framework::proto::VarType::LOD_TENSOR);
290
  self->SetDataType(framework::TransToProtoVarType(tensor.dtype()));
291
  auto *new_tensor = self->MutableVar()->GetMutable<phi::DenseTensor>();
C
cyberslack_lee 已提交
292
  // Same place, share data directly
293 294 295 296 297 298 299 300 301
  if (place == tensor.place()) {
    new_tensor->ShareDataWith(tensor);
    VLOG(4) << "Same place, do ShareDataWith";
  } else {
    framework::TensorCopy(tensor, place, new_tensor);
    VLOG(4) << "Different place, do TensorCopy";
  }
}

302 303
template <typename P>
static void InitVarBaseFromTensorWithArg(imperative::VarBase *self,
304
                                         const phi::DenseTensor &tensor,
B
Baibaifan 已提交
305 306
                                         const P &place,
                                         const std::string &name) {
307
  VLOG(4) << "Init VarBase";
308 309 310 311
  auto name_ = name.empty()
                   ? imperative::GetCurrentTracer()->GenerateUniqueName(
                         "generated_tensor")
                   : name;
B
Baibaifan 已提交
312
  new (self) imperative::VarBase(name_);
313 314
  self->SetPersistable(false);
  self->SetType(framework::proto::VarType::LOD_TENSOR);
315
  self->SetDataType(framework::TransToProtoVarType(tensor.dtype()));
316
  auto *new_tensor = self->MutableVar()->GetMutable<phi::DenseTensor>();
C
cyberslack_lee 已提交
317
  // Same place, share data directly
318 319 320 321 322 323 324 325 326
  if (platform::is_same_place(place, tensor.place())) {
    new_tensor->ShareDataWith(tensor);
    VLOG(4) << "Same place, do ShareDataWith";
  } else {
    framework::TensorCopy(tensor, place, new_tensor);
    VLOG(4) << "Different place, do TensorCopy";
  }
}

327 328 329 330 331
static std::string GetTypeName(const imperative::VarBase &var) {
  if (var.Type() == framework::proto::VarType::RAW) {
    return "RAW";
  } else if (!var.Var().IsInitialized()) {
    return "nullptr";
332
  } else {
333
    return framework::ToTypeName(var.Var().Type());
334 335
  }
}
L
Leo Chen 已提交
336

J
Jiabin Yang 已提交
337 338 339 340 341 342
Py_ssize_t GetSliceIndexFromPyObject(PyObject *obj) {
  if (py::isinstance<imperative::VarBase>(obj)) {
    VLOG(6) << "Call GetSliceIndexFromTensor in Imperative";
    return GetSliceIndexFromTensor(
        py::cast<std::shared_ptr<imperative::VarBase>>(obj)
            ->Var()
343
            .Get<phi::DenseTensor>());
J
Jiabin Yang 已提交
344 345
  } else {
    PADDLE_THROW(platform::errors::InvalidArgument(
346
        "We should only get paddle::Tensor or VarBase in this "
J
Jiabin Yang 已提交
347 348 349 350
        "method, when you reach this means we got another type index."));
  }
}

351
using PyNameVarBaseMap = std::unordered_map<std::string, py::handle>;
352 353 354 355 356 357 358 359 360 361 362 363 364

// NOTE(zjl): py::handle is a very light wrapper of PyObject *.
// Unlike py::object, py::handle does not change reference count of PyObject *.
static std::vector<std::shared_ptr<imperative::VarBase>>
GetVarBaseListFromPyHandle(const py::handle &handle) {
  PyObject *py_obj = handle.ptr();  // get underlying PyObject
  // Python None is not nullptr in C++!
  if (!py_obj || py_obj == Py_None) {
    return {};
  }

  std::vector<std::shared_ptr<imperative::VarBase>> result;

365
  if (PyList_Check(py_obj)) {  // List of VarBase
366 367 368
    size_t len = PyList_GET_SIZE(py_obj);
    result.reserve(len);
    for (size_t i = 0; i < len; ++i) {
369 370 371
      PyObject *py_ivar = PyList_GET_ITEM(py_obj, i);
      PADDLE_ENFORCE_NOT_NULL(
          py_ivar, platform::errors::InvalidArgument("Python Object is NULL"));
372 373 374
      result.emplace_back(
          PyObjectCast<std::shared_ptr<imperative::VarBase>>(py_ivar));
    }
375
  } else if (PyTuple_Check(py_obj)) {  // Tuple of VarBase
376 377 378
    size_t len = PyTuple_GET_SIZE(py_obj);
    result.reserve(len);
    for (size_t i = 0; i < len; ++i) {
379 380 381
      PyObject *py_ivar = PyTuple_GET_ITEM(py_obj, i);
      PADDLE_ENFORCE_NOT_NULL(
          py_ivar, platform::errors::InvalidArgument("Python Object is NULL"));
382 383 384
      result.emplace_back(
          PyObjectCast<std::shared_ptr<imperative::VarBase>>(py_ivar));
    }
385 386 387
  } else {  // VarBase
    result.emplace_back(
        PyObjectCast<std::shared_ptr<imperative::VarBase>>(py_obj));
388 389 390 391
  }

  return result;
}
392

J
Jiabin Yang 已提交
393 394 395
static imperative::NameVarBaseMap ConvertToNameVarBaseMap(
    const PyNameVarBaseMap &map) {
  imperative::NameVarBaseMap result;
396 397 398 399 400 401
  for (auto &pair : map) {
    auto var_vec = GetVarBaseListFromPyHandle(pair.second);
    if (!var_vec.empty()) {
      result.emplace(pair.first, std::move(var_vec));
    }
  }
J
Jiabin Yang 已提交
402

403
  PADDLE_ENFORCE_EQ(
404 405
      PyErr_Occurred(),
      nullptr,
406
      platform::errors::InvalidArgument(py::str(py::handle(PyErr_Occurred()))));
407 408 409
  return result;
}

410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426
paddle::imperative::NameTensorMap ConvertToNameTensorMap(
    const PyNameVarBaseMap &map) {
  paddle::imperative::NameTensorMap result;
  for (auto &pair : map) {
    auto var_vec = CastPyArg2VectorOfTensor(pair.second.ptr(), 0);
    if (!var_vec.empty()) {
      // change vector<Tensor> -> vector<shared_ptr<Tensor>>
      std::vector<std::shared_ptr<egr::EagerVariable>> dst_var_vec;
      for (auto &v : var_vec) {
        dst_var_vec.emplace_back(
            std::make_shared<egr::EagerVariable>(std::move(v)));
      }
      result.emplace(pair.first, std::move(dst_var_vec));
    }
  }

  PADDLE_ENFORCE_EQ(
427 428
      PyErr_Occurred(),
      nullptr,
429 430 431 432
      platform::errors::InvalidArgument(py::str(py::handle(PyErr_Occurred()))));
  return result;
}

433
template <typename P>
434 435
static void VarBaseCopy(std::shared_ptr<imperative::VarBase> &src,  // NOLINT
                        imperative::VarBase &dst,                   // NOLINT
436 437
                        const P &dst_device,
                        const bool blocking) {
438 439 440 441 442 443 444 445
  if (dst.SharedVar()->IsEmpty()) {
    VLOG(3) << "deep copy Variable from " << src->Name() << " to "
            << dst.Name();
    dst.SetPersistable(src->Persistable());
    dst.SetDataType(src->DataType());
    dst.SetType(src->Type());
    dst.SetOverridedStopGradient(src->OverridedStopGradient());
    if (!src->SharedVar()->IsEmpty()) {
446 447 448
      if (src->Var().IsType<phi::DenseTensor>()) {
        auto &src_tensor = src->Var().Get<phi::DenseTensor>();
        auto *dst_tensor = dst.MutableVar()->GetMutable<phi::DenseTensor>();
449 450 451 452 453 454 455 456 457
        dst_tensor->set_lod(src_tensor.lod());
        framework::TensorCopy(src_tensor, dst_device, dst_tensor);
        if (blocking) {
          platform::DeviceContextPool::Instance().Get(dst_device)->Wait();
          auto src_device = src_tensor.place();
          if (!(src_device == dst_device)) {
            platform::DeviceContextPool::Instance().Get(src_device)->Wait();
          }
        }
458 459
      } else if (src->Var().IsType<phi::SelectedRows>()) {
        auto &src_selected_rows = src->Var().Get<phi::SelectedRows>();
460
        auto *dst_selected_rows =
461
            dst.MutableVar()->GetMutable<phi::SelectedRows>();
462 463
        dst_selected_rows->set_height(src_selected_rows.height());
        dst_selected_rows->set_rows(src_selected_rows.rows());
464 465
        framework::TensorCopy(src_selected_rows.value(),
                              dst_device,
466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485
                              dst_selected_rows->mutable_value());
        if (blocking) {
          platform::DeviceContextPool::Instance().Get(dst_device)->Wait();
          auto src_device = src_selected_rows.value().place();
          if (!(src_device == dst_device)) {
            platform::DeviceContextPool::Instance().Get(src_device)->Wait();
          }
        }
      }

      if (!blocking) {
        IncreaseVarbaseReferenceCountUntilCopyComplete(src, dst_device);
      }

    } else {
      PADDLE_THROW(platform::errors::InvalidArgument(
          "The source Tensor(%s) can not copy when it is empty.", src->Name()));
    }
  } else {
    PADDLE_THROW(platform::errors::InvalidArgument(
C
co63oc 已提交
486
        "The destination Tensor(%s) can not copy when it is not empty.",
487 488 489 490
        dst.Name()));
  }
}

491
// Bind Methods
J
Jiabin Yang 已提交
492
void BindImperative(py::module *m_ptr) {
493 494
  auto &m = *m_ptr;

495 496
#ifndef _WIN32
  // Dygraph DataLoader signal handler
497 498
  m.def("_set_process_pids", [](int64_t key, py::object &obj) {
    PADDLE_ENFORCE_EQ(
499 500
        py::isinstance<py::tuple>(obj) || py::isinstance<py::list>(obj),
        true,
501 502 503 504 505 506
        platform::errors::InvalidArgument(
            "The subprocess ids set in DataLoader is illegal."
            "Expected data type is tuple or list, but received %s",
            obj.get_type()));
    py::list pids = py::cast<py::list>(obj);
    std::set<pid_t> pids_set = {};
507 508
    for (auto &&pid : pids) {
      pids_set.insert(pid.cast<pid_t>());
509 510
    }
    imperative::SetLoadProcessPIDs(key, pids_set);
511
  });
512 513
  m.def("_erase_process_pids",
        [](int64_t key) { imperative::EraseLoadProcessPIDs(key); });
514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530
  m.def("_set_process_signal_handler",
        []() { imperative::SetLoadProcessSignalHandler(); });
  m.def("_throw_error_if_process_failed",
        []() { imperative::ThrowErrorIfLoadProcessFailed(); });
  // Dygraph DataLoader reader process & thread related functions
  m.def(
      "_convert_to_tensor_list",
      [](py::object &obj) -> py::list {
        // 0. input data check
        PADDLE_ENFORCE(
            py::isinstance<py::tuple>(obj) || py::isinstance<py::list>(obj),
            platform::errors::InvalidArgument(
                "The batch data read into DataLoader is illegal."
                "Expected data type is tuple or list, but received %s",
                obj.get_type()));
        py::list batch = py::cast<py::list>(obj);
        py::list tensors;
531
        for (auto &&item : batch) {
532
          // 1. cast to python array
533
          auto array = item.cast<py::array>();
534
          PADDLE_ENFORCE_NE(
535 536
              string::Sprintf("%s", array.dtype()).compare("object"),
              0,
537
              platform::errors::InvalidArgument(
538
                  "Failed to convert input data to a regular ndarray.\n  * "
539 540 541 542
                  "Usually this means the input data contains nested "
                  "lists with different lengths.\n  * Check the reader "
                  "function passed to 'set_(sample/sample_list/batch)"
                  "_generator' to locate the data causes this issue."));
C
co63oc 已提交
543
          // 2. construct LoDTensor
544
          phi::DenseTensor t;
545 546
          SetTensorFromPyArray<platform::CPUPlace>(
              &t, array, platform::CPUPlace(), true);
547
          // 3. allocate shared memory
548
          void *data_ptr = t.data();
549
          size_t data_size = t.numel() * phi::SizeOf(t.dtype());
550 551 552 553 554 555
          auto shared_writer_holder =
              memory::allocation::AllocateMemoryMapWriterAllocation(data_size);
          // 4. maintain mmap fd set & backup ipc_name
          const std::string &ipc_name = shared_writer_holder->ipc_name();
          memory::allocation::MemoryMapFdSet::Instance().Insert(ipc_name);
          // 5. copy data & reset holder
556 557 558 559 560
          memory::Copy(platform::CPUPlace(),
                       shared_writer_holder->ptr(),
                       platform::CPUPlace(),
                       data_ptr,
                       data_size);
561 562 563 564 565 566 567 568
          t.ResetHolder(shared_writer_holder);
          // 6. append to result list
          tensors.append(t);
        }
        return tensors;
      },
      py::return_value_policy::take_ownership);

569 570 571 572 573 574
  m.def(
      "_array_to_share_memory_tensor",
      [](py::object &obj) {
        // 1. cast to python array
        auto array = obj.cast<py::array>();
        PADDLE_ENFORCE_NE(
575 576
            string::Sprintf("%s", array.dtype()).compare("object"),
            0,
577
            platform::errors::InvalidArgument(
578
                "Failed to convert input data to a regular ndarray.\n  * "
579 580 581 582
                "Usually this means the input data contains nested "
                "lists with different lengths.\n  * Check the reader "
                "function passed to 'set_(sample/sample_list/batch)"
                "_generator' to locate the data causes this issue."));
C
co63oc 已提交
583
        // 2. construct LoDTensor
584
        phi::DenseTensor t;
585 586
        SetTensorFromPyArray<platform::CPUPlace>(
            &t, array, platform::CPUPlace(), true);
587 588
        // 3. allocate shared memory
        void *data_ptr = t.data();
589
        size_t data_size = t.numel() * phi::SizeOf(t.dtype());
590 591 592 593 594 595
        auto shared_writer_holder =
            memory::allocation::AllocateMemoryMapWriterAllocation(data_size);
        // 4. maintain mmap fd set & backup ipc_name
        const std::string &ipc_name = shared_writer_holder->ipc_name();
        memory::allocation::MemoryMapFdSet::Instance().Insert(ipc_name);
        // 5. copy data & reset holder
596 597 598 599 600
        memory::Copy(platform::CPUPlace(),
                     shared_writer_holder->ptr(),
                     platform::CPUPlace(),
                     data_ptr,
                     data_size);
601 602 603 604 605
        t.ResetHolder(shared_writer_holder);

        return t;
      },
      py::return_value_policy::take_ownership);
K
Kaipeng Deng 已提交
606

607
  m.def("_remove_tensor_list_mmap_fds", [](py::list &tensor_list) {
608 609
    for (auto &&tensor : tensor_list) {
      auto t = tensor.cast<phi::DenseTensor>();
610 611 612 613 614 615 616 617 618 619 620 621 622 623 624
      auto *mmap_writer_allocation =
          dynamic_cast<memory::allocation::MemoryMapWriterAllocation *>(
              t.Holder().get());
      PADDLE_ENFORCE_NOT_NULL(
          mmap_writer_allocation,
          platform::errors::NotFound("The shared memory of LoDTensor in "
                                     "DataLoader's child process has been "
                                     "released."));
      memory::allocation::MemoryMapFdSet::Instance().Remove(
          mmap_writer_allocation->ipc_name());
    }
  });

  m.def("_cleanup_mmap_fds",
        []() { memory::allocation::MemoryMapFdSet::Instance().Clear(); });
625 626 627 628 629

  m.def("_set_max_memory_map_allocation_pool_size", [](int32_t size) {
    memory::allocation::MemoryMapAllocationPool::Instance().SetMaxPoolSize(
        size);
  });
630 631
#endif

632 633
  m.def("start_imperative_gperf_profiler",
        []() { imperative::StartProfile(); });
634 635 636 637
  m.def("_set_eager_tracer",
        [](const std::shared_ptr<imperative::Tracer> &tracer) {
          egr::Controller::Instance().SetCurrentTracer(tracer);
        });
638 639
  m.def("stop_imperative_gperf_profiler", []() { imperative::StopProfile(); });

Z
Zeng Jinle 已提交
640 641 642
  m.def("_is_dygraph_debug_enabled",
        []() { return imperative::IsDebugEnabled(); });
  m.def("_dygraph_debug_level", []() { return imperative::GetDebugLevel(); });
643 644
  m.def("_switch_tracer",
        [](const std::shared_ptr<imperative::Tracer> &tracer) {
J
Jiabin Yang 已提交
645
          egr::Controller::Instance().SetCurrentTracer(tracer);
646
          imperative::SetCurrentTracer(tracer);
647
        });
648 649 650 651 652
  py::class_<imperative::jit::ProgramDescTracer>(m, "ProgramDescTracer", "")
      .def("create_program_desc",
           &imperative::jit::ProgramDescTracer::CreateProgramDesc)
      .def("reset", &imperative::jit::ProgramDescTracer::Reset);

L
Leo Chen 已提交
653 654
  py::enum_<paddle::imperative::AmpLevel>(m, "AmpLevel", py::arithmetic())
      .value("O0", paddle::imperative::AmpLevel::O0)
655
      .value("OD", paddle::imperative::AmpLevel::OD)
L
Leo Chen 已提交
656 657 658 659 660
      .value("O1", paddle::imperative::AmpLevel::O1)
      .value("O2", paddle::imperative::AmpLevel::O2)
      .value("O3", paddle::imperative::AmpLevel::O3)
      .export_values();

661
  py::class_<imperative::Tracer, std::shared_ptr<imperative::Tracer>>(
662
      m, "Tracer", R"DOC()DOC")
663
      .def("__init__",
J
Jiabin Yang 已提交
664
           [](imperative::Tracer &self) { new (&self) imperative::Tracer(); })
665 666 667
      .def_property("_enable_program_desc_tracing",
                    &imperative::Tracer::IsProgramDescTracingEnabled,
                    &imperative::Tracer::SetEnableProgramDescTracing)
668 669 670
      .def_property("_use_promote",
                    &imperative::Tracer::GetUsePromote,
                    &imperative::Tracer::SetUsePromote)
671 672
      .def_property("_amp_level",
                    &imperative::Tracer::GetAmpLevel,
L
Leo Chen 已提交
673
                    &imperative::Tracer::SetAmpLevel)
674 675
      .def_property("_amp_dtype",
                    &imperative::Tracer::GetAmpDtype,
676
                    &imperative::Tracer::SetAmpDtype)
677 678
      .def_property("_has_grad",
                    &imperative::Tracer::HasGrad,
679
                    &imperative::Tracer::SetHasGrad)
680 681 682 683 684 685 686 687
      .def_property(
          "_expected_place",
          [](const imperative::Tracer &self) -> py::object {
            return py::cast(self.ExpectedPlace());
          },
          [](imperative::Tracer &self, const py::object &obj) {
            if (py::isinstance<platform::CUDAPlace>(obj)) {
              auto p = obj.cast<platform::CUDAPlace *>();
L
Leo Chen 已提交
688
              self.SetExpectedPlace(*p);
689 690
              // TODO(jiabin): Support eager here when we need to make all
              // dygraph in eager mode
691 692
              VLOG(4) << "Tracer(" << &self << ")"
                      << " set expected place " << *p;
693 694 695
            } else if (py::isinstance<platform::XPUPlace>(obj)) {
              auto p = obj.cast<platform::XPUPlace *>();
              self.SetExpectedPlace(*p);
696 697
              VLOG(4) << "Tracer(" << &self << ")"
                      << " set expected place " << *p;
698 699
            } else if (py::isinstance<platform::CPUPlace>(obj)) {
              auto p = obj.cast<platform::CPUPlace *>();
L
Leo Chen 已提交
700
              self.SetExpectedPlace(*p);
701 702
              VLOG(4) << "Tracer(" << &self << ")"
                      << " set expected place " << *p;
703 704
            } else if (py::isinstance<platform::CUDAPinnedPlace>(obj)) {
              auto p = obj.cast<platform::CUDAPinnedPlace *>();
L
Leo Chen 已提交
705
              self.SetExpectedPlace(*p);
706 707
              VLOG(4) << "Tracer(" << &self << ")"
                      << " set expected place " << *p;
708 709 710 711 712
            } else if (py::isinstance<platform::IPUPlace>(obj)) {
              auto p = obj.cast<platform::IPUPlace *>();
              self.SetExpectedPlace(*p);
              VLOG(4) << "Tracer(" << &self << ")"
                      << " set expected place " << *p;
713 714 715 716 717
            } else if (py::isinstance<platform::CustomPlace>(obj)) {
              auto p = obj.cast<platform::CustomPlace *>();
              self.SetExpectedPlace(*p);
              VLOG(4) << "Tracer(" << &self << ")"
                      << " set expected place " << *p;
718 719 720 721 722
            } else if (py::isinstance<platform::Place>(obj)) {
              auto p = obj.cast<platform::Place *>();
              self.SetExpectedPlace(*p);
              VLOG(4) << "Tracer(" << &self << ")"
                      << " set expected place " << *p;
723
            } else {
L
Leo Chen 已提交
724
              PADDLE_THROW(platform::errors::InvalidArgument(
725
                  "Incompatible Place Type: supports XPUPlace, CUDAPlace, "
张春乔 已提交
726
                  "CPUPlace, IPUPlace"
L
Leo Chen 已提交
727 728
                  "and CUDAPinnedPlace, "
                  "but got Unknown Type!"));
729 730
            }
          })
731 732 733
      .def("_get_program_desc_tracer",
           &imperative::Tracer::GetProgramDescTracer,
           py::return_value_policy::reference)
734 735
      .def("_generate_unique_name",
           &imperative::Tracer::GenerateUniqueName,
736
           py::arg("key") = "dygraph_tmp")
737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752
      .def("_set_amp_op_list",
           [](imperative::Tracer &self,
              std::unordered_set<std::string> &allow_ops,
              std::unordered_set<std::string> &block_ops) {
             // NOTE(zhiqiu): The automatic conversion in pybind11 between
             // c++
             // STL and python set/list/dict involve a copy operation that
             // prevents pass-by-reference semantics, so it is ok to swap.
             // The reaseon why not directly pass
             // std::shared_ptr<std::unordered_set<std::string>>
             // is that pybind11 forbid shared_ptr<T> where T is not custom
             // type.
             imperative::AmpOperators::Instance().GetMutableAllowOps()->swap(
                 allow_ops);
             imperative::AmpOperators::Instance().GetMutableBlockOps()->swap(
                 block_ops);
753
             VLOG(5) << "AMP operators changed, "
754 755
                     << imperative::AmpOperators::Instance();
           })
756 757 758
      .def("_get_amp_op_list",
           [](imperative::Tracer &self) {
             return std::make_tuple(
759 760
                 *(imperative::AmpOperators::Instance().GetMutableAllowOps()),
                 *(imperative::AmpOperators::Instance().GetMutableBlockOps()));
761
           })
C
Chen Weihang 已提交
762
      .def("_get_kernel_signature",
763 764 765 766
           [](imperative::Tracer &self,
              const std::string &type,
              const PyNameVarBaseMap &ins,
              const PyNameVarBaseMap &outs,
C
Chen Weihang 已提交
767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783
              framework::AttributeMap attrs) {
             // TODO(xiongkun): move this function outside of tracer.
             auto ins_map = ConvertToNameTensorMap(ins);
             auto outs_map = ConvertToNameTensorMap(outs);
             {
               auto input_to_vector =
                   [](paddle::small_vector<const char *> &vec) {
                     return std::vector<std::string>(vec.begin(), vec.end());
                   };
               auto output_to_vector =
                   [](paddle::small_vector<const char *> &vec) {
                     return std::vector<std::string>(vec.begin(), vec.end());
                   };
               auto attr_to_vector =
                   [](paddle::small_vector<const char *> &vec) {
                     return std::vector<std::string>(vec.begin(), vec.end());
                   };
784 785
               auto ret = self.GetExpectedKernelSignature(
                   type, ins_map, outs_map, attrs);
C
Chen Weihang 已提交
786 787 788
               auto kernelsig_ins = input_to_vector(ret.input_names);
               auto kernelsig_attrs = attr_to_vector(ret.attr_names);
               auto kernelsig_outs = output_to_vector(ret.output_names);
789 790
               return std::make_tuple(
                   kernelsig_ins, kernelsig_attrs, kernelsig_outs);
C
Chen Weihang 已提交
791
             }
J
Jiabin Yang 已提交
792
           });
793 794

  // define parallel context
795 796 797
  py::class_<imperative::ParallelStrategy> parallel_strategy(
      m, "ParallelStrategy", "");
  parallel_strategy.def(py::init())
798 799
      .def_property(
          "nranks",
800 801
          [](const imperative::ParallelStrategy &self) { return self.nranks_; },
          [](imperative::ParallelStrategy &self, int nranks) {
802 803
            self.nranks_ = nranks;
          })
804 805 806 807 808 809 810 811
      .def_property(
          "local_rank",
          [](const imperative::ParallelStrategy &self) {
            return self.local_rank_;
          },
          [](imperative::ParallelStrategy &self, int local_rank) {
            self.local_rank_ = local_rank;
          })
812 813
      .def_property(
          "trainer_endpoints",
814
          [](const imperative::ParallelStrategy &self) {
815 816
            return self.trainer_endpoints_;
          },
817
          [](imperative::ParallelStrategy &self, std::vector<std::string> eps) {
818 819
            self.trainer_endpoints_ = eps;
          })
820 821 822 823 824 825 826 827
      .def_property(
          "current_endpoint",
          [](const imperative::ParallelStrategy &self) {
            return self.current_endpoint_;
          },
          [](imperative::ParallelStrategy &self, const std::string &ep) {
            self.current_endpoint_ = ep;
          })
828 829 830 831 832 833
      .def_property(
          "nrings",
          [](const imperative::ParallelStrategy &self) { return self.nrings_; },
          [](imperative::ParallelStrategy &self, int nrings) {
            self.nrings_ = nrings;
          });
834

835 836 837 838
  m.def("varbase_copy", &VarBaseCopy<platform::Place>);
  m.def("varbase_copy", &VarBaseCopy<platform::CPUPlace>);
  m.def("varbase_copy", &VarBaseCopy<platform::CUDAPlace>);
  m.def("varbase_copy", &VarBaseCopy<platform::XPUPlace>);
839
  m.def("varbase_copy", &VarBaseCopy<platform::CUDAPinnedPlace>);
R
ronnywang 已提交
840
  m.def("varbase_copy", &VarBaseCopy<platform::CustomPlace>);
841

842 843 844 845 846 847 848
  m.def(
      "dygraph_partial_grad",
      [](const std::vector<std::shared_ptr<imperative::VarBase>> &input_targets,
         const std::vector<std::shared_ptr<imperative::VarBase>>
             &output_targets,
         const std::vector<std::shared_ptr<imperative::VarBase>> &output_grads,
         const std::vector<std::shared_ptr<imperative::VarBase>> &no_grad_vars,
849 850 851 852 853 854 855 856 857 858 859 860 861 862
         const platform::Place &place,
         bool create_graph,
         bool retain_graph,
         bool allow_unused,
         bool only_inputs) {
        imperative::PartialGradEngine engine(input_targets,
                                             output_targets,
                                             output_grads,
                                             no_grad_vars,
                                             place,
                                             create_graph,
                                             retain_graph,
                                             allow_unused,
                                             only_inputs);
863 864 865 866 867
        engine.Execute();
        return engine.GetResult();
      },
      py::call_guard<py::gil_scoped_release>());

868 869 870 871
  m.def(
      "dygraph_run_backward",
      [](const std::vector<std::shared_ptr<imperative::VarBase>> &tensors,
         const std::vector<std::shared_ptr<imperative::VarBase>> &grad_tensors,
872 873
         bool retain_graph,
         const imperative::Tracer &tracer) {
874 875 876 877 878 879 880 881
        auto *engine = tracer.GetEngine();
        engine->Init(tensors, grad_tensors, retain_graph);
        VLOG(3) << "Start backward";
        engine->Execute();
        VLOG(3) << "Finish backward";
      },
      py::call_guard<py::gil_scoped_release>());

882 883 884
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) ||     \
    defined(PADDLE_WITH_XPU_BKCL) || defined(PADDLE_WITH_GLOO) || \
    defined(PADDLE_WITH_CUSTOM_DEVICE)
885 886 887 888 889 890
  py::class_<imperative::ParallelContext,
             std::shared_ptr<imperative::ParallelContext>>(m,
                                                           "ParallelContext");

  py::class_<imperative::Reducer, std::shared_ptr<imperative::Reducer>>(
      m, "Reducer", R"DOC()DOC")
S
ShenLiang 已提交
891 892 893 894
      .def(py::init<const std::vector<std::shared_ptr<imperative::VarBase>> &,
                    const std::vector<std::vector<size_t>> &,
                    const std::vector<bool> &,
                    std::shared_ptr<imperative::ParallelContext>,
895 896 897 898 899 900 901 902 903 904
                    const std::vector<size_t> &,
                    bool>())
      .def("prepare_for_backward",
           &imperative::Reducer::PrepareForBackward,
           py::arg("vars"),
           py::call_guard<py::gil_scoped_release>());

  m.def("assign_group_by_size",
        &imperative::AssignGroupBySize,
        py::arg("vars"),
905 906
        py::arg("is_sparse_gradient"),
        py::arg("group_size_limits") = std::vector<size_t>{25 * 1024 * 1024},
907
        py::arg("tensor_indices") = std::vector<int64_t>{},
908
        py::call_guard<py::gil_scoped_release>());
909
#endif
910

911
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
912 913
  py::class_<imperative::NCCLParallelContext,
             imperative::ParallelContext,
914 915 916 917
             std::shared_ptr<imperative::NCCLParallelContext>>(
      m, "NCCLParallelContext")
      .def(py::init<const imperative::ParallelStrategy &,
                    const platform::CUDAPlace &>())
K
kuizhiqing 已提交
918 919 920 921
      .def("init", [](imperative::NCCLParallelContext &self) { self.Init(); })
      .def("init_with_ring_id",
           &imperative::NCCLParallelContext::InitWithRingID,
           py::arg("ring_id"));
922 923
#endif

924 925 926 927 928 929 930 931 932 933 934 935 936
#if defined(PADDLE_WITH_CUSTOM_DEVICE)
  py::class_<imperative::XCCLParallelContext,
             imperative::ParallelContext,
             std::shared_ptr<imperative::XCCLParallelContext>>(
      m, "XCCLParallelContext")
      .def(py::init<const imperative::ParallelStrategy &,
                    const platform::CustomPlace &>())
      .def("init", [](imperative::XCCLParallelContext &self) { self.Init(); })
      .def("init_with_ring_id",
           &imperative::XCCLParallelContext::InitWithRingID,
           py::arg("ring_id"));
#endif

937
#if defined(PADDLE_WITH_XPU_BKCL)
938 939
  py::class_<imperative::BKCLParallelContext,
             imperative::ParallelContext,
940 941 942 943
             std::shared_ptr<imperative::BKCLParallelContext>>(
      m, "BKCLParallelContext")
      .def(py::init<const imperative::ParallelStrategy &,
                    const platform::XPUPlace &>())
K
kuizhiqing 已提交
944 945 946 947
      .def("init", [](imperative::BKCLParallelContext &self) { self.Init(); })
      .def("init_with_ring_id",
           &imperative::BKCLParallelContext::InitWithRingID,
           py::arg("ring_id"));
948
#endif
949 950 951

#if defined(PADDLE_WITH_GLOO)
  // xiongkun
952 953
  py::class_<imperative::GLOOParallelContext,
             imperative::ParallelContext,
954 955 956 957 958 959 960
             std::shared_ptr<imperative::GLOOParallelContext>>(
      m, "GLOOParallelContext")
      .def(py::init<const imperative::ParallelStrategy &,
                    const platform::CPUPlace &>())
      .def("init", [](imperative::GLOOParallelContext &self) { self.Init(); })
      .def("init_with_ring_id",
           &imperative::GLOOParallelContext::InitWithRingID,
961 962 963
           py::arg("ring_id"));
#endif

K
kuizhiqing 已提交
964
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \
965
    defined(PADDLE_WITH_XPU_BKCL) || defined(PADDLE_WITH_CUSTOM_DEVICE)
966 967
  py::class_<imperative::HeterParallelContext,
             imperative::ParallelContext,
K
kuizhiqing 已提交
968 969 970 971 972 973
             std::shared_ptr<imperative::HeterParallelContext>>(
      m, "HeterParallelContext")
      .def(py::init<const imperative::ParallelStrategy &, const int &>())
      .def("init", [](imperative::HeterParallelContext &self) { self.Init(); });
#endif

S
Siming Dai 已提交
974
#if defined(PADDLE_WITH_CUDA)
975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995
  m.def(
      "to_uva_tensor",
      [](const py::object &obj, int device_id) {
        const auto &tracer = imperative::GetCurrentTracer();
        auto new_tensor = std::shared_ptr<imperative::VarBase>(
            new imperative::VarBase(tracer->GenerateUniqueName()));
        auto array = obj.cast<py::array>();
        if (py::isinstance<py::array_t<int32_t>>(array)) {
          SetUVATensorFromPyArray<int32_t>(new_tensor, array, device_id);
        } else if (py::isinstance<py::array_t<int64_t>>(array)) {
          SetUVATensorFromPyArray<int64_t>(new_tensor, array, device_id);
        } else if (py::isinstance<py::array_t<float>>(array)) {
          SetUVATensorFromPyArray<float>(new_tensor, array, device_id);
        } else if (py::isinstance<py::array_t<double>>(array)) {
          SetUVATensorFromPyArray<double>(new_tensor, array, device_id);
        } else if (py::isinstance<py::array_t<int8_t>>(array)) {
          SetUVATensorFromPyArray<int8_t>(new_tensor, array, device_id);
        } else if (py::isinstance<py::array_t<int16_t>>(array)) {
          SetUVATensorFromPyArray<int16_t>(new_tensor, array, device_id);
        } else if (py::isinstance<py::array_t<paddle::platform::float16>>(
                       array)) {
996 997
          SetUVATensorFromPyArray<paddle::platform::float16>(
              new_tensor, array, device_id);
998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010
        } else if (py::isinstance<py::array_t<bool>>(array)) {
          SetUVATensorFromPyArray<bool>(new_tensor, array, device_id);
        } else {
          // obj may be any type, obj.cast<py::array>() may be failed,
          // then the array.dtype will be string of unknown meaning.
          PADDLE_THROW(platform::errors::InvalidArgument(
              "Input object type error or incompatible array data type. "
              "tensor.set() supports array with bool, float16, float32, "
              "float64, int8, int16, int32, int64,"
              "please check your input or input array data type."));
        }
        return new_tensor;
      },
1011 1012 1013 1014
      py::arg("obj"),
      py::arg("device_id") = 0,
      py::return_value_policy::reference,
      R"DOC(
S
Siming Dai 已提交
1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025
  Returns tensor with the UVA(unified virtual addressing) created from numpy array.

  Args:
      obj(numpy.ndarray): The input numpy array, supporting bool, float16, float32,
                          float64, int8, int16, int32, int64 dtype currently.

      device_id(int, optional): The destination GPU device id.
                                Default: 0, means current device.

  Returns:

1026
      new_tensor(paddle.Tensor): Return the UVA Tensor with the sample dtype and
S
Siming Dai 已提交
1027 1028 1029 1030 1031 1032 1033 1034
                                 shape with the input numpy array.

  Examples:
      .. code-block:: python

        # required: gpu
        import numpy as np
        import paddle
1035

S
Siming Dai 已提交
1036 1037 1038 1039 1040 1041 1042
        data = np.random.randint(10, size=(3, 4))
        tensor = paddle.fluid.core.to_uva_tensor(data)
        print(tensor)
)DOC");

#endif

1043 1044 1045
#if defined(PADDLE_WITH_CUDA)
  m.def(
      "async_write",
1046 1047 1048 1049
      [](const imperative::VarBase &src,
         imperative::VarBase &dst,
         const imperative::VarBase &offset,
         const imperative::VarBase &count) {
1050
        PADDLE_ENFORCE_EQ(
1051 1052
            platform::is_gpu_place(src.Place()),
            true,
1053 1054 1055 1056
            platform::errors::InvalidArgument(
                "Required `src` device should be CUDAPlace, but received %d. ",
                src.Place()));
        PADDLE_ENFORCE_EQ(
1057 1058
            platform::is_cuda_pinned_place(dst.Place()),
            true,
1059 1060 1061 1062 1063
            platform::errors::InvalidArgument(
                "Required `dst` device should be CUDAPinnedPlace, "
                "but received %d. ",
                dst.Place()));
        PADDLE_ENFORCE_EQ(
1064 1065
            platform::is_cpu_place(offset.Place()),
            true,
1066 1067 1068 1069
            platform::errors::InvalidArgument("Required `offset` device should "
                                              "be CPUPlace, but received %d. ",
                                              offset.Place()));
        PADDLE_ENFORCE_EQ(
1070 1071
            platform::is_cpu_place(count.Place()),
            true,
1072 1073 1074 1075 1076 1077
            platform::errors::InvalidArgument(
                "Required `count` device should be CPUPlace, but received %d. ",
                count.Place()));

        // TODO(daisiming): In future, add index as arguments following
        // async_read.
1078 1079 1080 1081
        auto &src_tensor = src.Var().Get<phi::DenseTensor>();
        auto *dst_tensor = dst.MutableVar()->GetMutable<phi::DenseTensor>();
        auto &offset_tensor = offset.Var().Get<phi::DenseTensor>();
        auto &count_tensor = count.Var().Get<phi::DenseTensor>();
1082 1083
        const auto &deviceId = paddle::platform::GetCurrentDeviceId();

1084 1085
        PADDLE_ENFORCE_EQ(offset_tensor.dims().size(),
                          1,
1086 1087
                          platform::errors::InvalidArgument(
                              "`offset` tensor should be one-dimensional."));
1088 1089
        PADDLE_ENFORCE_EQ(count_tensor.dims().size(),
                          1,
1090 1091
                          platform::errors::InvalidArgument(
                              "`count` tensor should be one-dimensional."));
1092 1093
        PADDLE_ENFORCE_EQ(offset_tensor.numel(),
                          count_tensor.numel(),
1094 1095 1096
                          platform::errors::InvalidArgument(
                              "`offset` and `count` tensor size dismatch."));
        PADDLE_ENFORCE_EQ(
1097 1098
            src_tensor.dims().size(),
            dst_tensor->dims().size(),
1099 1100 1101 1102 1103
            platform::errors::InvalidArgument(
                "`src` and `dst` should have the same tensor shape, "
                "except for the first dimension."));
        for (int i = 1; i < src_tensor.dims().size(); i++) {
          PADDLE_ENFORCE_EQ(
1104 1105
              src_tensor.dims()[i],
              dst_tensor->dims()[i],
1106 1107 1108 1109 1110
              platform::errors::InvalidArgument(
                  "`src` and `dst` should have the same tensor shape, "
                  "except for the first dimension."));
        }

L
Leo Chen 已提交
1111 1112
        auto stream =
            paddle::platform::get_current_stream(deviceId)->raw_stream();
1113 1114 1115 1116 1117 1118 1119 1120 1121

        int64_t size = src_tensor.numel() / src_tensor.dims()[0];
        auto *src_data = src_tensor.data<float>();
        auto *dst_data = dst_tensor->mutable_data<float>(dst.Place());
        const int64_t *offset_data = offset_tensor.data<int64_t>();
        const int64_t *count_data = count_tensor.data<int64_t>();
        int64_t src_offset = 0, dst_offset, c;
        for (int64_t i = 0; i < offset_tensor.numel(); i++) {
          dst_offset = offset_data[i], c = count_data[i];
1122 1123
          PADDLE_ENFORCE_LE(src_offset + c,
                            src_tensor.dims()[0],
1124 1125
                            platform::errors::InvalidArgument(
                                "Invalid offset or count index"));
1126 1127
          PADDLE_ENFORCE_LE(dst_offset + c,
                            dst_tensor->dims()[0],
1128 1129
                            platform::errors::InvalidArgument(
                                "Invalid offset or count index"));
1130 1131 1132 1133 1134
          cudaMemcpyAsync(dst_data + (dst_offset * size),
                          src_data + (src_offset * size),
                          c * size * sizeof(float),
                          cudaMemcpyDeviceToHost,
                          stream);
1135 1136 1137 1138
          src_offset += c;
        }
      },
      R"DOC(
1139 1140 1141 1142 1143
  This api provides a way to write pieces of source tensor to destination tensor
  inplacely and asynchronously. In which, we use `offset` and `count` to determine
  where to copy. `offset` means the begin points of the copy pieces of `src`, and
  `count` means the lengths of the copy pieces of `src`. To be noted, the copy process
  will run asynchronously from cuda to pin memory. We can simply remember this as
1144
  "gpu async_write to pin_memory".
1145

1146
  Arguments:
1147 1148

    src (Tensor): The source tensor, and the data type should be `float32` currently.
1149 1150
                  Besides, `src` should be placed on CUDAPlace.

1151 1152 1153
    dst (Tensor): The destination tensor, and the data type should be `float32` currently.
                  Besides, `dst` should be placed on CUDAPinnedPlace. The shape of `dst`
                  should be the same with `src` except for the first dimension.
1154

1155 1156 1157 1158 1159 1160 1161
    offset (Tensor): The offset tensor, and the data type should be `int64` currently.
                     Besides, `offset` should be placed on CPUPlace. The shape of `offset`
                     should be one-dimensional.

    count (Tensor): The count tensor, and the data type should be `int64` currently.
                    Besides, `count` should be placed on CPUPlace. The shape of `count`
                    should be one-dimensinal.
1162 1163 1164 1165 1166 1167

  Examples:
      .. code-block:: python

          import numpy as np
          import paddle
1168
          from paddle.fluid import core
1169
          from paddle.device import cuda
1170

1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190
          if core.is_compiled_with_cuda():
              src = paddle.rand(shape=[100, 50, 50])
              dst = paddle.emtpy(shape=[200, 50, 50]).pin_memory()
              offset = paddle.to_tensor(
                  np.array([0, 60], dtype="int64"), place=paddle.CPUPlace())
              count = paddle.to_tensor(
                  np.array([40, 60], dtype="int64"), place=paddle.CPUPlace())

              stream = cuda.Stream()
              with cuda.stream_guard(stream):
                  core.async_write(src, dst, offset, count)

              offset_a = paddle.gather(dst, paddle.to_tensor(np.arange(0, 40)))
              offset_b = paddle.gather(dst, paddle.to_tensor(np.arange(60, 120)))
              offset_array = paddle.concat([offset_a, offset_b], axis=0)
              print(np.allclose(src.numpy(), offset_array.numpy())) # True
)DOC");

  m.def(
      "async_read",
1191 1192 1193 1194 1195 1196 1197 1198
      [](const imperative::VarBase &src,
         imperative::VarBase &dst,
         const imperative::VarBase &index,
         imperative::VarBase &buffer,
         const imperative::VarBase &offset,
         const imperative::VarBase &count) {
        PADDLE_ENFORCE_EQ(platform::is_cuda_pinned_place(src.Place()),
                          true,
1199 1200 1201 1202 1203
                          platform::errors::InvalidArgument(
                              "Required `src` device should be "
                              "CUDAPinnedPlace, but received %d.",
                              src.Place()));
        PADDLE_ENFORCE_EQ(
1204 1205
            platform::is_gpu_place(dst.Place()),
            true,
1206 1207 1208 1209
            platform::errors::InvalidArgument(
                "Required `dst` device should be CUDAPlace, but received %d.",
                dst.Place()));
        PADDLE_ENFORCE_EQ(
1210 1211
            platform::is_cpu_place(index.Place()),
            true,
1212 1213 1214 1215
            platform::errors::InvalidArgument(
                "Required `index` device should be CPUPlace, but received %d.",
                index.Place()));
        PADDLE_ENFORCE_EQ(
1216 1217
            platform::is_cuda_pinned_place(buffer.Place()),
            true,
1218 1219 1220 1221 1222
            platform::errors::InvalidArgument(
                "Required `buffer` device should be CUDAPinnedPlace, "
                "but received %d.",
                buffer.Place()));
        PADDLE_ENFORCE_EQ(
1223 1224
            platform::is_cpu_place(offset.Place()),
            true,
1225 1226 1227 1228
            platform::errors::InvalidArgument(
                "Required `offset` device should be CPUPlace, but received %d.",
                offset.Place()));
        PADDLE_ENFORCE_EQ(
1229 1230
            platform::is_cpu_place(count.Place()),
            true,
1231 1232 1233 1234
            platform::errors::InvalidArgument(
                "Required `count` device should be CPUPlace, but received %d.",
                count.Place()));

1235 1236 1237
        auto &src_tensor = src.Var().Get<phi::DenseTensor>();
        auto *dst_tensor = dst.MutableVar()->GetMutable<phi::DenseTensor>();
        auto &index_tensor = index.Var().Get<phi::DenseTensor>();
1238
        auto *buffer_tensor =
1239 1240 1241
            buffer.MutableVar()->GetMutable<phi::DenseTensor>();
        auto &offset_tensor = offset.Var().Get<phi::DenseTensor>();
        auto &count_tensor = count.Var().Get<phi::DenseTensor>();
1242 1243 1244
        auto *dst_data = dst_tensor->mutable_data<float>(dst.Place());
        const auto &deviceId = paddle::platform::GetCurrentDeviceId();

1245 1246
        PADDLE_ENFORCE_EQ(src_tensor.dims().size(),
                          dst_tensor->dims().size(),
1247 1248 1249 1250
                          platform::errors::InvalidArgument(
                              "`src` and `dst` should have same tensor shape, "
                              "except for the first dimension."));
        PADDLE_ENFORCE_EQ(
1251 1252
            src_tensor.dims().size(),
            buffer_tensor->dims().size(),
1253 1254 1255 1256 1257
            platform::errors::InvalidArgument(
                "`src` and `buffer` should have same tensor shape, "
                "except for the first dimension."));
        for (int i = 1; i < src_tensor.dims().size(); i++) {
          PADDLE_ENFORCE_EQ(
1258 1259
              src_tensor.dims()[i],
              dst_tensor->dims()[i],
1260 1261 1262 1263
              platform::errors::InvalidArgument(
                  "`src` and `dst` should have the same tensor shape, "
                  "except for the first dimension."));
          PADDLE_ENFORCE_EQ(
1264 1265
              src_tensor.dims()[i],
              buffer_tensor->dims()[i],
1266 1267 1268 1269
              platform::errors::InvalidArgument(
                  "`src` and `buffer` should have the same tensor shape, "
                  "except for the first dimension."));
        }
1270 1271
        PADDLE_ENFORCE_EQ(index_tensor.dims().size(),
                          1,
1272 1273 1274
                          platform::errors::InvalidArgument(
                              "`index` tensor should be one-dimensional."));

L
Leo Chen 已提交
1275 1276
        auto stream =
            paddle::platform::get_current_stream(deviceId)->raw_stream();
1277 1278 1279 1280 1281 1282

        int64_t numel = 0;  // total copy length
        int64_t copy_flag = offset_tensor.dims()[0];
        int64_t size = src_tensor.numel() / src_tensor.dims()[0];

        if (copy_flag != 0) {
1283 1284
          PADDLE_ENFORCE_EQ(offset_tensor.dims().size(),
                            1,
1285 1286
                            platform::errors::InvalidArgument(
                                "`offset` tensor should be one-dimensional."));
1287 1288
          PADDLE_ENFORCE_EQ(count_tensor.dims().size(),
                            1,
1289 1290
                            platform::errors::InvalidArgument(
                                "`count` tensor should be one-dimensional."));
1291 1292
          PADDLE_ENFORCE_EQ(offset_tensor.numel(),
                            count_tensor.numel(),
1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303
                            platform::errors::InvalidArgument(
                                "`offset` and `count` tensor size dismatch."));
          auto *offset_data = offset_tensor.data<int64_t>();
          auto *count_data = count_tensor.data<int64_t>();
          for (int64_t i = 0; i < count_tensor.numel(); i++) {
            numel += count_data[i];
          }
          PADDLE_ENFORCE_LE(numel + index_tensor.numel(),
                            buffer_tensor->dims()[0],
                            platform::errors::InvalidArgument(
                                "Buffer tensor size is too small."));
1304 1305
          PADDLE_ENFORCE_LE(numel + index_tensor.numel(),
                            dst_tensor->dims()[0],
1306 1307 1308 1309 1310 1311 1312
                            platform::errors::InvalidArgument(
                                "Target tensor size is too small."));

          int64_t src_offset, dst_offset = 0, c;
          auto *src_data = src_tensor.data<float>();
          for (int64_t i = 0; i < offset_tensor.numel(); i++) {
            src_offset = offset_data[i], c = count_data[i];
1313 1314
            PADDLE_ENFORCE_LE(src_offset + c,
                              src_tensor.dims()[0],
1315 1316
                              platform::errors::InvalidArgument(
                                  "Invalid offset or count index."));
1317 1318
            PADDLE_ENFORCE_LE(dst_offset + c,
                              dst_tensor->dims()[0],
1319 1320
                              platform::errors::InvalidArgument(
                                  "Invalid offset or count index."));
1321 1322 1323 1324 1325
            cudaMemcpyAsync(dst_data + (dst_offset * size),
                            src_data + (src_offset * size),
                            c * size * sizeof(float),
                            cudaMemcpyHostToDevice,
                            stream);
1326 1327 1328
            dst_offset += c;
          }
        } else {
1329 1330
          PADDLE_ENFORCE_LE(index_tensor.numel(),
                            buffer_tensor->dims()[0],
1331 1332 1333 1334 1335
                            platform::errors::InvalidArgument(
                                "Buffer tensor size is too small."));
        }

        // Select the index data to the buffer
1336 1337 1338
        auto index_select = [](const phi::DenseTensor &src_tensor,
                               const phi::DenseTensor &index_tensor,
                               phi::DenseTensor *buffer_tensor) {
1339 1340 1341 1342 1343 1344 1345 1346 1347
          auto *src_data = src_tensor.data<float>();
          auto *index_data = index_tensor.data<int64_t>();
          auto *buffer_data =
              buffer_tensor->mutable_data<float>(buffer_tensor->place());
          const int &slice_size = src_tensor.numel() / src_tensor.dims()[0];
          const int &copy_bytes = slice_size * sizeof(float);
          int64_t c = 0;
          for (int64_t i = 0; i < index_tensor.numel(); i++) {
            std::memcpy(buffer_data + c * slice_size,
1348 1349
                        src_data + index_data[i] * slice_size,
                        copy_bytes);
1350 1351 1352 1353 1354 1355
            c += 1;
          }
        };
        index_select(src_tensor, index_tensor, buffer_tensor);

        // Copy the data to device memory
1356 1357
        cudaMemcpyAsync(dst_data + (numel * size),
                        buffer_tensor->data<float>(),
1358
                        index_tensor.numel() * size * sizeof(float),
1359 1360
                        cudaMemcpyHostToDevice,
                        stream);
1361 1362
      },
      R"DOC(
1363 1364 1365 1366 1367
  This api provides a way to read from pieces of source tensor to destination tensor
  asynchronously. In which, we use `index`, `offset` and `count` to determine where
  to read. `index` means the index position of src tensor we want to read. `offset`
  and count means the begin points and length of pieces of src tensor we want to read.
  To be noted, the copy process will run asynchronously from pin memory to cuda place.
1368 1369 1370
  We can simply remember this as "cuda async_read from pin_memory".

  Arguments:
1371 1372

    src (Tensor): The source tensor, and the data type should be `float32` currently.
1373
                  Besides, `src` should be placed on CUDAPinnedPlace.
1374 1375 1376

    dst (Tensor): The destination tensor, and the data type should be `float32` currently.
                  Besides, `dst` should be placed on CUDAPlace. The shape of `dst` should
1377 1378
                  be the same with `src` except for the first dimension.

1379 1380
    index (Tensor): The index tensor, and the data type should be `int64` currently.
                    Besides, `index` should be on CPUplace. The shape of `index` should
1381 1382
                    be one-dimensional.

1383 1384
    buffer (Tensor): The buffer tensor, used to buffer index copy tensor temporarily.
                     The data type should be `float32` currently, and should be placed
1385 1386
                     on CUDAPinnedPlace. The shape of `buffer` should be the same with `src` except for the first dimension.

1387 1388
    offset (Tensor): The offset tensor, and the data type should be `int64` currently.
                     Besides, `offset` should be placed on CPUPlace. The shape of `offset`
1389 1390
                     should be one-dimensional.

1391 1392
    count (Tensor): The count tensor, and the data type should be `int64` currently.
                    Besides, `count` should be placed on CPUPlace. The shape of `count`
1393
                    should be one-dimensinal.
1394

1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412
  Examples:
      .. code-block:: python

          import numpy as np
          import paddle
          from paddle.fluid import core
          from paddle.device import cuda

          if core.is_compiled_with_cuda():
              src = paddle.rand(shape=[100, 50, 50], dtype="float32").pin_memory()
              dst = paddle.empty(shape=[100, 50, 50], dtype="float32")
              offset = paddle.to_tensor(
                  np.array([0, 60], dtype="int64"), place=paddle.CPUPlace())
              count = paddle.to_tensor(
                  np.array([40, 60], dtype="int64"), place=paddle.CPUPlace())
              buffer = paddle.empty(shape=[50, 50, 50], dtype="float32").pin_memory()
              index = paddle.to_tensor(
                  np.array([1, 3, 5, 7, 9], dtype="int64")).cpu()
1413

1414 1415 1416
              stream = cuda.Stream()
              with cuda.stream_guard(stream):
                  core.async_read(src, dst, index, buffer, offset, count)
1417

1418 1419
)DOC");
#endif
1420 1421 1422 1423
}

}  // namespace pybind
}  // namespace paddle