diff --git a/paddle/fluid/operators/distributed_ops/checkpoint_notify_op.cc b/paddle/fluid/operators/distributed_ops/checkpoint_notify_op.cc index cd7e4363fa0f3e7e5e4936b8a0d42cd8ec5352da..f0cc2cdcdac20393ba17a7b3824dfd6d3afe7973 100644 --- a/paddle/fluid/operators/distributed_ops/checkpoint_notify_op.cc +++ b/paddle/fluid/operators/distributed_ops/checkpoint_notify_op.cc @@ -49,7 +49,10 @@ class CheckpointNotifyOp : public framework::OperatorBase { VLOG(3) << "checkpoint notify sending lookup table: " << lookup_table_name << " and dir:" << dir << " to " << epmap[i]; } - PADDLE_ENFORCE(rpc_client->Wait(), "internal error in RPCClient"); + PADDLE_ENFORCE_EQ( + rpc_client->Wait(), true, + platform::errors::Fatal("Fail to notify checkpoint." + " Internal error occurs in RPCClient.")); } }; diff --git a/paddle/fluid/operators/distributed_ops/fake_init_op.cc b/paddle/fluid/operators/distributed_ops/fake_init_op.cc index f5a34b2dcb12e6f9c3367ea388c09eeeea905f83..1da164175e1daf8e872964d8fad21c4f3dd614e7 100644 --- a/paddle/fluid/operators/distributed_ops/fake_init_op.cc +++ b/paddle/fluid/operators/distributed_ops/fake_init_op.cc @@ -19,8 +19,7 @@ namespace operators { class FakeInitInferShape : public framework::InferShapeBase { public: void operator()(framework::InferShapeContext *ctx) const override { - PADDLE_ENFORCE(ctx->HasOutput("Out"), - "Output(Out) of FakeInitOp should not be null."); + OP_INOUT_CHECK(ctx->HasOutput("Out"), "Output", "Out", "FakeInit"); auto &shape = ctx->Attrs().Get>("shape"); ctx->SetOutputDim("Out", framework::make_ddim(shape)); } diff --git a/paddle/fluid/operators/distributed_ops/gen_nccl_id_op.cc b/paddle/fluid/operators/distributed_ops/gen_nccl_id_op.cc index 4da1e036e4308922b5c6b4f2dc86dc52c25bd339..e63f882478351cde16bde969b86e020181d6d4e5 100644 --- a/paddle/fluid/operators/distributed_ops/gen_nccl_id_op.cc +++ b/paddle/fluid/operators/distributed_ops/gen_nccl_id_op.cc @@ -44,9 +44,15 @@ class GenNCCLIdOp : public framework::OperatorBase { std::vector trainers = Attr>("trainers"); - PADDLE_ENFORCE( - trainer_id >= 0 && trainer_id < static_cast(trainers.size()), - "trainer_id:%d must be in trainers.size range", trainer_id); + PADDLE_ENFORCE_GE(trainer_id, 0, platform::errors::InvalidArgument( + "trainer_id %d is less than 0. Its " + "valid range is [0, trainer_size)")); + PADDLE_ENFORCE_LT( + trainer_id, static_cast(trainers.size()), + platform::errors::OutOfRange("trainer_id %d is out of range. Its valid " + "range is [0, trainer_size)", + trainer_id)); + std::string endpoint = trainers[trainer_id]; framework::Scope& local_scope = scope.NewScope(); @@ -58,12 +64,20 @@ class GenNCCLIdOp : public framework::OperatorBase { int inter_trainer_id = -1; int exter_trainer_id = -1; if (use_hierarchical_allreduce) { - PADDLE_ENFORCE(trainers.size() > 1, "trainers.size():%llu < 1", - trainers.size()); - PADDLE_ENFORCE(inter_nranks > 1, "inter_nranks:%d < 1", inter_nranks); - PADDLE_ENFORCE((trainers.size() % inter_nranks == 0), - "trainers.size():%llu mod inter_nranks:%d != 0", - trainers.size(), inter_nranks); + PADDLE_ENFORCE_GT( + trainers.size(), 1, + platform::errors::PreconditionNotMet( + "The number of collective trainers %llu <= 1", trainers.size())); + PADDLE_ENFORCE_GT( + inter_nranks, 1, + platform::errors::PreconditionNotMet( + "inter_nranks %d <= 1 while in hierarchical allreduce mode", + inter_nranks)); + PADDLE_ENFORCE_EQ( + trainers.size() % inter_nranks, 0, + platform::errors::PreconditionNotMet( + "The number of trainers %llu mod inter_nranks %d is not equal 0", + trainers.size(), inter_nranks)); inter_trainer_id = trainer_id % inter_nranks; @@ -106,10 +120,16 @@ class GenNCCLIdOp : public framework::OperatorBase { return; } - PADDLE_ENFORCE(trainers.size() % inter_nranks == 0, - "enpoints.size:%llu mod inter_nranks:%d should ==0", - trainers.size(), inter_nranks); - PADDLE_ENFORCE(inter_nranks > 1, "inter_nranks:%d must > 1", inter_nranks); + PADDLE_ENFORCE_EQ( + trainers.size() % inter_nranks, 0, + platform::errors::PreconditionNotMet( + "The number of trainers %llu mod inter_nranks %d is not equal 0", + trainers.size(), inter_nranks)); + PADDLE_ENFORCE_GT( + inter_nranks, 1, + platform::errors::PreconditionNotMet( + "inter_nranks %d <= 1 while in hierarchical allreduce mode", + inter_nranks)); // hierarchical inter ncclid if (inter_trainer_id == 0) { @@ -156,10 +176,11 @@ class GenNCCLIdOp : public framework::OperatorBase { const std::string& nccl_id_name, const std::vector& endpoint_list) const { auto var = scope->FindVar(nccl_id_name); - PADDLE_ENFORCE_NOT_NULL(var, "can't find nccl_id_var_name:%s", - nccl_id_name); + PADDLE_ENFORCE_NOT_NULL( + var, platform::errors::NotFound("Variable with name %s is not found", + nccl_id_name.c_str())); auto id = var->GetMutable(); - PADDLE_ENFORCE(platform::dynload::ncclGetUniqueId(id)); + PADDLE_ENFORCE_CUDA_SUCCESS(platform::dynload::ncclGetUniqueId(id)); distributed::RPCClient* client = distributed::RPCClient::GetInstance(0); diff --git a/paddle/fluid/operators/distributed_ops/listen_and_serv_op.cc b/paddle/fluid/operators/distributed_ops/listen_and_serv_op.cc index ea1b6d43ce72828e49915eddd6e198c1db5e190d..8a8c6420b6a7c600c96ef5a50350c3a0021bbfae 100644 --- a/paddle/fluid/operators/distributed_ops/listen_and_serv_op.cc +++ b/paddle/fluid/operators/distributed_ops/listen_and_serv_op.cc @@ -315,8 +315,9 @@ void ListenAndServOp::CacheVarsType(const std::vector &varnames, const framework::Scope &scope) const { for (const auto &varname : varnames) { auto var = scope.FindVar(varname); - PADDLE_ENFORCE(var != nullptr, - "Received var should be initialized in the received scope."); + PADDLE_ENFORCE_NOT_NULL( + var, platform::errors::PreconditionNotMet( + "Received var is not initialized in the received scope.")); if (var->IsType()) { sparse_vars_.push_back(varname); } else if (var->IsType() || @@ -344,7 +345,9 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, auto pserver_id = Attr("pserver_id"); auto inputs = Inputs("X"); - PADDLE_ENFORCE(!rpc_service_); + PADDLE_ENFORCE_EQ(rpc_service_, nullptr, + platform::errors::PreconditionNotMet( + "RPC service has been created unexpectedly.")); std::string endpoint = Attr("endpoint"); int checkpoint_block_id = Attr(kCheckpointBlockId); int lr_decay_block_id = Attr(kLRDecayBlockId); @@ -390,8 +393,10 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, auto optimize_blocks = Attr>(kOptimizeBlocks); - PADDLE_ENFORCE(optimize_blocks.size() >= 1, - "optimize blocks should be 1 at least on the pserver side."); + PADDLE_ENFORCE_GE(optimize_blocks.size(), 1, + platform::errors::PreconditionNotMet( + "optimize blocks is less than 1. Optimize blocks " + "should be 1 at least on the pserver side.")); auto *program = optimize_blocks[0]->Program(); framework::Executor executor(dev_place); diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 1cbe12c60e62d27f1b403d2c25b5effe6fd6baaf..6cfef72777af370b5d793741b53fe1cabd9605be 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -49,6 +49,7 @@ if(WIN32) LIST(REMOVE_ITEM TEST_OPS test_trainer_desc) LIST(REMOVE_ITEM TEST_OPS test_multiprocess_reader_exception) LIST(REMOVE_ITEM TEST_OPS test_avoid_twice_initialization) + LIST(REMOVE_ITEM TEST_OPS test_checkpoint_notify_op) endif() if (NOT ${WITH_GPU}) diff --git a/python/paddle/fluid/tests/unittests/test_checkpoint_notify_op.py b/python/paddle/fluid/tests/unittests/test_checkpoint_notify_op.py new file mode 100644 index 0000000000000000000000000000000000000000..839ed5793c9c1f67733378889d08e06919f6cb1a --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_checkpoint_notify_op.py @@ -0,0 +1,36 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import unittest +import paddle.fluid as fluid + + +class TestCheckpointNotifyOp(unittest.TestCase): + def test_checkpoint_notify_op(self): + program = fluid.Program() + attrs = {} + attrs['epmap'] = [] + attrs['dir'] = '' + attrs['lookup_table'] = '' + program.current_block().append_op( + type='checkpoint_notify', inputs={}, outputs={}, attrs=attrs) + + exe = fluid.Executor(fluid.CPUPlace()) + exe.run(program) + + +if __name__ == '__main__': + unittest.main()