未验证 提交 27a601e8 编写于 作者: 张春乔 提交者: GitHub

remove hccl in .py files (#52934)

* remove hccl in .py files

* remove ascend in setup.py.in

* remove ascend in setup.py
上级 23f87442
......@@ -63,7 +63,7 @@ _group_map_backend = {}
# Name of the default group for init_parallel_env
_default_group_name = "_default_pg"
_valid_backend_list = ['nccl', 'gloo', 'hccl', 'heter', 'xccl', 'bkcl']
_valid_backend_list = ['nccl', 'gloo', 'heter', 'xccl', 'bkcl']
_default_store = None # the default tcp store
_default_backend = None
_default_timeout = datetime.timedelta(seconds=1800)
......
......@@ -115,7 +115,7 @@ see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/tra
"--backend",
type=str,
default=os.environ.get('PADDLE_DISTRI_BACKEND', 'auto'),
help="Specifize the backend, can be gloo|nccl|bkcl|auto|hccl|heter. "
help="Specifize the backend, can be gloo|nccl|bkcl|auto|heter. "
"Default value is auto which perfers nccl or bkcl.",
)
base_group.add_argument(
......
......@@ -1988,14 +1988,13 @@ def check_backend(backend):
'bkcl',
'cncl',
'auto',
'hccl',
'heter',
'xccl',
]:
raise ValueError(
"paddle.distributed initialize error, "
"backend argument can only be one of "
"'nccl', 'gloo', 'bkcl', 'auto', 'hccl', 'heter', 'xccl' "
"'nccl', 'gloo', 'bkcl', 'auto', 'heter', 'xccl' "
"but got %s" % backend
)
......
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import namedtuple
import hccl.manage.api as hccl
from paddle.distributed import fleet
from paddle.framework import core
from paddle.optimizer import Optimizer
from . import ascend_parser
HcomGroupConfig = namedtuple('HcomGroupConfig', ['name', 'nranks', 'rank_ids'])
__all__ = []
class AscendIRParser:
def __init__(self, auto_dp=False, world_rank_size=1):
self.graph_idx = 0
self.hcom_endpoints = {}
self.groups_to_create = []
self._auto_dp = auto_dp
self._world_rank_size = world_rank_size
def _construct_input_map(self, input_varlist):
ret_map = {}
ge_in_operator = []
for id, var in enumerate(input_varlist):
if var.is_data: # input data
ge_input = core.GEOperatorFactory.create_operator(
var.name, "Data"
).set_attr_int32("index", id)
ret_map[var.name] = ge_input
ge_in_operator.append(ge_input)
else: # param, learning ...
ge_input = core.GEOperatorFactory.create_operator(
var.name, "Variable"
)
ge_input.update_output_desc(
"y",
core.GETensorDesc(
core.GEShape(var.shape),
core.GEFormat.FORMAT_ND,
core.GEDataType.DT_FLOAT,
),
)
ret_map[var.name] = ge_input
return ge_in_operator, ret_map
def _endpoint_to_world_rank_id(self, endpoint):
world_endpoints = fleet.worker_endpoints()
assert (
endpoint in world_endpoints
), "endpoint ({}) not in worker_endpoints ({}) ".format(
endpoint,
fleet.world_device_ids(),
)
return world_endpoints.index(endpoint)
def parse_op(self, op):
if op.type == 'c_gen_nccl_id':
endpoint = op.attr("endpoint")
other_endpoints = op.attr("other_endpoints")
rank = op.attr("rank")
nccl_id = op.output_arg_names[0]
# c_gen_nccl_id operator splits endpoints into local endpoint and other_endpoints
# we should combine these together to produce world_rank_ids
self.hcom_endpoints[nccl_id] = other_endpoints[:]
self.hcom_endpoints[nccl_id].insert(rank, endpoint)
print(
"nccl_id (%s) registered endpoints %s"
% (nccl_id, self.hcom_endpoints[nccl_id])
)
elif op.type == 'c_comm_init':
nccl_id = op.input_arg_names[0]
nranks = op.attr("nranks")
assert nranks == len(
self.hcom_endpoints[nccl_id]
), "nranks doesn't match endpoint count"
rank = op.attr("rank")
ring_id = op.attr("ring_id")
group_name = "hcom_group_" + str(ring_id)
global_rank_ids = [
self._endpoint_to_world_rank_id(endpoint)
for endpoint in self.hcom_endpoints[nccl_id]
]
self.groups_to_create.append(
HcomGroupConfig(
name=group_name, nranks=nranks, rank_ids=global_rank_ids
)
)
print(
"append to create group: %s, with rank_ids: %s"
% (group_name, global_rank_ids)
)
elif op.type in ascend_parser.registerd_op:
op_parser = self.parser_factory.create_parse(
ascend_parser.registerd_op[op.type]
)
op_parser.apply(op)
else:
raise AssertionError(
'Op[%s] has not been registered, so we have to skip it'
% op.type
)
def _parse_program(
self, graph_name, program, input_varlist=[], fetch_list=[]
):
begin_graph_idx = self.graph_idx
ge_in_operator = []
ge_out_operator = []
self.var2geop = {}
block = program.global_block()
if len(block.ops) == 0:
print("There is no ops in program %s" % (graph_name))
return []
graph = core.GEGraph(graph_name)
ge_in_operator, self.var2geop = self._construct_input_map(input_varlist)
self.parser_factory = ascend_parser.AscendParserFactory(
graph, self.var2geop
)
for i, curop in list(enumerate(block.ops)):
self.parse_op(curop)
# Set fetch_var for GE
for e in fetch_list:
name = e
if not isinstance(e, str):
name = e.name
ge_out_operator.append(self.var2geop[name])
# (Debug) If you want to print back prop vars, append/assign the varname in ge_out_operator here, such as:
# if graph_name == "main":
# ge_out_operator.append(self.var2geop["reduce_sum_0.tmp_0@GRAD"])
# Add ops that may be input of a graph, such as const.
for varname, geop in self.var2geop.items():
if varname.startswith("geinput"):
ge_in_operator.append(geop)
graph.set_inputs(ge_in_operator).set_outputs(ge_out_operator)
# Remove ops of origin program
op_num = len(block.ops)
for i in range(op_num - 1, -1, -1):
block._remove_op(i)
input_varlist = [var for var in input_varlist if var.is_data]
block.append_op(
type="ascend_trigger",
inputs={"FeedList": input_varlist},
outputs={"FetchList": fetch_list},
attrs={'graph_idx': self.graph_idx},
)
self.graph_idx += 1
return graph
def parse_program(
self, startup_program, main_program, input_varlist, fetch_list
):
startup_graph = self._parse_program("startup", startup_program)
main_graph = self._parse_program(
"main", main_program, input_varlist, fetch_list
)
if self._auto_dp and self._world_rank_size > 1:
assert (
len(self.groups_to_create) == 0
), "can't parse program under auto_dp mode"
from paddle.distributed import fleet
self.groups_to_create.append(
HcomGroupConfig(
name="hcom_group_0",
nranks=fleet.world_size(),
rank_ids=list(range(fleet.world_size())),
)
)
return startup_graph, main_graph
# AscendOptimizer is a wrapper for basic optimizer now
# We will make it part of fleet meta_optimizer in the future
class AscendOptimizer(Optimizer):
def __init__(self, optimizer, fetch_list=[]):
self.inner_opt = optimizer
self.fetch_list = fetch_list
self.ascend_instance = None
def __del__(self):
print("begin AscendOptimizer del")
if self.ascend_instance is not None:
self.ascend_instance.destroy_global_resources()
core.ge_finalize()
print("end AscendOptimizer del")
def _can_apply(self):
if not self.user_defined_strategy.ascend:
return False
# TODO(hutuxian): other check here
return True
def _disable_strategy(self, dist_strategy):
dist_strategy.ascend = False
dist_strategy.ascend_configs = {}
def _get_input_varlist(self, program):
ret_list = []
for var in program.list_vars():
if var.is_data or var.persistable:
ret_list.append(var)
return ret_list
def _set_auxiliary_var(self, key, val):
super()._set_auxiliary_var(key, val)
self.inner_opt._set_auxiliary_var(key, val)
def minimize(
self,
loss,
startup_program=None,
parameter_list=None,
no_grad_set=None,
auto_dp=False,
rank_table_file=None,
precision_mode="must_keep_origin_dtype",
):
minimized = None
if self.inner_opt:
minimized = self.inner_opt.minimize(
loss, startup_program=startup_program
)
self.ascend_instance = core.AscendInstance()
from paddle.distributed import fleet
if auto_dp and fleet.world_size() > 1:
from paddle.distributed.transpiler import ascend_transpiler
t = ascend_transpiler.AscendTranspiler(
startup_program, loss.block.program
)
t.transpile()
# print(loss.block.program)
# Config about Graph Engine can be found in https://support.huaweicloud.com/
config = {
"ge.exec.deviceId": str(fleet.local_device_ids()),
"ge.graphRunMode": "1",
"ge.exec.precision_mode": precision_mode,
}
# if multi trainers
if rank_table_file and fleet.world_size() > 1:
config["ge.exec.rankTableFile"] = rank_table_file
config["ge.exec.rankId"] = str(fleet.worker_index())
config["ge.exec.isUseHcom"] = "1"
config["ge.exec.deployMode"] = "0"
print("ge_initialize config:", config)
core.ge_initialize(config)
# Init Session
self.ascend_instance.init_global_resources()
main_block = loss.block
self.parser = AscendIRParser(
auto_dp=auto_dp, world_rank_size=fleet.world_size()
)
input_varlist = self._get_input_varlist(main_block.program)
startup_graph, main_graph = self.parser.parse_program(
startup_program, main_block.program, input_varlist, self.fetch_list
)
for cfg in self.parser.groups_to_create:
print(
"create group (%s), nranks: %d, rank_ids: %s"
% (cfg.name, cfg.nranks, cfg.rank_ids)
)
hccl.create_group(cfg.name, cfg.nranks, cfg.rank_ids)
self.ascend_instance.add_ascend_subgraph(0, startup_graph)
self.ascend_instance.add_ascend_subgraph(1, main_graph)
return minimized
......@@ -743,7 +743,6 @@ class ShardingOptimizer(MetaOptimizerBase):
)
def _init_npu_pipeline_comm(self, startup_block):
# NOTE(wangxi): some bug with hccl, must set pp_degree be even number
assert (self.pp_degree % 2) == 0
max_ring_id = -1
......
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from collections import namedtuple
import paddle
from paddle import fluid
from paddle.distributed import fleet
from paddle.distributed.fleet.meta_optimizers.ascend import ascend_optimizer
from paddle.fluid import core, unique_name
from paddle.fluid.layer_helper import LayerHelper
Block = namedtuple('Block', ['program'])
Loss = namedtuple('Loss', ['block'])
paddle.enable_static()
OpRole = core.op_proto_and_checker_maker.OpRole
OP_ROLE_KEY = core.op_proto_and_checker_maker.kOpRoleAttrName()
OP_ROLE_VAR_KEY = core.op_proto_and_checker_maker.kOpRoleVarAttrName()
role = fleet.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
def init_communicator(
startup_program, main_program, current_endpoint, endpoints, ring_id
):
nranks = len(endpoints)
other_endpoints = endpoints[:]
other_endpoints.remove(current_endpoint)
group_rank = endpoints.index(current_endpoint)
assert group_rank >= 0
block = startup_program.global_block()
nccl_id_var = block.create_var(
name=unique_name.generate('nccl_id'),
persistable=True,
type=core.VarDesc.VarType.RAW,
)
block.append_op(
type='c_gen_nccl_id',
inputs={},
outputs={'Out': nccl_id_var},
attrs={
'rank': group_rank,
'endpoint': current_endpoint,
'other_endpoints': other_endpoints,
OP_ROLE_KEY: OpRole.Forward,
},
)
block.append_op(
type='c_comm_init',
inputs={'X': nccl_id_var},
outputs={},
attrs={
'nranks': nranks,
'rank': group_rank,
'ring_id': ring_id,
OP_ROLE_KEY: OpRole.Forward,
},
)
# add input op for test
fill_var_name = "tensor@Filled"
fill_var = block.create_var(
name=fill_var_name,
shape=[10, 10],
dtype='float32',
persistable=False,
stop_gradient=True,
)
block.append_op(
type="fill_constant",
outputs={"Out": fill_var_name},
attrs={
"shape": [10, 10],
"dtype": fill_var.dtype,
"value": 1.0,
"place_type": 1,
},
)
with fluid.program_guard(main_program):
op_type = "c_allreduce_sum"
data = paddle.tensor.fill_constant(
shape=[1], dtype='float32', value=2.5
)
helper = LayerHelper(op_type, **locals())
helper.append_op(
type=op_type,
inputs={'X': [data]},
outputs={'Out': [data]},
attrs={'ring_id': ring_id, 'use_calc_stream': True},
)
print("startup program:", startup_program)
print("main program:", main_program)
def train(world_endpoints, world_device_ids, local_device_ids, local_rank):
startup_programs = []
main_programs = []
# trainer_endpoints=["127.0.0.1:6071","127.0.0.1:6072","127.0.0.1:6073","127.0.0.1:6074"]
trainer_endpoints = world_endpoints
groups = [[], [], []]
groups[0] = [trainer_endpoints[0], trainer_endpoints[1]]
groups[1] = [trainer_endpoints[2], trainer_endpoints[3]]
groups[2] = [trainer_endpoints[0], trainer_endpoints[2]]
print("groups:", groups)
for i in range(len(trainer_endpoints)):
startup_programs.append(fluid.Program())
main_programs.append(fluid.Program())
for idx, group in enumerate(groups):
for te in group:
te_idx = trainer_endpoints.index(te)
startup_program = startup_programs[te_idx]
main_program = main_programs[te_idx]
init_communicator(startup_program, main_program, te, group, idx)
print(len(startup_programs))
print(startup_programs[local_rank])
print(main_programs[local_rank])
print("local rank: ", local_rank)
print("local startup program: ", startup_programs[local_rank])
startup_program = startup_programs[local_rank]
main_program = main_programs[local_rank]
loss = Loss(Block(main_program))
optimizer = ascend_optimizer.AscendOptimizer(None, fetch_list=[])
optimizer.minimize(
loss,
startup_program,
auto_dp=True,
rank_table_file=os.getenv("RANK_TABLE_FILE", None),
)
exe = paddle.static.Executor(paddle.CPUPlace())
exe.run(startup_program)
exe.run(main_program)
worker_endpoints = fleet.worker_endpoints()
world_device_ids = fleet.world_device_ids()
local_device_ids = fleet.local_device_ids()
local_rank = int(fleet.local_rank())
print("worker_endpoints:", worker_endpoints)
print("world_device_ids:", world_device_ids)
print("local_device_ids:", local_device_ids)
print("local_rank:", local_rank)
train(worker_endpoints, world_device_ids, local_device_ids, local_rank)
# -*- coding:UTF-8 -*-
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""generate hccl config file script"""
import json
import os
import socket
import sys
from argparse import ArgumentParser
def parse_args():
"""
parse args .
Args:
Returns:
args.
Examples:
>>> parse_args()
"""
parser = ArgumentParser(
description="mindspore distributed training launch "
"helper utilty that will generate hccl"
" config file"
)
parser.add_argument(
"--device_num",
type=str,
default="[0,8)",
help="The number of the Ascend accelerators used. please note that the Ascend accelerators"
"used must be continuous, such [0,4) means to use four chips "
"0,1,2,3; [0,1) means to use chip 0; The first four chips are"
"a group, and the last four chips are a group. In addition to"
"the [0,8) chips are allowed, other cross-group such as [3,6)"
"are prohibited.",
)
parser.add_argument(
"--visible_devices",
type=str,
default="0,1,2,3,4,5,6,7",
help="will use the visible devices sequentially",
)
parser.add_argument("--server_ip", type=str, default="", help="server ip")
args = parser.parse_args()
return args
def get_host_ip():
"""
get host ip
"""
ip = None
try:
hostname = socket.gethostname()
ip = socket.gethostbyname(hostname)
except EOFError:
pass
return ip
def main():
print("start", __file__)
args = parse_args()
# visible_devices
visible_devices = args.visible_devices.split(',')
print(f'visible_devices:{visible_devices}')
# server_id
ip = get_host_ip()
if args.server_ip:
server_id = args.server_ip
elif ip:
server_id = ip
else:
raise ValueError("please input server ip!")
print(f'server_id:{server_id}')
# device_num
first_num = int(args.device_num[1])
last_num = int(args.device_num[3])
if first_num < 0 or last_num > 8:
raise ValueError(
f"device num {args.device_num} must be in range [0,8] !"
)
if first_num > last_num:
raise ValueError(
"First num {} of device num {} must less than last num {} !".format(
first_num, args.device_num, last_num
)
)
if first_num < 4:
if last_num > 4:
if first_num == 0 and last_num == 8:
pass
else:
raise ValueError(
"device num {} must be in the same group of [0,4] or [4,8] !".format(
args.device_num
)
)
device_num_list = list(range(first_num, last_num))
print("device_num_list:", device_num_list)
assert len(visible_devices) >= len(device_num_list)
# construct hccn_table
device_ips = {}
with open('/etc/hccn.conf', 'r') as fin:
for hccn_item in fin.readlines():
if hccn_item.strip().startswith('address_'):
device_id, device_ip = hccn_item.split('=')
device_id = device_id.split('_')[1]
device_ips[device_id] = device_ip.strip()
hccn_table = {'version': '1.0', 'server_count': '1', 'server_list': []}
device_list = []
rank_id = 0
for instance_id in device_num_list:
device_id = visible_devices[instance_id]
device_ip = device_ips[device_id]
device = {
'device_id': device_id,
'device_ip': device_ip,
'rank_id': str(rank_id),
}
print(
'rank_id:{}, device_id:{}, device_ip:{}'.format(
rank_id, device_id, device_ip
)
)
rank_id += 1
device_list.append(device)
hccn_table['server_list'].append(
{
'server_id': server_id,
'device': device_list,
'host_nic_ip': 'reserve',
}
)
hccn_table['status'] = 'completed'
# save hccn_table to file
table_path = os.getcwd()
table_fn = os.path.join(
table_path,
'hccl_{}p_{}_{}.json'.format(
len(device_num_list), "".join(map(str, device_num_list)), server_id
),
)
with open(table_fn, 'w') as table_fp:
json.dump(hccn_table, table_fp, indent=4)
sys.stdout.flush()
print("Completed: hccl file was save in :", table_fn)
if __name__ == "__main__":
main()
......@@ -700,11 +700,7 @@ class TestParallelDyGraphRunnerBase:
nranks = len(args.endpoints.split(",")) if args.endpoints else 1
# if args.update_method == "nccl2":
if (
args.update_method == "nccl2"
or args.update_method == "bkcl"
or args.update_method == "hccl"
):
if args.update_method == "nccl2" or args.update_method == "bkcl":
strategy = paddle.distributed.parallel.ParallelStrategy()
strategy.nranks = nranks
strategy.local_rank = args.trainer_id
......@@ -818,12 +814,12 @@ class TestParallelDyGraphRunnerBase:
strategy.find_unused_parameters = True
# 3. init parallel env
if args.update_method == "nccl2" or "bkcl" or "hccl":
if args.update_method == "nccl2" or "bkcl":
fleet.init(is_collective=True, strategy=strategy)
# 4. train model
model, train_reader, opt = self.get_model()
if args.update_method == "nccl2" or "bkcl" or "hccl":
if args.update_method == "nccl2" or "bkcl":
opt = fleet.distributed_optimizer(opt)
model = fleet.distributed_model(model)
......@@ -860,7 +856,6 @@ def runtime_main(test_class):
"local",
"nccl2_reduce_layer",
"gloo",
"hccl",
],
)
parser.add_argument('--trainer_id', type=int, required=False, default=0)
......@@ -968,7 +963,6 @@ class TestDistBase(unittest.TestCase):
self._nccl2_mode = False
self._bkcl_mode = False
self._gloo_mode = False # now, support gloo backend
self._hccl_mode = False
self._pipeline_mode = False
self._mp_mode = False
self._diff_batch = False
......@@ -1767,14 +1761,6 @@ class TestDistBase(unittest.TestCase):
check_error_log=check_error_log,
log_name=log_name,
)
elif self._hccl_mode:
tr0_losses, tr1_losses = self._run_cluster_nccl2(
model_file,
required_envs,
update_method='hccl',
check_error_log=check_error_log,
log_name=log_name,
)
elif self._pipeline_mode:
tr0_losses, tr1_losses = self._run_pipeline(
model_file, required_envs, check_error_log, log_name=log_name
......
......@@ -411,7 +411,6 @@ packages=['paddle',
'paddle.distributed.fleet.elastic',
'paddle.distributed.fleet.meta_optimizers',
'paddle.distributed.fleet.meta_optimizers.sharding',
'paddle.distributed.fleet.meta_optimizers.ascend',
'paddle.distributed.fleet.meta_optimizers.dygraph_optimizer',
'paddle.distributed.fleet.runtime',
'paddle.distributed.rpc',
......
......@@ -1387,7 +1387,6 @@ def get_setup_parameters():
'paddle.distributed.fleet.elastic',
'paddle.distributed.fleet.meta_optimizers',
'paddle.distributed.fleet.meta_optimizers.sharding',
'paddle.distributed.fleet.meta_optimizers.ascend',
'paddle.distributed.fleet.meta_optimizers.dygraph_optimizer',
'paddle.distributed.fleet.runtime',
'paddle.distributed.rpc',
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册