From 9c17a899d7c31a8079eb9aee4c03763f3b7f7a60 Mon Sep 17 00:00:00 2001 From: guru4elephant <35550832+guru4elephant@users.noreply.github.com> Date: Wed, 10 Jul 2019 08:27:14 +0800 Subject: [PATCH] upgrade collective fleet api (#18533) * upgrade collective fleet api --- .../fluid/incubate/fleet/base/role_maker.py | 97 ++++------- .../incubate/fleet/collective/__init__.py | 159 ++++++------------ 2 files changed, 91 insertions(+), 165 deletions(-) diff --git a/python/paddle/fluid/incubate/fleet/base/role_maker.py b/python/paddle/fluid/incubate/fleet/base/role_maker.py index 07b4f9df23..046015503f 100644 --- a/python/paddle/fluid/incubate/fleet/base/role_maker.py +++ b/python/paddle/fluid/incubate/fleet/base/role_maker.py @@ -103,46 +103,6 @@ class RoleMakerBase(object): return self._server_endpoints -class MultiProcessRoleMaker(RoleMakerBase): - """ - MultiProcessRoleMaker is a default role maker for multi-process - GPU training. It works with paddle.distributed.lanuch.py by-design - """ - - def __init__(self): - super(MultiProcessRoleMaker, self).__init__() - self._role_is_generated = False - - def generate_role(self): - import os - if not self._role_is_generated: - self._current_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) - self._num_trainers = 1 - self._training_role = os.getenv("PADDLE_TRAINING_ROLE", "TRAINER") - assert (self._training_role == "TRAINER") - self._worker_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS") - self._current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT") - if self._worker_endpoints: - self._worker_endpoints = self._worker_endpoints.split(",") - self._num_trainers = len(self._worker_endpoints) - self._role_is_generated = True - - def is_worker(self): - return True - - def is_server(self): - return False - - def is_first_worker(self): - return self._current_id == 0 - - def worker_index(self): - return self._current_id - - def worker_num(self): - return self._worker_num - - class MPIRoleMaker(RoleMakerBase): """ MPIRoleMaker is a MPI-API based role maker which is a counter-part of K8SRoleMaker @@ -361,34 +321,47 @@ class MPISymetricRoleMaker(MPIRoleMaker): class PaddleCloudRoleMaker(RoleMakerBase): - def __init__(self): + def __init__(self, is_collective=False): super(PaddleCloudRoleMaker, self).__init__() self._role_is_generated = False + self._is_collective = is_collective def generate_role(self): if not self._role_is_generated: - self.port = os.getenv("PADDLE_PORT", "6174") - self.pserver_ips = os.getenv("PADDLE_PSERVERS", "") - eplist = [] - for ip in self.pserver_ips.split(","): - eplist.append(':'.join([ip, self.port])) - self.endpoints = ",".join(eplist) - self._trainers = int(os.getenv("PADDLE_TRAINERS_NUM", "1")) - self.current_endpoint = os.getenv("POD_IP", - "localhost") + ":" + self.port - self.role = os.getenv("TRAINING_ROLE", "TRAINER") - self.trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) - self.eplist = eplist - print("PaddleCloudRoleMaker() endpoints: %s" % self.endpoints) - self.endpoints = self.endpoints.split(",") - self._server_endpoints = self.endpoints - self._worker_endpoints = self.endpoints - if self.role.upper() == "PSERVER": - self._current_id = self.endpoints.index(self.current_endpoint) - self._role = Role.SERVER + if not self._is_collective: + self.port = os.getenv("PADDLE_PORT", "6174") + self.pserver_ips = os.getenv("PADDLE_PSERVERS", "") + + eplist = [] + for ip in self.pserver_ips.split(","): + eplist.append(':'.join([ip, self.port])) + self.endpoints = ",".join(eplist) + self._trainers = int(os.getenv("PADDLE_TRAINERS_NUM", "1")) + self.current_endpoint = os.getenv("POD_IP", + "localhost") + ":" + self.port + self.role = os.getenv("TRAINING_ROLE", "TRAINER") + self.trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) + self.eplist = eplist + self.endpoints = self.endpoints.split(",") + self._server_endpoints = self.endpoints + self._worker_endpoints = self.endpoints + if self.role.upper() == "PSERVER": + self._current_id = self.endpoints.index( + self.current_endpoint) + self._role = Role.SERVER + else: + self._current_id = self.trainer_id + self._role = Role.WORKER else: - self._current_id = self.trainer_id - self._role = Role.WORKER + self._current_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) + self._training_role = os.getenv("PADDLE_TRAINING_ROLE", + "TRAINER") + assert (self._training_role == "TRAINER") + self._worker_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS") + self._current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT") + if self._worker_endpoints: + self._worker_endpoints = self._worker_endpoints.split(",") + self._num_trainers = len(self._worker_endpoints) self._role_is_generated = True def is_worker(self): diff --git a/python/paddle/fluid/incubate/fleet/collective/__init__.py b/python/paddle/fluid/incubate/fleet/collective/__init__.py index b9da38fa8a..4c72c9636a 100644 --- a/python/paddle/fluid/incubate/fleet/collective/__init__.py +++ b/python/paddle/fluid/incubate/fleet/collective/__init__.py @@ -34,6 +34,7 @@ class DistributedStrategy(object): self.h_allreduce = False def build(self): + self.strategy_map = {} # make sure we set single precision config True if self.use_fp32 and self.use_fp16: self.use_fp16 = False @@ -48,75 +49,19 @@ class DistributedStrategy(object): class DistributedOptimizerFactory(object): - def strategy_to_optimizer_map(self): - pattern = {} - pattern["fp16"] = [ - "MixedPrecisionOptimizer", "MixedPrecisionLocalSGDOptimizer" - ] - pattern["fp32"] = ["FullPrecisionOptimizer", "LocalSGDOptimizer"] - pattern["localsgd"] = [ - "MixedPrecisionLocalSGDOptimizer", "LocalSGDOptimizer" - ] - pattern["h_allreduce"] = [ - "FullPrecisionOptimizer", - "LocalSGDOptimizer", - "MixedPrecisionOptimizer", - "MixedPrecisionLocalSGDOptimizer", - ] - self.pattern = pattern - - def create_by_strategy(self, optimizer, strategy): - if strategy == None: - strategy = DistributedStrategy() - strategy.build() - strategy_list = [] - for key in strategy.strategy_map: - if strategy.strategy_map[key]: - strategy_list.append(self.pattern[key]) - classname = list(set.intersection(*map(set, strategy_list)))[0] - return globals()[classname](optimizer, strategy) - - -class DistributedStrategy(object): def __init__(self): - # precision configs - self.use_fp16 = False - self.use_fp32 = True - # algorithmic communication - self.local_sgd = False - self.dgc = False - # communication topology configs - self.h_allreduce = False + self.strategy_to_optimizer_map() - def build(self): - # make sure we set single precision config True - if self.use_fp32 and self.use_fp16: - self.use_fp16 = False - # make sure we set single algorithmic communication True - if self.local_sgd and self.dgc: - self.local_sgd = False - self.strategy_map["fp16"] = self.use_fp16 - self.strategy_map["fp32"] = self.use_fp32 - self.strategy_map["localsgd"] = self.local_sgd - self.strategy_map["dgc"] = self.dgc - self.strategy_map["h_allreduce"] = self.h_allreduce - - -class DistributedOptimizerFactory(object): def strategy_to_optimizer_map(self): pattern = {} - pattern["fp16"] = [ - "MixedPrecisionOptimizer", "MixedPrecisionLocalSGDOptimizer" - ] - pattern["fp32"] = ["FullPrecisionOptimizer", "LocalSGDOptimizer"] - pattern["localsgd"] = [ - "MixedPrecisionLocalSGDOptimizer", "LocalSGDOptimizer" - ] + pattern["fp16"] = ["FP16SGDOptimizer", "FP16LocalSGDOptimizer"] + pattern["fp32"] = ["FP32SGDOptimizer", "FP32LocalSGDOptimizer"] + pattern["localsgd"] = ["FP16LocalSGDOptimizer", "FP32LocalSGDOptimizer"] pattern["h_allreduce"] = [ - "FullPrecisionOptimizer", - "LocalSGDOptimizer", - "MixedPrecisionOptimizer", - "MixedPrecisionLocalSGDOptimizer", + "FP32SGDOptimizer", + "FP32LocalSGDOptimizer", + "FP16SGDOptimizer", + "FP16LocalSGDOptimizer", ] self.pattern = pattern @@ -158,8 +103,10 @@ class Collective(Fleet): "You should not call 'stop_worker' method for collective mode.") def distributed_optimizer(self, optimizer, strategy=None): + optimizer_factory = DistributedOptimizerFactory() + self._optimizer = \ - DistributedOptimizerFactory.create_by_strategy(optimizer, strategy) + optimizer_factory.create_by_strategy(optimizer, strategy) return self._optimizer def save_inference_model(self, @@ -182,29 +129,13 @@ fleet = Collective() class CollectiveOpBasedOptimizer(DistributedOptimizer): """ - TBA + Collective Operator Base Class For Distributed Optimizer + The class is invisible to a user """ def __init__(self, optimizer, strategy=None): super(CollectiveOpBasedOptimizer, self).__init__(optimizer, strategy) - def _transpile_program(self, startup_program=None): - startup_program = startup_program if startup_program else \ - fluid.framework.default_startup_program() - worker_endpoints = fleet.worker_endpoints() - trainer_id = fleet.worker_index() - current_endpoint = fleet.worker_endpoints()[trainer_id] - # call transpiler - config = dist_transpiler.DistributeTranspilerConfig() - config.mode = "collective" - config.collective_mode = "sgd" - t = dist_transpiler.DistributeTranspiler(config=config) - t.transpile( - trainer_id, - trainers=','.join(worker_endpoints), - startup_program=startup_program, - current_endpoint=current_endpoint) - def backward(self, loss, startup_program=None, @@ -218,11 +149,14 @@ class CollectiveOpBasedOptimizer(DistributedOptimizer): return self._optimizer.apply_gradients(params_grads) -class MixedPrecisionOptimizer(CollectiveOpBasedOptimizer): +class FP16SGDOptimizer(CollectiveOpBasedOptimizer): """ - TBA + do all reduce within every minibatch """ + def __init__(self, optimizer, strategy=None): + super(FP16SGDOptimizer, self).__init__(optimizer, strategy) + def minimize(self, loss, startup_program=None, @@ -231,32 +165,51 @@ class MixedPrecisionOptimizer(CollectiveOpBasedOptimizer): pass -class FullPrecisionOptimizer(CollectiveOpBasedOptimizer): - """ - TBA - """ +class FP32LocalSGDOptimizer(CollectiveOpBasedOptimizer): + def __init__(self, optimizer, strategy=None): + super(FP32LocalSGDOptimizer, self).__init__(optimizer, strategy) + def minimize(self, + loss, + startup_program=None, + parameter_list=None, + no_grad_set=None): + opts, param_and_grads = self._optimizer.minimize(loss) + config = fluid.DistributeTranspilerConfig() + config.mode = 'collective' + config.collective_mode = 'local_sgd' + t = fluid.DistributeTranspiler(config=config) + t.transpile( + trainer_id=fleet.worker_index(), + trainers=fleet.worker_endpoints(), + current_endpoint=fleet.worker_endpoints()[fleet.worker_index()], + startup_program=startup_program, + program=loss.block.program) + return opts, param_and_grads + + +class FP32SGDOptimizer(CollectiveOpBasedOptimizer): def __init__(self, optimizer, strategy=None): - super(FullPrecisionOptimizer, self).__init__(optimizer, strategy) + super(FP32SGDOptimizer, self).__init__(optimizer, strategy) def minimize(self, loss, startup_program=None, parameter_list=None, no_grad_set=None): - self._transpile_program(startup_program) - - train_program = loss.block.program - param_grads = self.backward(loss) - train_program.global_block().append_op(type='c_sync_compute_stream') - data_parallel_param_grads = [] - for p, g in param_grads: - # NOTE: scale will be done on loss scale - # in multi_devices_graph_pass using nranks. - reduced_g = fluid.layers.collective._allreduce(g, g) - data_parallel_param_grads.append([p, reduced_g]) - train_program.global_block().append_op(type='c_sync_comm_stream') - self.apply_gradients(data_parallel_param_grads) + opts, param_and_grads = self._optimizer.minimize(loss) + config = fluid.DistributeTranspilerConfig() + config.mode = 'collective' + config.collective_mode = 'grad_allreduce' + t = fluid.DistributeTranspiler(config=config) + + t.transpile( + trainer_id=fleet.worker_index(), + trainers=fleet.worker_endpoints(), + current_endpoint=fleet.worker_endpoints()[fleet.worker_index()], + startup_program=startup_program, + program=loss.block.program) + return opts, param_and_grads class CollectiveOptimizer(DistributedOptimizer): -- GitLab