ascend_communicate (#31708)

上级 faf40da5
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
# limitations under the License. # limitations under the License.
from __future__ import print_function from __future__ import print_function
import os
import paddle.fluid as fluid import paddle.fluid as fluid
from paddle.fluid import core, unique_name from paddle.fluid import core, unique_name
...@@ -70,34 +71,50 @@ class CollectiveHelper(object): ...@@ -70,34 +71,50 @@ class CollectiveHelper(object):
nranks = len(endpoints) nranks = len(endpoints)
other_endpoints = endpoints[:] other_endpoints = endpoints[:]
other_endpoints.remove(current_endpoint) other_endpoints.remove(current_endpoint)
if rank == 0 and wait_port:
wait_server_ready(other_endpoints)
block = program.global_block() block = program.global_block()
nccl_id_var = block.create_var( if core.is_compiled_with_cuda():
name=unique_name.generate('nccl_id'), if rank == 0 and wait_port:
persistable=True, wait_server_ready(other_endpoints)
type=core.VarDesc.VarType.RAW) nccl_id_var = block.create_var(
block.append_op( name=unique_name.generate('nccl_id'),
type='c_gen_nccl_id', persistable=True,
inputs={}, type=core.VarDesc.VarType.RAW)
outputs={'Out': nccl_id_var}, block.append_op(
attrs={ type='c_gen_nccl_id',
'rank': rank, inputs={},
'endpoint': current_endpoint, outputs={'Out': nccl_id_var},
'other_endpoints': other_endpoints, attrs={
OP_ROLE_KEY: OpRole.Forward 'rank': rank,
}) 'endpoint': current_endpoint,
block.append_op( 'other_endpoints': other_endpoints,
type='c_comm_init', OP_ROLE_KEY: OpRole.Forward
inputs={'X': nccl_id_var}, })
outputs={}, block.append_op(
attrs={ type='c_comm_init',
'nranks': nranks, inputs={'X': nccl_id_var},
'rank': rank, outputs={},
'ring_id': ring_id, attrs={
OP_ROLE_KEY: OpRole.Forward 'nranks': nranks,
}) 'rank': rank,
'ring_id': ring_id,
OP_ROLE_KEY: OpRole.Forward
})
elif core.is_compiled_with_npu():
endpoint_to_index_map = {
e: idx for idx, e in enumerate(endpoints)
}
block.append_op(
type='c_comm_init_hcom',
inputs={},
outputs={},
attrs={
'nranks': nranks,
'rank': rank,
'ring_id': ring_id,
'device_id': int(os.getenv("FLAGS_selected_npus")),
'rank_ids': [endpoint_to_index_map[e] for e in endpoints],
OP_ROLE_KEY: OpRole.Forward
})
def _wait(self, current_endpoint, endpoints): def _wait(self, current_endpoint, endpoints):
assert (self.wait_port) assert (self.wait_port)
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
from __future__ import print_function from __future__ import print_function
from __future__ import division from __future__ import division
import os
import paddle.fluid as fluid import paddle.fluid as fluid
from paddle.fluid import core, unique_name from paddle.fluid import core, unique_name
...@@ -78,34 +79,50 @@ class PipelineHelper(object): ...@@ -78,34 +79,50 @@ class PipelineHelper(object):
nranks = len(endpoints) nranks = len(endpoints)
other_endpoints = endpoints[:] other_endpoints = endpoints[:]
other_endpoints.remove(current_endpoint) other_endpoints.remove(current_endpoint)
if rank == 0 and wait_port:
wait_server_ready(other_endpoints)
block = program.global_block() block = program.global_block()
nccl_id_var = block.create_var( if core.is_compiled_with_cuda():
name=unique_name.generate('nccl_id'), if rank == 0 and wait_port:
persistable=True, wait_server_ready(other_endpoints)
type=core.VarDesc.VarType.RAW) nccl_id_var = block.create_var(
block.append_op( name=unique_name.generate('nccl_id'),
type='c_gen_nccl_id', persistable=True,
inputs={}, type=core.VarDesc.VarType.RAW)
outputs={'Out': nccl_id_var}, block.append_op(
attrs={ type='c_gen_nccl_id',
'rank': rank, inputs={},
'endpoint': current_endpoint, outputs={'Out': nccl_id_var},
'other_endpoints': other_endpoints, attrs={
OP_ROLE_KEY: OpRole.Forward, 'rank': rank,
}) 'endpoint': current_endpoint,
block.append_op( 'other_endpoints': other_endpoints,
type='c_comm_init', OP_ROLE_KEY: OpRole.Forward,
inputs={'X': nccl_id_var}, })
outputs={}, block.append_op(
attrs={ type='c_comm_init',
'nranks': nranks, inputs={'X': nccl_id_var},
'rank': rank, outputs={},
'ring_id': ring_id, attrs={
OP_ROLE_KEY: OpRole.Forward, 'nranks': nranks,
}) 'rank': rank,
'ring_id': ring_id,
OP_ROLE_KEY: OpRole.Forward,
})
elif core.is_compiled_with_npu():
endpoint_to_index_map = {
e: idx for idx, e in enumerate(endpoints)
}
block.append_op(
type='c_comm_init_hcom',
inputs={},
outputs={},
attrs={
'nranks': nranks,
'rank': rank,
'ring_id': ring_id,
'device_id': int(os.getenv("FLAGS_selected_npus")),
'rank_ids': [endpoint_to_index_map[e] for e in endpoints],
OP_ROLE_KEY: OpRole.Forward
})
def _broadcast_params(self, ring_id): def _broadcast_params(self, ring_id):
block = self.startup_program.global_block() block = self.startup_program.global_block()
......
...@@ -265,7 +265,7 @@ class ShardingOptimizer(MetaOptimizerBase): ...@@ -265,7 +265,7 @@ class ShardingOptimizer(MetaOptimizerBase):
for idx, op in reversed(list(enumerate(block.ops))): for idx, op in reversed(list(enumerate(block.ops))):
if op.type in [ if op.type in [
"c_allreduce_sum", "c_sync_comm_stream", "c_allreduce_sum", "c_sync_comm_stream",
"c_calc_comm_stream", "c_gen_nccl_id", "c_comm_init" "c_calc_comm_stream", "c_gen_nccl_id", "c_comm_init, c_comm_init_hcom"
]: ]:
pass pass
elif op.type == "conditional_block": elif op.type == "conditional_block":
......
...@@ -2053,7 +2053,7 @@ class Operator(object): ...@@ -2053,7 +2053,7 @@ class Operator(object):
'feed', 'fetch', 'recurrent', 'go', 'rnn_memory_helper_grad', 'feed', 'fetch', 'recurrent', 'go', 'rnn_memory_helper_grad',
'conditional_block', 'while', 'send', 'recv', 'listen_and_serv', 'conditional_block', 'while', 'send', 'recv', 'listen_and_serv',
'fl_listen_and_serv', 'ncclInit', 'select', 'checkpoint_notify', 'fl_listen_and_serv', 'ncclInit', 'select', 'checkpoint_notify',
'gen_nccl_id', 'c_gen_nccl_id', 'c_comm_init', 'c_sync_calc_stream', 'gen_nccl_id', 'c_gen_nccl_id', 'c_comm_init', 'c_comm_init_hcom', 'c_sync_calc_stream',
'c_sync_comm_stream', 'queue_generator', 'dequeue', 'enqueue', 'c_sync_comm_stream', 'queue_generator', 'dequeue', 'enqueue',
'heter_listen_and_serv' 'heter_listen_and_serv'
} }
......
...@@ -17,6 +17,7 @@ from __future__ import print_function ...@@ -17,6 +17,7 @@ from __future__ import print_function
import sys import sys
import math import math
from functools import reduce from functools import reduce
import os
import collections import collections
import six import six
...@@ -101,34 +102,50 @@ class Collective(object): ...@@ -101,34 +102,50 @@ class Collective(object):
nranks = len(endpoints) nranks = len(endpoints)
other_endpoints = endpoints[:] other_endpoints = endpoints[:]
other_endpoints.remove(current_endpoint) other_endpoints.remove(current_endpoint)
if rank == 0 and wait_port:
wait_server_ready(other_endpoints)
block = program.global_block() block = program.global_block()
nccl_id_var = block.create_var( if core.is_compiled_with_cuda():
name=unique_name.generate('nccl_id'), if rank == 0 and wait_port:
persistable=True, wait_server_ready(other_endpoints)
type=core.VarDesc.VarType.RAW) nccl_id_var = block.create_var(
block.append_op( name=unique_name.generate('nccl_id'),
type='c_gen_nccl_id', persistable=True,
inputs={}, type=core.VarDesc.VarType.RAW)
outputs={'Out': nccl_id_var}, block.append_op(
attrs={ type='c_gen_nccl_id',
'rank': rank, inputs={},
'endpoint': current_endpoint, outputs={'Out': nccl_id_var},
'other_endpoints': other_endpoints, attrs={
self.op_role_key: OpRole.Forward 'rank': rank,
}) 'endpoint': current_endpoint,
block.append_op( 'other_endpoints': other_endpoints,
type='c_comm_init', self.op_role_key: OpRole.Forward
inputs={'X': nccl_id_var}, })
outputs={}, block.append_op(
attrs={ type='c_comm_init',
'nranks': nranks, inputs={'X': nccl_id_var},
'rank': rank, outputs={},
'ring_id': ring_id, attrs={
self.op_role_key: OpRole.Forward 'nranks': nranks,
}) 'rank': rank,
'ring_id': ring_id,
self.op_role_key: OpRole.Forward
})
elif core.is_compiled_with_npu():
endpoint_to_index_map = {
e: idx for idx, e in enumerate(endpoints)
}
block.append_op(
type='c_comm_init_hcom',
inputs={},
outputs={},
attrs={
'nranks': nranks,
'rank': rank,
'ring_id': ring_id,
'device_id': int(os.getenv("FLAGS_selected_npus")),
'rank_ids': [endpoint_to_index_map[e] for e in endpoints],
self.op_role_key: OpRole.Forward
})
def _broadcast_params(self): def _broadcast_params(self):
block = self.startup_program.global_block() block = self.startup_program.global_block()
......
...@@ -133,33 +133,49 @@ def init_communicator(program, rank, nranks, wait_port, current_endpoint, ...@@ -133,33 +133,49 @@ def init_communicator(program, rank, nranks, wait_port, current_endpoint,
return return
other_endpoints = endpoints[:] other_endpoints = endpoints[:]
other_endpoints.remove(current_endpoint) other_endpoints.remove(current_endpoint)
if rank == 0 and wait_port:
wait_server_ready(other_endpoints)
block = program.global_block() block = program.global_block()
nccl_id_var = block.create_var( if core.is_compiled_with_cuda():
name=fluid.unique_name.generate('nccl_id'), if rank == 0 and wait_port:
persistable=True, wait_server_ready(other_endpoints)
type=fluid.core.VarDesc.VarType.RAW) nccl_id_var = block.create_var(
name=fluid.unique_name.generate('nccl_id'),
block.append_op( persistable=True,
type='c_gen_nccl_id', type=fluid.core.VarDesc.VarType.RAW)
inputs={},
outputs={'Out': nccl_id_var}, block.append_op(
attrs={ type='c_gen_nccl_id',
'rank': rank, inputs={},
'endpoint': current_endpoint, outputs={'Out': nccl_id_var},
'other_endpoints': other_endpoints attrs={
}) 'rank': rank,
'endpoint': current_endpoint,
block.append_op( 'other_endpoints': other_endpoints
type='c_comm_init', })
inputs={'X': nccl_id_var},
outputs={}, block.append_op(
attrs={ type='c_comm_init',
'nranks': nranks, inputs={'X': nccl_id_var},
'rank': rank, outputs={},
'ring_id': 0, attrs={
}) 'nranks': nranks,
'rank': rank,
'ring_id': 0,
})
elif core.is_compiled_with_npu():
endpoint_to_index_map = {
e: idx for idx, e in enumerate(endpoints)
}
block.append_op(
type='c_comm_init_hcom',
inputs={},
outputs={},
attrs={
'nranks': nranks,
'rank': rank,
'ring_id': 0,
'device_id': int(os.getenv("FLAGS_selected_npus")),
'rank_ids': [endpoint_to_index_map[e] for e in endpoints],
})
def prepare_distributed_context(place=None): def prepare_distributed_context(place=None):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册