From 1cb963594736e01c3eab05a88bbf8cbd6d958b1a Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Wed, 27 Dec 2017 14:09:14 +0800 Subject: [PATCH] fix dist train trainspiler bugs --- paddle/operators/send_op.cc | 3 +++ .../paddle/v2/fluid/distribute_transpiler.py | 8 +++--- python/paddle/v2/fluid/framework.py | 2 +- .../notest_recognize_digits_conv_dist.py | 26 ++++++++++++------- 4 files changed, 26 insertions(+), 13 deletions(-) rename python/paddle/v2/fluid/tests/{book => book_distribute}/notest_recognize_digits_conv_dist.py (76%) diff --git a/paddle/operators/send_op.cc b/paddle/operators/send_op.cc index 6e8293868..317db0867 100644 --- a/paddle/operators/send_op.cc +++ b/paddle/operators/send_op.cc @@ -49,14 +49,17 @@ class SendOp : public framework::OperatorBase { std::vector epmap = Attr>("epmap"); // TODO(typhoonzero): use async calls to send multiple variable asyncly. for (size_t i = 0; i < ins.size(); ++i) { + VLOG(3) << "sending " << ins[i]; bool ret = client_map_[epmap[i]]->SendVariable(scope, ins[i]); if (!ret) { LOG(ERROR) << "send variable error: " << ins[i]; } } + VLOG(3) << "waiting batch "; // TODO(typhoonzero): support async optimization client_map_[epmap[0]]->Wait(); for (size_t i = 0; i < outs.size(); ++i) { + VLOG(3) << "getting " << outs[i]; bool ret = client_map_[epmap[i]]->GetVariable(scope, outs[i]); if (!ret) { LOG(ERROR) << "GetVariable error: " << outs[i]; diff --git a/python/paddle/v2/fluid/distribute_transpiler.py b/python/paddle/v2/fluid/distribute_transpiler.py index 111937f59..49ece7b72 100644 --- a/python/paddle/v2/fluid/distribute_transpiler.py +++ b/python/paddle/v2/fluid/distribute_transpiler.py @@ -95,7 +95,9 @@ class DistributeTranspiler: """ if program is None: program = default_main_program() + self.program = program self.trainers = trainers + self.optimize_ops = optimize_ops self._optimize_distributed( optimize_ops, program, @@ -156,9 +158,10 @@ class DistributeTranspiler: attrs={"endpoints": pserver_endpoints, "epmap": epmap}) - def get_trainer_program(optimize_ops, program): + def get_trainer_program(self): # remove optimize ops and add a send op to main_program - program.global_block().delete_ops(optimize_ops) + self.program.global_block().delete_ops(self.optimize_ops) + return self.program def _create_var_for_trainers(self, block, var, trainers): var_list = [] @@ -210,7 +213,6 @@ class DistributeTranspiler: if opt_op.inputs.has_key("Grad"): if opt_op.inputs["Grad"].name in grad_var_names: - print "appending ", opt_op.type, opt_op.inputs optimize_sub_program.global_block().append_op( type=opt_op.type, inputs=opt_op.inputs, diff --git a/python/paddle/v2/fluid/framework.py b/python/paddle/v2/fluid/framework.py index add854306..dbdf9a043 100644 --- a/python/paddle/v2/fluid/framework.py +++ b/python/paddle/v2/fluid/framework.py @@ -663,7 +663,7 @@ class Block(object): end = list(self.ops).index(ops[-1]) except Exception, e: raise e - self.desc.remove_op(start, end) + self.desc.remove_op(start, end + 1) def prepend_op(self, *args, **kwargs): op_desc = self.desc.prepend_op() diff --git a/python/paddle/v2/fluid/tests/book/notest_recognize_digits_conv_dist.py b/python/paddle/v2/fluid/tests/book_distribute/notest_recognize_digits_conv_dist.py similarity index 76% rename from python/paddle/v2/fluid/tests/book/notest_recognize_digits_conv_dist.py rename to python/paddle/v2/fluid/tests/book_distribute/notest_recognize_digits_conv_dist.py index 2680502ef..20b4a8b34 100644 --- a/python/paddle/v2/fluid/tests/book/notest_recognize_digits_conv_dist.py +++ b/python/paddle/v2/fluid/tests/book_distribute/notest_recognize_digits_conv_dist.py @@ -38,35 +38,43 @@ train_reader = paddle.batch( place = fluid.CPUPlace() exe = fluid.Executor(place) + t = fluid.DistributeTranspiler() +# all parameter server endpoints list for spliting parameters pserver_endpoints = os.getenv("PSERVERS") +# server endpoint for current node +current_endpoint = os.getenv("SERVER_ENDPOINT") +# run as trainer or parameter server training_role = os.getenv("TRAINING_ROLE", "TRAINER") # get the training role: trainer/pserver -t.transpile(optimize_ops, params_grads, pservers=pserver_endpoints, trainers=1) +t.transpile(optimize_ops, params_grads, pservers=pserver_endpoints, trainers=2) if training_role == "PSERVER": - pserver_prog = t.get_pserver_program(pserver_endpoints, optimize_ops) + if not current_endpoint: + print("need env SERVER_ENDPOINT") + exit(1) + pserver_prog = t.get_pserver_program(current_endpoint, optimize_ops) exe.run(fluid.default_startup_program()) exe.run(pserver_prog) elif training_role == "TRAINER": + trainer_prog = t.get_trainer_program() feeder = fluid.DataFeeder(feed_list=[images, label], place=place) exe.run(fluid.default_startup_program()) for pass_id in range(PASS_NUM): accuracy.reset(exe) + batch_id = 0 for data in train_reader(): - loss, acc = exe.run(fluid.default_main_program(), + loss, acc = exe.run(trainer_prog, feed=feeder.feed(data), fetch_list=[avg_cost] + accuracy.metrics) pass_acc = accuracy.eval(exe) - # print loss, acc - if loss < 10.0 and pass_acc > 0.9: - # if avg cost less than 10.0 and accuracy is larger than 0.9, we think our code is good. - exit(0) + if batch_id % 100 == 0: + print("batch_id %d, loss: %f, acc: %f" % + (batch_id, loss, pass_acc)) + batch_id += 1 pass_acc = accuracy.eval(exe) print("pass_id=" + str(pass_id) + " pass_acc=" + str(pass_acc)) else: print("environment var TRAINER_ROLE should be TRAINER os PSERVER") - -exit(1) -- GitLab