From 1c57d554561cb5f5be9286ebc4fa44db7e1139db Mon Sep 17 00:00:00 2001 From: MrChengmo Date: Wed, 23 Sep 2020 13:57:25 +0800 Subject: [PATCH] ps_graph support ps-gpu --- .../fleet/base/distributed_strategy.py | 26 +++++++++---------- .../distributed/fleet/base/role_maker.py | 24 ++++++++++++----- .../parameter_server_graph_optimizer.py | 4 +++ 3 files changed, 34 insertions(+), 20 deletions(-) diff --git a/python/paddle/distributed/fleet/base/distributed_strategy.py b/python/paddle/distributed/fleet/base/distributed_strategy.py index f1c836468d..e86006469f 100755 --- a/python/paddle/distributed/fleet/base/distributed_strategy.py +++ b/python/paddle/distributed/fleet/base/distributed_strategy.py @@ -107,7 +107,7 @@ class DistributedStrategy(object): All of the distributed training configurations can be configured in DistributedStrategy, such as automatic mixed precision (AMP), Layer-wise Adaptive Rate Scaling (LARS), asynchronous update parameter server(ASGD), etc. - + DistributedStrategy can be serialized into protobuf file or deserialized from protobuf file Users who run local training usually configure BuildStrategy and ExecutionStrategy, and @@ -129,7 +129,7 @@ class DistributedStrategy(object): Examples: .. code-block:: python - + import paddle.distributed.fleet as fleet strategy = fleet.DistributedStrategy() strategy.dgc = True @@ -207,7 +207,7 @@ class DistributedStrategy(object): build_strategy.fuse_broadcast_ops = True build_strategy.fuse_all_optimizer_ops = True build_strategy.enable_inplace = True - + strategy = paddle.distributed.fleet.DistributedStrategy() strategy.build_strategy = build_strategy """ @@ -248,7 +248,7 @@ class DistributedStrategy(object): strategy = fleet.DistributedStrategy() strategy.a_sync = True # by default this is True - + # code block for defining loss and local optimizer # sgd = fleet.distributed_optimizer(optimizer, strategy) """ @@ -259,7 +259,7 @@ class DistributedStrategy(object): def a_sync(self, flag): if isinstance(flag, bool): self.strategy.a_sync = flag - self.a_sync_configs = {"k_steps": 0} + self.a_sync_configs = {"k_steps": 0, "worker_device": 'cpu'} else: raise ValueError( "The type of `flag` is invalid, expected type is bool, but received %s". @@ -472,7 +472,7 @@ class DistributedStrategy(object): def sync_batch_norm(self): """ Indicating whether we are using sync_batch_norm to do synchronous batch normalization among all training nodes. - + Default value: False Examples: @@ -525,7 +525,7 @@ class DistributedStrategy(object): Examples: .. code-block:: python - + import paddle.distributed.fleet as fleet strategy = fleet.DistributedStrategy() strategy.fuse_grad_size_in_MB = 50 @@ -563,7 +563,7 @@ class DistributedStrategy(object): Examples: .. code-block:: python - + import paddle.distributed.fleet as fleet strategy = fleet.DistributedStrategy() strategy.nccl_comm_num = 2 @@ -595,7 +595,7 @@ class DistributedStrategy(object): Examples: .. code-block:: python - + import paddle.distributed.fleet as fleet strategy = fleet.DistributedStrategy() strategy.recompute = True @@ -621,7 +621,7 @@ class DistributedStrategy(object): Examples: .. code-block:: python - + import paddle.distributed.fleet as fleet strategy = fleet.DistributedStrategy() strategy.pipeline = True @@ -656,7 +656,7 @@ class DistributedStrategy(object): Examples: .. code-block:: python - + import paddle.distributed.fleet as fleet strategy = fleet.DistributedStrategy() strategy.pipeline = True @@ -971,7 +971,7 @@ class DistributedStrategy(object): [Large Batch Optimization for Deep Learning: Training BERT in 76 minutes](https://arxiv.org/abs/1904.00962). Default Value: False - + Examples: .. code-block:: python @@ -1114,7 +1114,7 @@ class DistributedStrategy(object): optimizer = paddle.optimizer.SGD(learning_rate=0.01) optimizer = fleet.distributed_optimizer(optimizer, strategy) - + """ return self.strategy.conv_workspace_size_limit diff --git a/python/paddle/distributed/fleet/base/role_maker.py b/python/paddle/distributed/fleet/base/role_maker.py index 07d9cbbee3..a7aad92fff 100644 --- a/python/paddle/distributed/fleet/base/role_maker.py +++ b/python/paddle/distributed/fleet/base/role_maker.py @@ -681,8 +681,12 @@ class PaddleCloudRoleMaker(RoleMakerBase): else: self._worker_endpoints = [] - trainers_num = int(os.environ["PADDLE_TRAINERS_NUM"]) - training_role = os.environ["TRAINING_ROLE"] + trainers_num = os.getenv("PADDLE_TRAINERS_NUM", None) + assert trainers_num != None + trainers_num = int(trainers_num) + + training_role = os.getenv("TRAINING_ROLE", None) + assert training_role != None if training_role not in ["TRAINER", "PSERVER", "HETER_TRAINER"]: raise ValueError( @@ -716,19 +720,25 @@ class PaddleCloudRoleMaker(RoleMakerBase): if training_role == "TRAINER": role = Role.WORKER - current_id = int(os.environ["PADDLE_TRAINER_ID"]) + current_id = os.getenv("PADDLE_TRAINER_ID", None) + assert current_id != None + current_id = int(current_id) if len(self._worker_endpoints) > 0: self._cur_endpoint = self._worker_endpoints[current_id] elif training_role == "PSERVER": role = Role.SERVER - port = os.environ["PADDLE_PORT"] - ip = os.environ["POD_IP"] + port = os.getenv("PADDLE_PORT", None) + assert port != None + ip = os.getenv("POD_IP", None) + assert ip != None self._cur_endpoint = ip + ":" + port current_id = self._server_endpoints.index(self._cur_endpoint) elif training_role == "HETER_TRAINER": role = Role.HETER_WORKER - cur_ip = os.environ["POD_IP"] - cur_port = os.environ["PADDLE_PORT"] + cur_port = os.getenv("PADDLE_PORT", None) + assert port != None + cur_ip = os.getenv("POD_IP", None) + assert cur_ip != None curr_endpoint = ":".join([cur_ip, cur_port]) current_id = heter_trainer_eplist.index(curr_endpoint) else: diff --git a/python/paddle/distributed/fleet/meta_optimizers/parameter_server_graph_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/parameter_server_graph_optimizer.py index dfa765364f..62224d35fe 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/parameter_server_graph_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/parameter_server_graph_optimizer.py @@ -31,6 +31,10 @@ class ParameterServerGraphOptimizer(ParameterServerOptimizer): if k_steps < 0: return False + device = self.user_defined_strategy.a_sync_configs["worker_device"] + if device.upper() != 'CPU': + return False + if self.role_maker._is_server(): return False -- GitLab