diff --git a/paddle/fluid/framework/garbage_collector.cc b/paddle/fluid/framework/garbage_collector.cc index f69ada080676cddfa4f31c6cbc450b8eca28b3ac..e4142d89e59f819abdd67cf3aefabe0be6916718 100644 --- a/paddle/fluid/framework/garbage_collector.cc +++ b/paddle/fluid/framework/garbage_collector.cc @@ -107,6 +107,15 @@ void StreamGarbageCollector::ClearCallback( const std::function &callback) { callback_manager_->AddCallback(callback); } + +CUDAPinnedGarbageCollector::CUDAPinnedGarbageCollector( + const platform::CUDAPinnedPlace &place, size_t max_memory_size) + : GarbageCollector(place, max_memory_size) {} + +void CUDAPinnedGarbageCollector::ClearCallback( + const std::function &callback) { + callback(); +} #endif int64_t GetEagerDeletionThreshold() { diff --git a/paddle/fluid/framework/garbage_collector.h b/paddle/fluid/framework/garbage_collector.h index 0b5fdc4745c24875a360e715ba30f23343d6f41b..9148d2f2520a265b559b728966bed2a6e471addb 100644 --- a/paddle/fluid/framework/garbage_collector.h +++ b/paddle/fluid/framework/garbage_collector.h @@ -119,6 +119,15 @@ class StreamGarbageCollector : public GarbageCollector { cudaStream_t stream_; std::unique_ptr callback_manager_; }; + +class CUDAPinnedGarbageCollector : public GarbageCollector { + public: + CUDAPinnedGarbageCollector(const platform::CUDAPinnedPlace &place, + size_t max_memory_size); + + protected: + void ClearCallback(const std::function &callback) override; +}; #endif template diff --git a/paddle/fluid/imperative/layer.cc b/paddle/fluid/imperative/layer.cc index 57cde16a8800f0e9a0ebc0a23eba72d8c92c6081..e82bf02f5d678584c1d715adf81090d6596b1e8c 100644 --- a/paddle/fluid/imperative/layer.cc +++ b/paddle/fluid/imperative/layer.cc @@ -25,6 +25,7 @@ #include "paddle/fluid/imperative/infer_var_type_context.h" #include "paddle/fluid/imperative/op_base.h" #include "paddle/fluid/imperative/prepared_operator.h" +#include "paddle/fluid/imperative/tracer.h" #include "paddle/fluid/operators/math/math_function.h" #include "paddle/fluid/platform/device_context.h" #include "paddle/fluid/platform/enforce.h" @@ -231,9 +232,9 @@ std::shared_ptr VarBase::NewVarBase(const platform::Place& dst_place, true, platform::errors::InvalidArgument( "Variable is not initialized or Variable's type is not " "LoDTensor or SelectedRows when getting numpy tensor")); + if (Var().IsType()) { auto& src_tensor = Var().Get(); - // TODO(Jiabin): change this after move unique_name generator to CXX auto new_var = std::make_shared( true, Name() + std::to_string(copied_counter_++)); @@ -252,10 +253,8 @@ std::shared_ptr VarBase::NewVarBase(const platform::Place& dst_place, platform::DeviceContextPool::Instance().Get(src_place)->Wait(); } } - - if (platform::is_gpu_place(dst_place)) { - VLOG(3) << "copy tensor " << Name() << " from gpu"; - } + VLOG(4) << "copy tensor " << Name() << " from " << Place() << " to " + << dst_place; return new_var; } else { auto& src_selected_rows = Var().Get(); @@ -276,9 +275,8 @@ std::shared_ptr VarBase::NewVarBase(const platform::Place& dst_place, } dst_selected_rows->set_height(src_selected_rows.height()); dst_selected_rows->set_rows(src_selected_rows.rows()); - if (platform::is_gpu_place(dst_place)) { - VLOG(3) << "copy selected rows " << Name() << " from gpu"; - } + VLOG(4) << "copy tensor " << Name() << " from " << Place() << " to " + << dst_place; return new_var; } } diff --git a/paddle/fluid/imperative/tracer.cc b/paddle/fluid/imperative/tracer.cc index 4747d08a9484398b0d1db610d5930f15317c5afc..68c79f77e561b93cfde1da6f19b0cbb731fa0cce 100644 --- a/paddle/fluid/imperative/tracer.cc +++ b/paddle/fluid/imperative/tracer.cc @@ -56,6 +56,78 @@ static void PassStopGradient(const NameVarBaseMap& outs, bool generate_grad) { } } +void IncreaseVarbaseReferenceCountUntilCopyComplete( + const std::shared_ptr& var, + const platform::Place& place) { + // Note(zhiqiu): Follow the logic of TensorCopy to determine the place that we + // need to add callback, see tensor_utils.cc:245 + auto place_ = platform::is_gpu_place(place) ? place : var->Place(); + + auto tracer = imperative::GetCurrentTracer(); + auto gc = tracer->MutableGarbageCollectorIfNotExists(place_); + + // Note(zhiqiu): This is an empty callback, the only way is to "reference" + // var, so it will not be destructed until the kernels launched at current + // stream of given place is finished. + auto callback = [var, place_]() { + VLOG(4) << "Run callback of var:" << var->Name() << " at place " << place_; + }; + + gc->DirectClearCallback(callback); +} + +paddle::framework::GarbageCollector* Tracer::MutableGarbageCollectorIfNotExists( + const platform::Place& place) { + // if not exists, create a new GarbageCollector at given place + if (gcs_.count(place) == 0) { + std::unique_ptr gc; + if (platform::is_gpu_place(place)) { +#ifdef PADDLE_WITH_CUDA + gc.reset(new framework::DefaultStreamGarbageCollector( + BOOST_GET_CONST(platform::CUDAPlace, place), 0)); + + VLOG(10) << "Created GarbageCollector at " << place; +#else + PADDLE_THROW(platform::errors::PermissionDenied( + "Paddle can't use CUDA device since it's not compiled with CUDA," + "Please recompile or reinstall Paddle with GPU support.")); +#endif + } else if (platform::is_cuda_pinned_place(place)) { +#ifdef PADDLE_WITH_CUDA + gc.reset(new framework::CUDAPinnedGarbageCollector( + BOOST_GET_CONST(platform::CUDAPinnedPlace, place), 0)); + + VLOG(10) << "Created GarbageCollector at " << place; +#else + PADDLE_THROW(platform::errors::PermissionDenied( + "Paddle can't use CUDAPinned device since it's not compiled with " + "CUDA," + "Please recompile or reinstall Paddle with GPU support.")); +#endif + } else if (platform::is_xpu_place(place)) { +#if defined(PADDLE_WITH_XPU) + gc.reset(new framework::XPUGarbageCollector( + BOOST_GET_CONST(platform::XPUPlace, place), 0)); + VLOG(10) << "Created GarbageCollector at " << place; +#else + PADDLE_THROW(platform::errors::PermissionDenied( + "Paddle can't use XPU device since it's not compiled with XPU," + "Please recompile or reinstall Paddle with XPU support.")); +#endif + } else if (platform::is_cpu_place(place)) { + gc.reset(new framework::CPUGarbageCollector( + BOOST_GET_CONST(platform::CPUPlace, place), 0)); + VLOG(10) << "Created GarbageCollector at " << place; + } else { + PADDLE_THROW(platform::errors::PreconditionNotMet( + "Unsupported place for garbage collection")); + } + gcs_.emplace(place, std::move(gc)); + } + + return gcs_.at(place).get(); +} + void Tracer::TraceOp(const std::string& type, const NameVarBaseMap& ins, const NameVarBaseMap& outs, framework::AttributeMap attrs, const platform::Place& place, bool trace_backward) { diff --git a/paddle/fluid/imperative/tracer.h b/paddle/fluid/imperative/tracer.h index dd3950e7e0347fe41ad6de60e23956f0f87d6a55..601645a8445155cf03a3913b6cdff26cf806b97c 100644 --- a/paddle/fluid/imperative/tracer.h +++ b/paddle/fluid/imperative/tracer.h @@ -16,12 +16,14 @@ #include #include // NOLINT +#include #include #include #include #include #include "ThreadPool.h" +#include "paddle/fluid/framework/garbage_collector.h" #include "paddle/fluid/imperative/basic_engine.h" #include "paddle/fluid/imperative/jit/program_desc_tracer.h" #include "paddle/fluid/imperative/layer.h" @@ -30,6 +32,10 @@ namespace paddle { namespace imperative { +using GarbageCollectorMap = + std::map>; + class UniqueNameGenerator { public: explicit UniqueNameGenerator(std::string prefix = "") : prefix_(prefix) {} @@ -102,6 +108,9 @@ class Tracer { bool IsAutoCastEnabled() const { return enable_autocast_; } + paddle::framework::GarbageCollector* MutableGarbageCollectorIfNotExists( + const platform::Place& place); + private: std::unique_ptr basic_engine_; std::unique_ptr program_desc_tracer_; @@ -110,11 +119,15 @@ class Tracer { platform::Place expected_place_; bool has_grad_{true}; bool enable_autocast_{false}; + GarbageCollectorMap gcs_; }; // To access static variable current_tracer const std::shared_ptr& GetCurrentTracer(); void SetCurrentTracer(const std::shared_ptr& tracer_); +void IncreaseVarbaseReferenceCountUntilCopyComplete( + const std::shared_ptr& var, + const platform::Place& place); } // namespace imperative } // namespace paddle diff --git a/paddle/fluid/pybind/imperative.cc b/paddle/fluid/pybind/imperative.cc index 25ade963cbe6526a280578481e7912f7c172cae1..505d94559d0b3ec4e3d2acd18ba31a32efb39942 100644 --- a/paddle/fluid/pybind/imperative.cc +++ b/paddle/fluid/pybind/imperative.cc @@ -1060,21 +1060,52 @@ void BindImperative(py::module *m_ptr) { )DOC") .def("copy_", &imperative::VarBase::CopyFrom) .def("_copy_to", - [](const imperative::VarBase &self, const platform::CPUPlace &place, - bool blocking) { return self.NewVarBase(place, blocking); }, + [](const std::shared_ptr &self, + const platform::CPUPlace &place, bool blocking) { + auto new_var = self->NewVarBase(place, blocking); + // Note(zhiqiu): Since NewVarBase may use GpuCopyAsync to + // copy data from the tensor of self to the tensor of new varbase, + // we need to ensure that the varbase self is not destructed until + // the GpuCopyAsync is completed. Otherwise, the memory may be + // freed + // when varbase self is destructed. + // To do that, we increase the reference count of self by 1 and + // add a cuda event to wait the GpuCopyAsync's completion. + if (!blocking) { + IncreaseVarbaseReferenceCountUntilCopyComplete(self, place); + } + return new_var; + }, py::return_value_policy::copy) .def("_copy_to", - [](const imperative::VarBase &self, - const platform::CUDAPinnedPlace &place, - bool blocking) { return self.NewVarBase(place, blocking); }, + [](const std::shared_ptr &self, + const platform::CUDAPinnedPlace &place, bool blocking) { + auto new_var = self->NewVarBase(place, blocking); + if (!blocking) { + IncreaseVarbaseReferenceCountUntilCopyComplete(self, place); + } + return new_var; + }, py::return_value_policy::copy) .def("_copy_to", - [](const imperative::VarBase &self, const platform::XPUPlace &place, - bool blocking) { return self.NewVarBase(place, blocking); }, + [](const std::shared_ptr &self, + const platform::XPUPlace &place, bool blocking) { + auto new_var = self->NewVarBase(place, blocking); + if (!blocking) { + IncreaseVarbaseReferenceCountUntilCopyComplete(self, place); + } + return new_var; + }, py::return_value_policy::copy) .def("_copy_to", - [](const imperative::VarBase &self, const platform::CUDAPlace &place, - bool blocking) { return self.NewVarBase(place, blocking); }, + [](const std::shared_ptr &self, + const platform::CUDAPlace &place, bool blocking) { + auto new_var = self->NewVarBase(place, blocking); + if (!blocking) { + IncreaseVarbaseReferenceCountUntilCopyComplete(self, place); + } + return new_var; + }, py::return_value_policy::copy) .def("value", [](imperative::VarBase &self) { return self.MutableVar(); }, py::return_value_policy::reference) diff --git a/python/paddle/fluid/tests/unittests/test_var_base.py b/python/paddle/fluid/tests/unittests/test_var_base.py index 58ac8aab2db2c212013df6b758305bb37e1280bb..2f4a9c8e37e59bfcca8a4db1c717ce5712f7e3ad 100644 --- a/python/paddle/fluid/tests/unittests/test_var_base.py +++ b/python/paddle/fluid/tests/unittests/test_var_base.py @@ -156,6 +156,24 @@ class TestVarBase(unittest.TestCase): _test_place(core.CUDAPlace(0)) _test_place("gpu:0") + def test_to_tensor_change_place(self): + if core.is_compiled_with_cuda(): + a_np = np.random.rand(1024, 1024) + with paddle.fluid.dygraph.guard(core.CPUPlace()): + a = paddle.to_tensor(a_np, place=paddle.CUDAPinnedPlace()) + a = paddle.to_tensor(a) + self.assertEqual(a.place.__repr__(), "CPUPlace") + + with paddle.fluid.dygraph.guard(core.CUDAPlace(0)): + a = paddle.to_tensor(a_np, place=paddle.CUDAPinnedPlace()) + a = paddle.to_tensor(a) + self.assertEqual(a.place.__repr__(), "CUDAPlace(0)") + + with paddle.fluid.dygraph.guard(core.CUDAPlace(0)): + a = paddle.to_tensor(a_np, place=paddle.CPUPlace()) + a = paddle.to_tensor(a, place=paddle.CUDAPinnedPlace()) + self.assertEqual(a.place.__repr__(), "CUDAPinnedPlace") + def test_to_variable(self): with fluid.dygraph.guard(): var = fluid.dygraph.to_variable(self.array, name="abc")