未验证 提交 ec2aed2a 编写于 作者: X Xin Pan 提交者: GitHub

Merge pull request #13102 from typhoonzero/merge_dist_deps_fixes

Cherrypick dist fixes
...@@ -55,9 +55,10 @@ paddle.fluid.Inferencer.__init__ ArgSpec(args=['self', 'infer_func', 'param_path ...@@ -55,9 +55,10 @@ paddle.fluid.Inferencer.__init__ ArgSpec(args=['self', 'infer_func', 'param_path
paddle.fluid.Inferencer.infer ArgSpec(args=['self', 'inputs', 'return_numpy'], varargs=None, keywords=None, defaults=(True,)) paddle.fluid.Inferencer.infer ArgSpec(args=['self', 'inputs', 'return_numpy'], varargs=None, keywords=None, defaults=(True,))
paddle.fluid.DistributeTranspiler.__init__ ArgSpec(args=['self', 'config'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.DistributeTranspiler.__init__ ArgSpec(args=['self', 'config'], varargs=None, keywords=None, defaults=(None,))
paddle.fluid.DistributeTranspiler.get_pserver_program ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None) paddle.fluid.DistributeTranspiler.get_pserver_program ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None)
paddle.fluid.DistributeTranspiler.get_startup_program ArgSpec(args=['self', 'endpoint', 'pserver_program', 'startup_program'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.DistributeTranspiler.get_pserver_programs ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None)
paddle.fluid.DistributeTranspiler.get_startup_program ArgSpec(args=['self', 'endpoint', 'pserver_program', 'startup_program'], varargs=None, keywords=None, defaults=(None, None))
paddle.fluid.DistributeTranspiler.get_trainer_program ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None) paddle.fluid.DistributeTranspiler.get_trainer_program ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None)
paddle.fluid.DistributeTranspiler.transpile ArgSpec(args=['self', 'trainer_id', 'program', 'pservers', 'trainers', 'sync_mode'], varargs=None, keywords=None, defaults=(None, '127.0.0.1:6174', 1, True)) paddle.fluid.DistributeTranspiler.transpile ArgSpec(args=['self', 'trainer_id', 'program', 'pservers', 'trainers', 'sync_mode', 'startup_program'], varargs=None, keywords=None, defaults=(None, '127.0.0.1:6174', 1, True, None))
paddle.fluid.InferenceTranspiler.__init__ paddle.fluid.InferenceTranspiler.__init__
paddle.fluid.InferenceTranspiler.transpile ArgSpec(args=['self', 'program', 'place', 'scope'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.InferenceTranspiler.transpile ArgSpec(args=['self', 'program', 'place', 'scope'], varargs=None, keywords=None, defaults=(None,))
paddle.fluid.memory_optimize ArgSpec(args=['input_program', 'skip_opt_set', 'print_log', 'level'], varargs=None, keywords=None, defaults=(None, False, 0)) paddle.fluid.memory_optimize ArgSpec(args=['input_program', 'skip_opt_set', 'print_log', 'level'], varargs=None, keywords=None, defaults=(None, False, 0))
...@@ -329,9 +330,10 @@ paddle.fluid.contrib.BeamSearchDecoder.update_array ArgSpec(args=['self', 'array ...@@ -329,9 +330,10 @@ paddle.fluid.contrib.BeamSearchDecoder.update_array ArgSpec(args=['self', 'array
paddle.fluid.contrib.memory_usage ArgSpec(args=['program', 'batch_size'], varargs=None, keywords=None, defaults=None) paddle.fluid.contrib.memory_usage ArgSpec(args=['program', 'batch_size'], varargs=None, keywords=None, defaults=None)
paddle.fluid.transpiler.DistributeTranspiler.__init__ ArgSpec(args=['self', 'config'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.transpiler.DistributeTranspiler.__init__ ArgSpec(args=['self', 'config'], varargs=None, keywords=None, defaults=(None,))
paddle.fluid.transpiler.DistributeTranspiler.get_pserver_program ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None) paddle.fluid.transpiler.DistributeTranspiler.get_pserver_program ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None)
paddle.fluid.transpiler.DistributeTranspiler.get_startup_program ArgSpec(args=['self', 'endpoint', 'pserver_program', 'startup_program'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.transpiler.DistributeTranspiler.get_pserver_programs ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None)
paddle.fluid.transpiler.DistributeTranspiler.get_startup_program ArgSpec(args=['self', 'endpoint', 'pserver_program', 'startup_program'], varargs=None, keywords=None, defaults=(None, None))
paddle.fluid.transpiler.DistributeTranspiler.get_trainer_program ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None) paddle.fluid.transpiler.DistributeTranspiler.get_trainer_program ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None)
paddle.fluid.transpiler.DistributeTranspiler.transpile ArgSpec(args=['self', 'trainer_id', 'program', 'pservers', 'trainers', 'sync_mode'], varargs=None, keywords=None, defaults=(None, '127.0.0.1:6174', 1, True)) paddle.fluid.transpiler.DistributeTranspiler.transpile ArgSpec(args=['self', 'trainer_id', 'program', 'pservers', 'trainers', 'sync_mode', 'startup_program'], varargs=None, keywords=None, defaults=(None, '127.0.0.1:6174', 1, True, None))
paddle.fluid.transpiler.InferenceTranspiler.__init__ paddle.fluid.transpiler.InferenceTranspiler.__init__
paddle.fluid.transpiler.InferenceTranspiler.transpile ArgSpec(args=['self', 'program', 'place', 'scope'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.transpiler.InferenceTranspiler.transpile ArgSpec(args=['self', 'program', 'place', 'scope'], varargs=None, keywords=None, defaults=(None,))
paddle.fluid.transpiler.memory_optimize ArgSpec(args=['input_program', 'skip_opt_set', 'print_log', 'level'], varargs=None, keywords=None, defaults=(None, False, 0)) paddle.fluid.transpiler.memory_optimize ArgSpec(args=['input_program', 'skip_opt_set', 'print_log', 'level'], varargs=None, keywords=None, defaults=(None, False, 0))
......
...@@ -736,7 +736,7 @@ void MultiDevSSAGraphBuilder::CreateDistTrainOp(ir::Graph *result, ...@@ -736,7 +736,7 @@ void MultiDevSSAGraphBuilder::CreateDistTrainOp(ir::Graph *result,
.emplace(varname, op_dev_id); .emplace(varname, op_dev_id);
} }
} else { } else {
PADDLE_ENFORCE( PADDLE_THROW(
"the distribute training related op should be in [split_byref, " "the distribute training related op should be in [split_byref, "
"concat]."); "concat].");
} }
...@@ -746,17 +746,26 @@ void MultiDevSSAGraphBuilder::CreateDistTrainOp(ir::Graph *result, ...@@ -746,17 +746,26 @@ void MultiDevSSAGraphBuilder::CreateDistTrainOp(ir::Graph *result,
node->Op()->Type()); node->Op()->Type());
CreateComputationalOp(result, node, op_dev_id); CreateComputationalOp(result, node, op_dev_id);
if (node->Op()->Type() == "concat") { }
ConnectOp(result, result->Get<GraphOps>(kGraphOps).back().get(),
"fetch_barrier"); void SetOpInputsAllPlaces(ir::Graph *result, ir::Node *node, int num_places) {
auto *op_handle = result->Get<GraphOps>(kGraphOps).back().get();
for (ir::Node *input : node->inputs) {
VarHandle *var = nullptr;
for (int place_offset = 0; place_offset < num_places; ++place_offset) {
auto &var_holders = result->Get<GraphVars>(kGraphVars)[place_offset];
auto &var_holder = var_holders[input->Name()];
if (!var_holder.empty()) {
var = var_holder.rbegin()->get();
op_handle->AddInput(var);
}
}
} }
} }
// Create RPC related op handles that connects its in ops and out ops. // Create RPC related op handles that connects its in ops and out ops.
void MultiDevSSAGraphBuilder::CreateRPCOp(ir::Graph *result, void MultiDevSSAGraphBuilder::CreateRPCOp(ir::Graph *result,
ir::Node *node) const { ir::Node *node) const {
// FIXME(typhoonzero): Cleanup this deps for both sync mode and async mode
// put them into transpiler.
int op_dev_id = -1; int op_dev_id = -1;
if (node->Op()->Type() == "send") { if (node->Op()->Type() == "send") {
// TODO(paddle-dev): getting the first var is not safe. // TODO(paddle-dev): getting the first var is not safe.
...@@ -791,8 +800,6 @@ void MultiDevSSAGraphBuilder::CreateRPCOp(ir::Graph *result, ...@@ -791,8 +800,6 @@ void MultiDevSSAGraphBuilder::CreateRPCOp(ir::Graph *result,
} }
auto recv_param_grad = boost::get<std::vector<std::string>>( auto recv_param_grad = boost::get<std::vector<std::string>>(
node->Op()->GetAttr(OpProtoAndCheckerMaker::OpRoleVarAttrName())); node->Op()->GetAttr(OpProtoAndCheckerMaker::OpRoleVarAttrName()));
// FIXME(typhoonzero): assume each recv op output one param
// Use the same place as send.
if (recv_param_grad.size() == 2U) { if (recv_param_grad.size() == 2U) {
op_dev_id = GetVarDeviceID(*result, recv_param_grad[1]); op_dev_id = GetVarDeviceID(*result, recv_param_grad[1]);
VLOG(10) << "recv param " << recv_param_grad[0] VLOG(10) << "recv param " << recv_param_grad[0]
...@@ -806,34 +813,44 @@ void MultiDevSSAGraphBuilder::CreateRPCOp(ir::Graph *result, ...@@ -806,34 +813,44 @@ void MultiDevSSAGraphBuilder::CreateRPCOp(ir::Graph *result,
.emplace(varname, op_dev_id); .emplace(varname, op_dev_id);
} }
} else { } else {
// send_barrier and fetch_barrier op can be scheduled on device 0 // send_barrier, fetch_barrier will run on place 0;
op_dev_id = 0; op_dev_id = 0;
} }
PADDLE_ENFORCE(op_dev_id != -1, "can not find the right place for rpc op: %s", PADDLE_ENFORCE(op_dev_id != -1, "can not find the right place for rpc op: %s",
node->Op()->Type()); node->Op()->Type());
result->Get<GraphOps>(kGraphOps).emplace_back(new RPCOpHandle( result->Get<GraphOps>(kGraphOps).emplace_back(new RPCOpHandle(
result->CreateOpNode(node->Op()), *node->Op(), local_scopes_[op_dev_id], result->CreateOpNode(node->Op()), *node->Op(), local_scopes_[op_dev_id],
node->Op()->Type(), places_[op_dev_id])); node->Op()->Type(), places_[op_dev_id]));
// TODO(panyx0718): This might not be needed anymore. if (node->Op()->Type() == "send") {
if (node->Op()->Type() == "send_barrier") { CreateOpHandleIOs(result, node, op_dev_id);
ConnectOp(result, result->Get<GraphOps>(kGraphOps).back().get(), "send");
} else if (node->Op()->Type() == "recv") {
ConnectOp(result, result->Get<GraphOps>(kGraphOps).back().get(),
"send_barrier");
} else if (node->Op()->Type() == "fetch_barrier") {
ConnectOp(result, result->Get<GraphOps>(kGraphOps).back().get(), "recv");
} else if (node->Op()->Type() == "send") {
// do nothing
} else { } else {
PADDLE_THROW( // send_barrier, recv, fetch_barrier's inputs are deps var, get them from
"rpc op should be in [" // all places
"send, send_barrier. recv, fetch_barrier]"); auto p = places_[op_dev_id];
} auto *op_handle = result->Get<GraphOps>(kGraphOps).back().get();
op_handle->SetDeviceContext(p,
platform::DeviceContextPool::Instance().Get(p));
CreateOpHandleIOs(result, node, op_dev_id); SetOpInputsAllPlaces(result, node, places_.size());
for (ir::Node *output : node->outputs) {
int outvar_dev_id = op_dev_id;
if (node->Op()->Type() == "fetch_barrier") {
outvar_dev_id = GetVarDeviceID(*result, output->Name());
PADDLE_ENFORCE_NE(outvar_dev_id, -1);
}
p = places_[outvar_dev_id];
ir::Node *new_node = nullptr;
if (output->Var()) {
new_node = result->CreateVarNode(output->Var());
} else {
new_node =
result->CreateEmptyNode(output->Name(), ir::Node::Type::kVariable);
}
CreateOpOutput(result, op_handle, new_node, p, outvar_dev_id);
}
}
} }
bool MultiDevSSAGraphBuilder::IsScaleLossOp(ir::Node *node) const { bool MultiDevSSAGraphBuilder::IsScaleLossOp(ir::Node *node) const {
......
...@@ -132,63 +132,6 @@ Graph::Graph(const ProgramDesc &program) : program_(program) { ...@@ -132,63 +132,6 @@ Graph::Graph(const ProgramDesc &program) : program_(program) {
} }
} }
std::vector<ir::Node *> send_ops;
ir::Node *send_bar = nullptr;
std::vector<ir::Node *> recv_ops;
ir::Node *fetch_bar = nullptr;
for (ir::Node *node : Nodes()) {
if (node->Name() == "send") {
send_ops.push_back(node);
} else if (node->Name() == "send_barrier") {
PADDLE_ENFORCE(!send_bar, "only has one send barrier");
send_bar = node;
} else if (node->Name() == "recv") {
recv_ops.push_back(node);
} else if (node->Name() == "fetch_barrier") {
PADDLE_ENFORCE(!fetch_bar, "only has one fetch barrier");
fetch_bar = node;
}
}
if (send_bar) {
for (ir::Node *send : send_ops) {
ir::Node *dep_var = CreateControlDepVar();
send->outputs.push_back(dep_var);
dep_var->inputs.push_back(send);
send_bar->inputs.push_back(dep_var);
dep_var->outputs.push_back(send_bar);
}
for (ir::Node *recv : recv_ops) {
ir::Node *dep_var = CreateControlDepVar();
recv->inputs.push_back(dep_var);
dep_var->outputs.push_back(recv);
send_bar->outputs.push_back(dep_var);
dep_var->inputs.push_back(send_bar);
}
}
if (fetch_bar) {
for (ir::Node *recv : recv_ops) {
ir::Node *dep_var = CreateControlDepVar();
recv->outputs.push_back(dep_var);
dep_var->inputs.push_back(recv);
fetch_bar->inputs.push_back(dep_var);
dep_var->outputs.push_back(fetch_bar);
}
}
std::vector<std::string> send_vars = FindDistTrainSendVars(send_ops);
std::vector<std::string> recv_vars = FindDistTrainRecvVars(recv_ops);
for (ir::Node *node : Nodes()) {
if (IsDistTrainOp(node, send_vars, recv_vars)) {
if (fetch_bar && node->Name() == "concat") {
ir::Node *dep_var = CreateControlDepVar();
fetch_bar->outputs.push_back(dep_var);
dep_var->inputs.push_back(fetch_bar);
node->inputs.push_back(dep_var);
dep_var->outputs.push_back(node);
}
}
}
/** /**
* We should handle write after read(WAR) and write after write(WAW) here. * We should handle write after read(WAR) and write after write(WAW) here.
* Because some of the operators of the program can be executed parallelly. * Because some of the operators of the program can be executed parallelly.
......
...@@ -52,6 +52,8 @@ class FetchBarrierOp : public framework::OperatorBase { ...@@ -52,6 +52,8 @@ class FetchBarrierOp : public framework::OperatorBase {
class FetchBarrierOpMaker : public framework::OpProtoAndCheckerMaker { class FetchBarrierOpMaker : public framework::OpProtoAndCheckerMaker {
public: public:
void Make() { void Make() {
AddOutput("Out", "(Any) Dummy outputs, used for control dependency")
.AsDuplicable();
AddComment(R"DOC( AddComment(R"DOC(
SendBarrier operator SendBarrier operator
......
...@@ -56,6 +56,10 @@ class SendBarrierOp : public framework::OperatorBase { ...@@ -56,6 +56,10 @@ class SendBarrierOp : public framework::OperatorBase {
class SendBarrierOpMaker : public framework::OpProtoAndCheckerMaker { class SendBarrierOpMaker : public framework::OpProtoAndCheckerMaker {
public: public:
void Make() { void Make() {
AddInput("X", "(Any) Dummy inputs, used for control dependency")
.AsDuplicable();
AddOutput("Out", "(Any) Dummy outputs, used for control dependency")
.AsDuplicable();
AddComment(R"DOC( AddComment(R"DOC(
SendBarrier operator SendBarrier operator
......
...@@ -246,7 +246,11 @@ def Send(endpoints, send_vars, dummy_output=None, sync=True): ...@@ -246,7 +246,11 @@ def Send(endpoints, send_vars, dummy_output=None, sync=True):
rpc_op_role_name: core.op_proto_and_checker_maker.OpRole.RPC rpc_op_role_name: core.op_proto_and_checker_maker.OpRole.RPC
}) })
if sync: if sync:
helper.append_op(type="send_barrier", attrs={"endpoints": endpoints}) helper.append_op(
type="send_barrier",
inputs={"X": dummy_output},
outputs={"Out": []},
attrs={"endpoints": endpoints})
def Recv(endpoints, get_vars, dummy_input=None, sync=True): def Recv(endpoints, get_vars, dummy_input=None, sync=True):
...@@ -282,7 +286,10 @@ def Recv(endpoints, get_vars, dummy_input=None, sync=True): ...@@ -282,7 +286,10 @@ def Recv(endpoints, get_vars, dummy_input=None, sync=True):
attrs={"endpoints": endpoints, attrs={"endpoints": endpoints,
"epmap": epmap}) "epmap": epmap})
if sync: if sync:
helper.append_op(type="fetch_barrier", attrs={"endpoints": endpoints}) helper.append_op(
type="fetch_barrier",
outputs={"Out": get_vars},
attrs={"endpoints": endpoints})
return get_vars return get_vars
......
...@@ -130,7 +130,12 @@ class SE_ResNeXt(): ...@@ -130,7 +130,12 @@ class SE_ResNeXt():
input=conv, pool_size=7, pool_type='avg', global_pooling=True) input=conv, pool_size=7, pool_type='avg', global_pooling=True)
drop = fluid.layers.dropout(x=pool, dropout_prob=0.2) drop = fluid.layers.dropout(x=pool, dropout_prob=0.2)
stdv = 1.0 / math.sqrt(drop.shape[1] * 1.0) stdv = 1.0 / math.sqrt(drop.shape[1] * 1.0)
out = fluid.layers.fc(input=drop, size=class_dim, act='softmax') out = fluid.layers.fc(
input=drop,
size=class_dim,
act='softmax',
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant(value=0.05)))
return out return out
def shortcut(self, input, ch_out, stride): def shortcut(self, input, ch_out, stride):
...@@ -180,7 +185,7 @@ class SE_ResNeXt(): ...@@ -180,7 +185,7 @@ class SE_ResNeXt():
act=None, act=None,
# avoid pserver CPU init differs from GPU # avoid pserver CPU init differs from GPU
param_attr=fluid.ParamAttr( param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant()), initializer=fluid.initializer.Constant(value=0.05)),
bias_attr=False) bias_attr=False)
return fluid.layers.batch_norm(input=conv, act=act) return fluid.layers.batch_norm(input=conv, act=act)
...@@ -188,13 +193,19 @@ class SE_ResNeXt(): ...@@ -188,13 +193,19 @@ class SE_ResNeXt():
pool = fluid.layers.pool2d( pool = fluid.layers.pool2d(
input=input, pool_size=0, pool_type='avg', global_pooling=True) input=input, pool_size=0, pool_type='avg', global_pooling=True)
stdv = 1.0 / math.sqrt(pool.shape[1] * 1.0) stdv = 1.0 / math.sqrt(pool.shape[1] * 1.0)
squeeze = fluid.layers.fc(input=pool, squeeze = fluid.layers.fc(
size=num_channels // reduction_ratio, input=pool,
act='relu') size=num_channels // reduction_ratio,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant(value=0.05)),
act='relu')
stdv = 1.0 / math.sqrt(squeeze.shape[1] * 1.0) stdv = 1.0 / math.sqrt(squeeze.shape[1] * 1.0)
excitation = fluid.layers.fc(input=squeeze, excitation = fluid.layers.fc(
size=num_channels, input=squeeze,
act='sigmoid') size=num_channels,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant(value=0.05)),
act='sigmoid')
scale = fluid.layers.elementwise_mul(x=input, y=excitation, axis=0) scale = fluid.layers.elementwise_mul(x=input, y=excitation, axis=0)
return scale return scale
......
...@@ -49,28 +49,32 @@ class TestDistWord2vec2x2(TestDistRunnerBase): ...@@ -49,28 +49,32 @@ class TestDistWord2vec2x2(TestDistRunnerBase):
dtype='float32', dtype='float32',
is_sparse=IS_SPARSE, is_sparse=IS_SPARSE,
param_attr=fluid.ParamAttr( param_attr=fluid.ParamAttr(
name='shared_w', initializer=fluid.initializer.Constant())) name='shared_w',
initializer=fluid.initializer.Constant(value=0.1)))
embed_second = fluid.layers.embedding( embed_second = fluid.layers.embedding(
input=words[1], input=words[1],
size=[dict_size, EMBED_SIZE], size=[dict_size, EMBED_SIZE],
dtype='float32', dtype='float32',
is_sparse=IS_SPARSE, is_sparse=IS_SPARSE,
param_attr=fluid.ParamAttr( param_attr=fluid.ParamAttr(
name='shared_w', initializer=fluid.initializer.Constant())) name='shared_w',
initializer=fluid.initializer.Constant(value=0.1)))
embed_third = fluid.layers.embedding( embed_third = fluid.layers.embedding(
input=words[2], input=words[2],
size=[dict_size, EMBED_SIZE], size=[dict_size, EMBED_SIZE],
dtype='float32', dtype='float32',
is_sparse=IS_SPARSE, is_sparse=IS_SPARSE,
param_attr=fluid.ParamAttr( param_attr=fluid.ParamAttr(
name='shared_w', initializer=fluid.initializer.Constant())) name='shared_w',
initializer=fluid.initializer.Constant(value=0.1)))
embed_forth = fluid.layers.embedding( embed_forth = fluid.layers.embedding(
input=words[3], input=words[3],
size=[dict_size, EMBED_SIZE], size=[dict_size, EMBED_SIZE],
dtype='float32', dtype='float32',
is_sparse=IS_SPARSE, is_sparse=IS_SPARSE,
param_attr=fluid.ParamAttr( param_attr=fluid.ParamAttr(
name='shared_w', initializer=fluid.initializer.Constant())) name='shared_w',
initializer=fluid.initializer.Constant(value=0.1)))
concat_embed = fluid.layers.concat( concat_embed = fluid.layers.concat(
input=[embed_first, embed_second, embed_third, embed_forth], input=[embed_first, embed_second, embed_third, embed_forth],
...@@ -80,13 +84,13 @@ class TestDistWord2vec2x2(TestDistRunnerBase): ...@@ -80,13 +84,13 @@ class TestDistWord2vec2x2(TestDistRunnerBase):
size=HIDDEN_SIZE, size=HIDDEN_SIZE,
act='sigmoid', act='sigmoid',
param_attr=fluid.ParamAttr( param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant())) initializer=fluid.initializer.Constant(value=0.1)))
predict_word = fluid.layers.fc( predict_word = fluid.layers.fc(
input=hidden1, input=hidden1,
size=dict_size, size=dict_size,
act='softmax', act='softmax',
param_attr=fluid.ParamAttr( param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant())) initializer=fluid.initializer.Constant(value=0.1)))
cost = fluid.layers.cross_entropy( cost = fluid.layers.cross_entropy(
input=predict_word, label=words[4]) input=predict_word, label=words[4])
avg_cost = fluid.layers.mean(cost) avg_cost = fluid.layers.mean(cost)
......
...@@ -21,7 +21,7 @@ import sys ...@@ -21,7 +21,7 @@ import sys
import six import six
import signal import signal
import subprocess import subprocess
import six import argparse
class TestDistRunnerBase(object): class TestDistRunnerBase(object):
...@@ -30,7 +30,7 @@ class TestDistRunnerBase(object): ...@@ -30,7 +30,7 @@ class TestDistRunnerBase(object):
"get_model should be implemented by child classes.") "get_model should be implemented by child classes.")
def get_transpiler(self, trainer_id, main_program, pserver_endpoints, def get_transpiler(self, trainer_id, main_program, pserver_endpoints,
trainers): trainers, sync_mode):
# NOTE: import fluid until runtime, or else forking processes will cause error. # NOTE: import fluid until runtime, or else forking processes will cause error.
import paddle import paddle
import paddle.fluid as fluid import paddle.fluid as fluid
...@@ -39,33 +39,35 @@ class TestDistRunnerBase(object): ...@@ -39,33 +39,35 @@ class TestDistRunnerBase(object):
trainer_id=trainer_id, trainer_id=trainer_id,
program=main_program, program=main_program,
pservers=pserver_endpoints, pservers=pserver_endpoints,
trainers=trainers) trainers=trainers,
sync_mode=sync_mode)
return t return t
def run_pserver(self, pserver_endpoints, trainers, current_endpoint, def run_pserver(self, args):
trainer_id):
import paddle import paddle
import paddle.fluid as fluid import paddle.fluid as fluid
self.get_model(batch_size=2) self.get_model(batch_size=2)
t = self.get_transpiler(trainer_id, t = self.get_transpiler(args.trainer_id,
fluid.default_main_program(), pserver_endpoints, fluid.default_main_program(), args.endpoints,
trainers) args.trainers, args.sync_mode)
pserver_prog = t.get_pserver_program(current_endpoint) pserver_prog = t.get_pserver_program(args.current_endpoint)
startup_prog = t.get_startup_program(current_endpoint, pserver_prog) startup_prog = t.get_startup_program(args.current_endpoint,
pserver_prog)
place = fluid.CPUPlace() place = fluid.CPUPlace()
exe = fluid.Executor(place) exe = fluid.Executor(place)
exe.run(startup_prog) exe.run(startup_prog)
exe.run(pserver_prog) exe.run(pserver_prog)
def run_trainer(self, place, endpoints, trainer_id, trainers, is_dist=True): def run_trainer(self, place, args):
import paddle import paddle
import paddle.fluid as fluid import paddle.fluid as fluid
test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \ test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
self.get_model(batch_size=2) self.get_model(batch_size=2)
if is_dist: if args.is_dist:
t = self.get_transpiler(trainer_id, t = self.get_transpiler(args.trainer_id,
fluid.default_main_program(), endpoints, fluid.default_main_program(),
trainers) args.endpoints, args.trainers,
args.sync_mode)
trainer_prog = t.get_trainer_program() trainer_prog = t.get_trainer_program()
else: else:
trainer_prog = fluid.default_main_program() trainer_prog = fluid.default_main_program()
...@@ -76,8 +78,18 @@ class TestDistRunnerBase(object): ...@@ -76,8 +78,18 @@ class TestDistRunnerBase(object):
strategy = fluid.ExecutionStrategy() strategy = fluid.ExecutionStrategy()
strategy.num_threads = 1 strategy.num_threads = 1
strategy.allow_op_delay = False strategy.allow_op_delay = False
build_stra = fluid.BuildStrategy()
if args.use_reduce:
build_stra.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce
else:
build_stra.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce
exe = fluid.ParallelExecutor( exe = fluid.ParallelExecutor(
True, loss_name=avg_cost.name, exec_strategy=strategy) True,
loss_name=avg_cost.name,
exec_strategy=strategy,
build_strategy=build_stra)
feed_var_list = [ feed_var_list = [
var for var in trainer_prog.global_block().vars.values() var for var in trainer_prog.global_block().vars.values()
...@@ -106,45 +118,64 @@ def runtime_main(test_class): ...@@ -106,45 +118,64 @@ def runtime_main(test_class):
import paddle.fluid as fluid import paddle.fluid as fluid
import paddle.fluid.core as core import paddle.fluid.core as core
if len(sys.argv) != 7: parser = argparse.ArgumentParser(description='Run dist test.')
print( parser.add_argument(
"Usage: python dist_se_resnext.py [pserver/trainer] [endpoints] [trainer_id] [current_endpoint] [trainers] [is_dist]" '--role', type=str, required=True, choices=['pserver', 'trainer'])
) parser.add_argument('--endpoints', type=str, required=False, default="")
role = sys.argv[1] parser.add_argument('--is_dist', action='store_true')
endpoints = sys.argv[2] parser.add_argument('--trainer_id', type=int, required=False, default=0)
trainer_id = int(sys.argv[3]) parser.add_argument('--trainers', type=int, required=False, default=1)
current_endpoint = sys.argv[4] parser.add_argument(
trainers = int(sys.argv[5]) '--current_endpoint', type=str, required=False, default="")
is_dist = True if sys.argv[6] == "TRUE" else False parser.add_argument('--sync_mode', action='store_true')
parser.add_argument('--mem_opt', action='store_true')
parser.add_argument('--use_reduce', action='store_true')
args = parser.parse_args()
model = test_class() model = test_class()
if role == "pserver": if args.role == "pserver":
model.run_pserver(endpoints, trainers, current_endpoint, trainer_id) model.run_pserver(args)
else: else:
p = fluid.CUDAPlace(0) if core.is_compiled_with_cuda( p = fluid.CUDAPlace(0) if core.is_compiled_with_cuda(
) else fluid.CPUPlace() ) else fluid.CPUPlace()
model.run_trainer(p, endpoints, trainer_id, trainers, is_dist) model.run_trainer(p, args)
import paddle.compat as cpt import paddle.compat as cpt
class TestDistBase(unittest.TestCase): class TestDistBase(unittest.TestCase):
def _setup_config(self):
raise NotImplementedError("tests should have _setup_config implemented")
def setUp(self): def setUp(self):
self._trainers = 2 self._trainers = 2
self._pservers = 2 self._pservers = 2
self._ps_endpoints = "127.0.0.1:9123,127.0.0.1:9124" self._ps_endpoints = "127.0.0.1:9123,127.0.0.1:9124"
self._python_interp = "python" self._python_interp = "python"
self._sync_mode = True
self._mem_opt = False
self._use_reduce = False
self._setup_config()
def start_pserver(self, model_file, check_error_log): def start_pserver(self, model_file, check_error_log):
ps0_ep, ps1_ep = self._ps_endpoints.split(",") ps0_ep, ps1_ep = self._ps_endpoints.split(",")
ps0_cmd = "%s %s pserver %s 0 %s %d TRUE" % \ ps_cmd = "%s %s --role pserver --endpoints %s --trainer_id 0 --current_endpoint %s --trainers %d --is_dist"
ps0_cmd = ps_cmd % \
(self._python_interp, model_file, self._ps_endpoints, ps0_ep, (self._python_interp, model_file, self._ps_endpoints, ps0_ep,
self._trainers) self._trainers)
ps1_cmd = "%s %s pserver %s 0 %s %d TRUE" % \ ps1_cmd = ps_cmd % \
(self._python_interp, model_file, self._ps_endpoints, ps1_ep, (self._python_interp, model_file, self._ps_endpoints, ps1_ep,
self._trainers) self._trainers)
if self._sync_mode:
ps0_cmd += " --sync_mode"
ps1_cmd += " --sync_mode"
if self._mem_opt:
ps0_cmd += " --mem_opt"
ps1_cmd += " --mem_opt"
ps0_pipe = subprocess.PIPE ps0_pipe = subprocess.PIPE
ps1_pipe = subprocess.PIPE ps1_pipe = subprocess.PIPE
if check_error_log: if check_error_log:
...@@ -195,9 +226,7 @@ class TestDistBase(unittest.TestCase): ...@@ -195,9 +226,7 @@ class TestDistBase(unittest.TestCase):
# Run local to get a base line # Run local to get a base line
env_local = {"CUDA_VISIBLE_DEVICES": "0"} env_local = {"CUDA_VISIBLE_DEVICES": "0"}
env_local.update(required_envs) env_local.update(required_envs)
local_cmd = "%s %s trainer %s 0 %s %d FLASE" % \ local_cmd = "%s %s --role trainer" % (self._python_interp, model_file)
(self._python_interp, model_file,
"127.0.0.1:1234", "127.0.0.1:1234", 1)
if not check_error_log: if not check_error_log:
local_proc = subprocess.Popen( local_proc = subprocess.Popen(
local_cmd.split(" "), local_cmd.split(" "),
...@@ -226,12 +255,23 @@ class TestDistBase(unittest.TestCase): ...@@ -226,12 +255,23 @@ class TestDistBase(unittest.TestCase):
self._wait_ps_ready(ps1.pid) self._wait_ps_ready(ps1.pid)
ps0_ep, ps1_ep = self._ps_endpoints.split(",") ps0_ep, ps1_ep = self._ps_endpoints.split(",")
tr0_cmd = "%s %s trainer %s 0 %s %d TRUE" % \ tr_cmd = "%s %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --trainers %d --is_dist"
(self._python_interp, model_file, self._ps_endpoints, ps0_ep, tr0_cmd = tr_cmd % \
self._trainers) (self._python_interp, model_file, self._ps_endpoints,
tr1_cmd = "%s %s trainer %s 1 %s %d TRUE" % \ 0, ps0_ep, self._trainers)
(self._python_interp, model_file, self._ps_endpoints, ps1_ep, tr1_cmd = tr_cmd % \
self._trainers) (self._python_interp, model_file, self._ps_endpoints,
1, ps1_ep, self._trainers)
if self._sync_mode:
tr0_cmd += " --sync_mode"
tr1_cmd += " --sync_mode"
if self._mem_opt:
tr0_cmd += " --mem_opt"
tr1_cmd += " --mem_opt"
if self._use_reduce:
tr0_cmd += " --use_reduce"
tr1_cmd += " --use_reduce"
env0 = {"CUDA_VISIBLE_DEVICES": "0"} env0 = {"CUDA_VISIBLE_DEVICES": "0"}
env1 = {"CUDA_VISIBLE_DEVICES": "1"} env1 = {"CUDA_VISIBLE_DEVICES": "1"}
...@@ -282,6 +322,10 @@ class TestDistBase(unittest.TestCase): ...@@ -282,6 +322,10 @@ class TestDistBase(unittest.TestCase):
# FIXME: use terminate() instead of sigkill. # FIXME: use terminate() instead of sigkill.
os.kill(ps0.pid, signal.SIGKILL) os.kill(ps0.pid, signal.SIGKILL)
os.kill(ps1.pid, signal.SIGKILL) os.kill(ps1.pid, signal.SIGKILL)
ps0.terminate()
ps1.terminate()
ps0.wait()
ps1.wait()
FNULL.close() FNULL.close()
self.assertAlmostEqual(local_first_loss, dist_first_loss, delta=delta) self.assertAlmostEqual(local_first_loss, dist_first_loss, delta=delta)
......
...@@ -17,10 +17,51 @@ import unittest ...@@ -17,10 +17,51 @@ import unittest
from test_dist_base import TestDistBase from test_dist_base import TestDistBase
class TestDistSeResneXt2x2(TestDistBase): class TestDistMnist2x2(TestDistBase):
def _setup_config(self):
self._sync_mode = True
self._use_reduce = False
def test_se_resnext(self):
self.check_with_place("dist_mnist.py", delta=1e-7)
class TestDistMnist2x2WithMemopt(TestDistBase):
def _setup_config(self):
self._sync_mode = True
self._mem_opt = True
def test_se_resnext(self): def test_se_resnext(self):
self.check_with_place("dist_mnist.py", delta=1e-7) self.check_with_place("dist_mnist.py", delta=1e-7)
class TestDistMnistAsync(TestDistBase):
def _setup_config(self):
self._sync_mode = False
self._use_reduce = False
def test_se_resnext(self):
self.check_with_place("dist_mnist.py", delta=200)
# FIXME(typhoonzero): enable these tests once we have 4
# 4 GPUs on CI machine, and the base class should be updated.
#
# class TestDistMnist2x2ReduceMode(TestDistBase):
# def _setup_config(self):
# self._sync_mode = True
# self._use_reduce = True
# def test_se_resnext(self):
# self.check_with_place("dist_mnist.py", delta=1e-7)
# class TestDistMnistAsyncReduceMode(TestDistBase):
# def _setup_config(self):
# self._sync_mode = False
# self._use_reduce = True
# def test_se_resnext(self):
# self.check_with_place("dist_mnist.py", delta=200)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
...@@ -18,9 +18,20 @@ from test_dist_base import TestDistBase ...@@ -18,9 +18,20 @@ from test_dist_base import TestDistBase
class TestDistSeResneXt2x2(TestDistBase): class TestDistSeResneXt2x2(TestDistBase):
def _setup_config(self):
self._sync_mode = True
def test_se_resnext(self): def test_se_resnext(self):
self.check_with_place("dist_se_resnext.py", delta=1e-7) self.check_with_place("dist_se_resnext.py", delta=1e-7)
class TestDistSeResneXt2x2Async(TestDistBase):
def _setup_config(self):
self._sync_mode = False
def test_se_resnext(self):
self.check_with_place("dist_se_resnext.py", delta=100)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
...@@ -100,7 +100,7 @@ class TestSendOp(unittest.TestCase): ...@@ -100,7 +100,7 @@ class TestSendOp(unittest.TestCase):
main.global_block().append_op( main.global_block().append_op(
type="fetch_barrier", type="fetch_barrier",
inputs={}, inputs={},
outputs={}, outputs={"Out": []},
attrs={ attrs={
"endpoints": ["127.0.0.1:{0}".format(port)], "endpoints": ["127.0.0.1:{0}".format(port)],
RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE
......
...@@ -15,14 +15,55 @@ ...@@ -15,14 +15,55 @@
from __future__ import print_function from __future__ import print_function
import unittest import unittest
import paddle
from test_dist_base import TestDistBase from test_dist_base import TestDistBase
class TestDistTransformer2x2(TestDistBase): def download_files():
url_prefix = 'http://paddle-unittest-data.cdn.bcebos.com/dist_transformer/'
vocab_url = url_prefix + 'vocab.bpe.32000'
vocab_md5 = 'a86d345ca6e27f6591d0dccb1b9be853'
paddle.dataset.common.download(vocab_url, 'test_dist_transformer',
vocab_md5)
local_train_url = url_prefix + 'train.tok.clean.bpe.32000.en-de'
local_train_md5 = '033eb02b9449e6dd823f050782ac8914'
paddle.dataset.common.download(local_train_url, 'test_dist_transformer',
local_train_md5)
train0_url = url_prefix + 'train.tok.clean.bpe.32000.en-de.train_0'
train0_md5 = 'ddce7f602f352a0405267285379a38b1'
paddle.dataset.common.download(train0_url, 'test_dist_transformer',
train0_md5)
train1_url = url_prefix + 'train.tok.clean.bpe.32000.en-de.train_1'
train1_md5 = '8757798200180285b1a619cd7f408747'
paddle.dataset.common.download(train1_url, 'test_dist_transformer',
train1_md5)
test_url = url_prefix + 'newstest2013.tok.bpe.32000.en-de'
test_md5 = '9dd74a266dbdb25314183899f269b4a2'
paddle.dataset.common.download(test_url, 'test_dist_transformer', test_md5)
class TestDistTransformer2x2Sync(TestDistBase):
def _setup_config(self):
self._sync_mode = True
def test_transformer(self):
download_files()
#Note: loss on test dataset of the first 5 batch are:
# 10.518872, 10.518871, 10.518868, 10.518862, 10.518855
self.check_with_place("dist_transformer.py", delta=1e-7)
class TestDistTransformer2x2Async(TestDistBase):
def _setup_config(self):
self._sync_mode = False
def test_transformer(self): def test_transformer(self):
# TODO(paddle-dev): check if the delta is OK. download_files()
# Usually start around ~8000 and converge to ~5000 self.check_with_place("dist_transformer.py", delta=1.0)
self.check_with_place("dist_transformer.py", delta=400)
if __name__ == "__main__": if __name__ == "__main__":
......
...@@ -18,8 +18,19 @@ from test_dist_base import TestDistBase ...@@ -18,8 +18,19 @@ from test_dist_base import TestDistBase
class TestDistSeResneXt2x2(TestDistBase): class TestDistSeResneXt2x2(TestDistBase):
def _setup_config(self):
self._sync_mode = True
def test_se_resnext(self):
self.check_with_place("dist_word2vec.py", delta=1e-4)
class TestDistSeResneXt2x2Async(TestDistBase):
def _setup_config(self):
self._sync_mode = False
def test_se_resnext(self): def test_se_resnext(self):
self.check_with_place("dist_word2vec.py", delta=1e-7) self.check_with_place("dist_word2vec.py", delta=1)
if __name__ == "__main__": if __name__ == "__main__":
......
...@@ -39,3 +39,136 @@ def find_op_by_output_arg(block, arg_name): ...@@ -39,3 +39,136 @@ def find_op_by_output_arg(block, arg_name):
if arg_name in op.output_arg_names: if arg_name in op.output_arg_names:
return index return index
return -1 return -1
def get_indent_space(indent, space_num=4):
ret = ""
for i in range(0, indent * space_num):
ret += " "
return ret
def variable_to_code(var):
"""
Get readable codes of fluid variable.
Args:
var: A fluid operator.
Returns:
string: The formatted string.
"""
if var.type == core.VarDesc.VarType.SELECTED_ROWS or var.type == core.VarDesc.VarType.LOD_TENSOR:
var_str = "{name} : fluid.{type}.shape{shape}.astype({dtype})".\
format(i="{", e="}", name=var.name, type=var.type, shape=var.shape, dtype=var.dtype)
else:
var_str = "{name} : fluid.{type})".\
format(i="{", e="}", name=var.name, type=var.type)
if type(var) == paddle.fluid.framework.Parameter:
if var.trainable:
var_str = "trainable parameter " + var_str
else:
var_str = "parameter " + var_str
else:
var_str = "var " + var_str
if var.persistable:
var_str = "persist " + var_str
return var_str
def op_to_code(op):
"""
Get readable codes of fluid operator.
Args:
op: A fluid operator.
Returns:
string: The foramtted string.
"""
outputs_str = "{"
for i in range(0, len(op.output_names)):
outputs_str += "{name}=".format(name=op.output_names[i])
o = op.output(op.output_names[i])
outputs_str += "{value}".format(value=o)
if i != len(op.output_names) - 1:
outputs_str += ", "
outputs_str += "}"
inputs_str = "{"
for i in range(0, len(op.input_names)):
inputs_str += "{name}=".format(name=op.input_names[i])
o = op.input(op.input_names[i])
inputs_str += "{value}".format(value=o)
if i != len(op.input_names) - 1:
inputs_str += ", "
inputs_str += "}"
attrs_str = ""
for i in range(0, len(op.attr_names)):
name = op.attr_names[i]
attr_type = op.desc.attr_type(name)
if attr_type == core.AttrType.BLOCK:
a = "{name} = block[{value}]".format(
name=name, type=attr_type, value=op.block_attr_id(name))
attrs_str += a
continue
if attr_type == core.AttrType.BLOCKS:
a = "{name} = blocks{value}".format(
name=name, type=attr_type, value=op.blocks_attr_ids(name))
attrs_str += a
continue
a = "{name} = {value}".format(
name=name, type=attr_type, value=op.desc.attr(name))
attrs_str += a
if i != len(op.attr_names) - 1:
attrs_str += ", "
if outputs_str != "{}":
op_str = "{outputs} = {op_type}(inputs={inputs}, {attrs})".\
format(outputs = outputs_str, op_type=op.type, inputs=inputs_str, attrs=attrs_str)
else:
op_str = "{op_type}(inputs={inputs}, {attrs})".\
format(op_type=op.type, inputs=inputs_str, attrs=attrs_str)
return op_str
def block_to_code(block, block_idx):
indent = 0
print("{0}{1} // block {2}".format(
get_indent_space(indent), '{', block_idx))
indent += 1
# sort all vars
all_vars = sorted(block.vars.iteritems(), key=lambda x: x[0])
for var in all_vars:
print("{}{}".format(get_indent_space(indent), variable_to_code(var[1])))
if len(all_vars) > 0:
print("")
for op in block.ops:
print("{}{}".format(get_indent_space(indent), op_to_code(op)))
indent -= 1
print("{0}{1}".format(get_indent_space(indent), '}'))
def program_to_code(prog):
"""
Print readable codes of fluid program.
Args:
prog : A fluid program.
An example result like bellow:
https://github.com/PaddlePaddle/Paddle/pull/12673
"""
block_idx = 0
for block in prog.blocks:
block_to_code(block, block_idx)
block_idx += 1
...@@ -31,9 +31,10 @@ Steps to transpile pserver: ...@@ -31,9 +31,10 @@ Steps to transpile pserver:
""" """
import math import math
import random import sys
import numpy as np import numpy as np
import collections import collections
import random
from .ps_dispatcher import RoundRobin, HashName, PSDispatcher from .ps_dispatcher import RoundRobin, HashName, PSDispatcher
from .. import core, framework from .. import core, framework
...@@ -181,7 +182,8 @@ class DistributeTranspiler(object): ...@@ -181,7 +182,8 @@ class DistributeTranspiler(object):
program=None, program=None,
pservers="127.0.0.1:6174", pservers="127.0.0.1:6174",
trainers=1, trainers=1,
sync_mode=True): sync_mode=True,
startup_program=None):
""" """
Run the transpiler. Run the transpiler.
...@@ -194,13 +196,17 @@ class DistributeTranspiler(object): ...@@ -194,13 +196,17 @@ class DistributeTranspiler(object):
list. list.
trainers (int): number of trainers in the distributed job. trainers (int): number of trainers in the distributed job.
sync_mode (bool): Do sync training or not, default is True. sync_mode (bool): Do sync training or not, default is True.
startup_program (Program|None): startup_program to transpile,
default is fluid.default_main_program().
""" """
if program is None: if program is None:
program = default_main_program() program = default_main_program()
if startup_program is None:
startup_program = default_startup_program()
self.origin_program = program self.origin_program = program
self.origin_startup_program = default_startup_program().clone() self.startup_program = startup_program
self.origin_startup_program = self.startup_program.clone()
self.startup_program = default_startup_program()
self.trainer_num = trainers self.trainer_num = trainers
self.sync_mode = sync_mode self.sync_mode = sync_mode
self.trainer_id = trainer_id self.trainer_id = trainer_id
...@@ -260,6 +266,10 @@ class DistributeTranspiler(object): ...@@ -260,6 +266,10 @@ class DistributeTranspiler(object):
name=framework.generate_control_dev_var_name()) name=framework.generate_control_dev_var_name())
grad_name_to_send_dummy_out[grad_varname] = dummy_output grad_name_to_send_dummy_out[grad_varname] = dummy_output
# get send op_role_var, if not splited, the grad should have .trainer suffix
# if splited, grad should be the original grad var name (split_by_ref and send
# will be on the same place). ParallelExecutor
# will use op_role_var to get expected device place to run this op.
program.global_block()._insert_op( program.global_block()._insert_op(
index=index + 1, index=index + 1,
type="send", type="send",
...@@ -268,18 +278,23 @@ class DistributeTranspiler(object): ...@@ -268,18 +278,23 @@ class DistributeTranspiler(object):
attrs={ attrs={
"epmap": eplist, "epmap": eplist,
RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE, RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE,
OP_ROLE_VAR_ATTR_NAME: OP_ROLE_VAR_ATTR_NAME: [
[self.grad_name_to_param_name[grad_varname], grad_varname], self.grad_name_to_param_name[grad_varname],
splited_grad_varname
],
"sync_mode": not self.sync_mode, "sync_mode": not self.sync_mode,
}) })
for _, var in enumerate(splited_vars): for _, var in enumerate(splited_vars):
send_vars.append(var) send_vars.append(var)
if self.sync_mode: if self.sync_mode:
send_barrier_out = program.global_block().create_var(
name=framework.generate_control_dev_var_name())
input_deps = grad_name_to_send_dummy_out.values()
program.global_block().append_op( program.global_block().append_op(
type="send_barrier", type="send_barrier",
inputs={}, inputs={"X": input_deps},
outputs={}, outputs={"Out": send_barrier_out},
attrs={ attrs={
"endpoints": pserver_endpoints, "endpoints": pserver_endpoints,
RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE
...@@ -297,32 +312,46 @@ class DistributeTranspiler(object): ...@@ -297,32 +312,46 @@ class DistributeTranspiler(object):
self.param_grad_ep_mapping[ep]["grads"].append(send_vars[i]) self.param_grad_ep_mapping[ep]["grads"].append(send_vars[i])
# step4: Concat the parameters splits together after recv. # step4: Concat the parameters splits together after recv.
all_recv_outputs = []
for param_varname, splited_var in six.iteritems(self.param_var_mapping): for param_varname, splited_var in six.iteritems(self.param_var_mapping):
eps = [] eps = []
for var in splited_var: for var in splited_var:
index = [v.name for v in recv_vars].index(var.name) index = [v.name for v in recv_vars].index(var.name)
eps.append(eplist[index]) eps.append(eplist[index])
grad_send_dummy_out = grad_name_to_send_dummy_out[ if self.sync_mode:
self.param_name_to_grad_name[param_varname]] recv_dep_in = send_barrier_out
else:
# connect deps to send op in async mode
recv_dep_in = grad_name_to_send_dummy_out[
self.param_name_to_grad_name[param_varname]]
all_recv_outputs.extend(splited_var)
# get recv op_role_var, if not splited, the grad should have .trainer suffix
# if splited, grad should be the original grad var name. ParallelExecutor
# will use op_role_var to get expected device place to run this op.
orig_grad_name = self.param_name_to_grad_name[param_varname]
recv_op_role_var_name = orig_grad_name
splited_trainer_grad = self.grad_var_mapping[orig_grad_name]
if len(splited_trainer_grad) == 1:
recv_op_role_var_name = splited_trainer_grad[0].name
program.global_block().append_op( program.global_block().append_op(
type="recv", type="recv",
inputs={"X": [grad_send_dummy_out]}, inputs={"X": [recv_dep_in]},
outputs={"Out": splited_var}, outputs={"Out": splited_var},
attrs={ attrs={
"epmap": eps, "epmap": eps,
RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE, RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE,
OP_ROLE_VAR_ATTR_NAME: [ OP_ROLE_VAR_ATTR_NAME:
param_varname, [param_varname, recv_op_role_var_name],
self.param_name_to_grad_name[param_varname]
],
"sync_mode": not self.sync_mode "sync_mode": not self.sync_mode
}) })
if self.sync_mode: if self.sync_mode:
# form a WAW dependency
program.global_block().append_op( program.global_block().append_op(
type="fetch_barrier", type="fetch_barrier",
inputs={}, inputs={},
outputs={}, outputs={"Out": all_recv_outputs},
attrs={ attrs={
"endpoints": pserver_endpoints, "endpoints": pserver_endpoints,
RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE
...@@ -359,21 +388,18 @@ class DistributeTranspiler(object): ...@@ -359,21 +388,18 @@ class DistributeTranspiler(object):
return self.origin_program return self.origin_program
def _get_trainer_startup_program(self, def _get_trainer_startup_program(self, recv_vars, eplist):
recv_vars,
eplist,
startup_program=None):
""" """
Get transpiled trainer side startup program. Get transpiled trainer side startup program.
Args: Args:
startup_program(Program): Startup program. recv_vars (list): Variable list to recv for current trainer_id
eplist (list): A list of strings indicating
Returns: Returns:
Program: trainer side startup program. Program: trainer side startup program.
""" """
if startup_program is None: startup_program = self.startup_program
startup_program = self.startup_program
# FIXME(gongwb): delete not need ops. # FIXME(gongwb): delete not need ops.
# note that: some parameter is not trainable and those ops can't be deleted. # note that: some parameter is not trainable and those ops can't be deleted.
...@@ -406,10 +432,12 @@ class DistributeTranspiler(object): ...@@ -406,10 +432,12 @@ class DistributeTranspiler(object):
RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE
}) })
fetch_barrier_out = startup_program.global_block().create_var(
name=framework.generate_control_dev_var_name())
startup_program.global_block().append_op( startup_program.global_block().append_op(
type="fetch_barrier", type="fetch_barrier",
inputs={}, inputs={},
outputs={}, outputs={"Out": fetch_barrier_out},
attrs={ attrs={
"endpoints": self.pserver_endpoints, "endpoints": self.pserver_endpoints,
RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE
...@@ -419,7 +447,18 @@ class DistributeTranspiler(object): ...@@ -419,7 +447,18 @@ class DistributeTranspiler(object):
#add concat ops to merge splited parameters received from parameter servers. #add concat ops to merge splited parameters received from parameter servers.
if len(splited_var) <= 1: if len(splited_var) <= 1:
continue continue
orig_param = startup_program.global_block().vars[varname] # NOTE: if enable memory optimization, origin vars maybe removed.
if startup_program.global_block().vars.has_key(varname):
orig_param = startup_program.global_block().vars[varname]
else:
origin_param_var = self.origin_program.global_block().vars[
varname]
orig_param = startup_program.global_block().create_var(
name=varname,
persistable=origin_param_var.persistable,
type=origin_param_var.type,
dtype=origin_param_var.dtype,
shape=origin_param_var.shape)
startup_program.global_block().append_op( startup_program.global_block().append_op(
type="concat", type="concat",
inputs={"X": splited_var}, inputs={"X": splited_var},
...@@ -442,7 +481,9 @@ class DistributeTranspiler(object): ...@@ -442,7 +481,9 @@ class DistributeTranspiler(object):
# NOTE: assume blocks of the same variable is not distributed # NOTE: assume blocks of the same variable is not distributed
# on the same pserver, only change param/grad varnames for # on the same pserver, only change param/grad varnames for
# trainers to fetch. # trainers to fetch.
sys.stderr.write("get_pserver_program() is deprecated, call\
get_pserver_programs() to get pserver main and startup\
in a single call.")
# step1 # step1
pserver_program = Program() pserver_program = Program()
pserver_program.random_seed = self.origin_program.random_seed pserver_program.random_seed = self.origin_program.random_seed
...@@ -626,32 +667,58 @@ class DistributeTranspiler(object): ...@@ -626,32 +667,58 @@ class DistributeTranspiler(object):
attrs=attrs) attrs=attrs)
pserver_program._sync_with_cpp() pserver_program._sync_with_cpp()
# save pserver program to generate pserver side startup relatively.
self.pserver_program = pserver_program
return pserver_program return pserver_program
def get_pserver_programs(self, endpoint):
"""
Get pserver side main program and startup program for distributed training.
Args:
endpoint (str): current pserver endpoint.
Returns:
tuple: (main_program, startup_program), of type "Program"
"""
pserver_prog = self.get_pserver_program(endpoint)
pserver_startup = self.get_startup_program(endpoint)
return pserver_prog, pserver_startup
def get_startup_program(self, def get_startup_program(self,
endpoint, endpoint,
pserver_program, pserver_program=None,
startup_program=None): startup_program=None):
""" """
**Deprecated**
Get startup program for current parameter server. Get startup program for current parameter server.
Modify operator input variables if there are variables that Modify operator input variables if there are variables that
were split to several blocks. were split to several blocks.
Args: Args:
endpoint (str): current pserver endpoint. endpoint (str): current pserver endpoint.
pserver_program (Program): call get_pserver_program first and pserver_program (Program): deprecated, call get_pserver_program first.
pass the result here. startup_program (Program): deprecated, should pass startup_program
startup_program (Program): if pass None, will use when initalizing
default_startup_program
Returns: Returns:
Program: parameter server side startup program. Program: parameter server side startup program.
""" """
sys.stderr.write("get_startup_program() is deprecated, call\
get_pserver_programs() to get pserver main and startup\
in a single call.")
if pserver_program != None:
sys.stderr.write("passing pserver_program to get_startup_program()\
is deprecated, you can use new API get_pserver_programs() to\
get both pserver main program and startup program.")
if startup_program != None:
sys.stderr.write("passing startup_program to get_startup_program()\
is deprecated, use fluid.program_guard() or pass this argument\
to transpile() call.")
s_prog = Program() s_prog = Program()
if not startup_program: orig_s_prog = self.startup_program
orig_s_prog = default_startup_program()
else:
orig_s_prog = startup_program
s_prog.random_seed = orig_s_prog.random_seed s_prog.random_seed = orig_s_prog.random_seed
params = self.param_grad_ep_mapping[endpoint]["params"] params = self.param_grad_ep_mapping[endpoint]["params"]
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册