未验证 提交 1f97d61c 编写于 作者: L Leo Chen 提交者: GitHub

Add callback after TensorCopy (#30123)

* change to tensor copy sync

* change to tensor copy sync

* make copy_to safe when use TensorCopy

* refine code

* add ut

* add cudapinned garbagecollector

* add testcase: cpu place -> cuda pinned place
上级 b2483d78
...@@ -107,6 +107,15 @@ void StreamGarbageCollector::ClearCallback( ...@@ -107,6 +107,15 @@ void StreamGarbageCollector::ClearCallback(
const std::function<void()> &callback) { const std::function<void()> &callback) {
callback_manager_->AddCallback(callback); callback_manager_->AddCallback(callback);
} }
CUDAPinnedGarbageCollector::CUDAPinnedGarbageCollector(
const platform::CUDAPinnedPlace &place, size_t max_memory_size)
: GarbageCollector(place, max_memory_size) {}
void CUDAPinnedGarbageCollector::ClearCallback(
const std::function<void()> &callback) {
callback();
}
#endif #endif
int64_t GetEagerDeletionThreshold() { int64_t GetEagerDeletionThreshold() {
......
...@@ -119,6 +119,15 @@ class StreamGarbageCollector : public GarbageCollector { ...@@ -119,6 +119,15 @@ class StreamGarbageCollector : public GarbageCollector {
cudaStream_t stream_; cudaStream_t stream_;
std::unique_ptr<platform::StreamCallbackManager> callback_manager_; std::unique_ptr<platform::StreamCallbackManager> callback_manager_;
}; };
class CUDAPinnedGarbageCollector : public GarbageCollector {
public:
CUDAPinnedGarbageCollector(const platform::CUDAPinnedPlace &place,
size_t max_memory_size);
protected:
void ClearCallback(const std::function<void()> &callback) override;
};
#endif #endif
template <typename Container> template <typename Container>
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#include "paddle/fluid/imperative/infer_var_type_context.h" #include "paddle/fluid/imperative/infer_var_type_context.h"
#include "paddle/fluid/imperative/op_base.h" #include "paddle/fluid/imperative/op_base.h"
#include "paddle/fluid/imperative/prepared_operator.h" #include "paddle/fluid/imperative/prepared_operator.h"
#include "paddle/fluid/imperative/tracer.h"
#include "paddle/fluid/operators/math/math_function.h" #include "paddle/fluid/operators/math/math_function.h"
#include "paddle/fluid/platform/device_context.h" #include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/enforce.h" #include "paddle/fluid/platform/enforce.h"
...@@ -231,9 +232,9 @@ std::shared_ptr<VarBase> VarBase::NewVarBase(const platform::Place& dst_place, ...@@ -231,9 +232,9 @@ std::shared_ptr<VarBase> VarBase::NewVarBase(const platform::Place& dst_place,
true, platform::errors::InvalidArgument( true, platform::errors::InvalidArgument(
"Variable is not initialized or Variable's type is not " "Variable is not initialized or Variable's type is not "
"LoDTensor or SelectedRows when getting numpy tensor")); "LoDTensor or SelectedRows when getting numpy tensor"));
if (Var().IsType<framework::LoDTensor>()) { if (Var().IsType<framework::LoDTensor>()) {
auto& src_tensor = Var().Get<framework::LoDTensor>(); auto& src_tensor = Var().Get<framework::LoDTensor>();
// TODO(Jiabin): change this after move unique_name generator to CXX // TODO(Jiabin): change this after move unique_name generator to CXX
auto new_var = std::make_shared<VarBase>( auto new_var = std::make_shared<VarBase>(
true, Name() + std::to_string(copied_counter_++)); true, Name() + std::to_string(copied_counter_++));
...@@ -252,10 +253,8 @@ std::shared_ptr<VarBase> VarBase::NewVarBase(const platform::Place& dst_place, ...@@ -252,10 +253,8 @@ std::shared_ptr<VarBase> VarBase::NewVarBase(const platform::Place& dst_place,
platform::DeviceContextPool::Instance().Get(src_place)->Wait(); platform::DeviceContextPool::Instance().Get(src_place)->Wait();
} }
} }
VLOG(4) << "copy tensor " << Name() << " from " << Place() << " to "
if (platform::is_gpu_place(dst_place)) { << dst_place;
VLOG(3) << "copy tensor " << Name() << " from gpu";
}
return new_var; return new_var;
} else { } else {
auto& src_selected_rows = Var().Get<framework::SelectedRows>(); auto& src_selected_rows = Var().Get<framework::SelectedRows>();
...@@ -276,9 +275,8 @@ std::shared_ptr<VarBase> VarBase::NewVarBase(const platform::Place& dst_place, ...@@ -276,9 +275,8 @@ std::shared_ptr<VarBase> VarBase::NewVarBase(const platform::Place& dst_place,
} }
dst_selected_rows->set_height(src_selected_rows.height()); dst_selected_rows->set_height(src_selected_rows.height());
dst_selected_rows->set_rows(src_selected_rows.rows()); dst_selected_rows->set_rows(src_selected_rows.rows());
if (platform::is_gpu_place(dst_place)) { VLOG(4) << "copy tensor " << Name() << " from " << Place() << " to "
VLOG(3) << "copy selected rows " << Name() << " from gpu"; << dst_place;
}
return new_var; return new_var;
} }
} }
......
...@@ -56,6 +56,78 @@ static void PassStopGradient(const NameVarBaseMap& outs, bool generate_grad) { ...@@ -56,6 +56,78 @@ static void PassStopGradient(const NameVarBaseMap& outs, bool generate_grad) {
} }
} }
void IncreaseVarbaseReferenceCountUntilCopyComplete(
const std::shared_ptr<imperative::VarBase>& var,
const platform::Place& place) {
// Note(zhiqiu): Follow the logic of TensorCopy to determine the place that we
// need to add callback, see tensor_utils.cc:245
auto place_ = platform::is_gpu_place(place) ? place : var->Place();
auto tracer = imperative::GetCurrentTracer();
auto gc = tracer->MutableGarbageCollectorIfNotExists(place_);
// Note(zhiqiu): This is an empty callback, the only way is to "reference"
// var, so it will not be destructed until the kernels launched at current
// stream of given place is finished.
auto callback = [var, place_]() {
VLOG(4) << "Run callback of var:" << var->Name() << " at place " << place_;
};
gc->DirectClearCallback(callback);
}
paddle::framework::GarbageCollector* Tracer::MutableGarbageCollectorIfNotExists(
const platform::Place& place) {
// if not exists, create a new GarbageCollector at given place
if (gcs_.count(place) == 0) {
std::unique_ptr<framework::GarbageCollector> gc;
if (platform::is_gpu_place(place)) {
#ifdef PADDLE_WITH_CUDA
gc.reset(new framework::DefaultStreamGarbageCollector(
BOOST_GET_CONST(platform::CUDAPlace, place), 0));
VLOG(10) << "Created GarbageCollector at " << place;
#else
PADDLE_THROW(platform::errors::PermissionDenied(
"Paddle can't use CUDA device since it's not compiled with CUDA,"
"Please recompile or reinstall Paddle with GPU support."));
#endif
} else if (platform::is_cuda_pinned_place(place)) {
#ifdef PADDLE_WITH_CUDA
gc.reset(new framework::CUDAPinnedGarbageCollector(
BOOST_GET_CONST(platform::CUDAPinnedPlace, place), 0));
VLOG(10) << "Created GarbageCollector at " << place;
#else
PADDLE_THROW(platform::errors::PermissionDenied(
"Paddle can't use CUDAPinned device since it's not compiled with "
"CUDA,"
"Please recompile or reinstall Paddle with GPU support."));
#endif
} else if (platform::is_xpu_place(place)) {
#if defined(PADDLE_WITH_XPU)
gc.reset(new framework::XPUGarbageCollector(
BOOST_GET_CONST(platform::XPUPlace, place), 0));
VLOG(10) << "Created GarbageCollector at " << place;
#else
PADDLE_THROW(platform::errors::PermissionDenied(
"Paddle can't use XPU device since it's not compiled with XPU,"
"Please recompile or reinstall Paddle with XPU support."));
#endif
} else if (platform::is_cpu_place(place)) {
gc.reset(new framework::CPUGarbageCollector(
BOOST_GET_CONST(platform::CPUPlace, place), 0));
VLOG(10) << "Created GarbageCollector at " << place;
} else {
PADDLE_THROW(platform::errors::PreconditionNotMet(
"Unsupported place for garbage collection"));
}
gcs_.emplace(place, std::move(gc));
}
return gcs_.at(place).get();
}
void Tracer::TraceOp(const std::string& type, const NameVarBaseMap& ins, void Tracer::TraceOp(const std::string& type, const NameVarBaseMap& ins,
const NameVarBaseMap& outs, framework::AttributeMap attrs, const NameVarBaseMap& outs, framework::AttributeMap attrs,
const platform::Place& place, bool trace_backward) { const platform::Place& place, bool trace_backward) {
......
...@@ -16,12 +16,14 @@ ...@@ -16,12 +16,14 @@
#include <atomic> #include <atomic>
#include <future> // NOLINT #include <future> // NOLINT
#include <map>
#include <memory> #include <memory>
#include <string> #include <string>
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>
#include "ThreadPool.h" #include "ThreadPool.h"
#include "paddle/fluid/framework/garbage_collector.h"
#include "paddle/fluid/imperative/basic_engine.h" #include "paddle/fluid/imperative/basic_engine.h"
#include "paddle/fluid/imperative/jit/program_desc_tracer.h" #include "paddle/fluid/imperative/jit/program_desc_tracer.h"
#include "paddle/fluid/imperative/layer.h" #include "paddle/fluid/imperative/layer.h"
...@@ -30,6 +32,10 @@ ...@@ -30,6 +32,10 @@
namespace paddle { namespace paddle {
namespace imperative { namespace imperative {
using GarbageCollectorMap =
std::map<platform::Place,
std::unique_ptr<paddle::framework::GarbageCollector>>;
class UniqueNameGenerator { class UniqueNameGenerator {
public: public:
explicit UniqueNameGenerator(std::string prefix = "") : prefix_(prefix) {} explicit UniqueNameGenerator(std::string prefix = "") : prefix_(prefix) {}
...@@ -102,6 +108,9 @@ class Tracer { ...@@ -102,6 +108,9 @@ class Tracer {
bool IsAutoCastEnabled() const { return enable_autocast_; } bool IsAutoCastEnabled() const { return enable_autocast_; }
paddle::framework::GarbageCollector* MutableGarbageCollectorIfNotExists(
const platform::Place& place);
private: private:
std::unique_ptr<BasicEngine> basic_engine_; std::unique_ptr<BasicEngine> basic_engine_;
std::unique_ptr<jit::ProgramDescTracer> program_desc_tracer_; std::unique_ptr<jit::ProgramDescTracer> program_desc_tracer_;
...@@ -110,11 +119,15 @@ class Tracer { ...@@ -110,11 +119,15 @@ class Tracer {
platform::Place expected_place_; platform::Place expected_place_;
bool has_grad_{true}; bool has_grad_{true};
bool enable_autocast_{false}; bool enable_autocast_{false};
GarbageCollectorMap gcs_;
}; };
// To access static variable current_tracer // To access static variable current_tracer
const std::shared_ptr<Tracer>& GetCurrentTracer(); const std::shared_ptr<Tracer>& GetCurrentTracer();
void SetCurrentTracer(const std::shared_ptr<Tracer>& tracer_); void SetCurrentTracer(const std::shared_ptr<Tracer>& tracer_);
void IncreaseVarbaseReferenceCountUntilCopyComplete(
const std::shared_ptr<imperative::VarBase>& var,
const platform::Place& place);
} // namespace imperative } // namespace imperative
} // namespace paddle } // namespace paddle
...@@ -1060,21 +1060,52 @@ void BindImperative(py::module *m_ptr) { ...@@ -1060,21 +1060,52 @@ void BindImperative(py::module *m_ptr) {
)DOC") )DOC")
.def("copy_", &imperative::VarBase::CopyFrom) .def("copy_", &imperative::VarBase::CopyFrom)
.def("_copy_to", .def("_copy_to",
[](const imperative::VarBase &self, const platform::CPUPlace &place, [](const std::shared_ptr<imperative::VarBase> &self,
bool blocking) { return self.NewVarBase(place, blocking); }, const platform::CPUPlace &place, bool blocking) {
auto new_var = self->NewVarBase(place, blocking);
// Note(zhiqiu): Since NewVarBase may use GpuCopyAsync to
// copy data from the tensor of self to the tensor of new varbase,
// we need to ensure that the varbase self is not destructed until
// the GpuCopyAsync is completed. Otherwise, the memory may be
// freed
// when varbase self is destructed.
// To do that, we increase the reference count of self by 1 and
// add a cuda event to wait the GpuCopyAsync's completion.
if (!blocking) {
IncreaseVarbaseReferenceCountUntilCopyComplete(self, place);
}
return new_var;
},
py::return_value_policy::copy) py::return_value_policy::copy)
.def("_copy_to", .def("_copy_to",
[](const imperative::VarBase &self, [](const std::shared_ptr<imperative::VarBase> &self,
const platform::CUDAPinnedPlace &place, const platform::CUDAPinnedPlace &place, bool blocking) {
bool blocking) { return self.NewVarBase(place, blocking); }, auto new_var = self->NewVarBase(place, blocking);
if (!blocking) {
IncreaseVarbaseReferenceCountUntilCopyComplete(self, place);
}
return new_var;
},
py::return_value_policy::copy) py::return_value_policy::copy)
.def("_copy_to", .def("_copy_to",
[](const imperative::VarBase &self, const platform::XPUPlace &place, [](const std::shared_ptr<imperative::VarBase> &self,
bool blocking) { return self.NewVarBase(place, blocking); }, const platform::XPUPlace &place, bool blocking) {
auto new_var = self->NewVarBase(place, blocking);
if (!blocking) {
IncreaseVarbaseReferenceCountUntilCopyComplete(self, place);
}
return new_var;
},
py::return_value_policy::copy) py::return_value_policy::copy)
.def("_copy_to", .def("_copy_to",
[](const imperative::VarBase &self, const platform::CUDAPlace &place, [](const std::shared_ptr<imperative::VarBase> &self,
bool blocking) { return self.NewVarBase(place, blocking); }, const platform::CUDAPlace &place, bool blocking) {
auto new_var = self->NewVarBase(place, blocking);
if (!blocking) {
IncreaseVarbaseReferenceCountUntilCopyComplete(self, place);
}
return new_var;
},
py::return_value_policy::copy) py::return_value_policy::copy)
.def("value", [](imperative::VarBase &self) { return self.MutableVar(); }, .def("value", [](imperative::VarBase &self) { return self.MutableVar(); },
py::return_value_policy::reference) py::return_value_policy::reference)
......
...@@ -156,6 +156,24 @@ class TestVarBase(unittest.TestCase): ...@@ -156,6 +156,24 @@ class TestVarBase(unittest.TestCase):
_test_place(core.CUDAPlace(0)) _test_place(core.CUDAPlace(0))
_test_place("gpu:0") _test_place("gpu:0")
def test_to_tensor_change_place(self):
if core.is_compiled_with_cuda():
a_np = np.random.rand(1024, 1024)
with paddle.fluid.dygraph.guard(core.CPUPlace()):
a = paddle.to_tensor(a_np, place=paddle.CUDAPinnedPlace())
a = paddle.to_tensor(a)
self.assertEqual(a.place.__repr__(), "CPUPlace")
with paddle.fluid.dygraph.guard(core.CUDAPlace(0)):
a = paddle.to_tensor(a_np, place=paddle.CUDAPinnedPlace())
a = paddle.to_tensor(a)
self.assertEqual(a.place.__repr__(), "CUDAPlace(0)")
with paddle.fluid.dygraph.guard(core.CUDAPlace(0)):
a = paddle.to_tensor(a_np, place=paddle.CPUPlace())
a = paddle.to_tensor(a, place=paddle.CUDAPinnedPlace())
self.assertEqual(a.place.__repr__(), "CUDAPinnedPlace")
def test_to_variable(self): def test_to_variable(self):
with fluid.dygraph.guard(): with fluid.dygraph.guard():
var = fluid.dygraph.to_variable(self.array, name="abc") var = fluid.dygraph.to_variable(self.array, name="abc")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册