未验证 提交 c0a82748 编写于 作者: G gongweibao 提交者: GitHub

Polish backwards optimizer dependency codes and use more default values. (#18255)

上级 d3003a16
......@@ -204,7 +204,9 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder {
AppendPass("all_reduce_deps_pass");
}
if (strategy_.enable_backward_optimizer_op_deps_) {
if (strategy_.num_trainers_ > 1 && !strategy_.async_mode_ &&
!strategy_.is_distribution_ &&
strategy_.enable_backward_optimizer_op_deps_) {
VLOG(1) << "Add backward_op_deps_pass";
AppendPass("backward_optimizer_op_deps_pass");
}
......@@ -351,6 +353,12 @@ ir::Graph *BuildStrategy::Apply(ir::Graph *graph,
} else if (pass->Type() == "mkldnn_placement_pass") {
pass->Set("mkldnn_enabled_op_types",
new std::unordered_set<std::string>(mkldnn_enabled_op_types_));
} else if (pass->Type() == "backward_optimizer_op_deps_pass") {
if (!use_cuda) {
VLOG(1) << "backward_optimizer_op_deps_pass is only supported on "
"GPU, skipped.";
continue;
}
}
VLOG(3) << "Start Apply Pass " << pass->Type();
graph = pass->Apply(graph);
......
......@@ -72,7 +72,7 @@ struct BuildStrategy {
// Add dependency between backward ops and optimization ops, make sure that
// all the backward ops are finished before running the optimization ops.
// It might make the training speed of data parallelism faster.
bool enable_backward_optimizer_op_deps_{false};
bool enable_backward_optimizer_op_deps_{true};
// TODO(dev-paddle): enable_sequential_execution depends on
// kStaleProgramOpDescs, it is not appropriate, because kStaleProgramOpDescs
// will be removed in the near future.
......
......@@ -59,7 +59,8 @@ class NCCLOpHandleBase : public OpHandleBase {
VLOG(10) << "SetRunEnv "
<< " run_order:" << run_order
<< ", use_hierarchical_allreduce:" << use_hierarchical_allreduce;
<< ", use_hierarchical_allreduce:" << use_hierarchical_allreduce
<< ", nccl_ctx_:" << nccl_ctxs_;
if (nccl_ctxs_ == nullptr) {
return;
......
......@@ -113,9 +113,12 @@ class ParallelExecutorPrivate {
auto nccl_id_var = scope->FindVar(var_name);
if (nccl_id_var) {
nccl_id = nccl_id_var->GetMutable<ncclUniqueId>();
VLOG(10) << "find nccl_id_var:" << var_name << ", nccl_id:" << nccl_id;
} else {
nccl_id = new ncclUniqueId();
PADDLE_ENFORCE(platform::dynload::ncclGetUniqueId(nccl_id));
VLOG(10) << "can't find nccl_id_var:" << var_name
<< ", nccl_id:" << nccl_id;
}
flat_nccl_ids.push_back(nccl_id);
......@@ -170,8 +173,7 @@ class ParallelExecutorPrivate {
}
}
void InitOrGetNCCLCommunicator(framework::Scope *scope,
const BuildStrategy &bst) {
void InitOrGetNCCLCommunicator(framework::Scope *scope, BuildStrategy *bst) {
const std::string var_name = "NCCLCommunicator";
auto var = scope->FindVar(var_name);
if (var != nullptr) {
......@@ -183,9 +185,24 @@ class ParallelExecutorPrivate {
return;
}
if (bst->use_hierarchical_allreduce_) {
PADDLE_ENFORCE(bst->num_trainers_ > 1, "num_trainers:%llu < 1",
bst->num_trainers_);
PADDLE_ENFORCE(bst->hierarchical_allreduce_inter_nranks_ > 1,
"inter_nranks:%d < 1",
bst->hierarchical_allreduce_inter_nranks_);
PADDLE_ENFORCE(
(bst->num_trainers_ % bst->hierarchical_allreduce_inter_nranks_ == 0),
"num_trainers:%llu mod inter_nranks:%d != 0", bst->num_trainers_,
bst->hierarchical_allreduce_inter_nranks_);
bst->hierarchical_allreduce_exter_nranks_ =
bst->num_trainers_ / bst->hierarchical_allreduce_inter_nranks_;
}
VLOG(1) << "not find " << var_name << " in scope, so recreate it!";
nccl_ctxs_ = scope->Var(var_name)->GetMutable<platform::NCCLCommunicator>();
InitNCCLCtxs(scope, bst);
InitNCCLCtxs(scope, *bst);
}
#endif
......@@ -383,7 +400,7 @@ ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places,
if (member_->use_cuda_ && member_->nranks_ > 1) {
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
member_->InitOrGetNCCLCommunicator(scope, member_->build_strategy_);
member_->InitOrGetNCCLCommunicator(scope, &member_->build_strategy_);
// Initialize device context's nccl comm, will be used by normal
// Operators like sync_batch_norm, and collective ops.
......
......@@ -286,10 +286,14 @@ class NCCLCommunicator {
bool NeedExterAllReduce() const { return h_exter_ctxs_.size() > 0; }
NCCLContextMap *GetHierarchicalInterCtx(size_t run_order) const {
PADDLE_ENFORCE(h_inter_ctxs_.size() > 0,
"must init hierarchical ctxs first!");
return h_inter_ctxs_[run_order % h_inter_ctxs_.size()].get();
}
NCCLContextMap *GetHierarchicalExterCtx(size_t run_order) const {
PADDLE_ENFORCE(h_exter_ctxs_.size() > 0,
"must init hierarchical ctxs first!");
return h_exter_ctxs_[run_order % h_exter_ctxs_.size()].get();
}
......
......@@ -1408,27 +1408,20 @@ All parameter, weight, gradient are variables in Paddle.
[](BuildStrategy &self, int nccl_comm_num) {
self.nccl_comm_num_ = nccl_comm_num;
})
.def_property("use_hierarchical_allreduce_",
.def_property("use_hierarchical_allreduce",
[](const BuildStrategy &self) {
return self.use_hierarchical_allreduce_;
},
[](BuildStrategy &self, bool use) {
self.use_hierarchical_allreduce_ = use;
})
.def_property("hierarchical_allreduce_inter_nranks_",
.def_property("hierarchical_allreduce_inter_nranks",
[](const BuildStrategy &self) {
return self.hierarchical_allreduce_inter_nranks_;
},
[](BuildStrategy &self, int nranks) {
self.hierarchical_allreduce_inter_nranks_ = nranks;
})
.def_property("hierarchical_allreduce_exter_nranks_",
[](const BuildStrategy &self) {
return self.hierarchical_allreduce_exter_nranks_;
},
[](BuildStrategy &self, int nranks) {
self.hierarchical_allreduce_exter_nranks_ = nranks;
})
.def_property(
"fuse_elewise_add_act_ops",
......
......@@ -154,7 +154,7 @@ def start_procs(args):
for i in range(selected_gpus_num):
if trainers_endpoints != "":
trainers_endpoints += ","
trainers_endpoints += "%s:617%d" % (ip, i)
trainers_endpoints += "%s:%d" % (ip, args.started_port + i)
nranks = num_nodes * selected_gpus_num
......
......@@ -288,9 +288,8 @@ class CompiledProgram(object):
if self._program:
self._build_strategy.nccl_comm_num = self._program._nccl_comm_num
self._build_strategy.use_hierarchical_allreduce_ = self._program._use_hierarchical_allreduce
self._build_strategy.hierarchical_allreduce_inter_nranks_ = self._program._hierarchical_allreduce_inter_nranks
self._build_strategy.hierarchical_allreduce_exter_nranks_ = self._program._hierarchical_allreduce_exter_nranks
self._build_strategy.use_hierarchical_allreduce = self._program._use_hierarchical_allreduce
self._build_strategy.hierarchical_allreduce_inter_nranks = self._program._hierarchical_allreduce_inter_nranks
if self._build_strategy.sync_batch_norm:
self._build_strategy.enable_sequential_execution = True
......
......@@ -2844,7 +2844,6 @@ class Program(object):
self._nccl_comm_num = 1
self._use_hierarchical_allreduce = False
self._hierarchical_allreduce_inter_nranks = 0
self._hierarchical_allreduce_exter_nranks = 0
# @deprecated(the python memory optimize transpiler is deprecated)
# whether the program is optimized by memory_optimize_transpiler
......
......@@ -129,6 +129,9 @@ class TestDistRunnerBase(object):
config = fluid.DistributeTranspilerConfig()
config.mode = "nccl2"
config.nccl_comm_num = args.nccl_comm_num
if args.use_hallreduce:
config.use_hierarchical_allreduce = True
config.hierarchical_allreduce_inter_nranks = args.hallreduce_inter_nranks
my_print(
type(self).__name__,
"begin to run transpile on trainer with nccl2 mode")
......@@ -198,15 +201,6 @@ class TestDistRunnerBase(object):
exec_strategy=exec_strategy)
my_print(type(self).__name__, "program compiled with data parallel")
if args.use_cuda and args.update_method == "nccl2":
# it just for test share_vars_from feature.
test_exe = fluid.ParallelExecutor(
use_cuda=True,
loss_name=avg_cost.name,
build_strategy=build_stra,
main_program=test_program,
share_vars_from=binary._executor)
feed_var_list = [
var for var in trainer_prog.global_block().vars.values()
if var.is_data
......@@ -327,8 +321,10 @@ def runtime_main(test_class):
parser.add_argument('--trainer_id', type=int, required=False, default=0)
parser.add_argument('--trainers', type=int, required=False, default=1)
parser.add_argument('--nccl_comm_num', type=int, required=False, default=1)
parser.add_argument('--enable_backward_deps', action='store_true')
parser.add_argument('--use_hallreduce', action='store_true')
parser.add_argument(
'--enable_backward_deps', type=bool, required=False, default=1)
'--hallreduce_inter_nranks', type=int, required=False, default=2)
parser.add_argument(
'--current_endpoint', type=str, required=False, default="")
parser.add_argument('--sync_mode', action='store_true')
......@@ -407,9 +403,10 @@ class TestDistBase(unittest.TestCase):
self._use_dgc = False
self._dygraph = False
self._nccl_comm_num = 1
self._enable_backward_deps = False
self._use_hallreduce = False
self._setup_config()
self._after_setup_config()
self._enable_backward_deps = False
def _find_free_port(self):
def __free_port():
......@@ -597,118 +594,97 @@ class TestDistBase(unittest.TestCase):
ps0.terminate()
ps1.terminate()
# print server log
'''
with open("/tmp/ps0_err.log", "rb") as fn:
sys.stderr.write("ps0 stderr: %s\n" % fn.read())
with open("/tmp/ps1_err.log", "rb") as fn:
sys.stderr.write("ps1 stderr: %s\n" % fn.read())
'''
# print log
'''
with open("/tmp/tr0_err.log", "rb") as fn:
sys.stderr.write('trainer 0 stderr: %s\n' % fn.read())
with open("/tmp/tr1_err.log", "rb") as fn:
sys.stderr.write('trainer 1 stderr: %s\n' % fn.read())
'''
return pickle.loads(tr0_out), pickle.loads(tr1_out)
def _run_cluster_nccl2(self, model, envs, nccl2_reduce_layer,
check_error_log):
# NOTE: we reuse ps_endpoints as nccl2 worker endpoints
worker_endpoints = self._ps_endpoints.split(",")
w0_ep, w1_ep = worker_endpoints
if nccl2_reduce_layer:
update_method = "nccl2_reduce_layer"
else:
update_method = "nccl2"
tr_cmd = "%s %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --update_method %s --lr %f"
tr0_cmd = tr_cmd % \
(self._python_interp, model, self._ps_endpoints,
0, w0_ep, update_method, self._lr)
tr1_cmd = tr_cmd % \
def _get_nccl2_trainer_cmd(self, model, ep, update_method, trainer_id,
trainer_num):
env = {}
tr_cmd = "%s -u %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --update_method %s --lr %f"
tr_cmd = tr_cmd % \
(self._python_interp, model, self._ps_endpoints,
1, w1_ep, update_method, self._lr)
trainer_id, ep, update_method, self._lr)
if self._mem_opt:
tr0_cmd += " --mem_opt"
tr1_cmd += " --mem_opt"
tr_cmd += " --mem_opt"
if self._use_reduce:
tr0_cmd += " --use_reduce"
tr1_cmd += " --use_reduce"
tr_cmd += " --use_reduce"
if self._use_reader_alloc:
tr0_cmd += " --use_reader_alloc"
tr1_cmd += " --use_reader_alloc"
tr_cmd += " --use_reader_alloc"
if self.__use_cuda:
tr0_cmd += " --use_cuda"
tr1_cmd += " --use_cuda"
env0 = {
"CUDA_VISIBLE_DEVICES": "0",
# for test nccl2 layer
"PADDLE_TRAINERS_NUM": "2",
"PADDLE_TRAINER_ID": "0"
}
env1 = {
"CUDA_VISIBLE_DEVICES": "1",
"PADDLE_TRAINERS_NUM": "2",
"PADDLE_TRAINER_ID": "1"
}
tr_cmd += " --use_cuda"
env.update({
"CUDA_VISIBLE_DEVICES": "{}".format(trainer_id),
"PADDLE_TRAINERS_NUM": "{}".format(trainer_num),
"PADDLE_TRAINER_ID": "{}".format(trainer_id)
})
else:
env0 = {'CPU_NUM': '1'}
env1 = {'CPU_NUM': '1'}
env.update({'CPU_NUM': '1'})
if self._use_dgc:
tr0_cmd += " --use_dgc"
tr1_cmd += " --use_dgc"
tr_cmd += " --use_dgc"
if self._mp_mode:
env = {"FLAGS_selected_gpus": "{}".format(trainer_id)}
if self._nccl_comm_num > 1:
tr0_cmd += " --nccl_comm_num {}".format(self._nccl_comm_num)
tr1_cmd += " --nccl_comm_num {}".format(self._nccl_comm_num)
tr_cmd += " --nccl_comm_num {}".format(self._nccl_comm_num)
if self._mp_mode:
env0 = {"FLAGS_selected_gpus": "0"}
env1 = {"FLAGS_selected_gpus": "1"}
if self._use_hallreduce:
tr_cmd += " --use_hallreduce --hallreduce_inter_nranks 2"
if self._enable_backward_deps:
tr0_cmd += " --enable_backward_deps 1"
tr1_cmd += " --enable_backward_deps 1"
tr_cmd += " --enable_backward_deps"
env0.update(envs)
env1.update(envs)
return tr_cmd, env
print("tr0_cmd:{}, env: {}".format(tr0_cmd, env0))
print("tr1_cmd:{}, env: {}".format(tr1_cmd, env1))
tr0_pipe = open("/tmp/tr0_err.log", "wb")
tr1_pipe = open("/tmp/tr1_err.log", "wb")
def _run_cluster_nccl2(self, model, envs, nccl2_reduce_layer,
check_error_log):
if self._use_hallreduce:
self._ps_endpoints = ""
for i in range(0, 4):
self._ps_endpoints += "127.0.0.1:%s," % (self._find_free_port())
self._ps_endpoints = self._ps_endpoints[:-1]
my_print(type(self).__name__, "going to start process 0 with nccl2")
tr0_proc = subprocess.Popen(
tr0_cmd.strip().split(" "),
stdout=subprocess.PIPE,
stderr=tr0_pipe,
env=env0)
my_print(type(self).__name__, "going to start process 1 with nccl2")
tr1_proc = subprocess.Popen(
tr1_cmd.strip().split(" "),
stdout=subprocess.PIPE,
stderr=tr1_pipe,
env=env1)
# NOTE: we reuse ps_endpoints as nccl2 worker endpoints
worker_endpoints = self._ps_endpoints.split(",")
if nccl2_reduce_layer:
update_method = "nccl2_reduce_layer"
else:
update_method = "nccl2"
tr0_out, tr0_err = tr0_proc.communicate()
tr1_out, tr1_err = tr1_proc.communicate()
trainer_num = len(worker_endpoints)
# close trainer file
tr0_pipe.close()
tr1_pipe.close()
procs = []
pipes = []
for i in range(0, trainer_num):
tr_cmd, tr_env = self._get_nccl2_trainer_cmd(
model, worker_endpoints[i], update_method, i, trainer_num)
tr_env.update(envs)
print("use_hallreduce:{} tr_cmd:{}, env: {}".format(
self._use_hallreduce, tr_cmd, tr_env))
# print log
sys.stderr.write('trainer 0 stderr: %s\n' % tr0_err)
sys.stderr.write('trainer 1 stderr: %s\n' % tr1_err)
tr_pipe = open("/tmp/tr{}_err.log".format(i), "wb")
return pickle.loads(tr0_out), pickle.loads(tr1_out)
my_print(
type(self).__name__,
"going to start process {} with nccl2".format(i))
tr_proc = subprocess.Popen(
tr_cmd.strip().split(" "),
stdout=subprocess.PIPE,
stderr=tr_pipe,
env=tr_env)
procs.append(tr_proc)
pipes.append(tr_pipe)
outs = []
for i in range(0, trainer_num):
tr_out, tr_err = procs[i].communicate()
outs.append(tr_out)
pipes[i].close()
sys.stderr.write('trainer {} stderr: {}\n'.format(i, tr_err))
return pickle.loads(outs[0]), pickle.loads(outs[1])
def check_with_place(self,
model_file,
......@@ -724,13 +700,14 @@ class TestDistBase(unittest.TestCase):
"FLAGS_rpc_deadline": "30000", # 5sec to fail fast
"FLAGS_cudnn_deterministic": "1",
"http_proxy": "",
"NCCL_P2P_DISABLE": "1"
"NCCL_P2P_DISABLE": "1",
"NCCL_SHM_DISABLE": "1"
}
required_envs.update(need_envs)
if check_error_log:
required_envs["GLOG_v"] = "3"
required_envs["GLOG_v"] = "10"
required_envs["GLOG_logtostderr"] = "1"
local_losses\
......
......@@ -72,5 +72,19 @@ class TestDistMnistNCCL2BackWardDeps(TestDistBase):
self.check_with_place("dist_mnist.py", delta=1e-5)
class TestDistMnistNCCL2HAllreduce(TestDistBase):
def _setup_config(self):
self._sync_mode = True
self._use_reduce = False
self._use_reader_alloc = False
self._nccl2_mode = True
self._use_hallreduce = True
def test_dist_train(self):
import paddle.fluid as fluid
if fluid.core.is_compiled_with_cuda():
self.check_with_place("dist_mnist.py", delta=1e-5)
if __name__ == "__main__":
unittest.main()
......@@ -172,8 +172,6 @@ class DistributeTranspilerConfig(object):
use_hierarchical_allreduce = False
#Nccl ranks in a node when use hierarchical allreduce, it's setted to gpu cards' number in most cases.
hierarchical_allreduce_inter_nranks = 0
#Nccl ranks bewteen nodes when use hierarchical allreduce, it's setted to nodes number.
hierarchical_allreduce_exter_nranks = 0
# if mode is collective
# supported modes: sgd, local_sgd
......@@ -428,10 +426,23 @@ class DistributeTranspiler(object):
self.origin_program._trainers_endpoints = trainers.split(",")
self.origin_program._nccl_comm_num = self.config.nccl_comm_num
self.origin_program._use_hierarchical_allreduce = self.config.use_hierarchical_allreduce
# check use_hierarchical_allreduce options
if self.config.use_hierarchical_allreduce:
trainers_num = len(self.origin_program._trainers_endpoints)
# selected automaticly
if self.config.hierarchical_allreduce_inter_nranks <= 1:
self.config.hierarchical_allreduce_inter_nranks = fluid.core.get_cuda_device_count(
)
assert trainers_num > self.config.hierarchical_allreduce_inter_nranks, \
"trainers_num:{} < hierarchical_allreduce_inter_nranks:{}".format(trainers_num, self.config.hierarchical_allreduce_inter_nranks)
assert trainers_num % self.config.hierarchical_allreduce_inter_nranks == 0, \
"trainers_num:{} mod hierarchical_allreduce_inter_nranks:{} != 0".format(trainers_num, self.config.hierarchical_allreduce_inter_nranks)
self.origin_program._hierarchical_allreduce_inter_nranks = \
int(self.config.hierarchical_allreduce_inter_nranks)
self.origin_program._hierarchical_allreduce_exter_nranks = \
int(self.config.hierarchical_allreduce_exter_nranks)
self._transpile_nccl2(
trainer_id,
trainers,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册