From dbf9f6f4088c8d0e8ddd87cf8110ca9ce745de8b Mon Sep 17 00:00:00 2001 From: Yu Yang Date: Tue, 23 Oct 2018 10:20:02 +0800 Subject: [PATCH] Fix distribute compile test=develop --- .gitignore | 1 + paddle/fluid/framework/tensor.h | 2 + .../fluid/operators/distributed/grpc_serde.cc | 43 +++++----- .../operators/distributed/sendrecvop_utils.cc | 80 ++++++++----------- .../operators/distributed/sendrecvop_utils.h | 12 +-- 5 files changed, 61 insertions(+), 77 deletions(-) diff --git a/.gitignore b/.gitignore index 90138f996c..3189eb6929 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ paddle/operators/tensor.save python/paddle/v2/fluid/tests/book/image_classification_resnet.inference.model/ python/paddle/v2/fluid/tests/book/image_classification_vgg.inference.model/ python/paddle/v2/fluid/tests/book/label_semantic_roles.inference.model/ +paddle/fluid/operators/distributed/send_recv.proto *.DS_Store *.vs build/ diff --git a/paddle/fluid/framework/tensor.h b/paddle/fluid/framework/tensor.h index 0a4aebefac..f00c20a3f7 100644 --- a/paddle/fluid/framework/tensor.h +++ b/paddle/fluid/framework/tensor.h @@ -155,6 +155,8 @@ class Tensor { void clear() { holder_ = nullptr; } + const std::shared_ptr& Holder() const { return holder_; } + private: /*! holds the memory block if allocated. */ std::shared_ptr holder_; diff --git a/paddle/fluid/operators/distributed/grpc_serde.cc b/paddle/fluid/operators/distributed/grpc_serde.cc index bac098b892..2ec1f8e7ac 100644 --- a/paddle/fluid/operators/distributed/grpc_serde.cc +++ b/paddle/fluid/operators/distributed/grpc_serde.cc @@ -32,17 +32,21 @@ namespace paddle { namespace operators { namespace distributed { +static void SerializeDestroyCallback(void* payload) { + if (payload != nullptr) { + auto* shared_payload = + reinterpret_cast*>(payload); + delete shared_payload; + } +} + void SerializeToByteBuffer(const std::string& name, framework::Variable* var, const platform::DeviceContext& ctx, ::grpc::ByteBuffer* msg, const std::string& out_name) { platform::RecordRPCEvent record_event("serial", &ctx); - // Default DestroyCallback does nothing, When using GPU - // the CPU buffer need to be freed. - DestroyCallback destroy_callback = [](void* backing) {}; VarMsg request; - void* payload = nullptr; - size_t payload_size; + std::shared_ptr* payload = nullptr; request.set_varname(name); // Note: normally the profiler is enabled in 1 trainer, hence only @@ -61,10 +65,12 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var, } if (var->IsType()) { request.set_type(::sendrecv::LOD_TENSOR); - GetTensorPayload(var, ctx, &request, &payload, &payload_size); + payload = new std::shared_ptr( + GetTensorPayload(var, ctx, &request)); } else if (var->IsType()) { request.set_type(::sendrecv::SELECTED_ROWS); - GetSelectedRowsPayload(var, ctx, &request, &payload, &payload_size); + payload = new std::shared_ptr( + GetSelectedRowsPayload(var, ctx, &request)); #ifdef PADDLE_WITH_CUDA } else if (var->IsType()) { request.set_type(::sendrecv::NCCL_ID); @@ -74,17 +80,6 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var, typeid(var->Type()).name()); } - if (platform::is_gpu_place(ctx.GetPlace())) { -#ifdef PADDLE_WITH_CUDA - // GPU data is copied to CPU buffer when sending, - // free the buffer when possible. - destroy_callback = [](void* backing) { - platform::CUDAPinnedPlace cuda_pinned; - memory::Free(cuda_pinned, backing); - }; -#endif - } - std::string header; request.AppendToString(&header); auto buffer = std::unique_ptr(new char[1024]); @@ -108,17 +103,19 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var, return; } #endif + PADDLE_ENFORCE_NOT_NULL(payload); - e.WriteVarlengthBeginning(VarMsg::kSerializedFieldNumber, payload_size); + e.WriteVarlengthBeginning(VarMsg::kSerializedFieldNumber, + payload->get()->size()); // steal reference of tensor data ::grpc::Slice slices[4]; // metadata, tensor, rows meta, rows int num_slices = 2; // only SelectedRows have rows buffer slices[0] = ::grpc::Slice(e.size()); memcpy(const_cast(slices[0].begin()), e.data(), e.size()); - slices[1] = ::grpc::Slice( - grpc_slice_new_with_user_data(payload, payload_size, destroy_callback, - static_cast(payload)), - ::grpc::Slice::STEAL_REF); + slices[1] = ::grpc::Slice(grpc_slice_new_with_user_data( + payload->get()->ptr(), payload->get()->size(), + SerializeDestroyCallback, payload), + ::grpc::Slice::STEAL_REF); if (var->IsType()) { auto* slr = var->GetMutable(); diff --git a/paddle/fluid/operators/distributed/sendrecvop_utils.cc b/paddle/fluid/operators/distributed/sendrecvop_utils.cc index 6a3f8fd544..323780aa8b 100644 --- a/paddle/fluid/operators/distributed/sendrecvop_utils.cc +++ b/paddle/fluid/operators/distributed/sendrecvop_utils.cc @@ -28,16 +28,35 @@ namespace distributed { using VarMsg = sendrecv::VariableMessage; +static std::shared_ptr GetCommunicationAllocationFromTensor( + const platform::DeviceContext& ctx, const framework::Tensor& tensor) { + if (is_gpu_place(ctx.GetPlace())) { #ifdef PADDLE_WITH_CUDA -void* GetVarPayLoad(const std::string varname, int64_t size) { - platform::CUDAPinnedPlace cuda_pinned; - return memory::Alloc(cuda_pinned, size); -} -#endif + PADDLE_ENFORCE(is_gpu_place(tensor.place())); + auto& gpu_dev_ctx = + reinterpret_cast(ctx); + auto copy_size = tensor.numel() * framework::SizeOfType(tensor.type()); + platform::CUDAPinnedPlace cuda_pinned; + auto result = memory::AllocShared( + cuda_pinned, copy_size, memory::allocation::Allocator::kCrossDevice); -void GetTensorPayload(framework::Variable* var, - const platform::DeviceContext& ctx, VarMsg* request, - void** payload, size_t* payload_size) { + memory::Copy(cuda_pinned, result->ptr(), + boost::get(tensor.place()), + reinterpret_cast(tensor.data()), copy_size, + gpu_dev_ctx.stream()); + + ctx.Wait(); + return result; +#else + return nullptr; // THIS SHOULD NOT HAPPENED. +#endif + } else { + return tensor.Holder(); + } +} +std::shared_ptr GetTensorPayload( + framework::Variable* var, const platform::DeviceContext& ctx, + VarMsg* request) { auto tensor = var->Get(); // FIXME(wuyi): data types in send_recv.proto is copied from // framework.proto @@ -56,31 +75,12 @@ void GetTensorPayload(framework::Variable* var, } } } - if (platform::is_gpu_place(ctx.GetPlace())) { -#ifdef PADDLE_WITH_CUDA - PADDLE_ENFORCE(platform::is_gpu_place(tensor.place())); - // platform::CUDAPinnedPlace cuda_pinned; - auto& gpu_dev_ctx = static_cast(ctx); - auto copy_size = tensor.numel() * framework::SizeOfType(tensor.type()); - *payload = GetVarPayLoad(request->varname(), copy_size); - - platform::CUDAPinnedPlace cuda_pinned; - memory::Copy(cuda_pinned, *payload, - boost::get(tensor.place()), - reinterpret_cast(tensor.data()), copy_size, - gpu_dev_ctx.stream()); - - ctx.Wait(); -#endif - } else { - *payload = tensor.data(); - } - *payload_size = tensor.numel() * framework::SizeOfType(tensor.type()); + return GetCommunicationAllocationFromTensor(ctx, tensor); } -void GetSelectedRowsPayload(framework::Variable* var, - const platform::DeviceContext& ctx, VarMsg* request, - void** payload, size_t* payload_size) { +std::shared_ptr GetSelectedRowsPayload( + framework::Variable* var, const platform::DeviceContext& ctx, + VarMsg* request) { auto* slr = var->GetMutable(); request->set_data_type( static_cast(framework::ToDataType(slr->value().type()))); @@ -92,23 +92,7 @@ void GetSelectedRowsPayload(framework::Variable* var, } auto* tensor = slr->mutable_value(); - if (platform::is_gpu_place(ctx.GetPlace())) { -#ifdef PADDLE_WITH_CUDA - auto& gpu_dev_ctx = static_cast(ctx); - auto copy_size = tensor->numel() * framework::SizeOfType(tensor->type()); - *payload = GetVarPayLoad(request->varname(), copy_size); - - platform::CUDAPinnedPlace cuda_pinned; - memory::Copy(cuda_pinned, *payload, - boost::get(tensor->place()), - reinterpret_cast(tensor->data()), copy_size, - gpu_dev_ctx.stream()); - ctx.Wait(); -#endif - } else { - *payload = slr->mutable_value()->data(); - } - *payload_size = tensor->numel() * framework::SizeOfType(tensor->type()); + return GetCommunicationAllocationFromTensor(ctx, *tensor); } } // namespace distributed diff --git a/paddle/fluid/operators/distributed/sendrecvop_utils.h b/paddle/fluid/operators/distributed/sendrecvop_utils.h index 4d08d3c77a..a6ea034520 100644 --- a/paddle/fluid/operators/distributed/sendrecvop_utils.h +++ b/paddle/fluid/operators/distributed/sendrecvop_utils.h @@ -33,13 +33,13 @@ namespace distributed { using VarMsg = sendrecv::VariableMessage; -void GetTensorPayload(framework::Variable* var, - const platform::DeviceContext& ctx, VarMsg* request, - void** payload, size_t* payload_size); +std::shared_ptr GetTensorPayload( + framework::Variable* var, const platform::DeviceContext& ctx, + VarMsg* request); -void GetSelectedRowsPayload(framework::Variable* var, - const platform::DeviceContext& ctx, VarMsg* request, - void** payload, size_t* payload_size); +std::shared_ptr GetSelectedRowsPayload( + framework::Variable* var, const platform::DeviceContext& ctx, + VarMsg* request); inline std::type_index ToTypeIndex(sendrecv::VariableMessage::Type type) { switch (type) { -- GitLab