From bbc2add70318f76513b4493ce7ba446aea88ea1c Mon Sep 17 00:00:00 2001 From: lilong12 Date: Tue, 29 Sep 2020 10:56:28 +0800 Subject: [PATCH] Initialize gloo for low level collective apis (#27672) * add gloo initializer, test=develop --- paddle/fluid/platform/gloo_context.cc | 4 +- paddle/fluid/platform/gloo_context.h | 7 +-- paddle/fluid/pybind/gloo_context_py.cc | 32 ++++------- .../distributed/fleet/base/role_maker.py | 32 +++++++---- python/paddle/distributed/fleet/launch.py | 4 +- .../distributed/fleet/utils/http_server.py | 2 +- python/paddle/distributed/parallel.py | 52 +++++++++++++++++- python/paddle/distributed/spawn.py | 2 +- .../fluid/incubate/fleet/base/role_maker.py | 3 +- .../fluid/incubate/fleet/utils/http_server.py | 2 +- .../fluid/tests/unittests/CMakeLists.txt | 12 ++-- .../unittests/test_collective_api_base.py | 55 +++---------------- .../fluid/tests/unittests/test_dist_base.py | 6 +- .../tests/unittests/test_fleet_rolemaker_4.py | 2 +- .../unittests/test_fleet_rolemaker_new.py | 1 - 15 files changed, 112 insertions(+), 104 deletions(-) diff --git a/paddle/fluid/platform/gloo_context.cc b/paddle/fluid/platform/gloo_context.cc index 32e7299d319..efc1491336b 100644 --- a/paddle/fluid/platform/gloo_context.cc +++ b/paddle/fluid/platform/gloo_context.cc @@ -21,10 +21,10 @@ void GlooParallelContext::Init() { auto gloo_ptr = paddle::framework::GlooWrapper::GetInstance(); gloo_ptr->SetRank(strategy_.rank); gloo_ptr->SetSize(strategy_.rank_num); - gloo_ptr->SetPrefix(strategy_.prefix); gloo_ptr->SetIface(strategy_.iface); gloo_ptr->SetTimeoutSeconds(strategy_.init_seconds, strategy_.run_seconds); - gloo_ptr->SetHdfsStore(strategy_.path, strategy_.fs_name, strategy_.fs_ugi); + gloo_ptr->SetHttpStore(strategy_.ip_address, strategy_.ip_port, + strategy_.scope); gloo_ptr->Init(); } #endif diff --git a/paddle/fluid/platform/gloo_context.h b/paddle/fluid/platform/gloo_context.h index a7dcf288a22..399e87995ea 100644 --- a/paddle/fluid/platform/gloo_context.h +++ b/paddle/fluid/platform/gloo_context.h @@ -25,12 +25,11 @@ struct GlooParallelStrategy { int rank{0}; int rank_num{1}; std::string iface; - std::string prefix; int init_seconds{9999999}; int run_seconds{9999999}; - std::string path; - std::string fs_name; - std::string fs_ugi; + std::string ip_address; + int ip_port; + std::string scope{"worker"}; }; class GlooParallelContext { diff --git a/paddle/fluid/pybind/gloo_context_py.cc b/paddle/fluid/pybind/gloo_context_py.cc index 1a9c77b0c3a..029d38a89ea 100644 --- a/paddle/fluid/pybind/gloo_context_py.cc +++ b/paddle/fluid/pybind/gloo_context_py.cc @@ -62,12 +62,6 @@ void BindGlooContext(py::module *m) { [](platform::GlooParallelStrategy &self, const std::string &iface) { self.iface = iface; }) - .def_property("prefix", - [](const platform::GlooParallelStrategy &self) { - return self.prefix; - }, - [](platform::GlooParallelStrategy &self, - const std::string &prefix) { self.prefix = prefix; }) .def_property("init_seconds", [](const platform::GlooParallelStrategy &self) { return self.init_seconds; @@ -83,23 +77,19 @@ void BindGlooContext(py::module *m) { self.run_seconds = run_seconds; }) .def_property( - "path", - [](const platform::GlooParallelStrategy &self) { return self.path; }, - [](platform::GlooParallelStrategy &self, const std::string &path) { - self.path = path; - }) - .def_property("fs_name", - [](const platform::GlooParallelStrategy &self) { - return self.fs_name; - }, - [](platform::GlooParallelStrategy &self, - const std::string &fs_name) { self.fs_name = fs_name; }) - .def_property("fs_ugi", + "ip_address", + [](const platform::GlooParallelStrategy &self) { + return self.ip_address; + }, + [](platform::GlooParallelStrategy &self, + const std::string &ip_address) { self.ip_address = ip_address; }) + .def_property("ip_port", [](const platform::GlooParallelStrategy &self) { - return self.fs_ugi; + return self.ip_port; }, - [](platform::GlooParallelStrategy &self, - const std::string &fs_ugi) { self.fs_ugi = fs_ugi; }); + [](platform::GlooParallelStrategy &self, int ip_port) { + self.ip_port = ip_port; + }); py::class_ gloo_ctx(*m, "GlooParallelContext"); gloo_ctx.def(py::init()) diff --git a/python/paddle/distributed/fleet/base/role_maker.py b/python/paddle/distributed/fleet/base/role_maker.py index 1b8988f11a1..70e2b9209a6 100644 --- a/python/paddle/distributed/fleet/base/role_maker.py +++ b/python/paddle/distributed/fleet/base/role_maker.py @@ -19,6 +19,7 @@ import warnings from multiprocessing import Process, Manager import paddle.fluid as fluid +from paddle.distributed.fleet.base.private_helper_function import wait_server_ready class Role: @@ -77,7 +78,8 @@ class Gloo(object): self._worker_num = worker_num self._server_num = server_num self._need_init_all = need_init_all - self._iface = self.__get_default_iface() + self._start_http_server = kwargs.get("start_http_server", False) + self._iface = "" self._prefix = kwargs.get("store.prefix", "") if self._rendezvous == Gloo.RENDEZVOUS.HDFS: @@ -102,7 +104,9 @@ class Gloo(object): if not ip or not port: raise ValueError(self._err_type) - self._init_http(ip, port, self._prefix) + self._init_http(ip, port, self._prefix, self._start_http_server) + ep = ":".join([ip, port]) + wait_server_ready([ep]) else: raise ValueError(self._err_type) @@ -163,14 +167,13 @@ class Gloo(object): gloo = init(rank, nodes, "ALL") self._nodes_comm = gloo - def _init_http(self, ip, port, prefix): + def _init_http(self, ip, port, prefix, start_http_server): def __start_kv_server(http_server_d, size_d): from paddle.distributed.fleet.utils.http_server import KVServer http_server = KVServer(port, size_d) http_server.start() wait_seconds = 5 - while http_server_d.get("running", - False) and not http_server.shoud_stop(): + while http_server_d.get("running", False): time.sleep(wait_seconds) http_server.stop() @@ -203,7 +206,7 @@ class Gloo(object): port = int(port) - if self._role == Role.SERVER and self._role_id == 0: + if start_http_server: init_kv_server() if self._role == Role.WORKER: @@ -536,8 +539,8 @@ class PaddleCloudRoleMaker(RoleMakerBase): self._kwargs = kwargs self._role_is_generated = False - self._server_endpoints = None - self._worker_endpoints = None + self._server_endpoints = [] + self._worker_endpoints = [] self._gloo = Gloo() # gloo instance @@ -800,12 +803,21 @@ class PaddleCloudRoleMaker(RoleMakerBase): "store.prefix": prefix, } elif rendezvous_type == Gloo.RENDEZVOUS.HTTP: - ip = os.getenv("PADDLE_GLOO_HTTP_HOST", "") - port = os.getenv("PADDLE_GLOO_HTTP_PORT", "") + start_http_server = False + if self._is_collective: + ep_rank_0 = self._worker_endpoints[0] + if self._is_first_worker(): + start_http_server = True + else: + ep_rank_0 = self._server_endpoints[0] + if self._server_index() == 0: + start_http_server = True + ip, port = ep_rank_0.split(':') kwargs = { "http.host": ip, "http.port": port, "store.prefix": prefix, + 'start_http_server': start_http_server, } else: dfs_path = os.getenv("PADDLE_GLOO_FS_PATH", "") diff --git a/python/paddle/distributed/fleet/launch.py b/python/paddle/distributed/fleet/launch.py index c589e4f26a0..015d59b516e 100644 --- a/python/paddle/distributed/fleet/launch.py +++ b/python/paddle/distributed/fleet/launch.py @@ -220,7 +220,7 @@ def launch_collective(args): gloo_rendezvous_dir = tempfile.mkdtemp() # add gloo env global_envs["PADDLE_WITH_GLOO"] = "1" - global_envs["PADDLE_GLOO_RENDEZVOUS"] = "2" + global_envs["PADDLE_GLOO_RENDEZVOUS"] = "3" global_envs["PADDLE_GLOO_FS_PATH"] = gloo_rendezvous_dir procs = start_local_trainers( @@ -333,7 +333,7 @@ def launch_ps(args): gloo_rendezvous_dir = tempfile.mkdtemp() # add gloo env current_env["PADDLE_WITH_GLOO"] = "1" - current_env["PADDLE_GLOO_RENDEZVOUS"] = "2" + current_env["PADDLE_GLOO_RENDEZVOUS"] = "3" current_env["PADDLE_GLOO_FS_PATH"] = gloo_rendezvous_dir current_env.pop("http_proxy", None) diff --git a/python/paddle/distributed/fleet/utils/http_server.py b/python/paddle/distributed/fleet/utils/http_server.py index 78e310b0a5a..4f42ffc0b3d 100644 --- a/python/paddle/distributed/fleet/utils/http_server.py +++ b/python/paddle/distributed/fleet/utils/http_server.py @@ -181,7 +181,7 @@ class KVServer: self.listen_thread.join() self.http_server.server_close() - def shoud_stop(self): + def should_stop(self): """ return whether the server should stop. diff --git a/python/paddle/distributed/parallel.py b/python/paddle/distributed/parallel.py index d35bc096343..fab391e9fdf 100644 --- a/python/paddle/distributed/parallel.py +++ b/python/paddle/distributed/parallel.py @@ -15,6 +15,9 @@ import os import six import warnings +from multiprocessing import Process, Manager +import time +import sys from paddle import compat as cpt @@ -23,12 +26,23 @@ from paddle.fluid import core from paddle.fluid.framework import _set_expected_place from paddle.fluid.dygraph import parallel_helper from paddle.fluid.dygraph.parallel import ParallelEnv +from paddle.distributed.fleet.base.private_helper_function import wait_server_ready __all__ = ["init_parallel_env"] ParallelStrategy = core.ParallelStrategy +def _start_kv_server(port, http_server_d): + from paddle.distributed.fleet.utils.http_server import KVServer + http_server = KVServer(int(port)) + http_server.start() + wait_seconds = 5 + while http_server_d.get("running", False): + time.sleep(wait_seconds) + http_server.stop() + + def init_parallel_env(): """ Initialize parallel training environment in dynamic graph mode. @@ -110,7 +124,40 @@ def init_parallel_env(): _check_var_exists("PADDLE_TRAINERS_NUM") _check_var_exists("PADDLE_TRAINER_ENDPOINTS") - # 3. init NCCL ParallelStrategy + if ParallelEnv().world_size < 2: + return + + # 3: init gloo context + ep_rank_0 = ParallelEnv().trainer_endpoints[0].split(":") + ep_rank = ParallelEnv().trainer_endpoints[ParallelEnv().rank].split(":") + manager = Manager() + # glboal dict to store status + http_server_d = manager.dict() + http_server_d["running"] = False + if ParallelEnv().rank == 0: + http_server = Process( + target=_start_kv_server, args=(int(ep_rank_0[1]), http_server_d)) + http_server.daemon = True + http_server_d["running"] = True + http_server.start() + wait_server_ready([ParallelEnv().trainer_endpoints[0]]) + + gloo_strategy = core.GlooParallelStrategy() + gloo_strategy.rank = ParallelEnv().rank + gloo_strategy.rank_num = ParallelEnv().world_size + gloo_strategy.ip_address = ep_rank_0[0] + gloo_strategy.ip_port = int(ep_rank_0[1]) + default_init_timeout_seconds = 3600 + default_run_timeout_seconds = 9999999 + gloo_strategy.init_seconds = default_init_timeout_seconds + gloo_strategy.run_seconds = default_run_timeout_seconds + gloo = core.GlooParallelContext(gloo_strategy) + gloo.init() + if ParallelEnv().rank == 0: + http_server_d["running"] = False + http_server.join() + + # 4. init NCCL ParallelStrategy strategy = ParallelStrategy() if parallel_helper._is_parallel_ctx_initialized(): warnings.warn("The parallel environment has been initialized.") @@ -118,8 +165,7 @@ def init_parallel_env(): strategy.local_rank = ParallelEnv().rank strategy.trainer_endpoints = ParallelEnv().trainer_endpoints strategy.current_endpoint = ParallelEnv().current_endpoint - if strategy.nranks < 2: - return + # NOTE(chenweihang): [ why config global place here? ] # the dygraph mode will be set to default mode, # users will not call `dygraph.guard` or `enable_dygraph` diff --git a/python/paddle/distributed/spawn.py b/python/paddle/distributed/spawn.py index 6f1dcd15df3..ca35a3c9259 100644 --- a/python/paddle/distributed/spawn.py +++ b/python/paddle/distributed/spawn.py @@ -366,7 +366,7 @@ def spawn(func, args=(), nprocs=-1, join=True, daemon=False, **options): device = get_device() if device == 'cpu': # TODO: not supports cpu parallel now - nprocs = _cpu_num + nprocs = _cpu_num() else: nprocs = core.get_cuda_device_count() diff --git a/python/paddle/fluid/incubate/fleet/base/role_maker.py b/python/paddle/fluid/incubate/fleet/base/role_maker.py index 8fdfc773ce7..6db2e65bcff 100644 --- a/python/paddle/fluid/incubate/fleet/base/role_maker.py +++ b/python/paddle/fluid/incubate/fleet/base/role_maker.py @@ -989,8 +989,7 @@ class GeneralRoleMaker(RoleMakerBase): http_server = KVServer(int(self._http_ip_port[1]), size_d) http_server.start() wait_seconds = 5 - while http_server_d.get("running", - False) and not http_server.shoud_stop(): + while http_server_d.get("running", False): time.sleep(wait_seconds) http_server.stop() diff --git a/python/paddle/fluid/incubate/fleet/utils/http_server.py b/python/paddle/fluid/incubate/fleet/utils/http_server.py index 3573f417f35..50933ce5d1b 100644 --- a/python/paddle/fluid/incubate/fleet/utils/http_server.py +++ b/python/paddle/fluid/incubate/fleet/utils/http_server.py @@ -173,7 +173,7 @@ class KVServer: self.listen_thread.join() self.http_server.server_close() - def shoud_stop(self): + def should_stop(self): """ return whether the server should stop. diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 43c54c2fc1f..0fd1d749be6 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -15,6 +15,12 @@ list(APPEND DIST_TEST_OPS test_parallel_dygraph_sparse_embedding) list(APPEND DIST_TEST_OPS test_parallel_dygraph_transformer) list(APPEND DIST_TEST_OPS test_listen_and_serv_op) list(APPEND DIST_TEST_OPS test_fleet_graph_execution_meta_optimizer) +list(APPEND DIST_TEST_OPS test_collective_reduce_api) +list(APPEND DIST_TEST_OPS test_collective_scatter_api) +list(APPEND DIST_TEST_OPS test_collective_barrier_api) +list(APPEND DIST_TEST_OPS test_collective_allreduce_api) +list(APPEND DIST_TEST_OPS test_collective_broadcast_api) +list(APPEND DIST_TEST_OPS test_collective_allgather_api) set(MIXED_DIST_TEST_OPS ${DIST_TEST_OPS}) #remove distribute unittests. list(APPEND MIXED_DIST_TEST_OPS test_dgc_op) @@ -62,12 +68,6 @@ if(NOT WITH_GPU OR WIN32) LIST(REMOVE_ITEM TEST_OPS test_broadcast) LIST(REMOVE_ITEM TEST_OPS test_collective_reduce) LIST(REMOVE_ITEM TEST_OPS test_collective_scatter) - LIST(REMOVE_ITEM TEST_OPS test_collective_reduce_api) - LIST(REMOVE_ITEM TEST_OPS test_collective_scatter_api) - LIST(REMOVE_ITEM TEST_OPS test_collective_barrier_api) - LIST(REMOVE_ITEM TEST_OPS test_collective_allreduce_api) - LIST(REMOVE_ITEM TEST_OPS test_collective_broadcast_api) - LIST(REMOVE_ITEM TEST_OPS test_collective_allgather_api) LIST(REMOVE_ITEM TEST_OPS test_reducescatter) LIST(REMOVE_ITEM TEST_OPS test_reducescatter_api) endif() diff --git a/python/paddle/fluid/tests/unittests/test_collective_api_base.py b/python/paddle/fluid/tests/unittests/test_collective_api_base.py index 437b8b7befa..b1e87a30614 100644 --- a/python/paddle/fluid/tests/unittests/test_collective_api_base.py +++ b/python/paddle/fluid/tests/unittests/test_collective_api_base.py @@ -26,6 +26,7 @@ import functools import pickle from contextlib import closing from six import string_types +import paddle import paddle.fluid as fluid import paddle.fluid.unique_name as nameGen from paddle.fluid import core @@ -60,38 +61,6 @@ class TestCollectiveAPIRunnerBase(object): else: break - def initCommunicator(self, program, rank, nranks, wait_port, - current_endpoint, endpoints): - other_endpoints = endpoints[:] - other_endpoints.remove(current_endpoint) - if rank == 0 and wait_port: - self.wait_server_ready(other_endpoints) - block = program.global_block() - nccl_id_var = block.create_var( - name=nameGen.generate('nccl_id'), - persistable=True, - type=core.VarDesc.VarType.RAW) - - block.append_op( - type='c_gen_nccl_id', - inputs={}, - outputs={'Out': nccl_id_var}, - attrs={ - 'rank': rank, - 'endpoint': current_endpoint, - 'other_endpoints': other_endpoints - }) - - block.append_op( - type='c_comm_init', - inputs={'X': nccl_id_var}, - outputs={}, - attrs={ - 'nranks': nranks, - 'rank': rank, - 'ring_id': self.global_ring_id - }) - def run_trainer(self, args): train_prog = fluid.Program() startup_prog = fluid.Program() @@ -100,23 +69,12 @@ class TestCollectiveAPIRunnerBase(object): current_endpoint = args["currentendpoint"] nranks = 2 result = self.get_model(train_prog, startup_prog, rank) + paddle.distributed.init_parallel_env() if args['backend'] == 'nccl': - self.initCommunicator(startup_prog, rank, nranks, True, - current_endpoint, endpoints) device_id = int(os.getenv("FLAGS_selected_gpus", "0")) place = fluid.CUDAPlace( device_id) #if args.use_gpu else fluid.CPUPlace() else: - strategy = fluid.core.GlooParallelStrategy() - strategy.rank = rank - strategy.rank_num = nranks - strategy.prefix = "" - strategy.iface = "lo" - strategy.init_seconds = 999999 - strategy.run_seconds = 999999 - strategy.path = "/tmp/tmp%d" % args['path_id'] - gloo = fluid.core.GlooParallelContext(strategy) - gloo.init() place = fluid.CPUPlace() exe = fluid.Executor(place) exe.run(startup_prog) @@ -199,8 +157,8 @@ class TestDistBase(unittest.TestCase): tr_cmd = "%s %s" tr0_cmd = tr_cmd % (self._python_interp, model_file) tr1_cmd = tr_cmd % (self._python_interp, model_file) - tr0_pipe = open("/tmp/tr0_err.log", "wb") - tr1_pipe = open("/tmp/tr1_err.log", "wb") + tr0_pipe = open("/tmp/tr0_err.log", "w") + tr1_pipe = open("/tmp/tr1_err.log", "w") #print(tr0_cmd) tr0_proc = subprocess.Popen( tr0_cmd.strip().split(), @@ -221,6 +179,10 @@ class TestDistBase(unittest.TestCase): # close trainer file tr0_pipe.close() tr1_pipe.close() + with open("/tmp/tr0_err.log", "r") as f: + sys.stderr.write('trainer 0 stderr file: %s\n' % f.read()) + with open("/tmp/tr1_err.log", "r") as f: + sys.stderr.write('trainer 1 stderr file: %s\n' % f.read()) return pickle.loads(tr0_out), pickle.loads( tr1_out), tr0_proc.pid, tr1_proc.pid @@ -247,6 +209,7 @@ class TestDistBase(unittest.TestCase): if check_error_log: required_envs["GLOG_v"] = "3" required_envs["GLOG_logtostderr"] = "1" + required_envs["GLOO_LOG_LEVEL"] = "TRACE" tr0_out, tr1_out, pid0, pid1 = self._run_cluster(model_file, required_envs) np.random.seed(pid0) diff --git a/python/paddle/fluid/tests/unittests/test_dist_base.py b/python/paddle/fluid/tests/unittests/test_dist_base.py index f4d368b6b6f..166a44fb2d4 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_base.py @@ -406,7 +406,7 @@ class TestParallelDyGraphRunnerBase(object): fluid.default_main_program().random_seed = seed np.random.seed(seed) import random - random.seed = seed + random.seed(seed) model, train_reader, opt = self.get_model() nranks = len(args.endpoints.split(",")) if args.endpoints else 1 @@ -456,7 +456,7 @@ class TestParallelDyGraphRunnerBase(object): paddle.static.default_startup_program().random_seed = seed paddle.static.default_main_program().random_seed = seed np.random.seed(seed) - random.seed = seed + random.seed(seed) # get trainer id args.trainer_id = paddle.distributed.get_rank() @@ -499,7 +499,7 @@ class TestParallelDyGraphRunnerBase(object): paddle.static.default_startup_program().random_seed = seed paddle.static.default_main_program().random_seed = seed np.random.seed(seed) - random.seed = seed + random.seed(seed) # get trainer id args.trainer_id = paddle.distributed.get_rank() diff --git a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_4.py b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_4.py index 2db4fdeef4d..6ce1eabe24f 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_4.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_4.py @@ -182,7 +182,7 @@ class TestCloudRoleMaker(unittest.TestCase): h.log_message("666") s.get_deleted_size("haha") s1 = TmpS() - s1.shoud_stop() + s1.should_stop() if __name__ == "__main__": diff --git a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_new.py b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_new.py index 26a56fb08b4..cf32c803ff8 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_new.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_new.py @@ -677,7 +677,6 @@ class TestGlooWithCloudRoleMaker(unittest.TestCase): os.environ["PADDLE_GLOO_HTTP_PORT"] = "" role = role_maker.PaddleCloudRoleMaker() - self.assertRaises(ValueError, role._generate_role) def test_fs_gloo8(self): plats = platform.platform() -- GitLab