From 99626502f747cc85d518d87267cec821ffbf69a3 Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Fri, 18 Sep 2020 22:32:28 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90paddle.fleet=E3=80=91gloo=20and=20util?= =?UTF-8?q?=20(#27213)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix worker endpoints * fix gloo wrapper for hdfs * GPU fleetrun support gloo * parameterserver fleetrun support gloo * fix get server endpoint --- paddle/fluid/framework/fleet/gloo_wrapper.cc | 25 +- python/paddle/distributed/fleet/__init__.py | 1 + .../distributed/fleet/base/fleet_base.py | 22 +- .../distributed/fleet/base/role_maker.py | 592 ++++++++++++------ .../distributed/fleet/base/util_factory.py | 39 +- python/paddle/distributed/fleet/launch.py | 26 +- .../paddle/distributed/fleet/launch_utils.py | 10 +- .../fluid/tests/unittests/test_fleet_base.py | 19 +- .../unittests/test_fleet_rolemaker_new.py | 283 ++++++++- .../fluid/tests/unittests/test_fleet_util.py | 97 +-- 10 files changed, 749 insertions(+), 365 deletions(-) diff --git a/paddle/fluid/framework/fleet/gloo_wrapper.cc b/paddle/fluid/framework/fleet/gloo_wrapper.cc index bb958f1ac01..f195dde4084 100644 --- a/paddle/fluid/framework/fleet/gloo_wrapper.cc +++ b/paddle/fluid/framework/fleet/gloo_wrapper.cc @@ -19,6 +19,8 @@ limitations under the License. */ namespace gloo { namespace rendezvous { +constexpr int kNodeSize = 136; + HdfsStore::HdfsStore(const std::string& path) { path_ = path; wait_sleep_ms_ = 10000; @@ -213,12 +215,14 @@ void ParallelConnectContext::connectFullMesh( storeKey << rank; store.set(storeKey.str(), allBytes); + auto total_add_size = kNodeSize * (size - 1); + std::vector> connect_threads(thread_num_); // Connect every pair for (uint32_t i = 0; i < connect_threads.size(); ++i) { connect_threads[i].reset(new std::thread( - [&store, &transportContext, this](size_t thread_idx, - size_t thread_num) -> void { + [&store, &transportContext, total_add_size, this]( + size_t thread_idx, size_t thread_num) -> void { for (int i = thread_idx; i < size; i += thread_num) { if (i == rank) { continue; @@ -226,8 +230,23 @@ void ParallelConnectContext::connectFullMesh( // Wait for address of other side of this pair to become available std::string key = std::to_string(i); store.wait({key}, getTimeout()); + + std::vector allAddrs; + auto max_retry_times = 5; // Connect to other side of this pair - auto allAddrs = store.get(key); + + while (max_retry_times > 0) { + allAddrs = store.get(key); + + VLOG(3) << "store get all address size: " << allAddrs.size() + << " except: " << total_add_size; + if (allAddrs.size() == static_cast(total_add_size)) { + break; + } + + --max_retry_times; + } + auto addr = extractAddress(allAddrs, i); transportContext->getPair(i)->connect(addr); } diff --git a/python/paddle/distributed/fleet/__init__.py b/python/paddle/distributed/fleet/__init__.py index 2539fa57a34..f3ee09a6d9e 100644 --- a/python/paddle/distributed/fleet/__init__.py +++ b/python/paddle/distributed/fleet/__init__.py @@ -39,6 +39,7 @@ server_num = fleet.server_num server_index = fleet.server_index server_endpoints = fleet.server_endpoints is_server = fleet.is_server +set_util = fleet.set_util util = fleet.util barrier_worker = fleet.barrier_worker init_worker = fleet.init_worker diff --git a/python/paddle/distributed/fleet/base/fleet_base.py b/python/paddle/distributed/fleet/base/fleet_base.py index 805c2d1fc73..aeb8cac98e2 100644 --- a/python/paddle/distributed/fleet/base/fleet_base.py +++ b/python/paddle/distributed/fleet/base/fleet_base.py @@ -180,6 +180,8 @@ class Fleet(object): raise ValueError( "`role_maker` should be subclass of `RoleMakerBase`, but got {}". format(type(role_maker))) + self._role_maker.generate_role() + self.strategy_compiler = StrategyCompiler() if paddle.fluid.framework.in_dygraph_mode(): if parallel_helper._is_parallel_ctx_initialized(): @@ -187,7 +189,6 @@ class Fleet(object): "The dygraph parallel environment has been initialized.") else: paddle.distributed.init_parallel_env() - return None def is_first_worker(self): """ @@ -275,13 +276,10 @@ class Fleet(object): fleet.worker_endpoints() """ - ''' if to_string: return ",".join(self._role_maker.get_trainer_endpoints()) else: return self._role_maker.get_trainer_endpoints() - ''' - return ["127.0.0.1:1001", "127.0.0.1:1002"] def server_num(self): """ @@ -355,7 +353,9 @@ class Fleet(object): return self._role_maker.is_server( ) or self._role_maker._is_heter_worker() - @property + def set_util(self, util): + self._util = util + def util(self): """ Utility functions that can be used under certain runtime @@ -376,16 +376,6 @@ class Fleet(object): """ return self._util - @util.setter - def util(self, util): - """ - Set Utility functions for userd-defined runtime - - Returns: - None - """ - self._util = util - def barrier_worker(self): """ barrier all workers @@ -393,7 +383,7 @@ class Fleet(object): Returns: None """ - self._role_maker.barrier_worker() + self._role_maker._barrier("worker") @is_non_distributed_check @inited_runtime_handler diff --git a/python/paddle/distributed/fleet/base/role_maker.py b/python/paddle/distributed/fleet/base/role_maker.py index a3a809ee375..d36c06047f5 100644 --- a/python/paddle/distributed/fleet/base/role_maker.py +++ b/python/paddle/distributed/fleet/base/role_maker.py @@ -13,18 +13,332 @@ # limitations under the License. """Defination of Role Makers.""" import os +import time import numpy as np import warnings from multiprocessing import Process, Manager -import paddle.fluid as fluid -#__all__ = ['UserDefinedRoleMaker', 'PaddleCloudRoleMaker'] +import paddle.fluid as fluid class Role: WORKER = 1 SERVER = 2 HETER_WORKER = 3 + ALL = 4 + + +class Gloo(object): + """ + Gloo is a universal class for barrier and collective communication + """ + + class RENDEZVOUS: + HDFS = 1 + FILE = 2 + HTTP = 3 + + def __init__(self): + self._worker_comm = None + self._server_comm = None + self._nodes_comm = None + + self._comm_world = ["worker", "server", "all"] + self._err_init = "gloo is not initialized, will not communicator with other nodes" + self._err_type = "gloo initialized error, please check arguments" + self._err_world = "argument error, comm_world must in {}".format( + self._comm_world) + + self._is_initialized = False + self._init_timeout_seconds = 3600 + self._run_timeout_seconds = 9999999 + + self._rendezvous = None + self._role = None + self._iface = None + + self._role_id = -1 + self._worker_num = -1 + self._server_num = -1 + self._need_init_all = False + + def init(self, + rendezvous, + role, + role_id, + worker_num, + server_num, + need_init_all=False, + kwargs=None): + + self._rendezvous = rendezvous + self._role = role + self._role_id = role_id + self._worker_num = worker_num + self._server_num = server_num + self._need_init_all = need_init_all + self._iface = self.__get_default_iface() + self._prefix = kwargs.get("store.prefix", "") + + if self._rendezvous == Gloo.RENDEZVOUS.HDFS: + dfs_name = kwargs.get("dfs.name", "") + dfs_ugi = kwargs.get("dfs.ugi", "") + dfs_path = kwargs.get("dfs.path", "") + + if not dfs_name or not dfs_ugi or not dfs_path: + raise ValueError(self._err_type) + self._init_dfs(dfs_name, dfs_ugi, dfs_path, self._prefix) + + elif self._rendezvous == Gloo.RENDEZVOUS.FILE: + fs_path = kwargs.get("dfs.path", "") + + if not fs_path: + raise ValueError(self._err_type) + self._init_fs(fs_path, self._prefix) + + elif self._rendezvous == Gloo.RENDEZVOUS.HTTP: + ip = kwargs.get("http.host", "") + port = kwargs.get("http.port", "") + + if not ip or not port: + raise ValueError(self._err_type) + self._init_http(ip, port, self._prefix) + + else: + raise ValueError(self._err_type) + + self._is_initialized = True + + def _init_fs(self, fs_path, prefix): + def init(rank, nodes, role): + gloo = fluid.core.Gloo() + gloo.set_rank(rank) + gloo.set_size(nodes) + gloo.set_prefix(prefix) + gloo.set_iface(self._iface) + gloo.set_timeout_seconds(self._init_timeout_seconds, + self._run_timeout_seconds) + gloo.set_hdfs_store(os.path.join(fs_path, role), "", "") + gloo.init() + return gloo + + if self._role == Role.WORKER: + rank, nodes = self._get_rank_nodes(Role.WORKER) + gloo = init(rank, nodes, "WORKER") + self._worker_comm = gloo + else: + rank, nodes = self._get_rank_nodes(Role.SERVER) + gloo = init(rank, nodes, "SERVER") + self._server_comm = gloo + + if self._need_init_all: + rank, nodes = self._get_rank_nodes(Role.ALL) + gloo = init(rank, nodes, "ALL") + self._nodes_comm = gloo + + def _init_dfs(self, dfs_name, dfs_ugi, dfs_path, prefix): + def init(rank, nodes, role): + gloo = fluid.core.Gloo() + gloo.set_rank(rank) + gloo.set_size(nodes) + gloo.set_prefix(prefix) + gloo.set_iface(self._iface) + gloo.set_timeout_seconds(self._init_timeout_seconds, + self._run_timeout_seconds) + gloo.set_hdfs_store(os.path.join(dfs_path, role), dfs_name, dfs_ugi) + gloo.init() + return gloo + + if self._role == Role.WORKER: + rank, nodes = self._get_rank_nodes(Role.WORKER) + gloo = init(rank, nodes, "WORKER") + self._worker_comm = gloo + else: + rank, nodes = self._get_rank_nodes(Role.SERVER) + gloo = init(rank, nodes, "SERVER") + self._server_comm = gloo + + if self._need_init_all: + rank, nodes = self._get_rank_nodes(Role.ALL) + gloo = init(rank, nodes, "ALL") + self._nodes_comm = gloo + + def _init_http(self, ip, port, prefix): + def __start_kv_server(http_server_d, size_d): + from paddle.distributed.fleet.utils.http_server import KVServer + http_server = KVServer(port, size_d) + http_server.start() + wait_seconds = 5 + while http_server_d.get("running", + False) and not http_server.shoud_stop(): + time.sleep(wait_seconds) + http_server.stop() + + def init_kv_server(): + size_d = { + "trainer": self._worker_num, + "pserver": self._server_num, + "all": self._worker_num + self._server_num + } + + _http_server_d = {"running": True} + # child process for http server + _http_server = Process( + target=__start_kv_server, args=(_http_server_d, size_d)) + _http_server.daemon = True + # set running status to True + # start child process + _http_server.start() + + def init(rank, nodes, role): + gloo = fluid.core.Gloo() + gloo.set_rank(rank) + gloo.set_size(nodes) + gloo.set_prefix(prefix) + gloo.set_iface(self._iface) + gloo.set_timeout_seconds(self._init_timeout_seconds, + self._run_timeout_seconds) + gloo.set_http_store(ip, port, role) + return gloo + + port = int(port) + + if self._role == Role.SERVER and self._role_id == 0: + init_kv_server() + + if self._role == Role.WORKER: + rank, nodes = self._get_rank_nodes(Role.WORKER) + gloo = init(rank, nodes, "WORKER") + self._worker_comm = gloo + else: + rank, nodes = self._get_rank_nodes(Role.SERVER) + gloo = init(rank, nodes, "SERVER") + self._server_comm = gloo + + if self._need_init_all: + rank, nodes = self._get_rank_nodes(Role.ALL) + gloo = init(rank, nodes, "ALL") + self._nodes_comm = gloo + + def _get_rank_nodes(self, role): + nodes = 0 + rank = -1 + + if role == Role.WORKER: + nodes = self._worker_num + rank = self._role_id + elif role == Role.SERVER: + nodes = self._server_num + rank = self._role_id + elif role == Role.ALL: + nodes = self._worker_num + self._server_num + + if self._role == Role.WORKER: + rank = self._role_id + else: + rank = self._worker_num + self._role_id + else: + ValueError(self._err_type) + + return rank, nodes + + def __get_default_iface(self): + """ + get default physical interface + """ + default1 = self.__get_default_iface_from_gateway() + default2 = self.__get_default_iface_from_interfaces() + return default2 if default1 == "lo" else default1 + + def __get_default_iface_from_gateway(self): + """ + get default physical interface + """ + import netifaces + gateways = netifaces.gateways() + if gateways.get(netifaces.AF_INET) != None: + gateway = gateways[netifaces.AF_INET] + if len(gateway) > 0 and len(gateway[0]) > 1: + return gateway[0][1] + return "lo" + + def __get_default_iface_from_interfaces(self): + """ + get default physical interface + """ + import netifaces + for intf_name in netifaces.interfaces(): + addresses = netifaces.ifaddresses(intf_name) + if netifaces.AF_INET in addresses: + ipv4_addresses = addresses[netifaces.AF_INET] + for ipv4_address in ipv4_addresses: + if 'broadcast' in ipv4_address: + return intf_name + return "lo" + + def barrier(self, comm_world): + """ + dummy barrier, do nothing + """ + if not self._is_initialized: + warnings.warn(self._err_init) + return + + if comm_world not in self._comm_world: + raise ValueError(self._err_world) + + if comm_world == "worker": + self._worker_comm.barrier() + elif comm_world == "server": + self._server_comm.barrier() + else: + self._nodes_comm.barrier() + + def all_reduce(self, input, mode="sum", comm_world="worker"): + if not self._is_initialized: + warnings.warn(self._err_init) + return input + + if comm_world not in self._comm_world: + raise ValueError(self._err_world) + + input = np.array(input) + input_shape = input.shape + input_list = input.reshape(-1).tolist() + + self.barrier(comm_world) + + if comm_world == "worker": + ans = self._worker_comm.all_reduce(input_list, mode) + elif comm_world == "server": + ans = self._server_comm.all_reduce(input_list, mode) + else: + ans = self._nodes_comm.all_reduce(input_list, mode) + + output = np.array(ans).reshape(input_shape) + return output + + def all_gather(self, input, comm_world="worker"): + """ + dummy all gather, do nothing + Args: + obj(any): obj to do all gather + """ + if not self._is_initialized: + warnings.warn(self._err_init) + return input + + if comm_world not in self._comm_world: + raise ValueError(self._err_world) + + if comm_world == "worker": + output = self._worker_comm.all_gather(input) + elif comm_world == "server": + output = self._server_comm.all_gather(input) + else: + output = self._nodes_comm.all_gather(input) + + return output class RoleMakerBase(object): @@ -47,10 +361,6 @@ class RoleMakerBase(object): self._heter_trainer_device = "CPU" self._is_heter_parameter_server_mode = False - self._node_type = None - self._node_type_comm = None - self._all_comm = None - def is_worker(self): """ return is_worker() of current process @@ -142,19 +452,11 @@ class RoleMakerBase(object): self._role, self._current_id, self._worker_endpoints, self._server_endpoints) - def _all_gather(self, comm_world, input): - """ - - Args: - input(int|float): input value - - Returns: - return a list of values - """ - print("warning: RoleMakerBase does not have all gather.") + def _all_gather(self, input, comm_world="worker"): + print("warning: RoleMakerBase does not have all gather worker.") return None - def _all_reduce(self, comm_world, input, mode="sum"): + def _all_reduce(self, input, mode="sum", comm_world="worker"): """ Args: input(list/numpy.array): array of one dim @@ -221,73 +523,25 @@ class PaddleCloudRoleMaker(RoleMakerBase): def __init__(self, is_collective=False, **kwargs): super(PaddleCloudRoleMaker, self).__init__() self._is_collective = is_collective - self._init_gloo = False # default no init gloo - self._kwargs = kwargs + self._non_distributed = False + + self._kwargs = kwargs self._role_is_generated = False self._server_endpoints = None self._worker_endpoints = None - self._node_type_comm = None - self._all_comm = None - - self._non_distributed = False - - if not self._is_collective: - self._hdfs_name = kwargs.get("hdfs_name", "") - self._hdfs_ugi = kwargs.get("hdfs_ugi", "") - self._hdfs_path = kwargs.get("path", "").rstrip("/") - self._init_timeout_seconds = kwargs.get("init_timeout_seconds", - 3600) - self._run_timeout_seconds = kwargs.get("run_timeout_seconds", - 9999999) - ip_port = kwargs.get("http_ip_port", "") - self._http_ip_port = [] - self._http_server = None - # if ip_port is not empty, it will use http instead of hdfs - if ip_port != "": - self._http_ip_port = ip_port.split(":") - # it's for communication between processes - self._manager = Manager() - # global dict to store status - self._http_server_d = self._manager.dict() - # set running status of http server - self._http_server_d["running"] = False - self._iface = self.__get_default_iface() - # this environment variable can be empty - self._prefix = os.getenv("SYS_JOB_ID", "") + self._gloo = Gloo() # gloo instance def _barrier(self, comm_world): - if isinstance(comm_world, fluid.core.Gloo): - comm_world.barrier() - else: - print("warning: must init Gloo before using _barrier() function") - - def _all_gather(self, comm_world, input): - if isinstance(comm_world, fluid.core.Gloo): - self._barrier(comm_world) - output = comm_world.all_gather(input) - return output - else: - print("warning: must init Gloo before using _all_gather() function") - return None - - def _all_reduce(self, comm_world, input, mode="sum"): - if isinstance(comm_world, fluid.core.Gloo): - - input = np.array(input) + self._gloo.barrier(comm_world) - input_shape = input.shape - input_list = input.reshape(-1).tolist() + def _all_gather(self, input, comm_world="worker"): + return self._gloo.all_gather(input, comm_world) - self._barrier(comm_world) - ans = comm_world.all_reduce(input_list, mode) - output = np.array(ans).reshape(input_shape) - return output - else: - print("warning: must init Gloo before using _all_reduce() function") - return None + def _all_reduce(self, input, mode="sum", comm_world="worker"): + return self._gloo.all_reduce(input, mode, comm_world) def is_worker(self): """ @@ -349,7 +603,7 @@ class PaddleCloudRoleMaker(RoleMakerBase): """ if not self._role_is_generated: self.generate_role() - return self._trainers_num + return len(self.get_pserver_endpoints()) def node_num(self): """ @@ -421,8 +675,7 @@ class PaddleCloudRoleMaker(RoleMakerBase): # Environment variable PADDLE_PSERVERS_IP_PORT_LIST must be set # format: string(ip:port,ip:port), eg. 127.0.0.1:6001,127.0.0.1:6002 self._server_endpoints = os.getenv("PADDLE_PSERVERS_IP_PORT_LIST") - self._worker_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS", - "").split(",") + if self._server_endpoints is None: # back to non_distributed execution. self._server_endpoints = "" @@ -436,6 +689,13 @@ class PaddleCloudRoleMaker(RoleMakerBase): return self._server_endpoints = self._server_endpoints.split(",") + + self._worker_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS") + if self._worker_endpoints: + self._worker_endpoints = self._worker_endpoints.split(",") + else: + self._worker_endpoints = [] + trainers_num = int(os.environ["PADDLE_TRAINERS_NUM"]) training_role = os.environ["TRAINING_ROLE"] @@ -506,6 +766,7 @@ class PaddleCloudRoleMaker(RoleMakerBase): self._current_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) self._training_role = os.getenv("PADDLE_TRAINING_ROLE", "TRAINER") assert (self._training_role == "TRAINER") + self._role = Role.WORKER self._worker_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS") self._cur_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT") if self._worker_endpoints is None: @@ -518,74 +779,64 @@ class PaddleCloudRoleMaker(RoleMakerBase): self._node_num = len( set([x.split(':')[0] for x in self._worker_endpoints])) - def _init_gloo_env(self): - def init_gloo_instance(role="trainer"): - role = role.lower() - assert role in ["trainer", "pserver", "all"] - if role == "trainer": - all_list = self._worker_endpoints - rank = self._current_id - elif role == "pserver": - all_list = self._server_endpoints - rank = self._current_id - else: - all_list = self._worker_endpoints + self._server_endpoints - rank = all_list.index(self._cur_endpoint) - gloo = fluid.core.Gloo() - gloo.set_rank(rank) - gloo.set_size(len(all_list)) - gloo.set_prefix(self._prefix) - gloo.set_iface(self._iface) - gloo.set_timeout_seconds(self._init_timeout_seconds, - self._run_timeout_seconds) - if len(self._http_ip_port) != 0: - gloo.set_http_store(self._http_ip_port[0], - int(self._http_ip_port[1]), role) - else: - gloo.set_hdfs_store(self._hdfs_path + "/" + role, - self._hdfs_name, self._hdfs_ugi) - gloo.init() - return gloo - - # paddlecloud support gloo - if self._role == Role.WORKER: - if self._current_id == 0 and len(self._http_ip_port) != 0: - size_d = { - "trainer": len(self._worker_endpoints), - "pserver": len(self._server_endpoints), - "all": - len(self._worker_endpoints) + len(self._server_endpoints) - } - # child process for http server - self._http_server = Process( - target=self.__start_kv_server, - args=(self._http_server_d, size_d)) - self._http_server.daemon = True - # set running status to True - self._http_server_d["running"] = True - # start child process - self._http_server.start() - self._node_type = 1 - gloo = init_gloo_instance("trainer") - self._node_type_comm = gloo + def _gloo_init(self): + # PADDLE_WITH_GLOO 1: trainer barrier, 2: all barrier + use_gloo = int(os.getenv("PADDLE_WITH_GLOO", "0")) + if use_gloo not in [1, 2]: + return + + # PADDLE_GLOO_RENDEZVOUS 1: HDFS 2: FILE 3: HTTP + rendezvous_type = int(os.getenv("PADDLE_GLOO_RENDEZVOUS", "0")) + prefix = os.getenv("SYS_JOB_ID", "") + if rendezvous_type not in [ + Gloo.RENDEZVOUS.HDFS, Gloo.RENDEZVOUS.HTTP, Gloo.RENDEZVOUS.FILE + ]: + raise ValueError(self._gloo._err_type) + + need_init_all = True if use_gloo == 2 else False + + if rendezvous_type == Gloo.RENDEZVOUS.HDFS: + dfs_name = os.getenv("PADDLE_GLOO_FS_NAME", "") + dfs_ugi = os.getenv("PADDLE_GLOO_FS_UGI", "") + dfs_path = os.getenv("PADDLE_GLOO_FS_PATH", "") + kwargs = { + "dfs.name": dfs_name, + "dfs.ugi": dfs_ugi, + "dfs.path": dfs_path, + "store.prefix": prefix, + } + elif rendezvous_type == Gloo.RENDEZVOUS.HTTP: + ip = os.getenv("PADDLE_GLOO_HTTP_HOST", "") + port = os.getenv("PADDLE_GLOO_HTTP_PORT", "") + kwargs = { + "http.host": ip, + "http.port": port, + "store.prefix": prefix, + } else: - assert self._role == Role.SERVER - self._node_type = 0 - gloo = init_gloo_instance("pserver") - self._node_type_comm = gloo - - all_list = self._worker_endpoints + self._server_endpoints - self._rank = all_list.index(self._cur_endpoint) - self._size = len(all_list) - - gloo = init_gloo_instance("all") - self._all_comm = gloo - - if self._http_server is not None: - # set running status to False - self._http_server_d["running"] = False - # wait until child process exits - self._http_server.join() + dfs_path = os.getenv("PADDLE_GLOO_FS_PATH", "") + kwargs = { + "dfs.path": dfs_path, + "store.prefix": prefix, + } + + if rendezvous_type == Gloo.RENDEZVOUS.HDFS: + type = "HDFS" + elif rendezvous_type == Gloo.RENDEZVOUS.HTTP: + type = "HTTP" + else: + type = "FILE" + print("Gloo init with {}: need_init_all: {}, args: {}".format( + type, need_init_all, kwargs)) + + self._gloo.init( + rendezvous=rendezvous_type, + role=self._role, + role_id=self.role_id(), + worker_num=self.worker_num(), + server_num=self.server_num(), + need_init_all=need_init_all, + kwargs=kwargs) def generate_role(self): """ @@ -594,57 +845,10 @@ class PaddleCloudRoleMaker(RoleMakerBase): if not self._role_is_generated: if not self._is_collective: self._ps_env() - if "PADDLE_WITH_GLOO" in os.environ: - self._init_gloo = bool(os.environ["PADDLE_WITH_GLOO"]) - if self._init_gloo: - self._init_gloo_env() else: self._collective_env() self._role_is_generated = True - - def __get_default_iface(self): - """ - get default physical interface - """ - default1 = self.__get_default_iface_from_gateway() - default2 = self.__get_default_iface_from_interfaces() - return default2 if default1 == "lo" else default1 - - def __get_default_iface_from_gateway(self): - """ - get default physical interface - """ - import netifaces - gateways = netifaces.gateways() - if gateways.get(netifaces.AF_INET) != None: - gateway = gateways[netifaces.AF_INET] - if len(gateway) > 0 and len(gateway[0]) > 1: - return gateway[0][1] - return "lo" - - def __get_default_iface_from_interfaces(self): - """ - get default physical interface - """ - import netifaces - for intf_name in netifaces.interfaces(): - addresses = netifaces.ifaddresses(intf_name) - if netifaces.AF_INET in addresses: - ipv4_addresses = addresses[netifaces.AF_INET] - for ipv4_address in ipv4_addresses: - if 'broadcast' in ipv4_address: - return intf_name - return "lo" - - def __start_kv_server(self, http_server_d, size_d): - from paddle.distributed.fleet.utils.http_server import KVServer - http_server = KVServer(int(self._http_ip_port[1]), size_d) - http_server.start() - wait_seconds = 5 - while http_server_d.get("running", - False) and not http_server.shoud_stop(): - time.sleep(wait_seconds) - http_server.stop() + self._gloo_init() class UserDefinedRoleMaker(PaddleCloudRoleMaker): @@ -677,7 +881,7 @@ class UserDefinedRoleMaker(PaddleCloudRoleMaker): self._worker_endpoints = self._kwargs.get("worker_endpoints") self._current_id = self._kwargs.get("current_id") self._trainers_num = len(self._worker_endpoints) - self._training_role = Role.Worker + self._training_role = Role.WORKER self._node_num = len( set([x.split(':')[0] for x in self._worker_endpoints])) @@ -688,8 +892,6 @@ class UserDefinedRoleMaker(PaddleCloudRoleMaker): if not self._role_is_generated: if not self._is_collective: self._user_defined_ps_env() - if self._init_gloo: - self._init_gloo_env() else: self._user_defined_collective_env() self._role_is_generated = True diff --git a/python/paddle/distributed/fleet/base/util_factory.py b/python/paddle/distributed/fleet/base/util_factory.py index 4fa247c3196..e822c3c92f4 100644 --- a/python/paddle/distributed/fleet/base/util_factory.py +++ b/python/paddle/distributed/fleet/base/util_factory.py @@ -57,34 +57,7 @@ class UtilBase(object): ), "fs_client must be the instance of paddle.distributed.fleet.utils.FS" self.fs_client = fs_client - def __check_comm_world(self, comm_world="worker"): - if not self.role_maker._role_is_generated: - self.role_maker.generate_role() - - _comm_world = None - comm_world_upper = comm_world.upper() - if comm_world_upper == "WORKER": - if not self.role_maker.is_worker(): - print( - "warning: current role is not worker in collective_func(comm_world=\"worker\")" - ) - _comm_world = self.role_maker._node_type_comm - elif comm_world_upper == "SERVER": - if not self.role_maker.is_server(): - print( - "warning: current role is not server in collective_func(comm_world=\"server\")" - ) - _comm_world = self.role_maker._node_type_comm - elif comm_world_upper == "ALL": - _comm_world = self.role_maker._all_comm - else: - raise ValueError( - "not support comm_world, please choose one from [worker, server, all]" - ) - - return _comm_world - - def all_reduce(self, input, mode, comm_world="worker"): + def all_reduce(self, input, mode="sum", comm_world="worker"): """ All reduce `input` between specified collection. This is a distributed API. @@ -130,8 +103,7 @@ class UtilBase(object): if __name__ == "__main__": train() """ - _comm_world = self.__check_comm_world(comm_world) - return self.role_maker._all_reduce(_comm_world, input, mode) + return self.role_maker._all_reduce(input, mode, comm_world) def barrier(self, comm_world="worker"): """ @@ -170,8 +142,7 @@ class UtilBase(object): if __name__ == "__main__": train() """ - _comm_world = self.__check_comm_world(comm_world) - self.role_maker._barrier(_comm_world) + self.role_maker._barrier(comm_world) def all_gather(self, input, comm_world="worker"): """ @@ -219,8 +190,8 @@ class UtilBase(object): if __name__ == "__main__": train() """ - _comm_world = self.__check_comm_world(comm_world) - return self.role_maker._all_gather(_comm_world, input) + + return self.role_maker._all_gather(input, comm_world) def _broadcast(self): pass diff --git a/python/paddle/distributed/fleet/launch.py b/python/paddle/distributed/fleet/launch.py index a527393f602..4b629bc35ce 100644 --- a/python/paddle/distributed/fleet/launch.py +++ b/python/paddle/distributed/fleet/launch.py @@ -55,7 +55,10 @@ launch a process on each of the given gpu card or cpu machine. """ from __future__ import print_function + +import shutil import sys +import tempfile from sys import version import subprocess import os @@ -213,12 +216,20 @@ def launch_collective(args): cluster, pod = get_cluster_from_args(args, gpus) logger.debug("get cluster from args:{}".format(cluster)) + global_envs = copy.copy(os.environ.copy()) + gloo_rendezvous_dir = tempfile.mkdtemp() + # add gloo env + global_envs["PADDLE_WITH_GLOO"] = "1" + global_envs["PADDLE_GLOO_RENDEZVOUS"] = "2" + global_envs["PADDLE_GLOO_FS_PATH"] = gloo_rendezvous_dir + procs = start_local_trainers( cluster, pod, training_script=args.training_script, training_script_args=args.training_script_args, - log_dir=args.log_dir) + log_dir=args.log_dir, + envs=global_envs) while True: alive = watch_local_trainers(procs, cluster.trainers_nranks()) @@ -230,6 +241,9 @@ def launch_collective(args): time.sleep(3) + if os.path.exists(gloo_rendezvous_dir): + shutil.rmtree(gloo_rendezvous_dir) + def launch_ps(args): ports = None @@ -315,6 +329,13 @@ def launch_ps(args): default_env = os.environ.copy() current_env = copy.copy(default_env) + + gloo_rendezvous_dir = tempfile.mkdtemp() + # add gloo env + current_env["PADDLE_WITH_GLOO"] = "1" + current_env["PADDLE_GLOO_RENDEZVOUS"] = "2" + current_env["PADDLE_GLOO_FS_PATH"] = gloo_rendezvous_dir + current_env.pop("http_proxy", None) current_env.pop("https_proxy", None) procs = [] @@ -419,6 +440,9 @@ def launch_ps(args): procs[i].proc.terminate() print("all parameter server are killed", file=sys.stderr) + if os.path.exists(gloo_rendezvous_dir): + shutil.rmtree(gloo_rendezvous_dir) + def launch(): args = _parse_args() diff --git a/python/paddle/distributed/fleet/launch_utils.py b/python/paddle/distributed/fleet/launch_utils.py index b6f4c75a276..17d3b96cf44 100644 --- a/python/paddle/distributed/fleet/launch_utils.py +++ b/python/paddle/distributed/fleet/launch_utils.py @@ -398,8 +398,14 @@ def start_local_trainers(cluster, pod, training_script, training_script_args, - log_dir=None): - current_env = copy.copy(os.environ.copy()) + log_dir=None, + envs=None): + + if envs is None: + current_env = copy.copy(os.environ.copy()) + else: + current_env = copy.copy(envs) + #paddle broadcast ncclUniqueId use socket, and #proxy maybe make trainers unreachable, so delete them. #if we set them to "", grpc will log error message "bad uri" diff --git a/python/paddle/fluid/tests/unittests/test_fleet_base.py b/python/paddle/fluid/tests/unittests/test_fleet_base.py index 4ced9841ee4..3a90b363f27 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_base.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_base.py @@ -27,7 +27,7 @@ class TestFleetBase(unittest.TestCase): os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001" os.environ["PADDLE_TRAINERS_NUM"] = "2" os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = \ - "127.0.0.1:36001,127.0.0.2:36001" + "127.0.0.1:36001,127.0.0.2:36001" def test_init(self): role = role_maker.PaddleCloudRoleMaker(is_collective=True) @@ -88,7 +88,7 @@ class TestFleetBase(unittest.TestCase): def test_util(self): role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) - self.assertEqual(fleet.util, None) + self.assertEqual(fleet.util(), None) def test_barrier_worker(self): role = role_maker.PaddleCloudRoleMaker(is_collective=True) @@ -99,20 +99,17 @@ class TestFleetBase(unittest.TestCase): def test_init_worker(self): role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) - if fleet.is_worker(): - fleet.init_worker() - def test_run_server(self): - role = role_maker.PaddleCloudRoleMaker(is_collective=True) - fleet.init(role) - if fleet.is_worker(): - fleet.run_worker() + with self.assertRaises(ValueError): + if fleet.is_worker(): + fleet.init_worker() def test_stop_worker(self): role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) - if fleet.is_worker(): - fleet.stop_worker() + with self.assertRaises(ValueError): + if fleet.is_worker(): + fleet.stop_worker() def test_distributed_optimizer(self): role = role_maker.PaddleCloudRoleMaker(is_collective=True) diff --git a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_new.py b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_new.py index cf9b3e1e9a1..d786fa1eba8 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_new.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_new.py @@ -15,7 +15,11 @@ from __future__ import print_function import os +import platform +import shutil +import tempfile import unittest +import paddle import paddle.distributed.fleet.base.role_maker as role_maker @@ -42,9 +46,9 @@ class TestRoleMakerBase(unittest.TestCase): self.assertTrue(len(pserver_endpoints) == 0) print(role.to_string()) - self.assertTrue(role._all_gather(role._node_type_comm, 1) is None) - self.assertTrue(role._all_reduce(role._node_type_comm, 1) is None) - role._barrier(role._node_type_comm) + self.assertTrue(role._all_gather(1, "worker") is None) + self.assertTrue(role._all_reduce(1, "sum", "worker") is None) + role._barrier("worker") class TestCloudRoleMaker(unittest.TestCase): @@ -72,8 +76,8 @@ class TestCloudRoleMaker(unittest.TestCase): print("warning: no netifaces, skip test_tr_rolemaker") return - ro = role_maker.PaddleCloudRoleMaker( - is_collective=False, init_gloo=False) + ro = role_maker.PaddleCloudRoleMaker(is_collective=False) + self.assertTrue(ro.is_worker()) self.assertFalse(ro.is_server()) self.assertEqual(ro.worker_num(), 2) @@ -108,8 +112,9 @@ class TestCloudRoleMaker(unittest.TestCase): self.assertEqual(ro.server_num(), 2) pserver_endpoints = ro.get_pserver_endpoints() self.assertEqual(pserver_endpoints[0], '127.0.0.1:36001') - self.assertTrue(ro._all_gather(ro._all_comm, 1) is None) - self.assertTrue(ro._all_reduce(ro._all_comm, 1) is None) + + self.assertEqual(ro._all_gather(1, "worker"), 1) + self.assertEqual(ro._all_reduce(1, "sum", "worker"), 1) def test_traing_role(self): """Test training role.""" @@ -142,7 +147,7 @@ class TestUserDefinedRoleMaker(unittest.TestCase): ro = role_maker.UserDefinedRoleMaker( is_collective=False, init_gloo=False, - server_endpoints="127.0.0.1:36001,127.0.0.1:36001", + server_endpoints=["127.0.0.1:36001", "127.0.0.1:36001"], role=role_maker.Role.SERVER, current_id=0, worker_num=2) @@ -161,14 +166,274 @@ class TestUserDefinedRoleMaker(unittest.TestCase): ro = role_maker.UserDefinedRoleMaker( is_collective=False, init_gloo=False, - server_endpoints="127.0.0.1:36001,127.0.0.1:36001", + server_endpoints=["127.0.0.1:36001", "127.0.0.1:36001"], role=role_maker.Role.WORKER, current_id=0, worker_num=2) + self.assertIn("127.0.0.1:36001", ro.get_pserver_endpoints()) self.assertTrue(ro.is_worker()) self.assertEqual(ro.role_id(), 0) +class TestGlooWithCloudRoleMaker(unittest.TestCase): + def setUp(self): + os.environ["PADDLE_TRAINERS_NUM"] = "1" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001" + os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_TRAINER_ID"] = "0" + + def case(self, role, comm_world): + role._barrier(comm_world) + + gather = role._all_gather(1, comm_world) + self.assertEqual(gather[0], 1) + + all_reduce = role._all_reduce(1, "sum", comm_world) + self.assertEqual(1, all_reduce) + + def mkdir(self): + tmp = tempfile.mkdtemp() + return tmp + + def clean(self, tmp): + shutil.rmtree(tmp) + + def test_hdfs_gloo(self): + plats = platform.platform() + if 'Linux' not in plats: + print("skip gloo UT on MacOS/Win") + return + + tmp = self.mkdir() + os.environ["TRAINING_ROLE"] = "TRAINER" + os.environ["SYS_JOB_ID"] = "gloo_for_cluster" + os.environ["PADDLE_WITH_GLOO"] = "1" + os.environ["PADDLE_GLOO_RENDEZVOUS"] = "1" + os.environ["PADDLE_GLOO_FS_NAME"] = "NULL" + os.environ["PADDLE_GLOO_FS_UGI"] = "NULL" + os.environ["PADDLE_GLOO_FS_PATH"] = tmp + + role = role_maker.PaddleCloudRoleMaker() + role.generate_role() + self.case(role, "worker") + self.clean(tmp) + + def test_fs_gloo(self): + plats = platform.platform() + if 'Linux' not in plats: + print("skip gloo UT on MacOS/Win") + return + + tmp = self.mkdir() + os.environ["TRAINING_ROLE"] = "TRAINER" + os.environ["SYS_JOB_ID"] = "gloo_for_cluster" + os.environ["PADDLE_WITH_GLOO"] = "1" + os.environ["PADDLE_GLOO_RENDEZVOUS"] = "2" + os.environ["PADDLE_GLOO_FS_PATH"] = tmp + + role = role_maker.PaddleCloudRoleMaker() + role.generate_role() + self.case(role, "worker") + self.clean(tmp) + + def test_fs_gloo2(self): + plats = platform.platform() + if 'Linux' not in plats: + print("skip gloo UT on MacOS/Win") + return + + tmp = self.mkdir() + os.environ["TRAINING_ROLE"] = "PSERVER" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + + os.environ["SYS_JOB_ID"] = "gloo_for_cluster" + os.environ["PADDLE_WITH_GLOO"] = "1" + os.environ["PADDLE_GLOO_RENDEZVOUS"] = "2" + os.environ["PADDLE_GLOO_FS_PATH"] = tmp + + role = role_maker.PaddleCloudRoleMaker() + role.generate_role() + self.case(role, "server") + self.clean(tmp) + + def test_fs_gloo3(self): + plats = platform.platform() + if 'Linux' not in plats: + print("skip gloo UT on MacOS/Win") + return + + tmp = self.mkdir() + os.environ["TRAINING_ROLE"] = "PSERVER" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + + os.environ["SYS_JOB_ID"] = "gloo_for_cluster" + os.environ["PADDLE_WITH_GLOO"] = "1" + os.environ["PADDLE_GLOO_RENDEZVOUS"] = "1" + os.environ["PADDLE_GLOO_FS_NAME"] = "NULL" + os.environ["PADDLE_GLOO_FS_UGI"] = "NULL" + os.environ["PADDLE_GLOO_FS_PATH"] = tmp + + role = role_maker.PaddleCloudRoleMaker() + role.generate_role() + self.case(role, "server") + self.clean(tmp) + + def test_fs_gloo4(self): + plats = platform.platform() + if 'Linux' not in plats: + print("skip gloo UT on MacOS/Win") + return + + os.environ["TRAINING_ROLE"] = "PSERVER" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + + os.environ["SYS_JOB_ID"] = "gloo_for_cluster" + os.environ["PADDLE_WITH_GLOO"] = "1" + os.environ["PADDLE_GLOO_RENDEZVOUS"] = "3" + os.environ["PADDLE_GLOO_HTTP_HOST"] = "127.0.0.1" + os.environ["PADDLE_GLOO_HTTP_PORT"] = "30019" + + role = role_maker.PaddleCloudRoleMaker() + role.generate_role() + import time + time.sleep(3) + + def test_fs_gloo5(self): + plats = platform.platform() + if 'Linux' not in plats: + print("skip gloo UT on MacOS/Win") + return + + tmp = self.mkdir() + + os.environ["TRAINING_ROLE"] = "PSERVER" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + os.environ["PADDLE_TRAINERS_NUM"] = "0" + + os.environ["SYS_JOB_ID"] = "gloo_for_cluster" + os.environ["PADDLE_WITH_GLOO"] = "2" + os.environ["PADDLE_GLOO_RENDEZVOUS"] = "2" + os.environ["PADDLE_GLOO_FS_PATH"] = tmp + + role = role_maker.PaddleCloudRoleMaker() + role.generate_role() + self.case(role, "server") + self.case(role, "all") + self.clean(tmp) + + def test_fs_gloo6(self): + plats = platform.platform() + if 'Linux' not in plats: + print("skip gloo UT on MacOS/Win") + return + + tmp = self.mkdir() + + os.environ["TRAINING_ROLE"] = "PSERVER" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + os.environ["PADDLE_TRAINERS_NUM"] = "0" + + os.environ["SYS_JOB_ID"] = "gloo_for_cluster" + + os.environ["PADDLE_WITH_GLOO"] = "2" + os.environ["PADDLE_GLOO_RENDEZVOUS"] = "1" + os.environ["PADDLE_GLOO_FS_NAME"] = "NULL" + os.environ["PADDLE_GLOO_FS_UGI"] = "NULL" + os.environ["PADDLE_GLOO_FS_PATH"] = tmp + + role = role_maker.PaddleCloudRoleMaker() + role.generate_role() + self.case(role, "server") + self.case(role, "all") + self.clean(tmp) + + def test_fs_gloo7(self): + plats = platform.platform() + if 'Linux' not in plats: + print("skip gloo UT on MacOS/Win") + return + + os.environ["TRAINING_ROLE"] = "PSERVER" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + os.environ["PADDLE_TRAINERS_NUM"] = "0" + + os.environ["SYS_JOB_ID"] = "gloo_for_cluster" + + os.environ["PADDLE_WITH_GLOO"] = "1" + os.environ["PADDLE_GLOO_RENDEZVOUS"] = "5" + + role = role_maker.PaddleCloudRoleMaker() + self.assertRaises(ValueError, role.generate_role) + + def test_fs_gloo8(self): + plats = platform.platform() + if 'Linux' not in plats: + print("skip gloo UT on MacOS/Win") + return + + tmp = self.mkdir() + + os.environ["TRAINING_ROLE"] = "PSERVER" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + os.environ["PADDLE_TRAINERS_NUM"] = "0" + + os.environ["SYS_JOB_ID"] = "gloo_for_cluster" + + os.environ["PADDLE_WITH_GLOO"] = "2" + os.environ["PADDLE_GLOO_RENDEZVOUS"] = "1" + os.environ["PADDLE_GLOO_FS_NAME"] = "NULL" + os.environ["PADDLE_GLOO_FS_UGI"] = "NULL" + os.environ["PADDLE_GLOO_FS_PATH"] = tmp + + def net(): + x = paddle.fluid.layers.data(name='x', shape=[13], dtype='float32') + y_predict = paddle.fluid.layers.fc(input=x, size=1, act=None) + y = paddle.fluid.layers.data(name='y', shape=[1], dtype='float32') + cost = paddle.fluid.layers.square_error_cost( + input=y_predict, label=y) + avg_cost = paddle.fluid.layers.mean(cost) + return avg_cost + + from paddle.distributed import fleet + + role = role_maker.PaddleCloudRoleMaker() + fleet.init(role) + avg_cost = net() + + strategy = paddle.distributed.fleet.DistributedStrategy() + strategy.a_sync = False + + optimizer = paddle.optimizer.SGD(0.01) + optimizer = fleet.distributed_optimizer(optimizer, strategy) + optimizer.minimize(avg_cost) + + comm_world = "server" + fleet.util().barrier(comm_world) + + gather = fleet.util().all_gather(1, comm_world) + self.assertEqual(gather[0], 1) + + all_reduce = fleet.util().all_reduce(1, "sum", comm_world) + self.assertEqual(1, all_reduce) + + self.clean(tmp) + + if __name__ == "__main__": unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_fleet_util.py b/python/paddle/fluid/tests/unittests/test_fleet_util.py index d506088fde0..1570912e740 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_util.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_util.py @@ -59,7 +59,7 @@ class TestFleetUtil(unittest.TestCase): import paddle.distributed.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) - default_util = fleet.util + default_util = fleet.util() self.assertEqual(default_util, None) def test_set_user_defined_util(self): @@ -76,8 +76,8 @@ class TestFleetUtil(unittest.TestCase): role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) my_util = UserDefinedUtil() - fleet.util = my_util - user_id = fleet.util.get_user_id() + fleet.set_util(my_util) + user_id = fleet.util().get_user_id() self.assertEqual(user_id, 10) def test_fs(self): @@ -88,97 +88,6 @@ class TestFleetUtil(unittest.TestCase): self.assertFalse(fs.need_upload_download()) fleet_util._set_file_system(fs) - def test_barrier(self): - try: - import netifaces - except: - print("warning: no netifaces, skip test_barrier") - return - - gloo = fluid.core.Gloo() - gloo.set_rank(0) - gloo.set_size(1) - gloo.set_prefix("123") - gloo.set_iface("lo") - gloo.set_hdfs_store("./tmp_test_fleet_barrier", "", "") - gloo.init() - - role = role_maker.UserDefinedRoleMaker( - is_collective=False, - init_gloo=False, - current_id=0, - role=role_maker.Role.SERVER, - worker_endpoints=["127.0.0.1:6003"], - server_endpoints=["127.0.0.1:6001"]) - role._node_type_comm = gloo - role._role_is_generated = True - fleet_util._set_role_maker(role) - - fleet_util.barrier("worker") - - def test_all_reduce(self): - try: - import netifaces - except: - print("warning: no netifaces, skip test_all_reduce") - return - - gloo = fluid.core.Gloo() - gloo.set_rank(0) - gloo.set_size(1) - gloo.set_prefix("123") - gloo.set_iface("lo") - gloo.set_hdfs_store("./tmp_test_fleet_reduce", "", "") - gloo.init() - - role = role_maker.UserDefinedRoleMaker( - is_collective=False, - init_gloo=False, - current_id=0, - role=role_maker.Role.WORKER, - worker_endpoints=["127.0.0.1:6003"], - server_endpoints=["127.0.0.1:6001"]) - role._node_type_comm = gloo - role._role_is_generated = True - fleet_util._set_role_maker(role) - - output = fleet_util.all_reduce(1, "sum", comm_world="server") - print(output) - - # self.assertEqual(output, 1) - - def test_all_gather(self): - try: - import netifaces - except: - print("warning: no netifaces, skip test_all_gather") - return - - gloo = fluid.core.Gloo() - gloo.set_rank(0) - gloo.set_size(1) - gloo.set_prefix("123") - gloo.set_iface("lo") - gloo.set_hdfs_store("./tmp_test_fleet_reduce", "", "") - gloo.init() - - role = role_maker.UserDefinedRoleMaker( - is_collective=False, - init_gloo=False, - current_id=0, - role=role_maker.Role.SERVER, - worker_endpoints=["127.0.0.1:6003"], - server_endpoints=["127.0.0.1:6001"]) - role._node_type_comm = gloo - role._all_comm = gloo - role._role_is_generated = True - fleet_util._set_role_maker(role) - - output = fleet_util.all_gather(1, comm_world="all") - print(output) - # self.assertTrue(len(output) == 1 and output[0] == 1) - self.assertRaises(Exception, fleet_util.all_gather, 1, "test") - def download_files(self): path = download(self.proto_data_url, self.module_name, self.proto_data_md5) -- GitLab