From d6b54de46753827c23cabe5f3307f7493db194d0 Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Sun, 20 Sep 2020 13:18:26 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90paddle.fleet=E3=80=91Fix/role=20maker?= =?UTF-8?q?=20api=20fix=20(#27326)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix fleet util and gloo * fix worker endpoints * fix * fix UT * fix gloo * fix gloo * update gloo * update gloo * update gloo * update gloo * update gloo * fix gloo wrapper for hdfs * add file gloo and UT * fix UT * fix UT * fix UT * hide public method of RoleMaker * fix UT * GPU fleetrun support gloo * parameterserver fleetrun support gloo * add UT * add UT * fix UT * fix get server endpoint * fix get server endpoint * fix UT * hide public method of rolemaker * hide public method of rolemaker * hide public method of rolemaker * Update test_fleet_rolemaker_new.py * hide public method of rolemaker * hide public method of rolemaker --- .../distributed/fleet/base/fleet_base.py | 24 +- .../distributed/fleet/base/role_maker.py | 112 +++-- .../distributed/fleet/base/util_factory.py | 6 +- .../fleet/meta_optimizers/common.py | 6 +- .../fleet/meta_optimizers/dgc_optimizer.py | 4 +- .../graph_execution_optimizer.py | 18 +- .../meta_optimizers/localsgd_optimizer.py | 10 +- .../parameter_server_graph_optimizer.py | 2 +- .../parameter_server_optimizer.py | 4 +- .../meta_optimizers/pipeline_optimizer.py | 8 +- .../fleet/runtime/parameter_server_runtime.py | 21 +- .../fleet/parameter_server/ir/public.py | 30 +- .../fluid/tests/unittests/test_fleet_base.py | 49 ++- .../tests/unittests/test_fleet_rolemaker_2.py | 2 +- .../unittests/test_fleet_rolemaker_new.py | 414 ++++++++++++++++-- 15 files changed, 531 insertions(+), 179 deletions(-) diff --git a/python/paddle/distributed/fleet/base/fleet_base.py b/python/paddle/distributed/fleet/base/fleet_base.py index aeb8cac98e..d00faac838 100644 --- a/python/paddle/distributed/fleet/base/fleet_base.py +++ b/python/paddle/distributed/fleet/base/fleet_base.py @@ -180,7 +180,7 @@ class Fleet(object): raise ValueError( "`role_maker` should be subclass of `RoleMakerBase`, but got {}". format(type(role_maker))) - self._role_maker.generate_role() + self._role_maker._generate_role() self.strategy_compiler = StrategyCompiler() if paddle.fluid.framework.in_dygraph_mode(): @@ -207,7 +207,7 @@ class Fleet(object): fleet.is_first_worker() """ - return self._role_maker.is_first_worker() + return self._role_maker._is_first_worker() def worker_index(self): """ @@ -224,7 +224,7 @@ class Fleet(object): fleet.worker_index() """ - return self._role_maker.worker_index() + return self._role_maker._worker_index() def worker_num(self): """ @@ -241,7 +241,7 @@ class Fleet(object): fleet.worker_num() """ - return self._role_maker.worker_num() + return self._role_maker._worker_num() def is_worker(self): """ @@ -259,7 +259,7 @@ class Fleet(object): fleet.is_worker() """ - return self._role_maker.is_worker() + return self._role_maker._is_worker() def worker_endpoints(self, to_string=False): """ @@ -277,9 +277,9 @@ class Fleet(object): """ if to_string: - return ",".join(self._role_maker.get_trainer_endpoints()) + return ",".join(self._role_maker._get_trainer_endpoints()) else: - return self._role_maker.get_trainer_endpoints() + return self._role_maker._get_trainer_endpoints() def server_num(self): """ @@ -294,7 +294,7 @@ class Fleet(object): fleet.init() fleet.server_num() """ - return len(self._role_maker.get_pserver_endpoints()) + return len(self._role_maker._get_pserver_endpoints()) def server_index(self): """ @@ -311,7 +311,7 @@ class Fleet(object): fleet.server_index() """ - return self._role_maker.server_index() + return self._role_maker._server_index() def server_endpoints(self, to_string=False): """ @@ -330,9 +330,9 @@ class Fleet(object): """ if to_string: - return ",".join(self._role_maker.get_pserver_endpoints()) + return ",".join(self._role_maker._get_pserver_endpoints()) else: - return self._role_maker.get_pserver_endpoints() + return self._role_maker._get_pserver_endpoints() def is_server(self): """ @@ -350,7 +350,7 @@ class Fleet(object): fleet.is_server() """ - return self._role_maker.is_server( + return self._role_maker._is_server( ) or self._role_maker._is_heter_worker() def set_util(self, util): diff --git a/python/paddle/distributed/fleet/base/role_maker.py b/python/paddle/distributed/fleet/base/role_maker.py index d36c06047f..81d5908ccd 100644 --- a/python/paddle/distributed/fleet/base/role_maker.py +++ b/python/paddle/distributed/fleet/base/role_maker.py @@ -361,19 +361,19 @@ class RoleMakerBase(object): self._heter_trainer_device = "CPU" self._is_heter_parameter_server_mode = False - def is_worker(self): + def _is_worker(self): """ return is_worker() of current process """ raise NotImplementedError("Please implement this method in child class") - def is_server(self): + def _is_server(self): """ return is_server() of current process """ raise NotImplementedError("Please implement this method in child class") - def is_first_worker(self): + def _is_first_worker(self): """ Check whether the node is the first instance of worker. Returns: @@ -382,7 +382,7 @@ class RoleMakerBase(object): """ raise NotImplementedError("Please implement this method in child class") - def worker_num(self): + def _worker_num(self): """ Get current total worker number. @@ -391,7 +391,7 @@ class RoleMakerBase(object): """ raise NotImplementedError("Please implement this method in child class") - def server_num(self): + def _server_num(self): """ Get current total server number. @@ -400,7 +400,7 @@ class RoleMakerBase(object): """ raise NotImplementedError("Please implement this method in child class") - def worker_index(self): + def _worker_index(self): """ Get current worker id. @@ -409,7 +409,7 @@ class RoleMakerBase(object): """ raise NotImplementedError("Please implement this method in child class") - def server_index(self): + def _server_index(self): """ Get current server id. @@ -418,7 +418,7 @@ class RoleMakerBase(object): """ raise NotImplementedError("Please implement this method in child class") - def role_id(self): + def _role_id(self): """ Get current id. @@ -427,7 +427,7 @@ class RoleMakerBase(object): """ raise NotImplementedError("Please implement this method in child class") - def node_num(self): + def _node_num(self): """ Get the training node number Returns: @@ -435,13 +435,13 @@ class RoleMakerBase(object): """ raise NotImplementedError("Please implement this method in child class") - def get_trainer_endpoints(self): + def _get_trainer_endpoints(self): """ return trainer endpoints """ return self._worker_endpoints - def get_pserver_endpoints(self): + def _get_pserver_endpoints(self): """ return pserver endpoints """ @@ -543,90 +543,92 @@ class PaddleCloudRoleMaker(RoleMakerBase): def _all_reduce(self, input, mode="sum", comm_world="worker"): return self._gloo.all_reduce(input, mode, comm_world) - def is_worker(self): + def _is_worker(self): """ whether current process is worker """ if not self._role_is_generated: - self.generate_role() + self._generate_role() return self._role == Role.WORKER - def is_server(self): + def _is_server(self): """ whether current process is server """ if not self._role_is_generated: - self.generate_role() + self._generate_role() return self._role == Role.SERVER - def is_first_worker(self): + def _is_first_worker(self): """ whether current process is worker of rank 0 """ if not self._role_is_generated: - self.generate_role() + self._generate_role() return self._role == Role.WORKER and self._current_id == 0 - def worker_index(self): + def _worker_index(self): """ get index of current worker """ if not self._role_is_generated: - self.generate_role() + self._generate_role() return self._current_id - def server_index(self): + def _server_index(self): """ get index of current server """ if not self._role_is_generated: - self.generate_role() + self._generate_role() return self._current_id - def role_id(self): + def _role_id(self): """ get index of current node """ + if not self._role_is_generated: + self._generate_role() return self._current_id - def worker_num(self): + def _worker_num(self): """ retrun the current number of worker """ if not self._role_is_generated: - self.generate_role() + self._generate_role() return self._trainers_num - def server_num(self): + def _server_num(self): """ return the current number of server """ if not self._role_is_generated: - self.generate_role() - return len(self.get_pserver_endpoints()) + self._generate_role() + return len(self._get_pserver_endpoints()) - def node_num(self): + def _node_num(self): """ return the training node number """ if not self._role_is_generated: - self.generate_role() - return self._node_num + self._generate_role() + return self._nodes_num - def get_trainer_endpoints(self): + def _get_trainer_endpoints(self): """ get endpoint of all trainers """ if not self._role_is_generated: - self.generate_role() + self._generate_role() return self._worker_endpoints - def get_pserver_endpoints(self): + def _get_pserver_endpoints(self): """ get endpoint of all pservers """ if not self._role_is_generated: - self.generate_role() + self._generate_role() return self._server_endpoints def _is_non_distributed(self): @@ -635,7 +637,7 @@ class PaddleCloudRoleMaker(RoleMakerBase): (use python-run to launch fleet-code directly) """ if not self._role_is_generated: - self.generate_role() + self._generate_role() return self._non_distributed def _heter_worker_num(self): @@ -643,7 +645,7 @@ class PaddleCloudRoleMaker(RoleMakerBase): get heter worker nums """ if not self._role_is_generated: - self.generate_role() + self._generate_role() return self._heter_trainers_num def _is_heter_worker(self): @@ -651,25 +653,9 @@ class PaddleCloudRoleMaker(RoleMakerBase): whether current process is heter worker """ if not self._role_is_generated: - self.generate_role() + self._generate_role() return self._role == Role.HETER_WORKER - def _get_rank(self): - """ - get current rank in all workers and pservers - """ - if not self._role_is_generated: - self.generate_role() - return self._rank - - def _get_size(self): - """ - get total num of all workers and pservers - """ - if not self._role_is_generated: - self.generate_role() - return self._size - def _ps_env(self): try: # Environment variable PADDLE_PSERVERS_IP_PORT_LIST must be set @@ -682,7 +668,7 @@ class PaddleCloudRoleMaker(RoleMakerBase): self._trainers_num = 1 self._role = Role.WORKER self._current_id = 0 - self._node_num = 1 + self._nodes_num = 1 self._heter_trainers_num = 0 self._heter_trainer_endpoints = None self._non_distributed = True @@ -757,7 +743,7 @@ class PaddleCloudRoleMaker(RoleMakerBase): self._trainers_num = trainers_num self._role = role self._current_id = current_id - self._node_num = len( + self._nodes_num = len( set([x.split(':')[0] for x in self._worker_endpoints])) self._heter_trainers_num = heter_trainers_num self._heter_trainer_endpoints = heter_trainer_eplist @@ -776,7 +762,7 @@ class PaddleCloudRoleMaker(RoleMakerBase): self._non_distributed = True self._worker_endpoints = self._worker_endpoints.split(",") self._trainers_num = len(self._worker_endpoints) - self._node_num = len( + self._nodes_num = len( set([x.split(':')[0] for x in self._worker_endpoints])) def _gloo_init(self): @@ -832,13 +818,13 @@ class PaddleCloudRoleMaker(RoleMakerBase): self._gloo.init( rendezvous=rendezvous_type, role=self._role, - role_id=self.role_id(), - worker_num=self.worker_num(), - server_num=self.server_num(), + role_id=self._role_id(), + worker_num=self._worker_num(), + server_num=self._server_num(), need_init_all=need_init_all, kwargs=kwargs) - def generate_role(self): + def _generate_role(self): """ generate role for role maker """ @@ -874,7 +860,7 @@ class UserDefinedRoleMaker(PaddleCloudRoleMaker): self._cur_endpoint = self._worker_endpoints[self._current_id] elif self._role == Role.SERVER: self._cur_endpoint = self._server_endpoints[self._current_id] - self._node_num = len( + self._nodes_num = len( set([x.split(':')[0] for x in self._worker_endpoints])) def _user_defined_collective_env(self): @@ -882,10 +868,10 @@ class UserDefinedRoleMaker(PaddleCloudRoleMaker): self._current_id = self._kwargs.get("current_id") self._trainers_num = len(self._worker_endpoints) self._training_role = Role.WORKER - self._node_num = len( + self._nodes_num = len( set([x.split(':')[0] for x in self._worker_endpoints])) - def generate_role(self): + def _generate_role(self): """ generate role for role maker """ diff --git a/python/paddle/distributed/fleet/base/util_factory.py b/python/paddle/distributed/fleet/base/util_factory.py index e822c3c92f..efaa854c08 100644 --- a/python/paddle/distributed/fleet/base/util_factory.py +++ b/python/paddle/distributed/fleet/base/util_factory.py @@ -237,8 +237,8 @@ class UtilBase(object): if not isinstance(files, list): raise TypeError("files should be a list of file need to be read.") - trainer_id = self.role_maker.worker_index() - trainers = self.role_maker.worker_num() + trainer_id = self.role_maker._worker_index() + trainers = self.role_maker._worker_num() remainder = len(files) % trainers blocksize = int(len(files) / trainers) @@ -280,7 +280,7 @@ class UtilBase(object): fleet_util._set_role_maker(role) fleet_util.print_on_rank("I'm worker 0", 0) """ - if self.role_maker.worker_index() != rank_id: + if self.role_maker._worker_index() != rank_id: return print(message) diff --git a/python/paddle/distributed/fleet/meta_optimizers/common.py b/python/paddle/distributed/fleet/meta_optimizers/common.py index 70b010978b..8ff4114bf8 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/common.py +++ b/python/paddle/distributed/fleet/meta_optimizers/common.py @@ -57,12 +57,12 @@ class CollectiveHelper(object): if startup_program is None: self.startup_program = fluid.default_startup_program() - endpoints = self.role_maker.get_trainer_endpoints() - current_endpoint = endpoints[self.role_maker.worker_index()] + endpoints = self.role_maker._get_trainer_endpoints() + current_endpoint = endpoints[self.role_maker._worker_index()] for ring_id in range(self.nrings): self._init_communicator( self.startup_program, current_endpoint, endpoints, - self.role_maker.worker_index(), ring_id, self.wait_port) + self.role_maker._worker_index(), ring_id, self.wait_port) self._broadcast_params() def _init_communicator(self, program, current_endpoint, endpoints, rank, diff --git a/python/paddle/distributed/fleet/meta_optimizers/dgc_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/dgc_optimizer.py index 3f6ed1ed2f..6806a479d3 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/dgc_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/dgc_optimizer.py @@ -47,7 +47,7 @@ class DGCOptimizer(MetaOptimizerBase): sparsity=configs['sparsity'], parameter_list=opt._parameter_list, use_nesterov=opt._use_nesterov, - num_trainers=self.role_maker.worker_num(), + num_trainers=self.role_maker._worker_num(), regularization=opt.regularization, grad_clip=opt._grad_clip, name=opt._name) @@ -60,7 +60,7 @@ class DGCOptimizer(MetaOptimizerBase): if not isinstance(self.inner_opt, Momentum): logging.warn("dgc only works on Momentum optimizer") return False - if self.role_maker.worker_num() <= 1: + if self.role_maker._worker_num() <= 1: logging.warn("dgc only works on multi cards") return False diff --git a/python/paddle/distributed/fleet/meta_optimizers/graph_execution_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/graph_execution_optimizer.py index 6c1cc3d7a9..0ad9e5680e 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/graph_execution_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/graph_execution_optimizer.py @@ -50,12 +50,12 @@ class GraphExecutionOptimizer(MetaOptimizerBase): # should fix the variable def _setup_nccl_op(self, startup_program, main_program, build_strategy): - trainer_endpoints = self.role_maker.get_trainer_endpoints() + trainer_endpoints = self.role_maker._get_trainer_endpoints() trainers = trainer_endpoints - trainer_id = self.role_maker.worker_index() - current_endpoint = self.role_maker.get_trainer_endpoints()[trainer_id] + trainer_id = self.role_maker._worker_index() + current_endpoint = self.role_maker._get_trainer_endpoints()[trainer_id] trainer_endpoints_env = ",".join(trainer_endpoints) - trainers_num = self.role_maker.worker_num() + trainers_num = self.role_maker._worker_num() nccl_id_var = startup_program.global_block().create_var( name="NCCLID", persistable=True, type=core.VarDesc.VarType.RAW) for i in range(1, build_strategy.nccl_comm_num): @@ -127,8 +127,8 @@ class GraphExecutionOptimizer(MetaOptimizerBase): local_build_strategy.enable_sequential_execution = True exe_strategy = self.user_defined_strategy.execution_strategy - worker_num = self.role_maker.worker_num() - node_num = self.role_maker.node_num() + worker_num = self.role_maker._worker_num() + node_num = self.role_maker._node_num() if self.role_maker._is_collective: assert worker_num >= 1, "nccl2 worker_num must >= 1, now:{}" % worker_num @@ -170,9 +170,9 @@ class GraphExecutionOptimizer(MetaOptimizerBase): # TODO(guru4elephant): should be an independent optimizer self._setup_nccl_op(startup_program, main_program, local_build_strategy) - local_build_strategy.num_trainers = self.role_maker.worker_num() - local_build_strategy.trainer_id = self.role_maker.worker_index() - local_build_strategy.trainers_endpoints = self.role_maker.get_trainer_endpoints( + local_build_strategy.num_trainers = self.role_maker._worker_num() + local_build_strategy.trainer_id = self.role_maker._worker_index() + local_build_strategy.trainers_endpoints = self.role_maker._get_trainer_endpoints( ) local_build_strategy.enable_backward_optimizer_op_deps = True diff --git a/python/paddle/distributed/fleet/meta_optimizers/localsgd_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/localsgd_optimizer.py index 4ebac20888..9f094978d8 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/localsgd_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/localsgd_optimizer.py @@ -38,7 +38,7 @@ class LocalSGDOptimizer(MetaOptimizerBase): if not self.user_defined_strategy.localsgd: return False - if self.role_maker.worker_num() <= 1: + if self.role_maker._worker_num() <= 1: return False return isinstance(self.inner_opt, paddle.optimizer.momentum.Momentum) \ @@ -168,7 +168,7 @@ class LocalSGDOptimizer(MetaOptimizerBase): inputs={'X': [param]}, outputs={'Out': [param]}, attrs={ - 'scale': 1.0 / self.role_maker.worker_num(), + 'scale': 1.0 / self.role_maker._worker_num(), OP_ROLE_KEY: OpRole.Optimize }) sub_block.append_op( @@ -208,7 +208,7 @@ class AdaptiveLocalSGDOptimizer(MetaOptimizerBase): if not self.user_defined_strategy.adaptive_localsgd: return False - if self.role_maker.worker_num() <= 1: + if self.role_maker._worker_num() <= 1: return False return isinstance(self.inner_opt, paddle.optimizer.momentum.Momentum) \ @@ -275,7 +275,7 @@ class AdaptiveLocalSGDOptimizer(MetaOptimizerBase): inputs={'X': [avg_loss]}, outputs={'Out': [avg_loss]}, attrs={ - 'scale': 1.0 / self.role_maker.worker_num(), + 'scale': 1.0 / self.role_maker._worker_num(), OP_ROLE_KEY: OpRole.Optimize }) @@ -398,7 +398,7 @@ class AdaptiveLocalSGDOptimizer(MetaOptimizerBase): inputs={'X': [param]}, outputs={'Out': [param]}, attrs={ - 'scale': 1.0 / self.role_maker.worker_num(), + 'scale': 1.0 / self.role_maker._worker_num(), OP_ROLE_KEY: OpRole.Optimize }) sub_block.append_op( diff --git a/python/paddle/distributed/fleet/meta_optimizers/parameter_server_graph_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/parameter_server_graph_optimizer.py index 7dc532c86e..dfa765364f 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/parameter_server_graph_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/parameter_server_graph_optimizer.py @@ -31,7 +31,7 @@ class ParameterServerGraphOptimizer(ParameterServerOptimizer): if k_steps < 0: return False - if self.role_maker.is_server(): + if self.role_maker._is_server(): return False if self.role_maker._is_heter_parameter_server_mode: diff --git a/python/paddle/distributed/fleet/meta_optimizers/parameter_server_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/parameter_server_optimizer.py index 51d4d34316..38ad41f883 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/parameter_server_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/parameter_server_optimizer.py @@ -239,10 +239,10 @@ class ParameterServerOptimizer(MetaOptimizerBase): strategy, self.role_maker) compiled_config.strategy = strategy - if self.role_maker.is_worker() or self.role_maker._is_heter_worker(): + if self.role_maker._is_worker() or self.role_maker._is_heter_worker(): main_program, startup_program = self._build_trainer_programs( compiled_config) - elif self.role_maker.is_server(): + elif self.role_maker._is_server(): main_program, startup_program = self._build_pserver_programs( compiled_config) diff --git a/python/paddle/distributed/fleet/meta_optimizers/pipeline_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/pipeline_optimizer.py index 87fa707791..889fec838e 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/pipeline_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/pipeline_optimizer.py @@ -126,11 +126,11 @@ class PipelineOptimizer(MetaOptimizerBase): optimize_ops, params_grads, prog_list = \ self.wrapped_opt.minimize(loss, startup_program, parameter_list, no_grad_set) - if self.role_maker.worker_num() == 1: + if self.role_maker._worker_num() == 1: return optimize_ops, params_grads - endpoints = self.role_maker.get_trainer_endpoints() - current_endpoint = endpoints[self.role_maker.worker_index()] + endpoints = self.role_maker._get_trainer_endpoints() + current_endpoint = endpoints[self.role_maker._worker_index()] self.startup_program = startup_program if startup_program is None: self.startup_program = fluid.default_startup_program() @@ -142,7 +142,7 @@ class PipelineOptimizer(MetaOptimizerBase): self.nranks = nranks self.nrings = len(self.main_program_list) - self.rank = self.role_maker.worker_index() + self.rank = self.role_maker._worker_index() self.endpoints = endpoints self.current_endpoint = current_endpoint diff --git a/python/paddle/distributed/fleet/runtime/parameter_server_runtime.py b/python/paddle/distributed/fleet/runtime/parameter_server_runtime.py index 227f8f6021..ae5c53b8a3 100644 --- a/python/paddle/distributed/fleet/runtime/parameter_server_runtime.py +++ b/python/paddle/distributed/fleet/runtime/parameter_server_runtime.py @@ -104,9 +104,9 @@ class ParameterServerRuntime(RuntimeBase): def _init_worker(self): def sync_strategy_envs(): kwargs = {} - kwargs["pserver_endpoints"] = self.role_maker.get_pserver_endpoints( - ) - kwargs["trainer_id"] = self.role_maker.worker_index() + kwargs[ + "pserver_endpoints"] = self.role_maker._get_pserver_endpoints() + kwargs["trainer_id"] = self.role_maker._worker_index() return kwargs def geo_strategy_envs(): @@ -150,7 +150,7 @@ class ParameterServerRuntime(RuntimeBase): return "#".join(init_attrs) kwargs = {} - kwargs["trainers"] = self.role_maker.worker_num() + kwargs["trainers"] = self.role_maker._worker_num() kwargs["sparse_attrs"] = get_sparse_attrs() return kwargs @@ -338,7 +338,7 @@ class ParameterServerRuntime(RuntimeBase): block.append_op( type='recv_save', attrs={ - "trainer_id": self.role_maker.worker_index(), + "trainer_id": self.role_maker._worker_index(), "shape": var.shape, "slice_shapes": [",".join([str(i) for i in var.shape])], @@ -378,14 +378,15 @@ class ParameterServerRuntime(RuntimeBase): block.append_op( type='recv_save', attrs={ - "trainer_id": self.role_maker.worker_index(), + "trainer_id": self.role_maker._worker_index(), "shape": var.shape, "slice_shapes": slice_shapes, "slice_varnames": var_ctx.split_varnames(), "remote_varnames": var_ctx.split_varnames(), "is_sparse": True, "endpoints": var_ctx.split_endpoints(), - "pserver_num": len(self.role_maker.get_pserver_endpoints()), + "pserver_num": + len(self.role_maker._get_pserver_endpoints()), "file_path": os.path.join(dirname, var.name) }) @@ -403,7 +404,7 @@ class ParameterServerRuntime(RuntimeBase): block.append_op( type='recv_save', attrs={ - "trainer_id": self.role_maker.worker_index(), + "trainer_id": self.role_maker._worker_index(), "shape": var.shape, "slice_shapes": slice_shapes, "slice_varnames": slice_varnames, @@ -411,7 +412,7 @@ class ParameterServerRuntime(RuntimeBase): "is_sparse": True, "endpoints": var_ctx.split_endpoints(), "pserver_num": - len(self.role_maker.get_pserver_endpoints()), + len(self.role_maker._get_pserver_endpoints()), "file_path": os.path.join(dirname, var.name) }) @@ -422,7 +423,7 @@ class ParameterServerRuntime(RuntimeBase): block.append_op( type='recv_save', attrs={ - "trainer_id": self.role_maker.worker_index(), + "trainer_id": self.role_maker._worker_index(), "shape": var.shape, "slice_shapes": [",".join([str(i) for i in var.shape])], diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py b/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py index 216478479a..e348c67ae0 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py @@ -170,22 +170,40 @@ class CompileTimeStrategy(object): return trainer.mode == DistributedMode.ASYNC def get_role_id(self): - return self.role_maker.role_id() + try: + return self.role_maker._role_id() + except Exception: + return self.role_maker.role_id() def get_trainers(self): - return self.role_maker.worker_num() + try: + return self.role_maker._worker_num() + except Exception: + return self.role_maker.worker_num() def get_ps_endpoint(self): - return self.role_maker.get_pserver_endpoints()[self.get_role_id()] + try: + return self.role_maker._get_pserver_endpoints()[self.get_role_id()] + except Exception: + return self.role_maker.get_pserver_endpoints()[self.get_role_id()] def get_ps_endpoints(self): - return self.role_maker.get_pserver_endpoints() + try: + return self.role_maker._get_pserver_endpoints() + except Exception: + return self.role_maker.get_pserver_endpoints() def get_heter_worker_endpoints(self): - return self.role_maker._get_heter_worker_endpoints() + try: + return self.role_maker._get_heter_worker_endpoints() + except Exception: + return self.role_maker.get_heter_worker_endpoints() def get_heter_worker_endpoint(self): - return self.role_maker._get_heter_worker_endpoint() + try: + return self.role_maker._get_heter_worker_endpoint() + except Exception: + return self.role_maker.get_heter_worker_endpoint() def get_origin_programs(self): return self.origin_main_program, self.origin_startup_program diff --git a/python/paddle/fluid/tests/unittests/test_fleet_base.py b/python/paddle/fluid/tests/unittests/test_fleet_base.py index 3a90b363f2..45597e7253 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_base.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_base.py @@ -24,10 +24,10 @@ import numpy as np class TestFleetBase(unittest.TestCase): def setUp(self): os.environ["POD_IP"] = "127.0.0.1" - os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001" + os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36000" os.environ["PADDLE_TRAINERS_NUM"] = "2" os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = \ - "127.0.0.1:36001,127.0.0.2:36001" + "127.0.0.1:36001,127.0.0.2:36002" def test_init(self): role = role_maker.PaddleCloudRoleMaker(is_collective=True) @@ -58,32 +58,51 @@ class TestFleetBase(unittest.TestCase): def test_worker_endpoints(self): role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) - print(fleet.worker_endpoints(to_string=True)) + self.assertEqual( + "127.0.0.1:36000", fleet.worker_endpoints(to_string=True)) + self.assertEqual(["127.0.0.1:36000"], fleet.worker_endpoints()) def test_server_num(self): - role = role_maker.PaddleCloudRoleMaker(is_collective=True) + os.environ["TRAINING_ROLE"] = "PSERVER" + os.environ["PADDLE_PORT"] = "36001" + os.environ["POD_IP"] = "127.0.0.1" + + role = role_maker.PaddleCloudRoleMaker() fleet.init(role) - if fleet.is_server(): - print("fleet server num: {}".format(fleet.server_num())) + os.environ["PADDLE_TRAINERS_NUM"] = "2" + self.assertEqual(2, fleet.server_num()) def test_server_index(self): - role = role_maker.PaddleCloudRoleMaker(is_collective=True) + os.environ["TRAINING_ROLE"] = "PSERVER" + os.environ["PADDLE_PORT"] = "36001" + os.environ["POD_IP"] = "127.0.0.1" + + role = role_maker.PaddleCloudRoleMaker() fleet.init(role) - if fleet.is_server(): - print("fleet server index: {}".format(fleet.server_index())) + self.assertEqual(0, fleet.server_index()) def test_server_endpoints(self): - role = role_maker.PaddleCloudRoleMaker(is_collective=True) + os.environ["TRAINING_ROLE"] = "PSERVER" + os.environ["PADDLE_PORT"] = "36001" + os.environ["POD_IP"] = "127.0.0.1" + + role = role_maker.PaddleCloudRoleMaker() fleet.init(role) if fleet.is_server(): - print("fleet server index: {}".format( - fleet.server_endpoints(to_string=True))) + self.assertEqual( + "127.0.0.1:36001,127.0.0.2:36002", + fleet.server_endpoints(to_string=True)) + self.assertEqual(["127.0.0.1:36001", "127.0.0.2:36002"], + fleet.server_endpoints()) def test_is_server(self): - role = role_maker.PaddleCloudRoleMaker(is_collective=True) + os.environ["TRAINING_ROLE"] = "PSERVER" + os.environ["PADDLE_PORT"] = "36001" + os.environ["POD_IP"] = "127.0.0.1" + + role = role_maker.PaddleCloudRoleMaker() fleet.init(role) - if fleet.is_server(): - print("test fleet is server") + self.assertTrue(fleet.is_server()) def test_util(self): role = role_maker.PaddleCloudRoleMaker(is_collective=True) diff --git a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_2.py b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_2.py index a831f6e838..dae7907161 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_2.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_2.py @@ -87,7 +87,7 @@ class TestCloudRoleMaker2(unittest.TestCase): role2._all_gather(1) role2._all_gather(1) role2._barrier_server() - role2.all_gather(1) + role2._all_gather(1) role3 = GeneralRoleMaker(path="./test_gloo_3") role3._worker_gather(1) role3._worker_gather(1) diff --git a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_new.py b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_new.py index d786fa1eba..4dd254af25 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_new.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_new.py @@ -30,19 +30,19 @@ class TestRoleMakerBase(unittest.TestCase): def test_rolemaker_base(self): role = role_maker.RoleMakerBase() - self.assertRaises(Exception, role.is_worker) - self.assertRaises(Exception, role.is_server) - self.assertRaises(Exception, role.is_first_worker) - self.assertRaises(Exception, role.worker_num) - self.assertRaises(Exception, role.server_num) - self.assertRaises(Exception, role.worker_index) - self.assertRaises(Exception, role.server_index) - self.assertRaises(Exception, role.role_id) - self.assertRaises(Exception, role.node_num) - - trainer_endpoints = role.get_trainer_endpoints() + self.assertRaises(Exception, role._is_worker) + self.assertRaises(Exception, role._is_server) + self.assertRaises(Exception, role._is_first_worker) + self.assertRaises(Exception, role._worker_num) + self.assertRaises(Exception, role._server_num) + self.assertRaises(Exception, role._worker_index) + self.assertRaises(Exception, role._server_index) + self.assertRaises(Exception, role._role_id) + self.assertRaises(Exception, role._node_num) + + trainer_endpoints = role._get_trainer_endpoints() self.assertTrue(len(trainer_endpoints) == 0) - pserver_endpoints = role.get_pserver_endpoints() + pserver_endpoints = role._get_pserver_endpoints() self.assertTrue(len(pserver_endpoints) == 0) print(role.to_string()) @@ -77,20 +77,32 @@ class TestCloudRoleMaker(unittest.TestCase): return ro = role_maker.PaddleCloudRoleMaker(is_collective=False) - - self.assertTrue(ro.is_worker()) - self.assertFalse(ro.is_server()) - self.assertEqual(ro.worker_num(), 2) - self.assertTrue(ro.is_first_worker()) - worker_endpoints = ro.get_trainer_endpoints() + self.assertTrue(ro._is_worker()) + ro = role_maker.PaddleCloudRoleMaker(is_collective=False) + self.assertFalse(ro._is_server()) + ro = role_maker.PaddleCloudRoleMaker(is_collective=False) + self.assertEqual(ro._worker_num(), 2) + ro = role_maker.PaddleCloudRoleMaker(is_collective=False) + self.assertTrue(ro._is_first_worker()) + ro = role_maker.PaddleCloudRoleMaker(is_collective=False) + worker_endpoints = ro._get_trainer_endpoints() self.assertEqual(worker_endpoints[0], '127.0.0.1:36001') - self.assertEqual(ro.role_id(), 0) - self.assertEqual(ro.node_num(), 2) + ro = role_maker.PaddleCloudRoleMaker(is_collective=False) + self.assertEqual(ro._role_id(), 0) + ro = role_maker.PaddleCloudRoleMaker(is_collective=False) + self.assertEqual(ro._node_num(), 2) + ro = role_maker.PaddleCloudRoleMaker(is_collective=False) + self.assertFalse(ro._is_non_distributed()) + ro = role_maker.PaddleCloudRoleMaker(is_collective=False) + self.assertEqual(ro._heter_worker_num(), 0) + ro = role_maker.PaddleCloudRoleMaker(is_collective=False) + self.assertFalse(ro._is_heter_worker()) def test_tr_rolemaker_collective(self): ro = role_maker.PaddleCloudRoleMaker(is_collective=True) - self.assertEqual(ro.worker_num(), 2) - self.assertEqual(ro.node_num(), 2) + self.assertEqual(ro._worker_num(), 2) + ro = role_maker.PaddleCloudRoleMaker(is_collective=True) + self.assertEqual(ro._node_num(), 2) def test_ps_rolemaker(self): """Test ps rolemaker.""" @@ -106,11 +118,11 @@ class TestCloudRoleMaker(unittest.TestCase): ro = role_maker.PaddleCloudRoleMaker( is_collective=False, init_gloo=False) - self.assertEqual(ro.server_index(), 0) - self.assertFalse(ro.is_worker()) - self.assertTrue(ro.is_server()) - self.assertEqual(ro.server_num(), 2) - pserver_endpoints = ro.get_pserver_endpoints() + self.assertEqual(ro._server_index(), 0) + self.assertFalse(ro._is_worker()) + self.assertTrue(ro._is_server()) + self.assertEqual(ro._server_num(), 2) + pserver_endpoints = ro._get_pserver_endpoints() self.assertEqual(pserver_endpoints[0], '127.0.0.1:36001') self.assertEqual(ro._all_gather(1, "worker"), 1) @@ -126,7 +138,7 @@ class TestCloudRoleMaker(unittest.TestCase): return ro = role_maker.PaddleCloudRoleMaker(is_collective=False) - self.assertRaises(ValueError, ro.generate_role) + self.assertRaises(ValueError, ro._generate_role) class TestUserDefinedRoleMaker(unittest.TestCase): @@ -151,10 +163,10 @@ class TestUserDefinedRoleMaker(unittest.TestCase): role=role_maker.Role.SERVER, current_id=0, worker_num=2) - self.assertEqual(ro.server_num(), 2) - ro.generate_role() - self.assertTrue(ro.is_server()) - self.assertEqual(ro.role_id(), 0) + self.assertEqual(ro._server_num(), 2) + ro._generate_role() + self.assertTrue(ro._is_server()) + self.assertEqual(ro._role_id(), 0) def test_tr_rolemaker(self): try: @@ -171,9 +183,9 @@ class TestUserDefinedRoleMaker(unittest.TestCase): current_id=0, worker_num=2) - self.assertIn("127.0.0.1:36001", ro.get_pserver_endpoints()) - self.assertTrue(ro.is_worker()) - self.assertEqual(ro.role_id(), 0) + self.assertIn("127.0.0.1:36001", ro._get_pserver_endpoints()) + self.assertTrue(ro._is_worker()) + self.assertEqual(ro._role_id(), 0) class TestGlooWithCloudRoleMaker(unittest.TestCase): @@ -216,7 +228,7 @@ class TestGlooWithCloudRoleMaker(unittest.TestCase): os.environ["PADDLE_GLOO_FS_PATH"] = tmp role = role_maker.PaddleCloudRoleMaker() - role.generate_role() + role._generate_role() self.case(role, "worker") self.clean(tmp) @@ -234,7 +246,7 @@ class TestGlooWithCloudRoleMaker(unittest.TestCase): os.environ["PADDLE_GLOO_FS_PATH"] = tmp role = role_maker.PaddleCloudRoleMaker() - role.generate_role() + role._generate_role() self.case(role, "worker") self.clean(tmp) @@ -256,7 +268,7 @@ class TestGlooWithCloudRoleMaker(unittest.TestCase): os.environ["PADDLE_GLOO_FS_PATH"] = tmp role = role_maker.PaddleCloudRoleMaker() - role.generate_role() + role._generate_role() self.case(role, "server") self.clean(tmp) @@ -280,7 +292,7 @@ class TestGlooWithCloudRoleMaker(unittest.TestCase): os.environ["PADDLE_GLOO_FS_PATH"] = tmp role = role_maker.PaddleCloudRoleMaker() - role.generate_role() + role._generate_role() self.case(role, "server") self.clean(tmp) @@ -302,7 +314,7 @@ class TestGlooWithCloudRoleMaker(unittest.TestCase): os.environ["PADDLE_GLOO_HTTP_PORT"] = "30019" role = role_maker.PaddleCloudRoleMaker() - role.generate_role() + role._generate_role() import time time.sleep(3) @@ -326,7 +338,7 @@ class TestGlooWithCloudRoleMaker(unittest.TestCase): os.environ["PADDLE_GLOO_FS_PATH"] = tmp role = role_maker.PaddleCloudRoleMaker() - role.generate_role() + role._generate_role() self.case(role, "server") self.case(role, "all") self.clean(tmp) @@ -354,7 +366,7 @@ class TestGlooWithCloudRoleMaker(unittest.TestCase): os.environ["PADDLE_GLOO_FS_PATH"] = tmp role = role_maker.PaddleCloudRoleMaker() - role.generate_role() + role._generate_role() self.case(role, "server") self.case(role, "all") self.clean(tmp) @@ -377,7 +389,323 @@ class TestGlooWithCloudRoleMaker(unittest.TestCase): os.environ["PADDLE_GLOO_RENDEZVOUS"] = "5" role = role_maker.PaddleCloudRoleMaker() - self.assertRaises(ValueError, role.generate_role) + self.assertRaises(ValueError, role._generate_role) + + def test_fs_gloo8(self): + plats = platform.platform() + if 'Linux' not in plats: + print("skip gloo UT on MacOS/Win") + return + + tmp = self.mkdir() + + os.environ["TRAINING_ROLE"] = "PSERVER" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + os.environ["PADDLE_TRAINERS_NUM"] = "0" + + os.environ["SYS_JOB_ID"] = "gloo_for_cluster" + + os.environ["PADDLE_WITH_GLOO"] = "2" + os.environ["PADDLE_GLOO_RENDEZVOUS"] = "1" + os.environ["PADDLE_GLOO_FS_NAME"] = "NULL" + os.environ["PADDLE_GLOO_FS_UGI"] = "NULL" + os.environ["PADDLE_GLOO_FS_PATH"] = tmp + + def net(): + x = paddle.fluid.layers.data(name='x', shape=[13], dtype='float32') + y_predict = paddle.fluid.layers.fc(input=x, size=1, act=None) + y = paddle.fluid.layers.data(name='y', shape=[1], dtype='float32') + cost = paddle.fluid.layers.square_error_cost( + input=y_predict, label=y) + avg_cost = paddle.fluid.layers.mean(cost) + return avg_cost + + from paddle.distributed import fleet + + role = role_maker.PaddleCloudRoleMaker() + fleet.init(role) + avg_cost = net() + + strategy = paddle.distributed.fleet.DistributedStrategy() + strategy.a_sync = False + + optimizer = paddle.optimizer.SGD(0.01) + optimizer = fleet.distributed_optimizer(optimizer, strategy) + optimizer.minimize(avg_cost) + + comm_world = "server" + fleet.util().barrier(comm_world) + + gather = fleet.util().all_gather(1, comm_world) + self.assertEqual(gather[0], 1) + + all_reduce = fleet.util().all_reduce(1, "sum", comm_world) + self.assertEqual(1, all_reduce) + + self.clean(tmp) + + +class TestGlooWithCloudRoleMaker(unittest.TestCase): + def setUp(self): + os.environ["PADDLE_TRAINERS_NUM"] = "1" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001" + os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_TRAINER_ID"] = "0" + + def case(self, role, comm_world): + role._barrier(comm_world) + + gather = role._all_gather(1, comm_world) + self.assertEqual(gather[0], 1) + + all_reduce = role._all_reduce(1, "sum", comm_world) + self.assertEqual(1, all_reduce) + + def mkdir(self): + tmp = tempfile.mkdtemp() + return tmp + + def clean(self, tmp): + shutil.rmtree(tmp) + + def test_hdfs_gloo(self): + plats = platform.platform() + if 'Linux' not in plats: + print("skip gloo UT on MacOS/Win") + return + + tmp = self.mkdir() + os.environ["TRAINING_ROLE"] = "TRAINER" + os.environ["SYS_JOB_ID"] = "gloo_for_cluster" + os.environ["PADDLE_WITH_GLOO"] = "1" + os.environ["PADDLE_GLOO_RENDEZVOUS"] = "1" + os.environ["PADDLE_GLOO_FS_NAME"] = "NULL" + os.environ["PADDLE_GLOO_FS_UGI"] = "NULL" + os.environ["PADDLE_GLOO_FS_PATH"] = tmp + + role = role_maker.PaddleCloudRoleMaker() + role._generate_role() + self.case(role, "worker") + self.clean(tmp) + + def test_fs_gloo(self): + plats = platform.platform() + if 'Linux' not in plats: + print("skip gloo UT on MacOS/Win") + return + + tmp = self.mkdir() + os.environ["TRAINING_ROLE"] = "TRAINER" + os.environ["SYS_JOB_ID"] = "gloo_for_cluster" + os.environ["PADDLE_WITH_GLOO"] = "1" + os.environ["PADDLE_GLOO_RENDEZVOUS"] = "2" + os.environ["PADDLE_GLOO_FS_PATH"] = tmp + + role = role_maker.PaddleCloudRoleMaker() + role._generate_role() + self.case(role, "worker") + self.clean(tmp) + + def test_fs_gloo2(self): + plats = platform.platform() + if 'Linux' not in plats: + print("skip gloo UT on MacOS/Win") + return + + tmp = self.mkdir() + os.environ["TRAINING_ROLE"] = "PSERVER" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + + os.environ["SYS_JOB_ID"] = "gloo_for_cluster" + os.environ["PADDLE_WITH_GLOO"] = "1" + os.environ["PADDLE_GLOO_RENDEZVOUS"] = "2" + os.environ["PADDLE_GLOO_FS_PATH"] = tmp + + role = role_maker.PaddleCloudRoleMaker() + role._generate_role() + self.case(role, "server") + self.clean(tmp) + + def test_fs_gloo3(self): + plats = platform.platform() + if 'Linux' not in plats: + print("skip gloo UT on MacOS/Win") + return + + tmp = self.mkdir() + os.environ["TRAINING_ROLE"] = "PSERVER" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + + os.environ["SYS_JOB_ID"] = "gloo_for_cluster" + os.environ["PADDLE_WITH_GLOO"] = "1" + os.environ["PADDLE_GLOO_RENDEZVOUS"] = "1" + os.environ["PADDLE_GLOO_FS_NAME"] = "NULL" + os.environ["PADDLE_GLOO_FS_UGI"] = "NULL" + os.environ["PADDLE_GLOO_FS_PATH"] = tmp + + role = role_maker.PaddleCloudRoleMaker() + role._generate_role() + self.case(role, "server") + self.clean(tmp) + + def test_fs_gloo4(self): + plats = platform.platform() + if 'Linux' not in plats: + print("skip gloo UT on MacOS/Win") + return + + os.environ["TRAINING_ROLE"] = "PSERVER" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + + os.environ["SYS_JOB_ID"] = "gloo_for_cluster" + os.environ["PADDLE_WITH_GLOO"] = "1" + os.environ["PADDLE_GLOO_RENDEZVOUS"] = "3" + os.environ["PADDLE_GLOO_HTTP_HOST"] = "127.0.0.1" + os.environ["PADDLE_GLOO_HTTP_PORT"] = "30019" + + role = role_maker.PaddleCloudRoleMaker() + role._generate_role() + import time + time.sleep(3) + + def test_fs_gloo5(self): + plats = platform.platform() + if 'Linux' not in plats: + print("skip gloo UT on MacOS/Win") + return + + tmp = self.mkdir() + + os.environ["TRAINING_ROLE"] = "PSERVER" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + os.environ["PADDLE_TRAINERS_NUM"] = "0" + + os.environ["SYS_JOB_ID"] = "gloo_for_cluster" + os.environ["PADDLE_WITH_GLOO"] = "2" + os.environ["PADDLE_GLOO_RENDEZVOUS"] = "2" + os.environ["PADDLE_GLOO_FS_PATH"] = tmp + + role = role_maker.PaddleCloudRoleMaker() + role._generate_role() + self.case(role, "server") + self.case(role, "all") + self.clean(tmp) + + def test_fs_gloo6(self): + plats = platform.platform() + if 'Linux' not in plats: + print("skip gloo UT on MacOS/Win") + return + + tmp = self.mkdir() + + os.environ["TRAINING_ROLE"] = "PSERVER" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + os.environ["PADDLE_TRAINERS_NUM"] = "0" + + os.environ["SYS_JOB_ID"] = "gloo_for_cluster" + + os.environ["PADDLE_WITH_GLOO"] = "2" + os.environ["PADDLE_GLOO_RENDEZVOUS"] = "1" + os.environ["PADDLE_GLOO_FS_NAME"] = "NULL" + os.environ["PADDLE_GLOO_FS_UGI"] = "NULL" + os.environ["PADDLE_GLOO_FS_PATH"] = tmp + + role = role_maker.PaddleCloudRoleMaker() + role._generate_role() + self.case(role, "server") + self.case(role, "all") + self.clean(tmp) + + def test_fs_gloo7(self): + plats = platform.platform() + if 'Linux' not in plats: + print("skip gloo UT on MacOS/Win") + return + + os.environ["TRAINING_ROLE"] = "PSERVER" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + os.environ["PADDLE_TRAINERS_NUM"] = "0" + + os.environ["SYS_JOB_ID"] = "gloo_for_cluster" + + os.environ["PADDLE_WITH_GLOO"] = "1" + os.environ["PADDLE_GLOO_RENDEZVOUS"] = "5" + + role = role_maker.PaddleCloudRoleMaker() + self.assertRaises(ValueError, role._generate_role) + + def test_hdfs_gloo_v2(self): + plats = platform.platform() + if 'Linux' not in plats: + print("skip gloo UT on MacOS/Win") + return + + os.environ["TRAINING_ROLE"] = "TRAINER" + os.environ["SYS_JOB_ID"] = "gloo_for_cluster" + os.environ["PADDLE_WITH_GLOO"] = "1" + os.environ["PADDLE_GLOO_RENDEZVOUS"] = "1" + os.environ["PADDLE_GLOO_FS_NAME"] = "" + os.environ["PADDLE_GLOO_FS_UGI"] = "" + os.environ["PADDLE_GLOO_FS_PATH"] = "" + + role = role_maker.PaddleCloudRoleMaker() + self.assertRaises(ValueError, role._generate_role) + + def test_fs_gloo_v2(self): + plats = platform.platform() + if 'Linux' not in plats: + print("skip gloo UT on MacOS/Win") + return + + os.environ["TRAINING_ROLE"] = "PSERVER" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + os.environ["PADDLE_TRAINERS_NUM"] = "0" + + os.environ["SYS_JOB_ID"] = "gloo_for_cluster" + os.environ["PADDLE_WITH_GLOO"] = "1" + os.environ["PADDLE_GLOO_RENDEZVOUS"] = "2" + os.environ["PADDLE_GLOO_FS_PATH"] = "" + + role = role_maker.PaddleCloudRoleMaker() + self.assertRaises(ValueError, role._generate_role) + + def test_http_gloo_v2(self): + plats = platform.platform() + if 'Linux' not in plats: + print("skip gloo UT on MacOS/Win") + return + + os.environ["TRAINING_ROLE"] = "PSERVER" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + + os.environ["SYS_JOB_ID"] = "gloo_for_cluster" + os.environ["PADDLE_WITH_GLOO"] = "1" + os.environ["PADDLE_GLOO_RENDEZVOUS"] = "3" + os.environ["PADDLE_GLOO_HTTP_HOST"] = "" + os.environ["PADDLE_GLOO_HTTP_PORT"] = "" + + role = role_maker.PaddleCloudRoleMaker() + self.assertRaises(ValueError, role._generate_role) def test_fs_gloo8(self): plats = platform.platform() -- GitLab