diff --git a/paddle/fluid/operators/distributed/brpc_sendrecvop_utils.cc b/paddle/fluid/operators/distributed/brpc_sendrecvop_utils.cc index 6fed9ba92c16477c79509a30911920e67b9f9bdd..e4604db3a381616c7420f816f0b49a015c925bd4 100644 --- a/paddle/fluid/operators/distributed/brpc_sendrecvop_utils.cc +++ b/paddle/fluid/operators/distributed/brpc_sendrecvop_utils.cc @@ -16,6 +16,7 @@ limitations under the License. */ #include #endif #include +#include #include // NOLINT #include "paddle/fluid/framework/data_type.h" @@ -31,7 +32,12 @@ namespace distributed { class IOBufWriter { public: - static void Append(butil::IOBuf* iobuf, int k, const char* v, int64_t vlen) { + static void Append(const std::string& varname, butil::IOBuf* iobuf, int k, + const char* v, int64_t vlen) { + if (vlen >= std::numeric_limits::max() || vlen < 0) { + LOG(FATAL) << "AppendZeroCopy varname:" << varname << ", vlen:" << vlen; + } + iobuf->append(reinterpret_cast(&k), 4); iobuf->append(reinterpret_cast(&vlen), 8); iobuf->append(v, vlen); @@ -87,6 +93,10 @@ class IOBufWriter { int k, const char* v, int64_t vlen, bool in_cuda_pinned, void (*destroy)(void*), void* user_data) { + if (vlen >= std::numeric_limits::max() || vlen < 0) { + LOG(FATAL) << "AppendZeroCopy varname:" << varname << ", vlen:" << vlen; + } + #ifdef PADDLE_WITH_BRPC_RDMA IOBufWriter::AppendRdmaZeroCopy(varname, iobuf, k, v, vlen, in_cuda_pinned, destroy, user_data); @@ -134,7 +144,7 @@ void SerializeToIOBuf(const std::string& name, framework::Variable* var, request->set_type(::sendrecv::NCCL_ID); const ncclUniqueId& uid = var->Get(); // TODO(gongwb): use append_zero to avoid data copy. - IOBufWriter::Append(iobuf, + IOBufWriter::Append(name, iobuf, sendrecv::VariableMessage::kSerializedFieldNumber, uid.internal, NCCL_UNIQUE_ID_BYTES); return; @@ -149,7 +159,7 @@ void SerializeToIOBuf(const std::string& name, framework::Variable* var, // FIXME(gongwb): it seems that can use zero copy. if (var_is_not_stable) { IOBufWriter::Append( - iobuf, ::sendrecv::VariableMessage::kSerializedFieldNumber, + name, iobuf, ::sendrecv::VariableMessage::kSerializedFieldNumber, static_cast(payload->ptr()), payload->memory_size()); } else { if (platform::is_gpu_place(ctx.GetPlace())) { @@ -171,10 +181,11 @@ void SerializeToIOBuf(const std::string& name, framework::Variable* var, if (var->IsType()) { auto* slr = var->GetMutable(); - size_t rows_memory_size = - slr->rows().size() * framework::SizeOfType(typeid(int64_t)); + PADDLE_ENFORCE(VectorElemName(slr->rows()) == typeid(int64_t).name()); + size_t rows_memory_size = slr->rows().size() * sizeof(int64_t); - IOBufWriter::Append(iobuf, ::sendrecv::VariableMessage::kRowsFieldNumber, + IOBufWriter::Append(name, iobuf, + ::sendrecv::VariableMessage::kRowsFieldNumber, reinterpret_cast(slr->rows().data()), static_cast(rows_memory_size)); } diff --git a/paddle/fluid/operators/distributed/grpc_serde.cc b/paddle/fluid/operators/distributed/grpc_serde.cc index 299dfe35438c35ec922dcdc75bf774a00f111bbb..a9dea9cfd2eeaa7e7ed8f052d2f51f5893c1e2e3 100644 --- a/paddle/fluid/operators/distributed/grpc_serde.cc +++ b/paddle/fluid/operators/distributed/grpc_serde.cc @@ -15,6 +15,7 @@ limitations under the License. */ #ifdef PADDLE_WITH_CUDA #include #endif +#include #include // NOLINT #include "google/protobuf/io/coded_stream.h" @@ -102,6 +103,10 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var, e.WriteVarlengthBeginning(VarMsg::kSerializedFieldNumber, payload->memory_size()); + if (payload->memory_size() >= std::numeric_limits::max()) { + LOG(FATAL) << "AppendZeroCopy varname:" << name + << ", vlen:" << payload->memory_size(); + } // steal reference of tensor data ::grpc::Slice slices[4]; // metadata, tensor, rows meta, rows int num_slices = 2; // only SelectedRows have rows buffer @@ -115,7 +120,10 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var, if (var->IsType()) { auto* slr = var->GetMutable(); ProtoEncodeHelper e2(static_cast(buf), 128); + + PADDLE_ENFORCE(VectorElemName(slr->rows()) == typeid(int64_t).name()); size_t rows_memory_size = slr->rows().size() * sizeof(int64_t); + e2.WriteVarlengthBeginning(VarMsg::kRowsFieldNumber, rows_memory_size); slices[2] = ::grpc::Slice(e2.size()); memcpy(const_cast(slices[2].begin()), e2.data(), e2.size()); diff --git a/paddle/fluid/operators/distributed/sendrecvop_utils.h b/paddle/fluid/operators/distributed/sendrecvop_utils.h index 33eded0e6c0d90dadeeb63594983224d795fa244..6a87178be5daa02444c41a26f6e6c067713dd96f 100644 --- a/paddle/fluid/operators/distributed/sendrecvop_utils.h +++ b/paddle/fluid/operators/distributed/sendrecvop_utils.h @@ -15,6 +15,7 @@ limitations under the License. */ #pragma once #include #include +#include #include #include "paddle/fluid/framework/data_type.h" @@ -23,9 +24,8 @@ limitations under the License. */ #include "paddle/fluid/framework/selected_rows.h" #include "paddle/fluid/framework/tensor_util.h" #include "paddle/fluid/framework/var_type.h" -#include "paddle/fluid/platform/port.h" - #include "paddle/fluid/operators/distributed/send_recv.pb.h" +#include "paddle/fluid/platform/port.h" namespace paddle { namespace operators { @@ -83,6 +83,11 @@ inline framework::proto::VarType::Type ToVarType( } } +template