From f8e1f452c4569a051ec96695495d79a134bbea18 Mon Sep 17 00:00:00 2001 From: An Improved PeleeNet Algorithm with Feature Pyramid Networks for Image Detection <39549453+Baibaifan@users.noreply.github.com> Date: Thu, 18 Mar 2021 17:11:32 +0800 Subject: [PATCH] ascend_communicate (#31708) --- .../fleet/meta_optimizers/common.py | 71 ++++++++++++------- .../meta_optimizers/pipeline_optimizer.py | 71 ++++++++++++------- .../meta_optimizers/sharding_optimizer.py | 2 +- python/paddle/fluid/framework.py | 2 +- python/paddle/fluid/transpiler/collective.py | 71 ++++++++++++------- python/paddle/hapi/model.py | 68 +++++++++++------- 6 files changed, 176 insertions(+), 109 deletions(-) diff --git a/python/paddle/distributed/fleet/meta_optimizers/common.py b/python/paddle/distributed/fleet/meta_optimizers/common.py index 0f7ca4f4294..9befdfff04b 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/common.py +++ b/python/paddle/distributed/fleet/meta_optimizers/common.py @@ -13,6 +13,7 @@ # limitations under the License. from __future__ import print_function +import os import paddle.fluid as fluid from paddle.fluid import core, unique_name @@ -70,34 +71,50 @@ class CollectiveHelper(object): nranks = len(endpoints) other_endpoints = endpoints[:] other_endpoints.remove(current_endpoint) - if rank == 0 and wait_port: - wait_server_ready(other_endpoints) - block = program.global_block() - nccl_id_var = block.create_var( - name=unique_name.generate('nccl_id'), - persistable=True, - type=core.VarDesc.VarType.RAW) - block.append_op( - type='c_gen_nccl_id', - inputs={}, - outputs={'Out': nccl_id_var}, - attrs={ - 'rank': rank, - 'endpoint': current_endpoint, - 'other_endpoints': other_endpoints, - OP_ROLE_KEY: OpRole.Forward - }) - block.append_op( - type='c_comm_init', - inputs={'X': nccl_id_var}, - outputs={}, - attrs={ - 'nranks': nranks, - 'rank': rank, - 'ring_id': ring_id, - OP_ROLE_KEY: OpRole.Forward - }) + if core.is_compiled_with_cuda(): + if rank == 0 and wait_port: + wait_server_ready(other_endpoints) + nccl_id_var = block.create_var( + name=unique_name.generate('nccl_id'), + persistable=True, + type=core.VarDesc.VarType.RAW) + block.append_op( + type='c_gen_nccl_id', + inputs={}, + outputs={'Out': nccl_id_var}, + attrs={ + 'rank': rank, + 'endpoint': current_endpoint, + 'other_endpoints': other_endpoints, + OP_ROLE_KEY: OpRole.Forward + }) + block.append_op( + type='c_comm_init', + inputs={'X': nccl_id_var}, + outputs={}, + attrs={ + 'nranks': nranks, + 'rank': rank, + 'ring_id': ring_id, + OP_ROLE_KEY: OpRole.Forward + }) + elif core.is_compiled_with_npu(): + endpoint_to_index_map = { + e: idx for idx, e in enumerate(endpoints) + } + block.append_op( + type='c_comm_init_hcom', + inputs={}, + outputs={}, + attrs={ + 'nranks': nranks, + 'rank': rank, + 'ring_id': ring_id, + 'device_id': int(os.getenv("FLAGS_selected_npus")), + 'rank_ids': [endpoint_to_index_map[e] for e in endpoints], + OP_ROLE_KEY: OpRole.Forward + }) def _wait(self, current_endpoint, endpoints): assert (self.wait_port) diff --git a/python/paddle/distributed/fleet/meta_optimizers/pipeline_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/pipeline_optimizer.py index 67a3312552c..da8adf47b85 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/pipeline_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/pipeline_optimizer.py @@ -13,6 +13,7 @@ from __future__ import print_function from __future__ import division +import os import paddle.fluid as fluid from paddle.fluid import core, unique_name @@ -78,34 +79,50 @@ class PipelineHelper(object): nranks = len(endpoints) other_endpoints = endpoints[:] other_endpoints.remove(current_endpoint) - if rank == 0 and wait_port: - wait_server_ready(other_endpoints) - block = program.global_block() - nccl_id_var = block.create_var( - name=unique_name.generate('nccl_id'), - persistable=True, - type=core.VarDesc.VarType.RAW) - block.append_op( - type='c_gen_nccl_id', - inputs={}, - outputs={'Out': nccl_id_var}, - attrs={ - 'rank': rank, - 'endpoint': current_endpoint, - 'other_endpoints': other_endpoints, - OP_ROLE_KEY: OpRole.Forward, - }) - block.append_op( - type='c_comm_init', - inputs={'X': nccl_id_var}, - outputs={}, - attrs={ - 'nranks': nranks, - 'rank': rank, - 'ring_id': ring_id, - OP_ROLE_KEY: OpRole.Forward, - }) + if core.is_compiled_with_cuda(): + if rank == 0 and wait_port: + wait_server_ready(other_endpoints) + nccl_id_var = block.create_var( + name=unique_name.generate('nccl_id'), + persistable=True, + type=core.VarDesc.VarType.RAW) + block.append_op( + type='c_gen_nccl_id', + inputs={}, + outputs={'Out': nccl_id_var}, + attrs={ + 'rank': rank, + 'endpoint': current_endpoint, + 'other_endpoints': other_endpoints, + OP_ROLE_KEY: OpRole.Forward, + }) + block.append_op( + type='c_comm_init', + inputs={'X': nccl_id_var}, + outputs={}, + attrs={ + 'nranks': nranks, + 'rank': rank, + 'ring_id': ring_id, + OP_ROLE_KEY: OpRole.Forward, + }) + elif core.is_compiled_with_npu(): + endpoint_to_index_map = { + e: idx for idx, e in enumerate(endpoints) + } + block.append_op( + type='c_comm_init_hcom', + inputs={}, + outputs={}, + attrs={ + 'nranks': nranks, + 'rank': rank, + 'ring_id': ring_id, + 'device_id': int(os.getenv("FLAGS_selected_npus")), + 'rank_ids': [endpoint_to_index_map[e] for e in endpoints], + OP_ROLE_KEY: OpRole.Forward + }) def _broadcast_params(self, ring_id): block = self.startup_program.global_block() diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py index a7f704361d3..7fed227cd99 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py @@ -265,7 +265,7 @@ class ShardingOptimizer(MetaOptimizerBase): for idx, op in reversed(list(enumerate(block.ops))): if op.type in [ "c_allreduce_sum", "c_sync_comm_stream", - "c_calc_comm_stream", "c_gen_nccl_id", "c_comm_init" + "c_calc_comm_stream", "c_gen_nccl_id", "c_comm_init, c_comm_init_hcom" ]: pass elif op.type == "conditional_block": diff --git a/python/paddle/fluid/framework.py b/python/paddle/fluid/framework.py index e17527a3293..dd74196d37b 100644 --- a/python/paddle/fluid/framework.py +++ b/python/paddle/fluid/framework.py @@ -2053,7 +2053,7 @@ class Operator(object): 'feed', 'fetch', 'recurrent', 'go', 'rnn_memory_helper_grad', 'conditional_block', 'while', 'send', 'recv', 'listen_and_serv', 'fl_listen_and_serv', 'ncclInit', 'select', 'checkpoint_notify', - 'gen_nccl_id', 'c_gen_nccl_id', 'c_comm_init', 'c_sync_calc_stream', + 'gen_nccl_id', 'c_gen_nccl_id', 'c_comm_init', 'c_comm_init_hcom', 'c_sync_calc_stream', 'c_sync_comm_stream', 'queue_generator', 'dequeue', 'enqueue', 'heter_listen_and_serv' } diff --git a/python/paddle/fluid/transpiler/collective.py b/python/paddle/fluid/transpiler/collective.py index ae4befa004c..5d64ddd2ee5 100644 --- a/python/paddle/fluid/transpiler/collective.py +++ b/python/paddle/fluid/transpiler/collective.py @@ -17,6 +17,7 @@ from __future__ import print_function import sys import math from functools import reduce +import os import collections import six @@ -101,34 +102,50 @@ class Collective(object): nranks = len(endpoints) other_endpoints = endpoints[:] other_endpoints.remove(current_endpoint) - if rank == 0 and wait_port: - wait_server_ready(other_endpoints) - block = program.global_block() - nccl_id_var = block.create_var( - name=unique_name.generate('nccl_id'), - persistable=True, - type=core.VarDesc.VarType.RAW) - block.append_op( - type='c_gen_nccl_id', - inputs={}, - outputs={'Out': nccl_id_var}, - attrs={ - 'rank': rank, - 'endpoint': current_endpoint, - 'other_endpoints': other_endpoints, - self.op_role_key: OpRole.Forward - }) - block.append_op( - type='c_comm_init', - inputs={'X': nccl_id_var}, - outputs={}, - attrs={ - 'nranks': nranks, - 'rank': rank, - 'ring_id': ring_id, - self.op_role_key: OpRole.Forward - }) + if core.is_compiled_with_cuda(): + if rank == 0 and wait_port: + wait_server_ready(other_endpoints) + nccl_id_var = block.create_var( + name=unique_name.generate('nccl_id'), + persistable=True, + type=core.VarDesc.VarType.RAW) + block.append_op( + type='c_gen_nccl_id', + inputs={}, + outputs={'Out': nccl_id_var}, + attrs={ + 'rank': rank, + 'endpoint': current_endpoint, + 'other_endpoints': other_endpoints, + self.op_role_key: OpRole.Forward + }) + block.append_op( + type='c_comm_init', + inputs={'X': nccl_id_var}, + outputs={}, + attrs={ + 'nranks': nranks, + 'rank': rank, + 'ring_id': ring_id, + self.op_role_key: OpRole.Forward + }) + elif core.is_compiled_with_npu(): + endpoint_to_index_map = { + e: idx for idx, e in enumerate(endpoints) + } + block.append_op( + type='c_comm_init_hcom', + inputs={}, + outputs={}, + attrs={ + 'nranks': nranks, + 'rank': rank, + 'ring_id': ring_id, + 'device_id': int(os.getenv("FLAGS_selected_npus")), + 'rank_ids': [endpoint_to_index_map[e] for e in endpoints], + self.op_role_key: OpRole.Forward + }) def _broadcast_params(self): block = self.startup_program.global_block() diff --git a/python/paddle/hapi/model.py b/python/paddle/hapi/model.py index 137ca186d79..f1f8dec6401 100644 --- a/python/paddle/hapi/model.py +++ b/python/paddle/hapi/model.py @@ -133,33 +133,49 @@ def init_communicator(program, rank, nranks, wait_port, current_endpoint, return other_endpoints = endpoints[:] other_endpoints.remove(current_endpoint) - if rank == 0 and wait_port: - wait_server_ready(other_endpoints) block = program.global_block() - nccl_id_var = block.create_var( - name=fluid.unique_name.generate('nccl_id'), - persistable=True, - type=fluid.core.VarDesc.VarType.RAW) - - block.append_op( - type='c_gen_nccl_id', - inputs={}, - outputs={'Out': nccl_id_var}, - attrs={ - 'rank': rank, - 'endpoint': current_endpoint, - 'other_endpoints': other_endpoints - }) - - block.append_op( - type='c_comm_init', - inputs={'X': nccl_id_var}, - outputs={}, - attrs={ - 'nranks': nranks, - 'rank': rank, - 'ring_id': 0, - }) + if core.is_compiled_with_cuda(): + if rank == 0 and wait_port: + wait_server_ready(other_endpoints) + nccl_id_var = block.create_var( + name=fluid.unique_name.generate('nccl_id'), + persistable=True, + type=fluid.core.VarDesc.VarType.RAW) + + block.append_op( + type='c_gen_nccl_id', + inputs={}, + outputs={'Out': nccl_id_var}, + attrs={ + 'rank': rank, + 'endpoint': current_endpoint, + 'other_endpoints': other_endpoints + }) + + block.append_op( + type='c_comm_init', + inputs={'X': nccl_id_var}, + outputs={}, + attrs={ + 'nranks': nranks, + 'rank': rank, + 'ring_id': 0, + }) + elif core.is_compiled_with_npu(): + endpoint_to_index_map = { + e: idx for idx, e in enumerate(endpoints) + } + block.append_op( + type='c_comm_init_hcom', + inputs={}, + outputs={}, + attrs={ + 'nranks': nranks, + 'rank': rank, + 'ring_id': 0, + 'device_id': int(os.getenv("FLAGS_selected_npus")), + 'rank_ids': [endpoint_to_index_map[e] for e in endpoints], + }) def prepare_distributed_context(place=None): -- GitLab