From 20fb01fb00ae21c2d63ceb4539453262d6436648 Mon Sep 17 00:00:00 2001 From: MRXLT Date: Wed, 30 Sep 2020 10:59:15 +0800 Subject: [PATCH] fix distributed error info (#27206) * fix distributed error info * bug fix; notest * error info refine * update error info * update error info * update error info * bug fix * bug fix * bug fix * bug fix --- .../operators/collective/c_allgather_op.cc | 7 +-- .../operators/collective/c_allgather_op.cu.cc | 8 ++- .../operators/collective/c_allreduce_op.h | 6 ++- .../operators/collective/c_broadcast_op.cu.cc | 3 +- .../collective/c_comm_init_all_op.cc | 6 ++- .../operators/collective/c_comm_init_op.cc | 11 ++-- .../operators/collective/c_gen_nccl_id_op.cc | 7 ++- .../collective/c_reducescatter_op.cc | 11 ++-- .../collective/c_reducescatter_op.cu.cc | 3 +- .../operators/collective/c_reducescatter_op.h | 3 +- .../collective/c_sync_calc_stream_op.cc | 8 +-- .../collective/c_sync_comm_stream_op.cc | 6 ++- .../async_sparse_param_update_recorder.h | 6 ++- .../distributed/brpc/brpc_rdma_pool.cc | 10 ++-- .../distributed/brpc/brpc_sendrecvop_utils.cc | 19 +++++-- .../operators/distributed/brpc/brpc_server.cc | 52 +++++++++++-------- .../brpc/brpc_variable_response.cc | 6 ++- .../operators/distributed/collective_server.h | 4 +- .../operators/distributed/communicator.h | 24 ++++++--- .../operators/distributed/grpc/grpc_client.cc | 11 ++-- .../operators/distributed/grpc/grpc_serde.cc | 19 +++++-- .../operators/distributed/grpc/grpc_server.cc | 16 ++++-- .../operators/distributed/grpc/grpc_service.h | 6 ++- .../grpc/grpc_variable_response.cc | 3 +- .../distributed/heart_beat_monitor.cc | 4 +- .../distributed/heart_beat_monitor.h | 3 +- .../distributed/parameter_prefetch.cc | 10 +++- .../operators/distributed/parameter_send.cc | 10 ++-- .../distributed/request_handler_impl.cc | 18 +++++-- .../fluid/operators/distributed/rpc_server.cc | 6 +-- .../operators/distributed/rpc_server_test.cc | 4 +- .../operators/distributed/sendrecvop_utils.cc | 7 ++- .../operators/distributed/sendrecvop_utils.h | 3 +- .../distributed/variable_response.cc | 38 ++++++++++---- 34 files changed, 239 insertions(+), 119 deletions(-) diff --git a/paddle/fluid/operators/collective/c_allgather_op.cc b/paddle/fluid/operators/collective/c_allgather_op.cc index 5604163541b..4111a19c5eb 100644 --- a/paddle/fluid/operators/collective/c_allgather_op.cc +++ b/paddle/fluid/operators/collective/c_allgather_op.cc @@ -23,10 +23,11 @@ class CAllGatherOp : public framework::OperatorWithKernel { public: using framework::OperatorWithKernel::OperatorWithKernel; void InferShape(framework::InferShapeContext *ctx) const override { - PADDLE_ENFORCE(ctx->HasInput("X"), "Input(X) should not be null"); - PADDLE_ENFORCE(ctx->HasOutput("Out"), "Output(Out) should not be null."); + OP_INOUT_CHECK(ctx->HasInput("X"), "Input", "X", "AllGather"); + OP_INOUT_CHECK(ctx->HasOutput("Out"), "Input", "Out", "AllGather"); int nranks = ctx->Attrs().Get("nranks"); - PADDLE_ENFORCE_GE(nranks, 2, "nranks should be >=2"); + PADDLE_ENFORCE_GE(nranks, 2, platform::errors::InvalidArgument( + "The value of nranks should be >=2.")); framework::DDim dim = ctx->GetInputDim("X"); dim[0] = dim[0] * nranks; if (dim[0] < 0) dim[0] = -1; diff --git a/paddle/fluid/operators/collective/c_allgather_op.cu.cc b/paddle/fluid/operators/collective/c_allgather_op.cu.cc index b20e011f5cb..20cd4dcfdf8 100644 --- a/paddle/fluid/operators/collective/c_allgather_op.cu.cc +++ b/paddle/fluid/operators/collective/c_allgather_op.cu.cc @@ -37,7 +37,10 @@ class CAllGatherOpCUDAKernel : public framework::OpKernel { int rid = ctx.Attr("ring_id"); auto place = ctx.GetPlace(); auto comm = platform::NCCLCommContext::Instance().Get(rid, place); - PADDLE_ENFORCE_EQ(nranks, comm->nranks()); + PADDLE_ENFORCE_EQ( + nranks, comm->nranks(), + platform::errors::InvalidArgument("nranks: %s should equal to %s", + nranks, comm->nranks())); framework::DDim out_dims = in->dims(); out_dims[0] *= nranks; @@ -59,7 +62,8 @@ class CAllGatherOpCUDAKernel : public framework::OpKernel { send_buff, recv_buff, send_numel, static_cast(dtype), comm->comm(), stream)); #else - PADDLE_THROW("PaddlePaddle should compile with GPU."); + PADDLE_THROW(platform::errors::PreconditionNotMet( + "PaddlePaddle should compile with GPU.")); #endif } }; diff --git a/paddle/fluid/operators/collective/c_allreduce_op.h b/paddle/fluid/operators/collective/c_allreduce_op.h index be518b3bf0a..24f7f427cf5 100644 --- a/paddle/fluid/operators/collective/c_allreduce_op.h +++ b/paddle/fluid/operators/collective/c_allreduce_op.h @@ -150,13 +150,15 @@ class CAllReduceOpCUDAKernel : public framework::OpKernel { break; default: - PADDLE_THROW("Invalid reduce type: %d", red_type); + PADDLE_THROW(platform::errors::InvalidArgument( + "Invalid reduce type: %d", red_type)); } PADDLE_ENFORCE_CUDA_SUCCESS(platform::dynload::ncclAllReduce( sendbuff, recvbuff, numel, dtype, nccl_red_type, comm->comm(), stream)); #else - PADDLE_THROW("PaddlePaddle should compile with GPU."); + PADDLE_THROW(platform::errors::PreconditionNotMet( + "PaddlePaddle should compile with GPU.")); #endif } }; diff --git a/paddle/fluid/operators/collective/c_broadcast_op.cu.cc b/paddle/fluid/operators/collective/c_broadcast_op.cu.cc index f95d467345b..b7fc785126b 100644 --- a/paddle/fluid/operators/collective/c_broadcast_op.cu.cc +++ b/paddle/fluid/operators/collective/c_broadcast_op.cu.cc @@ -69,7 +69,8 @@ class CBroadcastOpCUDAKernel : public framework::OpKernel { out->Resize(x->dims()); out->set_lod(x->lod()); #else - PADDLE_THROW("PaddlePaddle should compile with GPU."); + PADDLE_THROW(platform::errors::PreconditionNotMet( + "PaddlePaddle should compile with GPU.")); #endif } }; diff --git a/paddle/fluid/operators/collective/c_comm_init_all_op.cc b/paddle/fluid/operators/collective/c_comm_init_all_op.cc index f72d2ac4d33..c4e8f871b04 100644 --- a/paddle/fluid/operators/collective/c_comm_init_all_op.cc +++ b/paddle/fluid/operators/collective/c_comm_init_all_op.cc @@ -50,7 +50,8 @@ class CCommInitAllOp : public framework::OperatorBase { void RunImpl(const framework::Scope& scope, const platform::Place& place) const override { PADDLE_ENFORCE_EQ(is_gpu_place(place), true, - "CCommInitAllOp can run on gpu place only."); + platform::errors::PreconditionNotMet( + "CCommInitAllOp can run on gpu place only")); #if defined(PADDLE_WITH_NCCL) std::vector devices = Attr>("devices"); @@ -62,7 +63,8 @@ class CCommInitAllOp : public framework::OperatorBase { platform::NCCLCommContext::Instance().CreateAllNCCLComms(devices, rid); #else - PADDLE_THROW("PaddlePaddle should compile with GPU."); + PADDLE_THROW(platform::errors::PreconditionNotMet( + "PaddlePaddle should compile with GPU.")); #endif } }; diff --git a/paddle/fluid/operators/collective/c_comm_init_op.cc b/paddle/fluid/operators/collective/c_comm_init_op.cc index ccad96320a7..b5fed44cd1c 100644 --- a/paddle/fluid/operators/collective/c_comm_init_op.cc +++ b/paddle/fluid/operators/collective/c_comm_init_op.cc @@ -39,11 +39,13 @@ class CCommInitOp : public framework::OperatorBase { void RunImpl(const framework::Scope& scope, const platform::Place& place) const override { - PADDLE_ENFORCE(is_gpu_place(place), - "CCommInitOp can run on gpu place only."); + PADDLE_ENFORCE_EQ(is_gpu_place(place), true, + platform::errors::PreconditionNotMet( + "CCommInitOp can run on gpu place only.")); auto var = scope.FindVar(Input("X")); - PADDLE_ENFORCE_NOT_NULL(var); + PADDLE_ENFORCE_NOT_NULL( + var, platform::errors::InvalidArgument("Input con not be empty.")); #if defined(PADDLE_WITH_NCCL) ncclUniqueId* nccl_id = var->GetMutable(); @@ -57,7 +59,8 @@ class CCommInitOp : public framework::OperatorBase { platform::NCCLCommContext::Instance().CreateNCCLComm( nccl_id, nranks, rank_id, device_id, rid); #else - PADDLE_THROW("PaddlePaddle should compile with GPU."); + PADDLE_THROW(platform::errors::PreconditionNotMet( + "PaddlePaddle should compile with GPU.")); #endif } }; diff --git a/paddle/fluid/operators/collective/c_gen_nccl_id_op.cc b/paddle/fluid/operators/collective/c_gen_nccl_id_op.cc index 2822129b198..ed478b1f0a0 100644 --- a/paddle/fluid/operators/collective/c_gen_nccl_id_op.cc +++ b/paddle/fluid/operators/collective/c_gen_nccl_id_op.cc @@ -61,9 +61,12 @@ class CGenNCCLIdOp : public framework::OperatorBase { const platform::DeviceContext& dev_ctx) const { std::string var_name = Output("Out"); auto var = scope->FindVar(var_name); - PADDLE_ENFORCE_NOT_NULL(var); + PADDLE_ENFORCE_NOT_NULL( + var, platform::errors::InvalidArgument("Output can not be Null")); auto id = var->GetMutable(); - PADDLE_ENFORCE(platform::dynload::ncclGetUniqueId(id)); + PADDLE_ENFORCE_EQ(platform::dynload::ncclGetUniqueId(id), 0, + platform::errors::InvalidArgument( + "ncclGetUniqueId failed with id %s", id)); std::vector endpoint_list = Attr>("other_endpoints"); diff --git a/paddle/fluid/operators/collective/c_reducescatter_op.cc b/paddle/fluid/operators/collective/c_reducescatter_op.cc index 9efb76c1a6a..ada1fd2b127 100644 --- a/paddle/fluid/operators/collective/c_reducescatter_op.cc +++ b/paddle/fluid/operators/collective/c_reducescatter_op.cc @@ -24,14 +24,15 @@ class CReduceScatterOp : public framework::OperatorWithKernel { using framework::OperatorWithKernel::OperatorWithKernel; void InferShape(framework::InferShapeContext *ctx) const override { - PADDLE_ENFORCE(ctx->HasInput("X"), "Input(X) should not be null"); - PADDLE_ENFORCE(ctx->HasOutput("Out"), "Output(Out) should not be null."); + OP_INOUT_CHECK(ctx->HasInput("X"), "Input", "X", "ReduceScatter"); + OP_INOUT_CHECK(ctx->HasOutput("Out"), "Output", "X", "ReduceScatter"); int nranks = ctx->Attrs().Get("nranks"); framework::DDim dim = ctx->GetInputDim("X"); if (dim[0] > 0 || dim[0] < -1) { - PADDLE_ENFORCE(dim[0] % nranks == 0, - "dim[0] (%d) is not divisible by nranks(%d)", dim[0], - nranks); + PADDLE_ENFORCE_EQ( + dim[0] % nranks, 0, + platform::errors::InvalidArgument( + "dim[0] (%d) is not divisible by nranks(%d)", dim[0], nranks)); dim[0] /= nranks; } ctx->SetOutputDim("Out", dim); diff --git a/paddle/fluid/operators/collective/c_reducescatter_op.cu.cc b/paddle/fluid/operators/collective/c_reducescatter_op.cu.cc index 0d94707513b..af563d022ba 100644 --- a/paddle/fluid/operators/collective/c_reducescatter_op.cu.cc +++ b/paddle/fluid/operators/collective/c_reducescatter_op.cu.cc @@ -61,7 +61,8 @@ class CReduceScatterOpCUDAKernel : public framework::OpKernel { send_buff, recv_buff, recv_numel, static_cast(dtype), ncclSum, comm->comm(), stream)); #else - PADDLE_THROW("PaddlePaddle should compile with GPU."); + PADDLE_THROW(platform::errors::PreconditionNotMet( + "PaddlePaddle should compile with GPU.")); #endif } }; diff --git a/paddle/fluid/operators/collective/c_reducescatter_op.h b/paddle/fluid/operators/collective/c_reducescatter_op.h index ee308080677..366d8a3747c 100644 --- a/paddle/fluid/operators/collective/c_reducescatter_op.h +++ b/paddle/fluid/operators/collective/c_reducescatter_op.h @@ -30,7 +30,8 @@ template class CReduceScatterOpCPUKernel : public framework::OpKernel { public: void Compute(const framework::ExecutionContext& ctx) const override { - PADDLE_THROW("Unimplemented cpu kernel for CReduceScatterOp."); + PADDLE_THROW(platform::errors::Unimplemented( + "Unimplemented cpu kernel for CReduceScatterOp.")); } }; diff --git a/paddle/fluid/operators/collective/c_sync_calc_stream_op.cc b/paddle/fluid/operators/collective/c_sync_calc_stream_op.cc index 00f366e6212..bdffe96acd7 100644 --- a/paddle/fluid/operators/collective/c_sync_calc_stream_op.cc +++ b/paddle/fluid/operators/collective/c_sync_calc_stream_op.cc @@ -34,14 +34,16 @@ class CSyncCalcStreamOp : public framework::OperatorBase { void RunImpl(const framework::Scope& scope, const platform::Place& place) const override { - PADDLE_ENFORCE(is_gpu_place(place), - "Sync stream op can run on gpu place only for now."); + PADDLE_ENFORCE_EQ(is_gpu_place(place), true, + platform::errors::PreconditionNotMet( + "Sync stream op can run on gpu place only for now.")); #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) auto dev_ctx = static_cast( platform::DeviceContextPool::Instance().Get(place)); PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamSynchronize(dev_ctx->stream())); #else - PADDLE_THROW("PaddlePaddle should compile with GPU."); + PADDLE_THROW(platform::errors::PreconditionNotMet( + "PaddlePaddle should compile with GPU.")); #endif } }; diff --git a/paddle/fluid/operators/collective/c_sync_comm_stream_op.cc b/paddle/fluid/operators/collective/c_sync_comm_stream_op.cc index 7e5311a2103..ad9884565b6 100644 --- a/paddle/fluid/operators/collective/c_sync_comm_stream_op.cc +++ b/paddle/fluid/operators/collective/c_sync_comm_stream_op.cc @@ -38,7 +38,8 @@ class CSyncCommStreamOp : public framework::OperatorBase { void RunImpl(const framework::Scope& scope, const platform::Place& place) const override { PADDLE_ENFORCE_EQ(is_gpu_place(place), true, - "Sync stream op can run on gpu place only for now."); + platform::errors::PreconditionNotMet( + "Sync stream op can run on gpu place only for now.")); #if defined(PADDLE_WITH_NCCL) int ring_id = Attr("ring_id"); @@ -46,7 +47,8 @@ class CSyncCommStreamOp : public framework::OperatorBase { platform::NCCLCommContext::Instance().Get(ring_id, place)->stream(); PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamSynchronize(stream)); #else - PADDLE_THROW("PaddlePaddle should compile with GPU."); + PADDLE_THROW(platform::errors::PreconditionNotMet( + "PaddlePaddle should compile with GPU.")); #endif } }; diff --git a/paddle/fluid/operators/distributed/async_sparse_param_update_recorder.h b/paddle/fluid/operators/distributed/async_sparse_param_update_recorder.h index cef6590ae21..28a5f2ad6c7 100644 --- a/paddle/fluid/operators/distributed/async_sparse_param_update_recorder.h +++ b/paddle/fluid/operators/distributed/async_sparse_param_update_recorder.h @@ -130,7 +130,11 @@ class AsyncSparseParamUpdateRecorder { std::vector* result) { VLOG(3) << "GetAndClear param: " << param_name << " for trainer: " << trainer_id; - PADDLE_ENFORCE_LT(trainer_id, trainer_num_); + PADDLE_ENFORCE_LT( + trainer_id, trainer_num_, + platform::errors::InvalidArgument( + "The value of trainer_id: %s should less than trainer_num: %s.", + trainer_id, trainer_num_)); param_to_updated_rows_.at(param_name)[trainer_id] ->GetAndClear(result) .wait(); diff --git a/paddle/fluid/operators/distributed/brpc/brpc_rdma_pool.cc b/paddle/fluid/operators/distributed/brpc/brpc_rdma_pool.cc index 85ef6cf2f0f..94f0b9919ac 100644 --- a/paddle/fluid/operators/distributed/brpc/brpc_rdma_pool.cc +++ b/paddle/fluid/operators/distributed/brpc/brpc_rdma_pool.cc @@ -39,8 +39,8 @@ void* RdmaMemPool::Find(const std::string& varname, int64_t size) { auto info = it->second; if (info.data_size != size) { pthread_rwlock_unlock(&access_); - PADDLE_ENFORCE(false, "var:%s size:%ld != %ld", varname, size, - info.data_size); + PADDLE_THROW(platform::errors::InvalidArgument( + "var:%s size:%ld != %ld", varname, size, info.data_size)); return nullptr; } @@ -52,9 +52,9 @@ void RdmaMemPool::Register(const std::string& varname, void* data, int64_t data_size) { void* old = Find(varname, data_size); if (old != nullptr) { - if (data != old) { - PADDLE_ENFORCE(false, "var:%s data:%ld != %ld", varname, data, old); - } + PADDLE_ENFORCE_EQ( + data, old, platform::errors::InvalidArgument("var:%s data:%ld != %ld", + varname, data, old)); VLOG(7) << "Find on rdma:" << varname << " data:" << data << " data_size:" << data_size; return; diff --git a/paddle/fluid/operators/distributed/brpc/brpc_sendrecvop_utils.cc b/paddle/fluid/operators/distributed/brpc/brpc_sendrecvop_utils.cc index d2341c2dce2..d66281ac7c7 100644 --- a/paddle/fluid/operators/distributed/brpc/brpc_sendrecvop_utils.cc +++ b/paddle/fluid/operators/distributed/brpc/brpc_sendrecvop_utils.cc @@ -155,11 +155,15 @@ void SerializeToIOBuf(const std::string& name, framework::Variable* var, return; #endif } else { - PADDLE_THROW("Serialize does not support type: %s", - typeid(var->Type()).name()); + PADDLE_THROW(platform::errors::InvalidArgument( + "Serialize does not support type: %s", typeid(var->Type()).name())); } - PADDLE_ENFORCE_NOT_NULL(payload); + PADDLE_ENFORCE_NOT_NULL( + payload, + platform::errors::InvalidArgument( + "Not support type: %s, need to be LOD_TENSOR or SELECTED_ROWS.", + var->Type())); // FIXME(gongwb): it seems that can use zero copy. if (var_is_not_stable) { @@ -186,7 +190,10 @@ void SerializeToIOBuf(const std::string& name, framework::Variable* var, if (var->IsType()) { auto* slr = var->GetMutable(); - PADDLE_ENFORCE(VectorElemName(slr->rows()) == typeid(int64_t).name()); + PADDLE_ENFORCE_EQ(VectorElemName(slr->rows()), typeid(int64_t).name(), + platform::errors::InvalidArgument( + "Got wrong type: %s, expect type: int64_t", + VectorElemName(slr->rows()))); size_t rows_memory_size = slr->rows().size() * sizeof(int64_t); IOBufWriter::Append(name, iobuf, @@ -202,7 +209,9 @@ void DeserializeFromIOBuf(const ::sendrecv::VariableMessage& meta, const framework::Scope* scope, framework::Variable** var, int* trainer_id) { operators::distributed::BRPCVariableResponse resp(scope, &ctx); - PADDLE_ENFORCE(resp.Parse(iobuf, meta) == 0, "parse iobuf to tensor error!"); + PADDLE_ENFORCE_EQ( + resp.Parse(iobuf, meta), 0, + platform::errors::InvalidArgument("parse iobuf to tensor error!")); *var = resp.GetVar(); *trainer_id = resp.GetTrainerId(); } diff --git a/paddle/fluid/operators/distributed/brpc/brpc_server.cc b/paddle/fluid/operators/distributed/brpc/brpc_server.cc index 64720ded7d2..5ca26f006bf 100644 --- a/paddle/fluid/operators/distributed/brpc/brpc_server.cc +++ b/paddle/fluid/operators/distributed/brpc/brpc_server.cc @@ -90,8 +90,9 @@ class BRPCServiceImpl : public SendRecvService { void _SendVariable(google::protobuf::RpcController* cntl_butil, const VariableMessage* request, VoidMessage* response, google::protobuf::Closure* done) { - PADDLE_ENFORCE(request_send_h_ != nullptr, - "RequestSend handler should be registed first!"); + PADDLE_ENFORCE_NOT_NULL( + request_send_h_, platform::errors::PreconditionNotMet( + "RequestSend handler should be registed first!")); brpc::ClosureGuard done_guard(done); brpc::Controller* cntl = static_cast(cntl_butil); @@ -103,8 +104,9 @@ class BRPCServiceImpl : public SendRecvService { distributed::BRPCVariableResponse resp(request_send_h_->scope(), request_send_h_->dev_ctx(), request_send_h_->distributed_mode()); - PADDLE_ENFORCE(resp.Parse(cntl->request_attachment(), *request) == 0, - "parse iobuf to tensor error!"); + PADDLE_ENFORCE_EQ( + resp.Parse(cntl->request_attachment(), *request), 0, + platform::errors::InvalidArgument("parse iobuf to tensor error!")); auto scope = resp.GetMutableLocalScope(); auto invar = resp.GetVar(); @@ -132,8 +134,9 @@ class BRPCServiceImpl : public SendRecvService { void _GetVariable(google::protobuf::RpcController* cntl_butil, const VariableMessage* request, VariableMessage* response, google::protobuf::Closure* done) { - PADDLE_ENFORCE(request_get_h_ != nullptr, - "RequestGet handler should be registed first!"); + PADDLE_ENFORCE_NOT_NULL( + request_get_h_, platform::errors::PreconditionNotMet( + "RequestGet handler should be registed first!")); brpc::ClosureGuard done_guard(done); brpc::Controller* cntl = static_cast(cntl_butil); @@ -164,8 +167,10 @@ class BRPCServiceImpl : public SendRecvService { const VariableMessage* request, VariableMessage* response, google::protobuf::Closure* done) { - PADDLE_ENFORCE(request_getnobarrier_h_ != nullptr, - "RequestGetNoBarrier handler should be registed first!"); + PADDLE_ENFORCE_NOT_NULL( + request_getnobarrier_h_, + platform::errors::PreconditionNotMet( + "RequestGetNoBarrier handler should be registed first!")); brpc::ClosureGuard done_guard(done); brpc::Controller* cntl = static_cast(cntl_butil); @@ -204,8 +209,9 @@ class BRPCServiceImpl : public SendRecvService { const VariableMessage* request, VariableMessage* response, google::protobuf::Closure* done) { - PADDLE_ENFORCE(request_prefetch_h_ != nullptr, - "kRequestPrefetch handler should be registed first!"); + PADDLE_ENFORCE_NOT_NULL(request_prefetch_h_, + platform::errors::PreconditionNotMet( + "kRequestPrefetch handler should be registed first!"); brpc::ClosureGuard done_guard(done); brpc::Controller* cntl = static_cast(cntl_butil); @@ -221,8 +227,9 @@ class BRPCServiceImpl : public SendRecvService { distributed::BRPCVariableResponse resp( request_prefetch_h_->scope(), request_prefetch_h_->dev_ctx(), true); - PADDLE_ENFORCE(resp.Parse(cntl->request_attachment(), *request) == 0, - "parse iobuf to tensor error!"); + PADDLE_ENFORCE_EQ(resp.Parse(cntl->request_attachment(), *request), 0, + platform::errors::InvalidArgument( + "parse iobuf to tensor error!")); auto scope = resp.GetMutableLocalScope(); auto invar = scope->FindVar(in_var_name); @@ -248,9 +255,10 @@ class BRPCServiceImpl : public SendRecvService { void _CheckpointNotify(google::protobuf::RpcController* cntl_butil, const VariableMessage* request, VoidMessage* response, google::protobuf::Closure* done) { - PADDLE_ENFORCE( - request_checkpoint_h_ != nullptr, - "kRequestCheckpointNotify handler should be registed first!"); + PADDLE_ENFORCE_NOT_NULL( + request_checkpoint_h_, + platform::errors::PreconditionNotMet( + "kRequestCheckpointNotify handler should be registed first!")); brpc::ClosureGuard done_guard(done); brpc::Controller* cntl = static_cast(cntl_butil); @@ -277,9 +285,10 @@ class BRPCServiceImpl : public SendRecvService { const VariableMessage* request, VariableMessage* response, google::protobuf::Closure* done) override { - PADDLE_ENFORCE( - request_get_monomer_handler_h_ != nullptr, - "kRequestGetMonomerVariable handler should be registed first!"); + PADDLE_ENFORCE_NOT_NULL( + request_get_monomer_handler_h_, + platform::errors::PreconditionNotMet( + "kRequestGetMonomerVariable handler should be registed first!")); brpc::ClosureGuard done_guard(done); brpc::Controller* cntl = static_cast(cntl_butil); @@ -309,9 +318,10 @@ class BRPCServiceImpl : public SendRecvService { void GetMonomerBarrier(google::protobuf::RpcController* cntl_butil, const VariableMessage* request, VoidMessage* response, google::protobuf::Closure* done) override { - PADDLE_ENFORCE( - request_get_monomer_barrier_handler_h_ != nullptr, - "RequestGetMonomerBarrier handler should be registed first!"); + PADDLE_ENFORCE_NOT_NULL( + request_get_monomer_barrier_handler_h_, + platform::errors::PreconditionNotMet( + "RequestGetMonomerBarrier handler should be registed first!")); brpc::ClosureGuard done_guard(done); brpc::Controller* cntl = static_cast(cntl_butil); diff --git a/paddle/fluid/operators/distributed/brpc/brpc_variable_response.cc b/paddle/fluid/operators/distributed/brpc/brpc_variable_response.cc index eb78917ad2d..49521e8a770 100644 --- a/paddle/fluid/operators/distributed/brpc/brpc_variable_response.cc +++ b/paddle/fluid/operators/distributed/brpc/brpc_variable_response.cc @@ -52,7 +52,8 @@ int BRPCVariableResponse::Parse(Source* source) { PADDLE_ENFORCE((meta_.type() == sendrecv::SELECTED_ROWS || meta_.type() == sendrecv::LOD_TENSOR) && meta_.varname() != "", - "meta info should be got first!"); + platform::errors::PreconditionNotMet( + "meta info should be got first!")); if (!CopySelectRowsData(&input, *dev_ctx_, num_bytes)) { return ret; @@ -60,7 +61,8 @@ int BRPCVariableResponse::Parse(Source* source) { break; } default: { - PADDLE_ENFORCE(false, "not surpported %u fieldnumber", field); + PADDLE_THROW(platform::errors::Unavailable( + "not surpported %u fieldnumber", field)); return ret; } } diff --git a/paddle/fluid/operators/distributed/collective_server.h b/paddle/fluid/operators/distributed/collective_server.h index 1015007ba0c..49649232860 100644 --- a/paddle/fluid/operators/distributed/collective_server.h +++ b/paddle/fluid/operators/distributed/collective_server.h @@ -21,7 +21,6 @@ limitations under the License. */ #include // NOLINT #include #include - #include "gflags/gflags.h" #include "paddle/fluid/operators/distributed/distributed.h" #include "paddle/fluid/operators/distributed/request_handler.h" @@ -51,7 +50,8 @@ class GetMonomerHandler final : public RequestHandler { VLOG(50) << "GetMonomerHandler recv " << var_name; *outvar = scope->FindVar(var_name); - PADDLE_ENFORCE(outvar != nullptr, "%s not found", var_name); + PADDLE_ENFORCE_NOT_NULL( + outvar, platform::errors::NotFound("var: %s is not found.", var_name)); return true; } diff --git a/paddle/fluid/operators/distributed/communicator.h b/paddle/fluid/operators/distributed/communicator.h index 4e3dd1d07bb..7c4910421f8 100644 --- a/paddle/fluid/operators/distributed/communicator.h +++ b/paddle/fluid/operators/distributed/communicator.h @@ -58,14 +58,19 @@ template class BlockingQueue { public: explicit BlockingQueue(size_t capacity) : capacity_(capacity) { - PADDLE_ENFORCE_GT(capacity_, 0, "The capacity must be greater than 0."); + PADDLE_ENFORCE_GT(capacity_, 0, + platform::errors::InvalidArgument( + "The capacity must be greater than 0.")); } bool Push(const T &elem) { { std::unique_lock lock(mutex_); cv_.wait(lock, [&] { return queue_.size() < capacity_; }); - PADDLE_ENFORCE_LT(queue_.size(), capacity_); + PADDLE_ENFORCE_LT( + queue_.size(), capacity_, + platform::errors::OutOfRange("The queue size: %s out of capacity:%s", + queue_.size(), capacity_)); queue_.push_back(elem); } cv_.notify_one(); @@ -76,7 +81,10 @@ class BlockingQueue { { std::unique_lock lock(mutex_); cv_.wait(lock, [&] { return queue_.size() < capacity_; }); - PADDLE_ENFORCE_LT(queue_.size(), capacity_); + PADDLE_ENFORCE_LT( + queue_.size(), capacity_, + platform::errors::OutOfRange("The queue size: %s out of capacity:%s", + queue_.size(), capacity_)); queue_.emplace_back(std::move(elem)); } cv_.notify_one(); @@ -118,7 +126,8 @@ template inline void MergeVars(const std::string &var_name, const std::vector> &vars, Scope *scope, bool merge_add = true) { - PADDLE_ENFORCE(!vars.empty(), "should have value to merge!"); + PADDLE_ENFORCE_NE(vars.empty(), true, platform::errors::InvalidArgument( + "vector vars are empty.")); auto cpu_place = platform::CPUPlace(); auto &var0 = vars[0]; auto *out_var = scope->Var(var_name); @@ -132,7 +141,9 @@ inline void MergeVars(const std::string &var_name, // check the input dims for (auto &var : vars) { auto &var_t = var->Get(); - PADDLE_ENFORCE_EQ(var_t.dims(), dims, "should have the same dims"); + PADDLE_ENFORCE_EQ( + var_t.dims(), dims, + platform::errors::InvalidArgument("vars should have the same dims")); } // set output tensor to 0. @@ -173,7 +184,8 @@ inline void MergeVars(const std::string &var_name, VLOG(3) << "merge " << var_name << " SelectedRows height: " << slr0.height() << " dims: " << slr0.value().dims() << "; merge add: " << merge_add; } else { - PADDLE_THROW("unsupported var type!"); + PADDLE_THROW(platform::errors::InvalidArgument("unsupported var type: %s!", + var0->Type())); } } diff --git a/paddle/fluid/operators/distributed/grpc/grpc_client.cc b/paddle/fluid/operators/distributed/grpc/grpc_client.cc index 0983b4a406e..9fd828bfa55 100644 --- a/paddle/fluid/operators/distributed/grpc/grpc_client.cc +++ b/paddle/fluid/operators/distributed/grpc/grpc_client.cc @@ -32,8 +32,9 @@ namespace distributed { void GRPCClient::InitImpl() { // start the client process thread // TODO(wuyi): can make this in a threadpool - PADDLE_ENFORCE(client_thread_ == nullptr, - "please not re init proceed thread"); + PADDLE_ENFORCE_EQ(client_thread_ == nullptr, true, + platform::errors::PreconditionNotMet( + "please not re init proceed thread")); client_thread_.reset(new std::thread(std::bind(&GRPCClient::Proceed, this))); } @@ -44,7 +45,8 @@ void GRPCClient::SendComplete() { VLOG(3) << "send complete message to " << it.first; this->AsyncSendComplete(it.first); } - PADDLE_ENFORCE(this->Wait(), "internal grpc error"); + PADDLE_ENFORCE_EQ(this->Wait(), true, platform::errors::PreconditionNotMet( + "internal grpc service error.")); completed_ = true; } } @@ -590,7 +592,8 @@ void GRPCClient::Proceed() { while (!stopped_ && cq_.Next(&tag, &ok)) { BaseProcessor* c = static_cast(tag); GPR_ASSERT(ok); - PADDLE_ENFORCE(c); + PADDLE_ENFORCE_NOT_NULL( + c, platform::errors::PreconditionNotMet("Make BaseProcessor failed.")); if (c->status_.ok()) { VLOG(3) << c->GetVarHandlePtr()->String() << " process"; diff --git a/paddle/fluid/operators/distributed/grpc/grpc_serde.cc b/paddle/fluid/operators/distributed/grpc/grpc_serde.cc index 0372846ce0d..13343ed4a78 100644 --- a/paddle/fluid/operators/distributed/grpc/grpc_serde.cc +++ b/paddle/fluid/operators/distributed/grpc/grpc_serde.cc @@ -80,8 +80,8 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var, request.set_type(::sendrecv::NCCL_ID); #endif } else { - PADDLE_THROW("Serialize does not support type: %s", - typeid(var->Type()).name()); + PADDLE_THROW(platform::errors::InvalidArgument( + "Serialize does not support type: %s", typeid(var->Type()).name())); } std::string header; request.AppendToString(&header); @@ -106,7 +106,11 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var, return; } #endif - PADDLE_ENFORCE_NOT_NULL(payload); + PADDLE_ENFORCE_NOT_NULL( + payload, + platform::errors::InvalidArgument( + "Not support type: %s, need to be LOD_TENSOR or SELECTED_ROWS", + var->Type())); e.WriteVarlengthBeginning(VarMsg::kSerializedFieldNumber, payload->memory_size()); if (payload->memory_size() >= std::numeric_limits::max()) { @@ -128,7 +132,10 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var, auto* slr = var->GetMutable(); ProtoEncodeHelper e2(static_cast(buf), 128); - PADDLE_ENFORCE(VectorElemName(slr->rows()) == typeid(int64_t).name()); + PADDLE_ENFORCE_EQ(VectorElemName(slr->rows()), typeid(int64_t).name(), + platform::errors::InvalidArgument( + "Got wrong type %s, expect type: int64_t", + VectorElemName(slr->rows()))); size_t rows_memory_size = slr->rows().size() * sizeof(int64_t); e2.WriteVarlengthBeginning(VarMsg::kRowsFieldNumber, rows_memory_size); @@ -155,7 +162,9 @@ void DeserializeFromByteBuffer(const ::grpc::ByteBuffer& msg, framework::Variable** var, int* trainer_id) { platform::RecordRPCEvent record_event("deserial"); operators::distributed::GRPCVariableResponse resp(scope, &ctx); - PADDLE_ENFORCE(resp.Parse(msg) == 0, "parse bytebuffer to tensor error!"); + PADDLE_ENFORCE_EQ( + resp.Parse(msg), 0, + platform::errors::InvalidArgument("parse bytebuffer to tensor error!")); *var = resp.GetVar(); *trainer_id = resp.GetTrainerId(); } diff --git a/paddle/fluid/operators/distributed/grpc/grpc_server.cc b/paddle/fluid/operators/distributed/grpc/grpc_server.cc index 47e114ff4b2..a1cbf7db7e2 100644 --- a/paddle/fluid/operators/distributed/grpc/grpc_server.cc +++ b/paddle/fluid/operators/distributed/grpc/grpc_server.cc @@ -57,7 +57,8 @@ class RequestBase { status_(PROCESS), request_handler_(request_handler), req_id_(req_id) { - PADDLE_ENFORCE(cq_); + PADDLE_ENFORCE_NOT_NULL(cq_, platform::errors::InvalidArgument( + "ServerCompletionQueue cq are empty")); } virtual ~RequestBase() {} virtual void Process() = 0; @@ -550,8 +551,9 @@ void AsyncGRPCServer::StartServer() { sleep(3); } - PADDLE_ENFORCE_NE(selected_port_, 0, "can't bind to address:%s", - bind_address_); + PADDLE_ENFORCE_NE( + selected_port_, 0, + platform::errors::Unavailable("can't bind to address:%s", bind_address_)); std::function f = std::bind(&AsyncGRPCServer::TryToRegisterNewOne, this, @@ -649,7 +651,8 @@ void AsyncGRPCServer::TryToRegisterNewOne(const std::string& rpc_name, } else if (rpc_name == kRequestSendAndRecv) { b = new RequestSendAndRecv(service_.get(), cq.get(), handler, req_id); } else { - PADDLE_ENFORCE(false, "not supported rpc"); + PADDLE_THROW( + platform::errors::InvalidArgument("not supported rpc: %s", rpc_name)); } reqs[req_id] = b; @@ -677,7 +680,10 @@ void AsyncGRPCServer::HandleRequest( auto& reqs = rpc_reqs_[rpc_name]; RequestBase* base = nullptr; { - PADDLE_ENFORCE(req_id >= 0 && req_id < kRequestBufSize); + PADDLE_ENFORCE_EQ( + (req_id >= 0 && req_id < kRequestBufSize), true, + platform::errors::OutOfRange("request id: %s out of bounds: [0, %s)", + req_id, kRequestBufSize)); std::unique_lock lock(cq_mutex_); base = reqs[req_id]; } diff --git a/paddle/fluid/operators/distributed/grpc/grpc_service.h b/paddle/fluid/operators/distributed/grpc/grpc_service.h index 95b6810ec61..10037c90853 100644 --- a/paddle/fluid/operators/distributed/grpc/grpc_service.h +++ b/paddle/fluid/operators/distributed/grpc/grpc_service.h @@ -47,7 +47,8 @@ class SerializationTraits< static Status Serialize( const paddle::operators::distributed::GRPCVariableResponse& msg, grpc_byte_buffer** bp, bool* own_buffer) { - PADDLE_ENFORCE(false, "SerializationTraits::Serialize not implemented!"); + PADDLE_THROW(paddle::platform::errors::Unimplemented( + "SerializationTraits::Serialize not implemented!")); return Status(); } static Status Deserialize( @@ -115,7 +116,8 @@ inline const char* GrpcMethodName(GrpcMethod id) { } // Shouldn't be reached. - PADDLE_ENFORCE(false, "Invalid id: not found valid method name"); + PADDLE_THROW(platform::errors::InvalidArgument( + "Invalid id: not found valid method name")); return nullptr; } diff --git a/paddle/fluid/operators/distributed/grpc/grpc_variable_response.cc b/paddle/fluid/operators/distributed/grpc/grpc_variable_response.cc index 7d7723f1945..f7679e9fc92 100644 --- a/paddle/fluid/operators/distributed/grpc/grpc_variable_response.cc +++ b/paddle/fluid/operators/distributed/grpc/grpc_variable_response.cc @@ -257,7 +257,8 @@ int GRPCVariableResponse::Parse(Source* source) { PADDLE_ENFORCE((meta_.type() == sendrecv::SELECTED_ROWS || meta_.type() == sendrecv::LOD_TENSOR) && meta_.varname() != "", - "meta info should be got first!"); + platform::errors::PreconditionNotMet( + "meta info should be got first!")); int num_bytes = 0; if (wt != WIRETYPE_LENGTH_DELIMITED || diff --git a/paddle/fluid/operators/distributed/heart_beat_monitor.cc b/paddle/fluid/operators/distributed/heart_beat_monitor.cc index 84ba9793c4e..fda5fd09a4e 100644 --- a/paddle/fluid/operators/distributed/heart_beat_monitor.cc +++ b/paddle/fluid/operators/distributed/heart_beat_monitor.cc @@ -76,11 +76,11 @@ void HeartBeatMonitor::LostWorkerMonitor() { << timestamp - worker.timestamp; if (timestamp - worker.timestamp >= FLAGS_worker_update_interval_secs) { - PADDLE_THROW( + PADDLE_THROW(platform::errors::ExecutionTimeout( "the latest update of worker %d is %d secs ago, we doubt the " "the worker is not alive and this may have a bad effect on the " "fitting result, please check", - worker.id, FLAGS_worker_update_interval_secs); + worker.id, FLAGS_worker_update_interval_secs)); } } diff --git a/paddle/fluid/operators/distributed/heart_beat_monitor.h b/paddle/fluid/operators/distributed/heart_beat_monitor.h index cfef492de0e..5df14c5a51b 100644 --- a/paddle/fluid/operators/distributed/heart_beat_monitor.h +++ b/paddle/fluid/operators/distributed/heart_beat_monitor.h @@ -56,7 +56,8 @@ class HeartBeatMonitor { is_chief_(is_chief), be_monitored_var_(be_monitored_var), running_(true) { - PADDLE_ENFORCE_GT(workers, 0, "trainers must have one or more"); + PADDLE_ENFORCE_GT(workers, 0, platform::errors::InvalidArgument( + "workers must greater than 0.")); for (auto worker_id = 0; worker_id < workers; worker_id++) { UnderMonitoredWorker worker(worker_id); diff --git a/paddle/fluid/operators/distributed/parameter_prefetch.cc b/paddle/fluid/operators/distributed/parameter_prefetch.cc index 6b33c1f5fcd..67aef609865 100644 --- a/paddle/fluid/operators/distributed/parameter_prefetch.cc +++ b/paddle/fluid/operators/distributed/parameter_prefetch.cc @@ -156,8 +156,14 @@ void prefetch_core( const auto *out_var_data = prefetch_out_var.data(); auto &dims = prefetch_out_var.dims(); - PADDLE_ENFORCE_EQ(dims.size(), 2, ""); - PADDLE_ENFORCE_EQ(ids_in_this_section.size(), dims[0]); + PADDLE_ENFORCE_EQ(dims.size(), 2, + platform::errors::InvalidArgument( + "The size of Tensor dims must be 2.")); + PADDLE_ENFORCE_EQ(ids_in_this_section.size(), dims[0], + platform::errors::InvalidArgument( + "The size of ids in this section must equal to " + "dims[0]: %s, but got %s", + dims[0], ids_in_this_section.size())); auto row_numel = dims[1]; diff --git a/paddle/fluid/operators/distributed/parameter_send.cc b/paddle/fluid/operators/distributed/parameter_send.cc index 99af06bc7cc..109514ca254 100644 --- a/paddle/fluid/operators/distributed/parameter_send.cc +++ b/paddle/fluid/operators/distributed/parameter_send.cc @@ -127,9 +127,10 @@ void ParameterSend::operator()(const CommContext &rpc_ctx, outs_dims.reserve(out_num); // infer output shape - PADDLE_ENFORCE_EQ(rpc_ctx.height_sections.size(), out_num, - "tensor split sections size" - "should be equal to output size."); + PADDLE_ENFORCE_EQ( + rpc_ctx.height_sections.size(), out_num, + platform::errors::InvalidArgument("tensor split sections size" + "should be equal to output size.")); for (size_t i = 0; i < out_num; ++i) { auto dim = send_tensor_dims; dim[0] = rpc_ctx.height_sections[i]; @@ -309,7 +310,8 @@ void ParameterSend::operator()(const CommContext &rpc_ctx, } } } else { - PADDLE_THROW("unsupported var type to send!"); + PADDLE_THROW(platform::errors::InvalidArgument( + "unsupported var type: %s to send!", send_var->Type())); } VLOG(4) << "Prepare to send var " << rpc_ctx.var_name; diff --git a/paddle/fluid/operators/distributed/request_handler_impl.cc b/paddle/fluid/operators/distributed/request_handler_impl.cc index 761a4edc523..0d67fc0021a 100644 --- a/paddle/fluid/operators/distributed/request_handler_impl.cc +++ b/paddle/fluid/operators/distributed/request_handler_impl.cc @@ -65,9 +65,9 @@ bool RequestSendHandler::Handle(const std::string &varname, if (distributed_mode_ != DistributedMode::kSync) { VLOG(3) << "async process var: " << varname; if (varname == BATCH_BARRIER_MESSAGE) { - PADDLE_THROW( + PADDLE_THROW(platform::errors::InvalidArgument( "async mode should not recv BATCH_BARRIER_MESSAGE or " - "COMPLETE_MESSAGE"); + "COMPLETE_MESSAGE")); } HeartBeatMonitor::GetInstance()->Update(trainer_id, varname, RUNNING); @@ -78,7 +78,10 @@ bool RequestSendHandler::Handle(const std::string &varname, if (string::Contains(var_name_piece, part_piece)) { auto varname_splits = paddle::string::Split(varname, '@'); - PADDLE_ENFORCE_EQ(varname_splits.size(), 3); + PADDLE_ENFORCE_EQ( + varname_splits.size(), 3, + platform::errors::InvalidArgument( + "varname: %s should be separated into 3 parts by @", varname)); run_varname = varname_splits[0]; scope->Rename(varname, run_varname); } @@ -192,7 +195,11 @@ bool RequestGetHandler::Handle(const std::string &varname, out_dims, origin_tensor.place()); auto width = dims[1]; for (size_t i = 0; i < updated_rows.size(); ++i) { - PADDLE_ENFORCE_LT(updated_rows[i], dims[0]); + PADDLE_ENFORCE_LT( + updated_rows[i], dims[0], + platform::errors::OutOfRange( + "The value of updated_rows: %s out of Tensor %s dims[0]: %s", + updated_rows[i], varname, dims[0])); memcpy(data + i * width, origin_tensor_data + updated_rows[i] * width, sizeof(float) * width); } @@ -225,7 +232,8 @@ bool RequestGetNoBarrierHandler::Handle(const std::string &varname, *outvar = scope_->FindVar(var_name_piece.ToString()); return true; } else { - PADDLE_THROW("GetNoBarrier must contain %s", WITHOUT_BARRIER_MESSAGE); + PADDLE_THROW(platform::errors::InvalidArgument( + "GetNoBarrier must contain %s", WITHOUT_BARRIER_MESSAGE)); } return true; } diff --git a/paddle/fluid/operators/distributed/rpc_server.cc b/paddle/fluid/operators/distributed/rpc_server.cc index 52b4456f7b1..37cf0460fb1 100644 --- a/paddle/fluid/operators/distributed/rpc_server.cc +++ b/paddle/fluid/operators/distributed/rpc_server.cc @@ -159,9 +159,9 @@ void RPCServer::RegisterVar(const std::string& var_name, { std::unique_lock lock(mutex_); - if (var_map_.find(var_name) != var_map_.end()) { - PADDLE_ENFORCE(false, "%s alreay in var_map", var_name); - } + PADDLE_ENFORCE_EQ( + var_map_.find(var_name), var_map_.end(), + platform::errors::AlreadyExists("%s already in var_map.", var_name)); var_map_[var_name] = h; } diff --git a/paddle/fluid/operators/distributed/rpc_server_test.cc b/paddle/fluid/operators/distributed/rpc_server_test.cc index 5ce7ac85269..b6d4d594855 100644 --- a/paddle/fluid/operators/distributed/rpc_server_test.cc +++ b/paddle/fluid/operators/distributed/rpc_server_test.cc @@ -172,7 +172,9 @@ TEST(COMPLETE, CPU) { g_rpc_service.reset(new RPCSERVER_T("127.0.0.1:0", 2)); distributed::RPCClient* client = distributed::RPCClient::GetInstance(0); - PADDLE_ENFORCE(client != nullptr); + PADDLE_ENFORCE_NE(client, nullptr, + platform::errors::InvalidArgument( + "Client Start Fail, Check Your Code & Env")); std::thread server_thread(StartServer, distributed::kRequestSend); g_rpc_service->WaitServerReady(); int port = g_rpc_service->GetSelectedPort(); diff --git a/paddle/fluid/operators/distributed/sendrecvop_utils.cc b/paddle/fluid/operators/distributed/sendrecvop_utils.cc index 2e9d958ebfb..39b4b3daf8c 100644 --- a/paddle/fluid/operators/distributed/sendrecvop_utils.cc +++ b/paddle/fluid/operators/distributed/sendrecvop_utils.cc @@ -40,7 +40,9 @@ static TensorPayload GetCommunicationAllocationFromTensor( const platform::DeviceContext& ctx, const framework::Tensor& tensor) { if (is_gpu_place(ctx.GetPlace())) { #ifdef PADDLE_WITH_CUDA - PADDLE_ENFORCE(is_gpu_place(tensor.place())); + PADDLE_ENFORCE_EQ( + is_gpu_place(tensor.place()), true, + platform::errors::PreconditionNotMet("Please run in gpu place.")); auto& gpu_dev_ctx = reinterpret_cast(ctx); auto copy_size = tensor.numel() * framework::SizeOfType(tensor.type()); @@ -53,7 +55,8 @@ static TensorPayload GetCommunicationAllocationFromTensor( ctx.Wait(); return TensorPayload(result); #else - PADDLE_THROW("This situation should not be happened"); + PADDLE_THROW( + platform::errors::Unavailable("This situation should not be happened")); #endif } else { return TensorPayload(tensor); diff --git a/paddle/fluid/operators/distributed/sendrecvop_utils.h b/paddle/fluid/operators/distributed/sendrecvop_utils.h index 8a382baa5be..84ed1ab0247 100644 --- a/paddle/fluid/operators/distributed/sendrecvop_utils.h +++ b/paddle/fluid/operators/distributed/sendrecvop_utils.h @@ -95,7 +95,8 @@ inline framework::proto::VarType::Type ToVarType( case sendrecv::VariableMessage::BOOL: return framework::proto::VarType::BOOL; // NOLINT default: - PADDLE_THROW("Not support type %d", type); + PADDLE_THROW( + platform::errors::InvalidArgument("Not support type id: %d.", type)); } } diff --git a/paddle/fluid/operators/distributed/variable_response.cc b/paddle/fluid/operators/distributed/variable_response.cc index de77121ee39..4c161f044d8 100644 --- a/paddle/fluid/operators/distributed/variable_response.cc +++ b/paddle/fluid/operators/distributed/variable_response.cc @@ -61,7 +61,8 @@ bool VariableResponse::ReadRaw(::google::protobuf::io::CodedInputStream* input, } gpu_dev_ctx.Wait(); #else - PADDLE_THROW("Unexpected branch"); + PADDLE_THROW(platform::errors::PreconditionNotMet( + "Unexpected branch, please compile with PADDLE_WITH_CUDA")); #endif return true; } else if (platform::is_xpu_place(place)) { @@ -147,7 +148,11 @@ bool VariableResponse::CopyLodTensorData( VLOG(6) << "Tensor.memory_size = " << tensor->memory_size() << ", Buffer Size = " << length << ", dims:" << dims << ", numel:" << tensor->numel(); - PADDLE_ENFORCE_GE(tensor->memory_size(), static_cast(length)); + PADDLE_ENFORCE_GE( + tensor->memory_size(), static_cast(length), + platform::errors::InvalidArgument( + "The memory size of tensor: %s should greater than length: %s", + tensor->memory_size(), length)); return ReadRaw(input, ctx, tensor->place(), tensor_data, length); } @@ -171,7 +176,12 @@ bool VariableResponse::CopySelectRowsTensorData( PADDLE_ENFORCE_EQ( static_cast(tensor->numel()), length / framework::SizeOfType(paddle::operators::distributed::ToVarType( - meta_.data_type()))); + meta_.data_type())), + platform::errors::InvalidArgument( + "length: %s should equal to memory size of tensor: %s", length, + tensor->numel() * + framework::SizeOfType(paddle::operators::distributed::ToVarType( + meta_.data_type())))); void* tensor_data = tensor->mutable_data( ctx.GetPlace(), paddle::operators::distributed::ToVarType(meta_.data_type())); @@ -203,11 +213,12 @@ bool VariableResponse::CopySelectRowsData( bool VariableResponse::ProcSerializedField( int tag, ::google::protobuf::io::CodedInputStream* input, int64_t num_bytes) { - PADDLE_ENFORCE((meta_.type() == sendrecv::SELECTED_ROWS || - meta_.type() == sendrecv::LOD_TENSOR || - meta_.type() == sendrecv::NCCL_ID) && - meta_.varname() != "", - "meta info should be got first!"); + PADDLE_ENFORCE( + (meta_.type() == sendrecv::SELECTED_ROWS || + meta_.type() == sendrecv::LOD_TENSOR || + meta_.type() == sendrecv::NCCL_ID) && + meta_.varname() != "", + platform::errors::PreconditionNotMet("meta info should be got first!")); if (meta_.type() == sendrecv::NCCL_ID) { #ifdef PADDLE_WITH_CUDA @@ -221,7 +232,8 @@ bool VariableResponse::ProcSerializedField( } return true; #else - PADDLE_THROW("Not compiled with CUDA!"); + PADDLE_THROW( + platform::errors::PreconditionNotMet("Please compiled with CUDA!")); return false; #endif } @@ -230,7 +242,9 @@ bool VariableResponse::ProcSerializedField( << ", type:" << meta_.type() << std::endl; framework::DDim dims = GetDims(meta_.dims()); if (meta_.type() == sendrecv::LOD_TENSOR) { - PADDLE_ENFORCE(meta_.lod_size() >= 0, "lod info should be got first!"); + PADDLE_ENFORCE_GE( + meta_.lod_size(), 0, + platform::errors::PreconditionNotMet("lod info should be got first!")); if (!CopyLodTensorData(input, *dev_ctx_, dims, num_bytes)) { return false; } @@ -245,7 +259,9 @@ bool VariableResponse::ProcSerializedField( return true; } - PADDLE_ENFORCE("not supported var types:", meta_.varname(), meta_.type()); + PADDLE_THROW(platform::errors::InvalidArgument( + "The type: %s of var: %s is not supported", meta_.type(), + meta_.varname())); return false; } -- GitLab