From 36c0410223dd6608f44707738decd1288f19ec4d Mon Sep 17 00:00:00 2001 From: lilong12 Date: Mon, 28 Sep 2020 15:51:55 +0800 Subject: [PATCH] Revert "Initialize gloo for low level collective apis (#27356)", test=document_fix (#27665) --- paddle/fluid/platform/gloo_context.cc | 4 +- paddle/fluid/platform/gloo_context.h | 7 +- paddle/fluid/pybind/gloo_context_py.cc | 32 +++++--- .../distributed/fleet/base/role_maker.py | 3 +- .../distributed/fleet/utils/http_server.py | 2 +- python/paddle/distributed/parallel.py | 79 +------------------ python/paddle/distributed/spawn.py | 2 +- .../fluid/incubate/fleet/base/role_maker.py | 3 +- .../fluid/incubate/fleet/utils/http_server.py | 2 +- .../fluid/tests/unittests/CMakeLists.txt | 12 +-- .../unittests/test_collective_api_base.py | 55 ++++++++++--- .../fluid/tests/unittests/test_dist_base.py | 6 +- .../tests/unittests/test_fleet_rolemaker_4.py | 2 +- 13 files changed, 93 insertions(+), 116 deletions(-) diff --git a/paddle/fluid/platform/gloo_context.cc b/paddle/fluid/platform/gloo_context.cc index efc1491336..32e7299d31 100644 --- a/paddle/fluid/platform/gloo_context.cc +++ b/paddle/fluid/platform/gloo_context.cc @@ -21,10 +21,10 @@ void GlooParallelContext::Init() { auto gloo_ptr = paddle::framework::GlooWrapper::GetInstance(); gloo_ptr->SetRank(strategy_.rank); gloo_ptr->SetSize(strategy_.rank_num); + gloo_ptr->SetPrefix(strategy_.prefix); gloo_ptr->SetIface(strategy_.iface); gloo_ptr->SetTimeoutSeconds(strategy_.init_seconds, strategy_.run_seconds); - gloo_ptr->SetHttpStore(strategy_.ip_address, strategy_.ip_port, - strategy_.scope); + gloo_ptr->SetHdfsStore(strategy_.path, strategy_.fs_name, strategy_.fs_ugi); gloo_ptr->Init(); } #endif diff --git a/paddle/fluid/platform/gloo_context.h b/paddle/fluid/platform/gloo_context.h index 399e87995e..a7dcf288a2 100644 --- a/paddle/fluid/platform/gloo_context.h +++ b/paddle/fluid/platform/gloo_context.h @@ -25,11 +25,12 @@ struct GlooParallelStrategy { int rank{0}; int rank_num{1}; std::string iface; + std::string prefix; int init_seconds{9999999}; int run_seconds{9999999}; - std::string ip_address; - int ip_port; - std::string scope{"worker"}; + std::string path; + std::string fs_name; + std::string fs_ugi; }; class GlooParallelContext { diff --git a/paddle/fluid/pybind/gloo_context_py.cc b/paddle/fluid/pybind/gloo_context_py.cc index 029d38a89e..1a9c77b0c3 100644 --- a/paddle/fluid/pybind/gloo_context_py.cc +++ b/paddle/fluid/pybind/gloo_context_py.cc @@ -62,6 +62,12 @@ void BindGlooContext(py::module *m) { [](platform::GlooParallelStrategy &self, const std::string &iface) { self.iface = iface; }) + .def_property("prefix", + [](const platform::GlooParallelStrategy &self) { + return self.prefix; + }, + [](platform::GlooParallelStrategy &self, + const std::string &prefix) { self.prefix = prefix; }) .def_property("init_seconds", [](const platform::GlooParallelStrategy &self) { return self.init_seconds; @@ -77,19 +83,23 @@ void BindGlooContext(py::module *m) { self.run_seconds = run_seconds; }) .def_property( - "ip_address", - [](const platform::GlooParallelStrategy &self) { - return self.ip_address; - }, - [](platform::GlooParallelStrategy &self, - const std::string &ip_address) { self.ip_address = ip_address; }) - .def_property("ip_port", + "path", + [](const platform::GlooParallelStrategy &self) { return self.path; }, + [](platform::GlooParallelStrategy &self, const std::string &path) { + self.path = path; + }) + .def_property("fs_name", + [](const platform::GlooParallelStrategy &self) { + return self.fs_name; + }, + [](platform::GlooParallelStrategy &self, + const std::string &fs_name) { self.fs_name = fs_name; }) + .def_property("fs_ugi", [](const platform::GlooParallelStrategy &self) { - return self.ip_port; + return self.fs_ugi; }, - [](platform::GlooParallelStrategy &self, int ip_port) { - self.ip_port = ip_port; - }); + [](platform::GlooParallelStrategy &self, + const std::string &fs_ugi) { self.fs_ugi = fs_ugi; }); py::class_ gloo_ctx(*m, "GlooParallelContext"); gloo_ctx.def(py::init()) diff --git a/python/paddle/distributed/fleet/base/role_maker.py b/python/paddle/distributed/fleet/base/role_maker.py index 94ef61ef91..1b8988f11a 100644 --- a/python/paddle/distributed/fleet/base/role_maker.py +++ b/python/paddle/distributed/fleet/base/role_maker.py @@ -169,7 +169,8 @@ class Gloo(object): http_server = KVServer(port, size_d) http_server.start() wait_seconds = 5 - while http_server_d.get("running", False): + while http_server_d.get("running", + False) and not http_server.shoud_stop(): time.sleep(wait_seconds) http_server.stop() diff --git a/python/paddle/distributed/fleet/utils/http_server.py b/python/paddle/distributed/fleet/utils/http_server.py index 4f42ffc0b3..78e310b0a5 100644 --- a/python/paddle/distributed/fleet/utils/http_server.py +++ b/python/paddle/distributed/fleet/utils/http_server.py @@ -181,7 +181,7 @@ class KVServer: self.listen_thread.join() self.http_server.server_close() - def should_stop(self): + def shoud_stop(self): """ return whether the server should stop. diff --git a/python/paddle/distributed/parallel.py b/python/paddle/distributed/parallel.py index a0dd8cbdfd..d35bc09634 100644 --- a/python/paddle/distributed/parallel.py +++ b/python/paddle/distributed/parallel.py @@ -15,10 +15,6 @@ import os import six import warnings -from multiprocessing import Process, Manager -import netifaces -import time -import sys from paddle import compat as cpt @@ -33,39 +29,6 @@ __all__ = ["init_parallel_env"] ParallelStrategy = core.ParallelStrategy -def _start_kv_server(port, http_server_d): - from paddle.distributed.fleet.utils.http_server import KVServer - http_server = KVServer(int(port)) - http_server.start() - wait_seconds = 5 - while http_server_d.get("running", False): - time.sleep(wait_seconds) - http_server.stop() - - -def _get_iface_by_ip(ip_address): - """ - Get network interface name by its ip address. - - Args: - ip_address (string): ip address - - Returns: - Name of the network interface which has the ip address 'ip_address', - or None if the ip_address is not found. - """ - nicfaces = netifaces.interfaces() - for nicface in nicfaces: - message = netifaces.ifaddresses(nicface) - iface_info = message.get(netifaces.AF_INET) - if iface_info: - iface_dict = iface_info[0] - ipaddr = iface_dict.get('addr') - if ipaddr == ip_address: - return nicface - return None - - def init_parallel_env(): """ Initialize parallel training environment in dynamic graph mode. @@ -147,44 +110,7 @@ def init_parallel_env(): _check_var_exists("PADDLE_TRAINERS_NUM") _check_var_exists("PADDLE_TRAINER_ENDPOINTS") - if ParallelEnv().world_size < 2: - return - - # 3: init gloo context - ep_rank_0 = ParallelEnv().trainer_endpoints[0].split(":") - ep_rank = ParallelEnv().trainer_endpoints[ParallelEnv().rank].split(":") - manager = Manager() - # glboal dict to store status - http_server_d = manager.dict() - http_server_d["running"] = False - if ParallelEnv().rank == 0: - http_server = Process( - target=_start_kv_server, args=(int(ep_rank_0[1]), http_server_d)) - http_server.daemon = True - http_server_d["running"] = True - http_server.start() - - iface = _get_iface_by_ip(ep_rank[0]) - if iface is None: - raise ValueError("No network interface associated with the " - "ip address: {}.".format(ep_rank_0[0])) - gloo_strategy = core.GlooParallelStrategy() - gloo_strategy.rank = ParallelEnv().rank - gloo_strategy.rank_num = ParallelEnv().world_size - gloo_strategy.ip_address = ep_rank_0[0] - gloo_strategy.ip_port = int(ep_rank_0[1]) - gloo_strategy.iface = iface - default_init_timeout_seconds = 3600 - default_run_timeout_seconds = 9999999 - gloo_strategy.init_seconds = default_init_timeout_seconds - gloo_strategy.run_seconds = default_run_timeout_seconds - gloo = core.GlooParallelContext(gloo_strategy) - gloo.init() - if ParallelEnv().rank == 0: - http_server_d["running"] = False - http_server.join() - - # 4. init NCCL ParallelStrategy + # 3. init NCCL ParallelStrategy strategy = ParallelStrategy() if parallel_helper._is_parallel_ctx_initialized(): warnings.warn("The parallel environment has been initialized.") @@ -192,7 +118,8 @@ def init_parallel_env(): strategy.local_rank = ParallelEnv().rank strategy.trainer_endpoints = ParallelEnv().trainer_endpoints strategy.current_endpoint = ParallelEnv().current_endpoint - + if strategy.nranks < 2: + return # NOTE(chenweihang): [ why config global place here? ] # the dygraph mode will be set to default mode, # users will not call `dygraph.guard` or `enable_dygraph` diff --git a/python/paddle/distributed/spawn.py b/python/paddle/distributed/spawn.py index ca35a3c925..6f1dcd15df 100644 --- a/python/paddle/distributed/spawn.py +++ b/python/paddle/distributed/spawn.py @@ -366,7 +366,7 @@ def spawn(func, args=(), nprocs=-1, join=True, daemon=False, **options): device = get_device() if device == 'cpu': # TODO: not supports cpu parallel now - nprocs = _cpu_num() + nprocs = _cpu_num else: nprocs = core.get_cuda_device_count() diff --git a/python/paddle/fluid/incubate/fleet/base/role_maker.py b/python/paddle/fluid/incubate/fleet/base/role_maker.py index 6db2e65bcf..8fdfc773ce 100644 --- a/python/paddle/fluid/incubate/fleet/base/role_maker.py +++ b/python/paddle/fluid/incubate/fleet/base/role_maker.py @@ -989,7 +989,8 @@ class GeneralRoleMaker(RoleMakerBase): http_server = KVServer(int(self._http_ip_port[1]), size_d) http_server.start() wait_seconds = 5 - while http_server_d.get("running", False): + while http_server_d.get("running", + False) and not http_server.shoud_stop(): time.sleep(wait_seconds) http_server.stop() diff --git a/python/paddle/fluid/incubate/fleet/utils/http_server.py b/python/paddle/fluid/incubate/fleet/utils/http_server.py index 50933ce5d1..3573f417f3 100644 --- a/python/paddle/fluid/incubate/fleet/utils/http_server.py +++ b/python/paddle/fluid/incubate/fleet/utils/http_server.py @@ -173,7 +173,7 @@ class KVServer: self.listen_thread.join() self.http_server.server_close() - def should_stop(self): + def shoud_stop(self): """ return whether the server should stop. diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 23aaa90d68..97a3ebc213 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -15,12 +15,6 @@ list(APPEND DIST_TEST_OPS test_parallel_dygraph_sparse_embedding) list(APPEND DIST_TEST_OPS test_parallel_dygraph_transformer) list(APPEND DIST_TEST_OPS test_listen_and_serv_op) list(APPEND DIST_TEST_OPS test_fleet_graph_execution_meta_optimizer) -list(APPEND DIST_TEST_OPS test_collective_reduce_api) -list(APPEND DIST_TEST_OPS test_collective_scatter_api) -list(APPEND DIST_TEST_OPS test_collective_barrier_api) -list(APPEND DIST_TEST_OPS test_collective_allreduce_api) -list(APPEND DIST_TEST_OPS test_collective_broadcast_api) -list(APPEND DIST_TEST_OPS test_collective_allgather_api) set(MIXED_DIST_TEST_OPS ${DIST_TEST_OPS}) #remove distribute unittests. list(APPEND MIXED_DIST_TEST_OPS test_dgc_op) @@ -68,6 +62,12 @@ if(NOT WITH_GPU OR WIN32) LIST(REMOVE_ITEM TEST_OPS test_broadcast) LIST(REMOVE_ITEM TEST_OPS test_collective_reduce) LIST(REMOVE_ITEM TEST_OPS test_collective_scatter) + LIST(REMOVE_ITEM TEST_OPS test_collective_reduce_api) + LIST(REMOVE_ITEM TEST_OPS test_collective_scatter_api) + LIST(REMOVE_ITEM TEST_OPS test_collective_barrier_api) + LIST(REMOVE_ITEM TEST_OPS test_collective_allreduce_api) + LIST(REMOVE_ITEM TEST_OPS test_collective_broadcast_api) + LIST(REMOVE_ITEM TEST_OPS test_collective_allgather_api) LIST(REMOVE_ITEM TEST_OPS test_reducescatter) LIST(REMOVE_ITEM TEST_OPS test_reducescatter_api) endif() diff --git a/python/paddle/fluid/tests/unittests/test_collective_api_base.py b/python/paddle/fluid/tests/unittests/test_collective_api_base.py index b1e87a3061..437b8b7bef 100644 --- a/python/paddle/fluid/tests/unittests/test_collective_api_base.py +++ b/python/paddle/fluid/tests/unittests/test_collective_api_base.py @@ -26,7 +26,6 @@ import functools import pickle from contextlib import closing from six import string_types -import paddle import paddle.fluid as fluid import paddle.fluid.unique_name as nameGen from paddle.fluid import core @@ -61,6 +60,38 @@ class TestCollectiveAPIRunnerBase(object): else: break + def initCommunicator(self, program, rank, nranks, wait_port, + current_endpoint, endpoints): + other_endpoints = endpoints[:] + other_endpoints.remove(current_endpoint) + if rank == 0 and wait_port: + self.wait_server_ready(other_endpoints) + block = program.global_block() + nccl_id_var = block.create_var( + name=nameGen.generate('nccl_id'), + persistable=True, + type=core.VarDesc.VarType.RAW) + + block.append_op( + type='c_gen_nccl_id', + inputs={}, + outputs={'Out': nccl_id_var}, + attrs={ + 'rank': rank, + 'endpoint': current_endpoint, + 'other_endpoints': other_endpoints + }) + + block.append_op( + type='c_comm_init', + inputs={'X': nccl_id_var}, + outputs={}, + attrs={ + 'nranks': nranks, + 'rank': rank, + 'ring_id': self.global_ring_id + }) + def run_trainer(self, args): train_prog = fluid.Program() startup_prog = fluid.Program() @@ -69,12 +100,23 @@ class TestCollectiveAPIRunnerBase(object): current_endpoint = args["currentendpoint"] nranks = 2 result = self.get_model(train_prog, startup_prog, rank) - paddle.distributed.init_parallel_env() if args['backend'] == 'nccl': + self.initCommunicator(startup_prog, rank, nranks, True, + current_endpoint, endpoints) device_id = int(os.getenv("FLAGS_selected_gpus", "0")) place = fluid.CUDAPlace( device_id) #if args.use_gpu else fluid.CPUPlace() else: + strategy = fluid.core.GlooParallelStrategy() + strategy.rank = rank + strategy.rank_num = nranks + strategy.prefix = "" + strategy.iface = "lo" + strategy.init_seconds = 999999 + strategy.run_seconds = 999999 + strategy.path = "/tmp/tmp%d" % args['path_id'] + gloo = fluid.core.GlooParallelContext(strategy) + gloo.init() place = fluid.CPUPlace() exe = fluid.Executor(place) exe.run(startup_prog) @@ -157,8 +199,8 @@ class TestDistBase(unittest.TestCase): tr_cmd = "%s %s" tr0_cmd = tr_cmd % (self._python_interp, model_file) tr1_cmd = tr_cmd % (self._python_interp, model_file) - tr0_pipe = open("/tmp/tr0_err.log", "w") - tr1_pipe = open("/tmp/tr1_err.log", "w") + tr0_pipe = open("/tmp/tr0_err.log", "wb") + tr1_pipe = open("/tmp/tr1_err.log", "wb") #print(tr0_cmd) tr0_proc = subprocess.Popen( tr0_cmd.strip().split(), @@ -179,10 +221,6 @@ class TestDistBase(unittest.TestCase): # close trainer file tr0_pipe.close() tr1_pipe.close() - with open("/tmp/tr0_err.log", "r") as f: - sys.stderr.write('trainer 0 stderr file: %s\n' % f.read()) - with open("/tmp/tr1_err.log", "r") as f: - sys.stderr.write('trainer 1 stderr file: %s\n' % f.read()) return pickle.loads(tr0_out), pickle.loads( tr1_out), tr0_proc.pid, tr1_proc.pid @@ -209,7 +247,6 @@ class TestDistBase(unittest.TestCase): if check_error_log: required_envs["GLOG_v"] = "3" required_envs["GLOG_logtostderr"] = "1" - required_envs["GLOO_LOG_LEVEL"] = "TRACE" tr0_out, tr1_out, pid0, pid1 = self._run_cluster(model_file, required_envs) np.random.seed(pid0) diff --git a/python/paddle/fluid/tests/unittests/test_dist_base.py b/python/paddle/fluid/tests/unittests/test_dist_base.py index 166a44fb2d..f4d368b6b6 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_base.py @@ -406,7 +406,7 @@ class TestParallelDyGraphRunnerBase(object): fluid.default_main_program().random_seed = seed np.random.seed(seed) import random - random.seed(seed) + random.seed = seed model, train_reader, opt = self.get_model() nranks = len(args.endpoints.split(",")) if args.endpoints else 1 @@ -456,7 +456,7 @@ class TestParallelDyGraphRunnerBase(object): paddle.static.default_startup_program().random_seed = seed paddle.static.default_main_program().random_seed = seed np.random.seed(seed) - random.seed(seed) + random.seed = seed # get trainer id args.trainer_id = paddle.distributed.get_rank() @@ -499,7 +499,7 @@ class TestParallelDyGraphRunnerBase(object): paddle.static.default_startup_program().random_seed = seed paddle.static.default_main_program().random_seed = seed np.random.seed(seed) - random.seed(seed) + random.seed = seed # get trainer id args.trainer_id = paddle.distributed.get_rank() diff --git a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_4.py b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_4.py index 6ce1eabe24..2db4fdeef4 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_4.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_4.py @@ -182,7 +182,7 @@ class TestCloudRoleMaker(unittest.TestCase): h.log_message("666") s.get_deleted_size("haha") s1 = TmpS() - s1.should_stop() + s1.shoud_stop() if __name__ == "__main__": -- GitLab