diff --git a/paddle/fluid/framework/details/fetch_async_op_handle.cc b/paddle/fluid/framework/details/fetch_async_op_handle.cc index 09aedafc6bb2e1cc0223a4bcb67b8814616da121..98cae9f9e5bce3ad5a8c6ba6c620d63dce95b15c 100644 --- a/paddle/fluid/framework/details/fetch_async_op_handle.cc +++ b/paddle/fluid/framework/details/fetch_async_op_handle.cc @@ -13,8 +13,10 @@ // limitations under the License. #include "paddle/fluid/framework/details/fetch_async_op_handle.h" + #include #include + #include "paddle/fluid/platform/profiler.h" namespace paddle { @@ -195,7 +197,7 @@ void FetchAsyncOpHandle::FetchMergedLodTensor( void FetchAsyncOpHandle::RunImpl() { platform::RecordEvent record_event(Name()); - WaitInputVarGenerated(); + WaitInputVarGenerated(true); // get src vars auto &scopes = *local_exec_scopes_; diff --git a/paddle/fluid/framework/details/op_handle_base.cc b/paddle/fluid/framework/details/op_handle_base.cc index 105c37192f57c365abc1429afa7e6627b95eef90..22b7bd17fe429996b0bf4021d27c083598124ea4 100644 --- a/paddle/fluid/framework/details/op_handle_base.cc +++ b/paddle/fluid/framework/details/op_handle_base.cc @@ -143,7 +143,7 @@ void OpHandleBase::AddOutput(VarHandleBase *out) { out->AddInput(this, this->Node()); } -void OpHandleBase::WaitInputVarGenerated() { +void OpHandleBase::WaitInputVarGenerated(bool wait_for_feed) { for (auto in_var : inputs_) { if (NeedWait(in_var)) { // Dummy Variable is used to represent dependencies between operators, so @@ -165,6 +165,30 @@ void OpHandleBase::WaitInputVarGenerated() { } // There are nothing to do when the place is CPUPlace. } + } else { + // NOTE(zhiqiu): Special case when using fetch_async_op_handle may lead to + // nodetermination due to parallel execution of cuda memory operation. Eg: + // execute stream: CPU->GPU copy (feed) + // fetch stream: GPU->CUDAPinned (fetch) + if (in_var && wait_for_feed) { + auto *in_var_handle = dynamic_cast(in_var); + if (in_var_handle) { + auto &place = in_var_handle->place(); + if (platform::is_gpu_place(place)) { +#ifdef PADDLE_WITH_CUDA + platform::DeviceContextPool &pool = + platform::DeviceContextPool::Instance(); + auto stream = + static_cast(pool.Get(place)) + ->stream(); + PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamSynchronize(stream)); +#else + PADDLE_THROW(platform::errors::PreconditionNotMet( + "Not compiled with CUDA.")); +#endif + } + } + } } } } @@ -172,8 +196,8 @@ void OpHandleBase::WaitInputVarGenerated() { void OpHandleBase::WaitInputVarGenerated(const platform::Place &place) { for (auto in_var : inputs_) { if (NeedWait(in_var)) { - // Dummy Variable is used to represent dependencies between operators, so - // there doesn't add event for it. + // Dummy Variable is used to represent dependencies between operators, + // so there doesn't add event for it. auto *in_var_handle = dynamic_cast(in_var); if (in_var_handle) { if (platform::is_gpu_place(in_var_handle->place())) { diff --git a/paddle/fluid/framework/details/op_handle_base.h b/paddle/fluid/framework/details/op_handle_base.h index eb3d9c32ffc1f45ab843a8d9f00b0918acd5e0f6..37e18adf9da9e6f8af0be6d0551c121bbf47c744 100644 --- a/paddle/fluid/framework/details/op_handle_base.h +++ b/paddle/fluid/framework/details/op_handle_base.h @@ -81,12 +81,15 @@ class OpHandleBase { // This method adds the wait events of all the input on all the device // context. - // NODE: This Wait is asynchronous operation. - virtual void WaitInputVarGenerated(); + // NOTE: This Wait is asynchronous operation. + // NOTE: wait_for_feed is added to wait for feed var, since it has + // generated op, no event and cannot perform event wait. It is only + // used in fetch_async_op_handle currently. + virtual void WaitInputVarGenerated(bool wait_for_feed = false); // This method adds the wait events of all the input on the specified device // context. - // NODE: This Wait is asynchronous operation. + // NOTE: This Wait is asynchronous operation. virtual void WaitInputVarGenerated(const platform::Place &place); virtual bool NeedWait(VarHandleBase *in_var); diff --git a/paddle/fluid/memory/memcpy.cc b/paddle/fluid/memory/memcpy.cc index 225b6858cc1f2a5afc9d612958694d0d940e2e7b..8a04f74c6de82fc4aceb783390ad505b326b31d6 100644 --- a/paddle/fluid/memory/memcpy.cc +++ b/paddle/fluid/memory/memcpy.cc @@ -15,6 +15,7 @@ limitations under the License. */ #include "paddle/fluid/memory/memcpy.h" #include // for memcpy + #include "paddle/fluid/platform/enforce.h" #include "paddle/fluid/platform/profiler.h" @@ -267,6 +268,8 @@ void Copy( const void* src, size_t num, cudaStream_t stream) { if (UNLIKELY(num == 0)) return; + VLOG(4) << "memory::Copy " << num << " Bytes from " << src_place << " to " + << dst_place << " by thream(" << stream << ")"; if (dst_place == src_place) { platform::SetDeviceId(src_place.device); if (stream) { @@ -293,6 +296,8 @@ template <> void Copy( platform::CPUPlace dst_place, void* dst, platform::CUDAPinnedPlace src_place, const void* src, size_t num) { + VLOG(4) << "memory::Copy " << num << " Bytes from " << src_place << " to " + << dst_place; if (UNLIKELY(num == 0)) return; std::memcpy(dst, src, num); } @@ -301,6 +306,8 @@ template <> void Copy( platform::CUDAPinnedPlace dst_place, void* dst, platform::CPUPlace src_place, const void* src, size_t num) { + VLOG(4) << "memory::Copy " << num << " Bytes from " << src_place << " to " + << dst_place; if (UNLIKELY(num == 0)) return; std::memcpy(dst, src, num); } @@ -309,6 +316,8 @@ template <> void Copy( platform::CUDAPinnedPlace dst_place, void* dst, platform::CUDAPinnedPlace src_place, const void* src, size_t num) { + VLOG(4) << "memory::Copy " << num << " Bytes from " << src_place << " to " + << dst_place; if (UNLIKELY(num == 0)) return; std::memcpy(dst, src, num); } @@ -320,6 +329,8 @@ void Copy( cudaStream_t stream) { if (UNLIKELY(num == 0)) return; platform::SetDeviceId(src_place.device); + VLOG(4) << "memory::Copy " << num << " Bytes from " << src_place << " to " + << dst_place << " by thream(" << stream << ")"; if (stream) { platform::RecordEvent record_event("GpuMemcpyAsync:GPU->CUDAPinned"); platform::GpuMemcpyAsync(dst, src, num, cudaMemcpyDeviceToHost, stream); @@ -337,6 +348,8 @@ void Copy( if (UNLIKELY(num == 0)) return; platform::SetDeviceId(dst_place.device); + VLOG(4) << "memory::Copy " << num << " Bytes from " << src_place << " to " + << dst_place << " by thream(" << stream << ")"; if (stream) { platform::RecordEvent record_event("GpuMemcpyAsync:CUDAPinned->GPU"); platform::GpuMemcpyAsync(dst, src, num, cudaMemcpyHostToDevice, stream); diff --git a/python/paddle/fluid/tests/unittests/test_buffer_shared_memory_reuse_pass.py b/python/paddle/fluid/tests/unittests/test_buffer_shared_memory_reuse_pass.py index 4b1a54d3c66a132a1dfcd843aecc68dbf5b7d441..546124bbee899348e7519ce50f28017a698ba963 100644 --- a/python/paddle/fluid/tests/unittests/test_buffer_shared_memory_reuse_pass.py +++ b/python/paddle/fluid/tests/unittests/test_buffer_shared_memory_reuse_pass.py @@ -34,7 +34,6 @@ class InplaceTestBase(unittest.TestCase): def initParameter(self): self.use_cuda = True self.fuse_all_optimizer_ops = False - self.fuse_all_reduce_ops = False def setUp(self): paddle.enable_static() @@ -94,7 +93,6 @@ class InplaceTestBase(unittest.TestCase): build_strategy.memory_optimize = memory_optimize build_strategy.enable_inplace = enable_inplace build_strategy.fuse_all_optimizer_ops = self.fuse_all_optimizer_ops - build_strategy.fuse_all_reduce_ops = self.fuse_all_reduce_ops compiled_prog = fluid.CompiledProgram(prog).with_data_parallel( loss_name=loss.name, build_strategy=build_strategy, @@ -117,15 +115,7 @@ class InplaceTestBase(unittest.TestCase): fetch_val2, = exe.run(compiled_prog, feed=feed_dict, fetch_list=[fetch_var]) - #NOTE(zhiqiu): Temporally changed from array_equal to allclose. - # The real root is fuse_all_reduce and fuse_all_optimizer_opss may - # result in diff because of the instruction set on the virtual machine. - # And the related unit tests: test_fuse_all_reduce_pass and test_fuse_optimizer_pass use "almostEqual" in their checks. - # There are also some related issues: - # https://github.com/PaddlePaddle/Paddle/issues/21270 - # https://github.com/PaddlePaddle/Paddle/issues/21046 - # https://github.com/PaddlePaddle/Paddle/issues/21045 - self.assertTrue(np.allclose(fetch_val1, fetch_val2)) + self.assertTrue(np.array_equal(fetch_val1, fetch_val2)) def check_multi_card_fetch_var(self): if self.is_invalid_test(): @@ -148,7 +138,6 @@ class InplaceTestBase(unittest.TestCase): build_strategy.memory_optimize = memory_optimize build_strategy.enable_inplace = enable_inplace build_strategy.fuse_all_optimizer_ops = self.fuse_all_optimizer_ops - build_strategy.fuse_all_reduce_ops = self.fuse_all_reduce_ops compiled_program = fluid.CompiledProgram( prog).with_data_parallel( loss_name=loss.name, @@ -170,15 +159,13 @@ class InplaceTestBase(unittest.TestCase): fetch_vals.append(fetch_val) for item in fetch_vals: - # save above - self.assertTrue(np.allclose(fetch_vals[0], item)) + self.assertTrue(np.array_equal(fetch_vals[0], item)) class CUDAInplaceTest(InplaceTestBase): def initParameter(self): self.use_cuda = True self.fuse_all_optimizer_ops = False - self.fuse_all_reduce_ops = False def test_multi_card_fetch_var(self): self.check_multi_card_fetch_var() @@ -191,7 +178,6 @@ class CPUInplaceTest(InplaceTestBase): def initParameter(self): self.use_cuda = False self.fuse_all_optimizer_ops = False - self.fuse_all_reduce_ops = False def test_multi_card_fetch_var(self): self.check_multi_card_fetch_var()