From fa73e4a28426a1ccc430b703e4989a01156128c9 Mon Sep 17 00:00:00 2001 From: lilong12 Date: Mon, 28 Sep 2020 11:19:54 +0800 Subject: [PATCH] Initialize gloo for low level collective apis (#27356) * add gloo initializer, test=develop --- paddle/fluid/platform/gloo_context.cc | 4 +- paddle/fluid/platform/gloo_context.h | 7 +- paddle/fluid/pybind/gloo_context_py.cc | 32 +++----- .../distributed/fleet/base/role_maker.py | 3 +- .../distributed/fleet/utils/http_server.py | 2 +- python/paddle/distributed/parallel.py | 79 ++++++++++++++++++- python/paddle/distributed/spawn.py | 2 +- .../fluid/incubate/fleet/base/role_maker.py | 3 +- .../fluid/incubate/fleet/utils/http_server.py | 2 +- .../fluid/tests/unittests/CMakeLists.txt | 12 +-- .../unittests/test_collective_api_base.py | 55 +++---------- .../fluid/tests/unittests/test_dist_base.py | 6 +- .../tests/unittests/test_fleet_rolemaker_4.py | 2 +- 13 files changed, 116 insertions(+), 93 deletions(-) diff --git a/paddle/fluid/platform/gloo_context.cc b/paddle/fluid/platform/gloo_context.cc index 32e7299d31..efc1491336 100644 --- a/paddle/fluid/platform/gloo_context.cc +++ b/paddle/fluid/platform/gloo_context.cc @@ -21,10 +21,10 @@ void GlooParallelContext::Init() { auto gloo_ptr = paddle::framework::GlooWrapper::GetInstance(); gloo_ptr->SetRank(strategy_.rank); gloo_ptr->SetSize(strategy_.rank_num); - gloo_ptr->SetPrefix(strategy_.prefix); gloo_ptr->SetIface(strategy_.iface); gloo_ptr->SetTimeoutSeconds(strategy_.init_seconds, strategy_.run_seconds); - gloo_ptr->SetHdfsStore(strategy_.path, strategy_.fs_name, strategy_.fs_ugi); + gloo_ptr->SetHttpStore(strategy_.ip_address, strategy_.ip_port, + strategy_.scope); gloo_ptr->Init(); } #endif diff --git a/paddle/fluid/platform/gloo_context.h b/paddle/fluid/platform/gloo_context.h index a7dcf288a2..399e87995e 100644 --- a/paddle/fluid/platform/gloo_context.h +++ b/paddle/fluid/platform/gloo_context.h @@ -25,12 +25,11 @@ struct GlooParallelStrategy { int rank{0}; int rank_num{1}; std::string iface; - std::string prefix; int init_seconds{9999999}; int run_seconds{9999999}; - std::string path; - std::string fs_name; - std::string fs_ugi; + std::string ip_address; + int ip_port; + std::string scope{"worker"}; }; class GlooParallelContext { diff --git a/paddle/fluid/pybind/gloo_context_py.cc b/paddle/fluid/pybind/gloo_context_py.cc index 1a9c77b0c3..029d38a89e 100644 --- a/paddle/fluid/pybind/gloo_context_py.cc +++ b/paddle/fluid/pybind/gloo_context_py.cc @@ -62,12 +62,6 @@ void BindGlooContext(py::module *m) { [](platform::GlooParallelStrategy &self, const std::string &iface) { self.iface = iface; }) - .def_property("prefix", - [](const platform::GlooParallelStrategy &self) { - return self.prefix; - }, - [](platform::GlooParallelStrategy &self, - const std::string &prefix) { self.prefix = prefix; }) .def_property("init_seconds", [](const platform::GlooParallelStrategy &self) { return self.init_seconds; @@ -83,23 +77,19 @@ void BindGlooContext(py::module *m) { self.run_seconds = run_seconds; }) .def_property( - "path", - [](const platform::GlooParallelStrategy &self) { return self.path; }, - [](platform::GlooParallelStrategy &self, const std::string &path) { - self.path = path; - }) - .def_property("fs_name", - [](const platform::GlooParallelStrategy &self) { - return self.fs_name; - }, - [](platform::GlooParallelStrategy &self, - const std::string &fs_name) { self.fs_name = fs_name; }) - .def_property("fs_ugi", + "ip_address", + [](const platform::GlooParallelStrategy &self) { + return self.ip_address; + }, + [](platform::GlooParallelStrategy &self, + const std::string &ip_address) { self.ip_address = ip_address; }) + .def_property("ip_port", [](const platform::GlooParallelStrategy &self) { - return self.fs_ugi; + return self.ip_port; }, - [](platform::GlooParallelStrategy &self, - const std::string &fs_ugi) { self.fs_ugi = fs_ugi; }); + [](platform::GlooParallelStrategy &self, int ip_port) { + self.ip_port = ip_port; + }); py::class_ gloo_ctx(*m, "GlooParallelContext"); gloo_ctx.def(py::init()) diff --git a/python/paddle/distributed/fleet/base/role_maker.py b/python/paddle/distributed/fleet/base/role_maker.py index 36da7264ef..f78b0b4e06 100644 --- a/python/paddle/distributed/fleet/base/role_maker.py +++ b/python/paddle/distributed/fleet/base/role_maker.py @@ -169,8 +169,7 @@ class Gloo(object): http_server = KVServer(port, size_d) http_server.start() wait_seconds = 5 - while http_server_d.get("running", - False) and not http_server.shoud_stop(): + while http_server_d.get("running", False): time.sleep(wait_seconds) http_server.stop() diff --git a/python/paddle/distributed/fleet/utils/http_server.py b/python/paddle/distributed/fleet/utils/http_server.py index 78e310b0a5..4f42ffc0b3 100644 --- a/python/paddle/distributed/fleet/utils/http_server.py +++ b/python/paddle/distributed/fleet/utils/http_server.py @@ -181,7 +181,7 @@ class KVServer: self.listen_thread.join() self.http_server.server_close() - def shoud_stop(self): + def should_stop(self): """ return whether the server should stop. diff --git a/python/paddle/distributed/parallel.py b/python/paddle/distributed/parallel.py index d35bc09634..a0dd8cbdfd 100644 --- a/python/paddle/distributed/parallel.py +++ b/python/paddle/distributed/parallel.py @@ -15,6 +15,10 @@ import os import six import warnings +from multiprocessing import Process, Manager +import netifaces +import time +import sys from paddle import compat as cpt @@ -29,6 +33,39 @@ __all__ = ["init_parallel_env"] ParallelStrategy = core.ParallelStrategy +def _start_kv_server(port, http_server_d): + from paddle.distributed.fleet.utils.http_server import KVServer + http_server = KVServer(int(port)) + http_server.start() + wait_seconds = 5 + while http_server_d.get("running", False): + time.sleep(wait_seconds) + http_server.stop() + + +def _get_iface_by_ip(ip_address): + """ + Get network interface name by its ip address. + + Args: + ip_address (string): ip address + + Returns: + Name of the network interface which has the ip address 'ip_address', + or None if the ip_address is not found. + """ + nicfaces = netifaces.interfaces() + for nicface in nicfaces: + message = netifaces.ifaddresses(nicface) + iface_info = message.get(netifaces.AF_INET) + if iface_info: + iface_dict = iface_info[0] + ipaddr = iface_dict.get('addr') + if ipaddr == ip_address: + return nicface + return None + + def init_parallel_env(): """ Initialize parallel training environment in dynamic graph mode. @@ -110,7 +147,44 @@ def init_parallel_env(): _check_var_exists("PADDLE_TRAINERS_NUM") _check_var_exists("PADDLE_TRAINER_ENDPOINTS") - # 3. init NCCL ParallelStrategy + if ParallelEnv().world_size < 2: + return + + # 3: init gloo context + ep_rank_0 = ParallelEnv().trainer_endpoints[0].split(":") + ep_rank = ParallelEnv().trainer_endpoints[ParallelEnv().rank].split(":") + manager = Manager() + # glboal dict to store status + http_server_d = manager.dict() + http_server_d["running"] = False + if ParallelEnv().rank == 0: + http_server = Process( + target=_start_kv_server, args=(int(ep_rank_0[1]), http_server_d)) + http_server.daemon = True + http_server_d["running"] = True + http_server.start() + + iface = _get_iface_by_ip(ep_rank[0]) + if iface is None: + raise ValueError("No network interface associated with the " + "ip address: {}.".format(ep_rank_0[0])) + gloo_strategy = core.GlooParallelStrategy() + gloo_strategy.rank = ParallelEnv().rank + gloo_strategy.rank_num = ParallelEnv().world_size + gloo_strategy.ip_address = ep_rank_0[0] + gloo_strategy.ip_port = int(ep_rank_0[1]) + gloo_strategy.iface = iface + default_init_timeout_seconds = 3600 + default_run_timeout_seconds = 9999999 + gloo_strategy.init_seconds = default_init_timeout_seconds + gloo_strategy.run_seconds = default_run_timeout_seconds + gloo = core.GlooParallelContext(gloo_strategy) + gloo.init() + if ParallelEnv().rank == 0: + http_server_d["running"] = False + http_server.join() + + # 4. init NCCL ParallelStrategy strategy = ParallelStrategy() if parallel_helper._is_parallel_ctx_initialized(): warnings.warn("The parallel environment has been initialized.") @@ -118,8 +192,7 @@ def init_parallel_env(): strategy.local_rank = ParallelEnv().rank strategy.trainer_endpoints = ParallelEnv().trainer_endpoints strategy.current_endpoint = ParallelEnv().current_endpoint - if strategy.nranks < 2: - return + # NOTE(chenweihang): [ why config global place here? ] # the dygraph mode will be set to default mode, # users will not call `dygraph.guard` or `enable_dygraph` diff --git a/python/paddle/distributed/spawn.py b/python/paddle/distributed/spawn.py index 6f1dcd15df..ca35a3c925 100644 --- a/python/paddle/distributed/spawn.py +++ b/python/paddle/distributed/spawn.py @@ -366,7 +366,7 @@ def spawn(func, args=(), nprocs=-1, join=True, daemon=False, **options): device = get_device() if device == 'cpu': # TODO: not supports cpu parallel now - nprocs = _cpu_num + nprocs = _cpu_num() else: nprocs = core.get_cuda_device_count() diff --git a/python/paddle/fluid/incubate/fleet/base/role_maker.py b/python/paddle/fluid/incubate/fleet/base/role_maker.py index be27a7c521..250ee165bb 100644 --- a/python/paddle/fluid/incubate/fleet/base/role_maker.py +++ b/python/paddle/fluid/incubate/fleet/base/role_maker.py @@ -982,8 +982,7 @@ class GeneralRoleMaker(RoleMakerBase): http_server = KVServer(int(self._http_ip_port[1]), size_d) http_server.start() wait_seconds = 5 - while http_server_d.get("running", - False) and not http_server.shoud_stop(): + while http_server_d.get("running", False): time.sleep(wait_seconds) http_server.stop() diff --git a/python/paddle/fluid/incubate/fleet/utils/http_server.py b/python/paddle/fluid/incubate/fleet/utils/http_server.py index 3573f417f3..50933ce5d1 100644 --- a/python/paddle/fluid/incubate/fleet/utils/http_server.py +++ b/python/paddle/fluid/incubate/fleet/utils/http_server.py @@ -173,7 +173,7 @@ class KVServer: self.listen_thread.join() self.http_server.server_close() - def shoud_stop(self): + def should_stop(self): """ return whether the server should stop. diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 5df32c5df4..0fa79f02ab 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -15,6 +15,12 @@ list(APPEND DIST_TEST_OPS test_parallel_dygraph_sparse_embedding) list(APPEND DIST_TEST_OPS test_parallel_dygraph_transformer) list(APPEND DIST_TEST_OPS test_listen_and_serv_op) list(APPEND DIST_TEST_OPS test_fleet_graph_execution_meta_optimizer) +list(APPEND DIST_TEST_OPS test_collective_reduce_api) +list(APPEND DIST_TEST_OPS test_collective_scatter_api) +list(APPEND DIST_TEST_OPS test_collective_barrier_api) +list(APPEND DIST_TEST_OPS test_collective_allreduce_api) +list(APPEND DIST_TEST_OPS test_collective_broadcast_api) +list(APPEND DIST_TEST_OPS test_collective_allgather_api) set(MIXED_DIST_TEST_OPS ${DIST_TEST_OPS}) #remove distribute unittests. list(APPEND MIXED_DIST_TEST_OPS test_dgc_op) @@ -62,12 +68,6 @@ if(NOT WITH_GPU OR WIN32) LIST(REMOVE_ITEM TEST_OPS test_broadcast) LIST(REMOVE_ITEM TEST_OPS test_collective_reduce) LIST(REMOVE_ITEM TEST_OPS test_collective_scatter) - LIST(REMOVE_ITEM TEST_OPS test_collective_reduce_api) - LIST(REMOVE_ITEM TEST_OPS test_collective_scatter_api) - LIST(REMOVE_ITEM TEST_OPS test_collective_barrier_api) - LIST(REMOVE_ITEM TEST_OPS test_collective_allreduce_api) - LIST(REMOVE_ITEM TEST_OPS test_collective_broadcast_api) - LIST(REMOVE_ITEM TEST_OPS test_collective_allgather_api) LIST(REMOVE_ITEM TEST_OPS test_reducescatter) LIST(REMOVE_ITEM TEST_OPS test_reducescatter_api) endif() diff --git a/python/paddle/fluid/tests/unittests/test_collective_api_base.py b/python/paddle/fluid/tests/unittests/test_collective_api_base.py index 437b8b7bef..b1e87a3061 100644 --- a/python/paddle/fluid/tests/unittests/test_collective_api_base.py +++ b/python/paddle/fluid/tests/unittests/test_collective_api_base.py @@ -26,6 +26,7 @@ import functools import pickle from contextlib import closing from six import string_types +import paddle import paddle.fluid as fluid import paddle.fluid.unique_name as nameGen from paddle.fluid import core @@ -60,38 +61,6 @@ class TestCollectiveAPIRunnerBase(object): else: break - def initCommunicator(self, program, rank, nranks, wait_port, - current_endpoint, endpoints): - other_endpoints = endpoints[:] - other_endpoints.remove(current_endpoint) - if rank == 0 and wait_port: - self.wait_server_ready(other_endpoints) - block = program.global_block() - nccl_id_var = block.create_var( - name=nameGen.generate('nccl_id'), - persistable=True, - type=core.VarDesc.VarType.RAW) - - block.append_op( - type='c_gen_nccl_id', - inputs={}, - outputs={'Out': nccl_id_var}, - attrs={ - 'rank': rank, - 'endpoint': current_endpoint, - 'other_endpoints': other_endpoints - }) - - block.append_op( - type='c_comm_init', - inputs={'X': nccl_id_var}, - outputs={}, - attrs={ - 'nranks': nranks, - 'rank': rank, - 'ring_id': self.global_ring_id - }) - def run_trainer(self, args): train_prog = fluid.Program() startup_prog = fluid.Program() @@ -100,23 +69,12 @@ class TestCollectiveAPIRunnerBase(object): current_endpoint = args["currentendpoint"] nranks = 2 result = self.get_model(train_prog, startup_prog, rank) + paddle.distributed.init_parallel_env() if args['backend'] == 'nccl': - self.initCommunicator(startup_prog, rank, nranks, True, - current_endpoint, endpoints) device_id = int(os.getenv("FLAGS_selected_gpus", "0")) place = fluid.CUDAPlace( device_id) #if args.use_gpu else fluid.CPUPlace() else: - strategy = fluid.core.GlooParallelStrategy() - strategy.rank = rank - strategy.rank_num = nranks - strategy.prefix = "" - strategy.iface = "lo" - strategy.init_seconds = 999999 - strategy.run_seconds = 999999 - strategy.path = "/tmp/tmp%d" % args['path_id'] - gloo = fluid.core.GlooParallelContext(strategy) - gloo.init() place = fluid.CPUPlace() exe = fluid.Executor(place) exe.run(startup_prog) @@ -199,8 +157,8 @@ class TestDistBase(unittest.TestCase): tr_cmd = "%s %s" tr0_cmd = tr_cmd % (self._python_interp, model_file) tr1_cmd = tr_cmd % (self._python_interp, model_file) - tr0_pipe = open("/tmp/tr0_err.log", "wb") - tr1_pipe = open("/tmp/tr1_err.log", "wb") + tr0_pipe = open("/tmp/tr0_err.log", "w") + tr1_pipe = open("/tmp/tr1_err.log", "w") #print(tr0_cmd) tr0_proc = subprocess.Popen( tr0_cmd.strip().split(), @@ -221,6 +179,10 @@ class TestDistBase(unittest.TestCase): # close trainer file tr0_pipe.close() tr1_pipe.close() + with open("/tmp/tr0_err.log", "r") as f: + sys.stderr.write('trainer 0 stderr file: %s\n' % f.read()) + with open("/tmp/tr1_err.log", "r") as f: + sys.stderr.write('trainer 1 stderr file: %s\n' % f.read()) return pickle.loads(tr0_out), pickle.loads( tr1_out), tr0_proc.pid, tr1_proc.pid @@ -247,6 +209,7 @@ class TestDistBase(unittest.TestCase): if check_error_log: required_envs["GLOG_v"] = "3" required_envs["GLOG_logtostderr"] = "1" + required_envs["GLOO_LOG_LEVEL"] = "TRACE" tr0_out, tr1_out, pid0, pid1 = self._run_cluster(model_file, required_envs) np.random.seed(pid0) diff --git a/python/paddle/fluid/tests/unittests/test_dist_base.py b/python/paddle/fluid/tests/unittests/test_dist_base.py index f4d368b6b6..166a44fb2d 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_base.py @@ -406,7 +406,7 @@ class TestParallelDyGraphRunnerBase(object): fluid.default_main_program().random_seed = seed np.random.seed(seed) import random - random.seed = seed + random.seed(seed) model, train_reader, opt = self.get_model() nranks = len(args.endpoints.split(",")) if args.endpoints else 1 @@ -456,7 +456,7 @@ class TestParallelDyGraphRunnerBase(object): paddle.static.default_startup_program().random_seed = seed paddle.static.default_main_program().random_seed = seed np.random.seed(seed) - random.seed = seed + random.seed(seed) # get trainer id args.trainer_id = paddle.distributed.get_rank() @@ -499,7 +499,7 @@ class TestParallelDyGraphRunnerBase(object): paddle.static.default_startup_program().random_seed = seed paddle.static.default_main_program().random_seed = seed np.random.seed(seed) - random.seed = seed + random.seed(seed) # get trainer id args.trainer_id = paddle.distributed.get_rank() diff --git a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_4.py b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_4.py index 6cb40eef27..7d8a71a236 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_4.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_4.py @@ -188,7 +188,7 @@ class TestCloudRoleMaker(unittest.TestCase): h.log_message("666") s.get_deleted_size("haha") s1 = TmpS() - s1.shoud_stop() + s1.should_stop() if __name__ == "__main__": -- GitLab