From 515b206d40abb2fae3455ef89d701bdd2a7e134e Mon Sep 17 00:00:00 2001 From: WangXi Date: Thu, 9 Jan 2020 10:41:58 +0800 Subject: [PATCH] [Cherry-pick 1.6] fix batch_norm_grad shape=0 & allreduce shape enforce & sync_batch_norm hang in fleet (#22157) --- .../framework/details/all_reduce_op_handle.cc | 5 ++++ paddle/fluid/operators/batch_norm_op.cc | 19 ++++++++---- .../incubate/fleet/collective/__init__.py | 11 +++++++ .../fluid/tests/unittests/CMakeLists.txt | 19 ++++++++---- .../paddle/fluid/tests/unittests/dist_test.sh | 10 ++++++- .../fluid/tests/unittests/test_dist_base.py | 5 ++++ .../unittests/test_dist_mnist_fleetapi.py | 30 +++++++++++++++++++ 7 files changed, 87 insertions(+), 12 deletions(-) diff --git a/paddle/fluid/framework/details/all_reduce_op_handle.cc b/paddle/fluid/framework/details/all_reduce_op_handle.cc index 8deacf4e7dd..6c3b0923ed0 100644 --- a/paddle/fluid/framework/details/all_reduce_op_handle.cc +++ b/paddle/fluid/framework/details/all_reduce_op_handle.cc @@ -84,6 +84,11 @@ void AllReduceOpHandle::AllReduceImpl( if (i == 0) { numel = static_cast(lod_tensor.numel()); + // only enforce place0, we will enforce other palce numel == place0 numel + PADDLE_ENFORCE_GT( + numel, 0, platform::errors::InvalidArgument( + "The numel of tensos=[%s] must > 0. But now numel=[%d]", + in_var_handles[i]->name(), numel)); dtype = lod_tensor.type(); is_gpu_place = platform::is_gpu_place(lod_tensor.place()); } diff --git a/paddle/fluid/operators/batch_norm_op.cc b/paddle/fluid/operators/batch_norm_op.cc index ee5dffc3f05..25b8775065b 100644 --- a/paddle/fluid/operators/batch_norm_op.cc +++ b/paddle/fluid/operators/batch_norm_op.cc @@ -425,11 +425,17 @@ void BatchNormGradOp::InferShape(framework::InferShapeContext *ctx) const { // check output PADDLE_ENFORCE(ctx->HasOutput(framework::GradVarName("X")), ""); - if (ctx->HasOutput(framework::GradVarName("Scale"))) { - PADDLE_ENFORCE(ctx->HasOutput(framework::GradVarName("Bias")), - "Output(Scale@GRAD) and Output(Bias@GRAD) should not be " - "null at same time"); - } + + const bool has_scale_grad = ctx->HasOutput(framework::GradVarName("Scale")); + const bool has_bias_grad = ctx->HasOutput(framework::GradVarName("Bias")); + + PADDLE_ENFORCE_EQ((has_scale_grad == has_bias_grad), true, + platform::errors::InvalidArgument( + "Output(Scale@GRAD) and Output(Bias@GRAD) must be null " + "or not be null at same time. But now, " + "has Scale@Grad=[%d], has Bias@GRAD=[%d]", + has_scale_grad, has_bias_grad)); + const bool use_global_stats = ctx->Attrs().Get("use_global_stats"); if (use_global_stats) { PADDLE_ENFORCE(!ctx->Attrs().Get("use_mkldnn"), @@ -444,7 +450,8 @@ void BatchNormGradOp::InferShape(framework::InferShapeContext *ctx) const { : x_dims[x_dims.size() - 1]); ctx->SetOutputDim(framework::GradVarName("X"), x_dims); - if (ctx->HasOutput(framework::GradVarName("Scale"))) { + // has_scale_grad == has_bias_grad, judge has_scale_grad is enough + if (has_scale_grad) { ctx->SetOutputDim(framework::GradVarName("Scale"), {C}); ctx->SetOutputDim(framework::GradVarName("Bias"), {C}); } diff --git a/python/paddle/fluid/incubate/fleet/collective/__init__.py b/python/paddle/fluid/incubate/fleet/collective/__init__.py index 9c7963a1ec2..e4e9045a2eb 100644 --- a/python/paddle/fluid/incubate/fleet/collective/__init__.py +++ b/python/paddle/fluid/incubate/fleet/collective/__init__.py @@ -297,6 +297,17 @@ class CollectiveOptimizer(DistributedOptimizer): "with multi nccl comm, please export FLAGS_sync_nccl_allreduce = 0" ) + # NOTE. open sync_batch_norm will hang when use multi num_threads + sync_batch_norm = self._strategy.sync_batch_norm + if sync_batch_norm is not None and sync_batch_norm is True: + self._strategy.nccl_comm_num = 1 + self._strategy.use_hierarchical_allreduce = False + exec_strategy.num_threads = 1 + logging.warn( + "use sync_batch_norm will hang when set num_threads > 1, so " + "set num_threads=1, nccl_comm_num=1, use_hierarchical_allreduce=False." + ) + if self.print_config: print("node_num:", node_num, "num_threads:", exec_strategy.num_threads, "use_hierarchical_allreduce:", diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index bf614d0c293..65d15219f38 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -126,11 +126,20 @@ function(bash_test_modules TARGET_NAME) set(timeout ${bash_test_modules_TIMEOUT}) endif() - add_test(NAME ${TARGET_NAME} - COMMAND ${CMAKE_COMMAND} -E env PYTHONPATH=${PADDLE_BINARY_DIR}/python - TEST_TARGET_NAME=${TARGET_NAME} TEST_TIMEOUT=${timeout} ${bash_test_modules_ENVS} - bash ${CMAKE_CURRENT_BINARY_DIR}/${bash_test_modules_MODULES} - WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) + if(WITH_COVERAGE) + add_test(NAME ${TARGET_NAME} + COMMAND ${CMAKE_COMMAND} -E env PYTHONPATH=${PADDLE_BINARY_DIR}/python + TEST_TARGET_NAME=${TARGET_NAME} TEST_TIMEOUT=${timeout} ${bash_test_modules_ENVS} + WITH_COVERAGE=ON COVERAGE_FILE=${PADDLE_BINARY_DIR}/python-coverage.data + bash ${CMAKE_CURRENT_BINARY_DIR}/${bash_test_modules_MODULES} + WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) + else() + add_test(NAME ${TARGET_NAME} + COMMAND ${CMAKE_COMMAND} -E env PYTHONPATH=${PADDLE_BINARY_DIR}/python + TEST_TARGET_NAME=${TARGET_NAME} TEST_TIMEOUT=${timeout} ${bash_test_modules_ENVS} + bash ${CMAKE_CURRENT_BINARY_DIR}/${bash_test_modules_MODULES} + WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) + endif() if (bash_test_modules_SERIAL) set_property(TEST ${TARGET_NAME} PROPERTY RUN_SERIAL 1) diff --git a/python/paddle/fluid/tests/unittests/dist_test.sh b/python/paddle/fluid/tests/unittests/dist_test.sh index f1f6788ce7b..a245262c74e 100644 --- a/python/paddle/fluid/tests/unittests/dist_test.sh +++ b/python/paddle/fluid/tests/unittests/dist_test.sh @@ -20,7 +20,15 @@ rm -f ${name}*.log # start the unit test run_time=$(( $TEST_TIMEOUT - 10 )) echo "run_time: ${run_time}" -timeout -s SIGKILL ${run_time} python -u ${name}.py > ${name}_run.log 2>&1 + +if [[ ${WITH_COVERAGE} == "ON" ]]; then + PYTHON_EXEC="python -u -m coverage run --branch -p " +else + PYTHON_EXEC="python -u " +fi + +timeout -s SIGKILL ${run_time} ${PYTHON_EXEC} ${name}.py > ${name}_run.log 2>&1 + exit_code=$? if [[ $exit_code -eq 0 ]]; then exit 0 diff --git a/python/paddle/fluid/tests/unittests/test_dist_base.py b/python/paddle/fluid/tests/unittests/test_dist_base.py index 0c18fea1bc9..b435d86dbd4 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_base.py @@ -136,6 +136,8 @@ class TestDistRunnerBase(object): dist_strategy.use_local_sgd = True if args.ut4grad_allreduce: dist_strategy._ut4grad_allreduce = True + if args.sync_batch_norm: + dist_strategy.sync_batch_norm = True role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) @@ -445,6 +447,7 @@ def runtime_main(test_class): required=False, type=bool, default=False) + parser.add_argument('--sync_batch_norm', action='store_true') args = parser.parse_args() @@ -776,6 +779,8 @@ class TestDistBase(unittest.TestCase): tr_cmd += " --use_local_sgd" if self._ut4grad_allreduce: tr_cmd += " --ut4grad_allreduce" + if hasattr(self, '_sync_batch_norm') and self._sync_batch_norm: + tr_cmd += " --sync_batch_norm" if os.getenv('WITH_COVERAGE', 'OFF') == 'ON': env['COVERAGE_FILE'] = os.getenv('COVERAGE_FILE', '') diff --git a/python/paddle/fluid/tests/unittests/test_dist_mnist_fleetapi.py b/python/paddle/fluid/tests/unittests/test_dist_mnist_fleetapi.py index 30f8592e1da..d5ebe09adca 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_mnist_fleetapi.py +++ b/python/paddle/fluid/tests/unittests/test_dist_mnist_fleetapi.py @@ -24,6 +24,7 @@ class TestDistMnistNCCL2FleetApi(TestDistBase): self._use_reader_alloc = False self._nccl2_mode = True self._gpu_fleet_api = True + self._sync_batch_norm = True def test_dist_train(self): import paddle.fluid as fluid @@ -31,5 +32,34 @@ class TestDistMnistNCCL2FleetApi(TestDistBase): self.check_with_place("dist_mnist.py", delta=1e-5) +class FleetCollectiveTest(unittest.TestCase): + def test_open_sync_batch_norm(self): + import paddle.fluid as fluid + import paddle.fluid.incubate.fleet.base.role_maker as role_maker + from paddle.fluid.incubate.fleet.collective import fleet, DistributedStrategy + + if not fluid.core.is_compiled_with_cuda(): + # Operator "gen_nccl_id" has not been registered + return + + data = fluid.layers.data(name='X', shape=[1], dtype='float32') + hidden = fluid.layers.fc(input=data, size=10) + loss = fluid.layers.mean(hidden) + + optimizer = fluid.optimizer.AdamOptimizer() + + role = role_maker.UserDefinedCollectiveRoleMaker(0, ['127.0.0.1:6170']) + fleet.init(role) + + dist_strategy = DistributedStrategy() + dist_strategy.sync_batch_norm = True + + dist_optimizer = fleet.distributed_optimizer( + optimizer, strategy=dist_strategy) + dist_optimizer.minimize(loss) + + self.assertEqual(dist_strategy.exec_strategy.num_threads, 1) + + if __name__ == "__main__": unittest.main() -- GitLab