未验证 提交 2c7870e0 编写于 作者: Y Yuang Liu 提交者: GitHub

[fleet_executor] handle empty addr for single card train (#37150)

上级 742378f4
......@@ -43,24 +43,30 @@ void FleetExecutor::InitMessageBus() {
std::unordered_map<int64_t, std::string> rank_to_addr;
std::string addr;
for (const auto& rank_info : exe_desc_.cluster_info()) {
// init the dns map
int64_t rank = rank_info.rank();
std::string ip_port = rank_info.ip_port();
ss << rank << "\t->\t" << ip_port << "\n";
// TODO(Yuang): replace the first 'rank' with real interceptor id
// TODO(Yuang): init interceptor_id_to_rank out of this loop
interceptor_id_to_rank.insert(std::make_pair(rank, rank));
rank_to_addr.insert(std::make_pair(rank, ip_port));
if (rank == cur_rank) {
addr = ip_port;
}
}
PADDLE_ENFORCE_NE(
addr, "",
platform::errors::NotFound(
"Current rank is %s, which ip_port cannot be found in the config.",
cur_rank));
VLOG(3) << "Current rank is " << cur_rank << " and the ip_port is " << addr
<< ".";
VLOG(3) << "The number of ranks are " << interceptor_id_to_rank.size() << ".";
if (addr == "") {
PADDLE_ENFORCE_EQ(
rank_to_addr.size(), 0,
platform::errors::NotFound("Empty address is not valid for "
"paddle.distributed.launch method."));
PADDLE_ENFORCE_EQ(
cur_rank, 0,
platform::errors::NotFound("Address is empty but cur rank is not 0."));
}
VLOG(3) << "Current rank is " << cur_rank << " and the ip_port is "
<< (addr == "" ? "empty" : addr) << ".";
VLOG(3) << "The number of ranks are "
<< (rank_to_addr.size() == 0 ? 1 : rank_to_addr.size()) << ".";
VLOG(5) << ss.str();
MessageBus& message_bus_instance = MessageBus::Instance();
if (!message_bus_instance.IsInit()) {
......
......@@ -85,6 +85,10 @@ bool MessageBus::Send(const InterceptorMessage& interceptor_message) {
}
void MessageBus::ListenPort() {
if (addr_ == "") {
VLOG(3) << "No need listen to port since training on single card.";
return;
}
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
// function keep listen the port and handle the message
......@@ -121,6 +125,10 @@ bool MessageBus::IsSameRank(int64_t src_id, int64_t dst_id) {
dst_rank, interceptor_id_to_rank_.end(),
platform::errors::NotFound(
"Cannot find rank for dst interceptor id %lld. Init error.", dst_id));
if (addr_ == "") {
// single card training, must be same rank
return true;
}
const auto& src_ip = rank_to_addr_.find(src_rank->second);
PADDLE_ENFORCE_NE(src_ip, rank_to_addr_.end(),
platform::errors::NotFound(
......
......@@ -142,6 +142,7 @@ if(((NOT WITH_ROCM) AND (NOT WITH_GPU)) OR WIN32)
LIST(REMOVE_ITEM TEST_OPS test_fleet_gradient_scale)
LIST(REMOVE_ITEM TEST_OPS test_disable_signal_handler)
LIST(REMOVE_ITEM TEST_OPS test_fleet_executor)
LIST(REMOVE_ITEM TEST_OPS test_fleet_executor_multi_devices)
endif()
# Temporally disable test_deprecated_decorator
......
......@@ -13,7 +13,6 @@
# limitations under the License.
import unittest
import os
import paddle
import paddle.fluid as fluid
......@@ -32,17 +31,7 @@ class TestFleetExecutor(unittest.TestCase):
}
exe.run(empty_program, feed={'x': [1]})
def test_executor_on_multi_devices(self):
places = [fluid.CPUPlace()]
if fluid.is_compiled_with_cuda():
places.append(fluid.CUDAPlace(0))
for place in places:
self.run_fleet_executor(place)
def test_dist_executor_on_multi_devices(self):
os.environ["PADDLE_TRAINER_ID"] = "0"
os.environ[
"PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:7000,127.0.0.1:7001,127.0.0.1:7002"
def test_executor_on_single_device(self):
places = [fluid.CPUPlace()]
if fluid.is_compiled_with_cuda():
places.append(fluid.CUDAPlace(0))
......
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import os
import paddle
import paddle.fluid as fluid
paddle.enable_static()
class TestFleetExecutor(unittest.TestCase):
def run_fleet_executor(self, place):
exe = paddle.static.Executor(place)
empty_program = paddle.static.Program()
with fluid.program_guard(empty_program, empty_program):
x = fluid.layers.data(name='x', shape=[1], dtype=paddle.float32)
empty_program._pipeline_opt = {
"fleet_opt": True,
"section_program": empty_program
}
exe.run(empty_program, feed={'x': [1]})
def test_dist_executor_on_multi_devices(self):
os.environ["PADDLE_TRAINER_ID"] = "0"
os.environ[
"PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:7000,127.0.0.1:7001,127.0.0.1:7002"
places = [fluid.CPUPlace()]
if fluid.is_compiled_with_cuda():
places.append(fluid.CUDAPlace(0))
for place in places:
self.run_fleet_executor(place)
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册