From 2c7870e0b71af78be41da9811e5a17ea9d7d19b1 Mon Sep 17 00:00:00 2001 From: Yuang Liu Date: Fri, 12 Nov 2021 16:00:47 +0800 Subject: [PATCH] [fleet_executor] handle empty addr for single card train (#37150) --- .../fleet_executor/fleet_executor.cc | 24 ++++++---- .../distributed/fleet_executor/message_bus.cc | 8 ++++ .../fluid/tests/unittests/CMakeLists.txt | 1 + .../tests/unittests/test_fleet_executor.py | 13 +---- .../test_fleet_executor_multi_devices.py | 47 +++++++++++++++++++ 5 files changed, 72 insertions(+), 21 deletions(-) create mode 100644 python/paddle/fluid/tests/unittests/test_fleet_executor_multi_devices.py diff --git a/paddle/fluid/distributed/fleet_executor/fleet_executor.cc b/paddle/fluid/distributed/fleet_executor/fleet_executor.cc index 47d0c526c0..8990cae1e2 100644 --- a/paddle/fluid/distributed/fleet_executor/fleet_executor.cc +++ b/paddle/fluid/distributed/fleet_executor/fleet_executor.cc @@ -43,24 +43,30 @@ void FleetExecutor::InitMessageBus() { std::unordered_map rank_to_addr; std::string addr; for (const auto& rank_info : exe_desc_.cluster_info()) { + // init the dns map int64_t rank = rank_info.rank(); std::string ip_port = rank_info.ip_port(); ss << rank << "\t->\t" << ip_port << "\n"; - // TODO(Yuang): replace the first 'rank' with real interceptor id + // TODO(Yuang): init interceptor_id_to_rank out of this loop interceptor_id_to_rank.insert(std::make_pair(rank, rank)); rank_to_addr.insert(std::make_pair(rank, ip_port)); if (rank == cur_rank) { addr = ip_port; } } - PADDLE_ENFORCE_NE( - addr, "", - platform::errors::NotFound( - "Current rank is %s, which ip_port cannot be found in the config.", - cur_rank)); - VLOG(3) << "Current rank is " << cur_rank << " and the ip_port is " << addr - << "."; - VLOG(3) << "The number of ranks are " << interceptor_id_to_rank.size() << "."; + if (addr == "") { + PADDLE_ENFORCE_EQ( + rank_to_addr.size(), 0, + platform::errors::NotFound("Empty address is not valid for " + "paddle.distributed.launch method.")); + PADDLE_ENFORCE_EQ( + cur_rank, 0, + platform::errors::NotFound("Address is empty but cur rank is not 0.")); + } + VLOG(3) << "Current rank is " << cur_rank << " and the ip_port is " + << (addr == "" ? "empty" : addr) << "."; + VLOG(3) << "The number of ranks are " + << (rank_to_addr.size() == 0 ? 1 : rank_to_addr.size()) << "."; VLOG(5) << ss.str(); MessageBus& message_bus_instance = MessageBus::Instance(); if (!message_bus_instance.IsInit()) { diff --git a/paddle/fluid/distributed/fleet_executor/message_bus.cc b/paddle/fluid/distributed/fleet_executor/message_bus.cc index 2a8afb99ba..27a1f90767 100644 --- a/paddle/fluid/distributed/fleet_executor/message_bus.cc +++ b/paddle/fluid/distributed/fleet_executor/message_bus.cc @@ -85,6 +85,10 @@ bool MessageBus::Send(const InterceptorMessage& interceptor_message) { } void MessageBus::ListenPort() { + if (addr_ == "") { + VLOG(3) << "No need listen to port since training on single card."; + return; + } #if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ !defined(PADDLE_WITH_ASCEND_CL) // function keep listen the port and handle the message @@ -121,6 +125,10 @@ bool MessageBus::IsSameRank(int64_t src_id, int64_t dst_id) { dst_rank, interceptor_id_to_rank_.end(), platform::errors::NotFound( "Cannot find rank for dst interceptor id %lld. Init error.", dst_id)); + if (addr_ == "") { + // single card training, must be same rank + return true; + } const auto& src_ip = rank_to_addr_.find(src_rank->second); PADDLE_ENFORCE_NE(src_ip, rank_to_addr_.end(), platform::errors::NotFound( diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 9e02347a13..97af987ae3 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -142,6 +142,7 @@ if(((NOT WITH_ROCM) AND (NOT WITH_GPU)) OR WIN32) LIST(REMOVE_ITEM TEST_OPS test_fleet_gradient_scale) LIST(REMOVE_ITEM TEST_OPS test_disable_signal_handler) LIST(REMOVE_ITEM TEST_OPS test_fleet_executor) + LIST(REMOVE_ITEM TEST_OPS test_fleet_executor_multi_devices) endif() # Temporally disable test_deprecated_decorator diff --git a/python/paddle/fluid/tests/unittests/test_fleet_executor.py b/python/paddle/fluid/tests/unittests/test_fleet_executor.py index 538f7bb875..48952cab3d 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_executor.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_executor.py @@ -13,7 +13,6 @@ # limitations under the License. import unittest -import os import paddle import paddle.fluid as fluid @@ -32,17 +31,7 @@ class TestFleetExecutor(unittest.TestCase): } exe.run(empty_program, feed={'x': [1]}) - def test_executor_on_multi_devices(self): - places = [fluid.CPUPlace()] - if fluid.is_compiled_with_cuda(): - places.append(fluid.CUDAPlace(0)) - for place in places: - self.run_fleet_executor(place) - - def test_dist_executor_on_multi_devices(self): - os.environ["PADDLE_TRAINER_ID"] = "0" - os.environ[ - "PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:7000,127.0.0.1:7001,127.0.0.1:7002" + def test_executor_on_single_device(self): places = [fluid.CPUPlace()] if fluid.is_compiled_with_cuda(): places.append(fluid.CUDAPlace(0)) diff --git a/python/paddle/fluid/tests/unittests/test_fleet_executor_multi_devices.py b/python/paddle/fluid/tests/unittests/test_fleet_executor_multi_devices.py new file mode 100644 index 0000000000..1afe4b9475 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleet_executor_multi_devices.py @@ -0,0 +1,47 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import os +import paddle +import paddle.fluid as fluid + +paddle.enable_static() + + +class TestFleetExecutor(unittest.TestCase): + def run_fleet_executor(self, place): + exe = paddle.static.Executor(place) + empty_program = paddle.static.Program() + with fluid.program_guard(empty_program, empty_program): + x = fluid.layers.data(name='x', shape=[1], dtype=paddle.float32) + empty_program._pipeline_opt = { + "fleet_opt": True, + "section_program": empty_program + } + exe.run(empty_program, feed={'x': [1]}) + + def test_dist_executor_on_multi_devices(self): + os.environ["PADDLE_TRAINER_ID"] = "0" + os.environ[ + "PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:7000,127.0.0.1:7001,127.0.0.1:7002" + places = [fluid.CPUPlace()] + if fluid.is_compiled_with_cuda(): + places.append(fluid.CUDAPlace(0)) + for place in places: + self.run_fleet_executor(place) + + +if __name__ == "__main__": + unittest.main() -- GitLab