From b4cd7f3d758e4a1f9104861dfd910afdbbbb66fe Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Tue, 12 Dec 2017 21:07:53 +0800 Subject: [PATCH] wip need ut --- paddle/operators/detail/send_impl.cc | 1 + paddle/operators/recv_op.cc | 26 ++++--- paddle/operators/send_op.cc | 1 + paddle/pybind/protobuf.cc | 6 ++ python/paddle/v2/fluid/distribute_planner.py | 8 +-- python/paddle/v2/fluid/executor.py | 72 +++++++++++++------ python/paddle/v2/fluid/framework.py | 8 +++ .../book/test_recognize_digits_conv_dist.py | 3 +- 8 files changed, 87 insertions(+), 38 deletions(-) diff --git a/paddle/operators/detail/send_impl.cc b/paddle/operators/detail/send_impl.cc index da1ddf75d2a..2313255dcba 100644 --- a/paddle/operators/detail/send_impl.cc +++ b/paddle/operators/detail/send_impl.cc @@ -37,6 +37,7 @@ bool RPCClient::SendVariable(const framework::Scope& scope, msg.set_serialized(oss.str()); Status status = stub_->SendVariable(&context, msg, &out_msg); if (!status.ok()) { + LOG(ERROR) << "gRPC error: " << status.error_message(); return false; } std::istringstream iss(out_msg.serialized()); diff --git a/paddle/operators/recv_op.cc b/paddle/operators/recv_op.cc index b593c6e4f36..94cb39391f9 100644 --- a/paddle/operators/recv_op.cc +++ b/paddle/operators/recv_op.cc @@ -64,12 +64,12 @@ class RecvOp : public framework::OperatorBase { void Run(const framework::Scope &scope, const platform::DeviceContext &dev_ctx) const override { + // FIXME(typhoonzero): no new scopes for every run. framework::Scope &recv_scope = scope.NewScope(); // blocking get one var from client. const detail::TensorWithName &v = rpc_service_->Get(); auto grad_var_name = v.first; - // framework::Scope &recv_scope = scope.NewScope(); auto param_list = Attr>("ParamList"); auto grad_list = Attr>("GradList"); auto it = std::find(grad_list.begin(), grad_list.end(), grad_var_name); @@ -77,16 +77,23 @@ class RecvOp : public framework::OperatorBase { if (it != grad_list.end()) { param_var_name = param_list[it - grad_list.begin()]; } - // set graph input var - auto input_grad = Input("RX"); + // find input by "grad_var_name" + // auto inputs = Inputs("RX"); // FIXME(typhoonzero): Find the parameter name from input grad name // rename X -> Param // rename RX -> Grad - auto *var = recv_scope.FindVar(input_grad); + + LOG(ERROR) << "recved grad: " << grad_var_name + << " param: " << param_var_name; + auto *var = recv_scope.Var(grad_var_name); auto *tensor = var->GetMutable(); - recv_scope.Rename(param_var_name, "Param"); - recv_scope.Rename("RX", "Grad"); + + // Param is in parent scope, put it in current scope. + auto *param_var = recv_scope.FindVar(param_var_name); + auto param_scope = recv_scope.FindScope(param_var); + param_scope->Rename(param_var_name, "Param"); + recv_scope.Rename(grad_var_name, "Grad"); // FIXME(typhoonzero): do not copy framework::CopyFrom(v.second, dev_ctx.GetPlace(), dev_ctx, tensor); @@ -100,14 +107,14 @@ class RecvOp : public framework::OperatorBase { executor.Run(program, &recv_scope, 0, /*global_block*/ false /*create_local_scope*/); - auto *out_var = recv_scope.FindVar("Param"); + auto *out_var = recv_scope.FindVar("ParamOut"); detail::TensorWithName out; out.first = param_var_name; out.second = out_var->Get(); rpc_service_->Push(out); // rename back the params - recv_scope.Rename("Param", param_var_name); - recv_scope.Rename("Grad", "RX"); + param_scope.Rename("Param", param_var_name); + recv_scope.Rename("Grad", grad_var_name); } protected: @@ -117,7 +124,6 @@ class RecvOp : public framework::OperatorBase { // grpc send/recv service implement to register. std::shared_ptr rpc_service_; std::shared_ptr server_thread_; - framework::Scope const *recv_scope_{nullptr}; }; class RecvOpMaker : public framework::OpProtoAndCheckerMaker { diff --git a/paddle/operators/send_op.cc b/paddle/operators/send_op.cc index 7cbc45e69af..648905743c8 100644 --- a/paddle/operators/send_op.cc +++ b/paddle/operators/send_op.cc @@ -47,6 +47,7 @@ class SendOp : public framework::OperatorBase { // TODO(typhoonzero): currently it's non-blocking, // should block until server responds. for (auto in : ins) { + LOG(ERROR) << "sending grad: " << in; bool ret = client_->SendVariable(scope, in, in); if (!ret) { LOG(ERROR) << "send variable error"; diff --git a/paddle/pybind/protobuf.cc b/paddle/pybind/protobuf.cc index 6c8f06cccb9..6e6cafafb9c 100644 --- a/paddle/pybind/protobuf.cc +++ b/paddle/pybind/protobuf.cc @@ -250,6 +250,12 @@ void BindOpDesc(py::module &m) { .def("set_attr", &OpDescBind::SetAttr) .def("attr", &OpDescBind::GetAttr) .def("set_block_attr", &OpDescBind::SetBlockAttr) + .def("set_serialized_attr", + [](OpDescBind &self, const std::string &name, + const py::bytes &seriralized) { + std::string ser(seriralized); + self.SetAttr(name, ser); + }) .def("block_attr", &OpDescBind::GetBlockAttr) .def("check_attrs", &OpDescBind::CheckAttrs) .def("infer_shape", &OpDescBind::InferShape) diff --git a/python/paddle/v2/fluid/distribute_planner.py b/python/paddle/v2/fluid/distribute_planner.py index 3d8df4b3c86..c3430b3b68a 100644 --- a/python/paddle/v2/fluid/distribute_planner.py +++ b/python/paddle/v2/fluid/distribute_planner.py @@ -29,19 +29,19 @@ def hash_name_to_server(params_grads, pserver_endpoints): return param_grad_map -def round_robin(parameters, pserver_endpoints): - assert (len(parameters) > len(pserver_endpoints)) +def round_robin(params_grads, pserver_endpoints): + assert (len(params_grads) > len(pserver_endpoints)) param_grad_map = dict() pserver_idx = 0 - for param in parameters: + for param, grad in params_grads: if param.trainable is True: server_for_param = pserver_endpoints[pserver_idx] if not param_grad_map.has_key(server_for_param): param_grad_map[server_for_param] = {"params": [], "grads": []} param_grad_map[server_for_param]["params"].append(param) - param_grad_map[server_for_param]["grads"].append(param) + param_grad_map[server_for_param]["grads"].append(grad) pserver_idx += 1 if pserver_idx >= len(pserver_endpoints): diff --git a/python/paddle/v2/fluid/executor.py b/python/paddle/v2/fluid/executor.py index b6cfec3983c..ba699442ce6 100644 --- a/python/paddle/v2/fluid/executor.py +++ b/python/paddle/v2/fluid/executor.py @@ -70,6 +70,31 @@ class Executor(object): return self._optimize_distributed(optimize_ops, program, params_grads, **kwargs) + def _clone_param(self, block, v): + assert isinstance(v, Parameter) + new_p = Parameter( + block=block, + shape=v.shape, + dtype=v.dtype, + type=v.type, + lod_level=v.lod_level, + stop_gradient=v.stop_gradient, + trainable=v.trainable, + optimize_attr=v.optimize_attr, + regularizer=v.regularizer, + name=v.name) + block.vars[new_p.name] = new_p + + def _clone_var(self, block, var): + assert isinstance(var, Variable) + return block.create_var( + name=var.name, + shape=var.shape, + dtype=var.dtype, + type=var.type, + lod_level=var.lod_level, + persistable=True) + def _optimize_distributed(self, optimize_ops, program, params_and_grads, **kwargs): # remove optimize ops and add a send op to main_program @@ -84,8 +109,7 @@ class Executor(object): assert (callable(split_method)) pserver_endpoints = kwargs["pservers"].split(",") - params = program.global_block().all_parameters() - self.param_grad_map = split_method(params, pserver_endpoints) + self.param_grad_map = split_method(params_and_grads, pserver_endpoints) for ep in pserver_endpoints: # FIXME(typhoonzero): send to different servers can run in parrallel. @@ -95,27 +119,26 @@ class Executor(object): }, # inputs is a list of tensors to be send outputs={}, attrs={"endpoint": ep}) - # -------------- generate optimize sub program -------------- - self.optimize_sub_program = Program() - for opt_op in optimize_ops: - self.optimize_sub_program.global_block().ops.append(opt_op) - def get_pserver_program(self, endpoint): + def get_pserver_program(self, endpoint, optimize_ops): pserver_program = Program() for v in self.param_grad_map[endpoint]["params"]: - assert isinstance(v, Parameter) - new_p = Parameter( - block=pserver_program.global_block(), - shape=v.shape, - dtype=v.dtype, - type=v.type, - lod_level=v.lod_level, - stop_gradient=v.stop_gradient, - trainable=v.trainable, - optimize_attr=v.optimize_attr, - regularizer=v.regularizer, - name=v.name) - pserver_program.global_block().vars[new_p.name] = new_p + self._clone_param(pserver_program.global_block(), v) + + optimize_sub_program = Program() + for opt_op in optimize_ops: + for varname, var in opt_op.inputs.iteritems(): + optimize_sub_program.global_block().create_var( + name=var.name, + persistable=var.persistable, + dtype=var.dtype, + shape=var.shape) + optimize_sub_program.global_block().append_op( + type=opt_op.type, + inputs=opt_op.inputs, + outputs=opt_op.outputs, + attrs=opt_op.attrs) + print("optimize program: ", optimize_sub_program) pserver_program.global_block().append_op( type="recv", @@ -123,11 +146,14 @@ class Executor(object): self.param_grad_map[endpoint]["grads"]}, # grads to recv outputs={}, attrs={ - "OptimizeProgram": self.optimize_sub_program.to_string(True), + "OptimizeProgram": optimize_sub_program.desc, "endpoint": endpoint, - "ParamList": self.param_grad_map[endpoint]["params"], - "GradList": self.param_grad_map[endpoint]["grads"] + "ParamList": + [p.name for p in self.param_grad_map[endpoint]["params"]], + "GradList": + [p.name for p in self.param_grad_map[endpoint]["grads"]] }) + pserver_program.sync_with_cpp() return pserver_program def aslodtensor(self, data): diff --git a/python/paddle/v2/fluid/framework.py b/python/paddle/v2/fluid/framework.py index 18d414c579d..274565b28f3 100644 --- a/python/paddle/v2/fluid/framework.py +++ b/python/paddle/v2/fluid/framework.py @@ -227,6 +227,10 @@ class Operator(object): attrs=None): self.block = block self.desc = desc + # for clone a new operator + self.inputs = inputs + self.outputs = outputs + self.attrs = attrs if len(self.desc.type()) != 0: return if type is None: @@ -298,6 +302,10 @@ class Operator(object): continue if isinstance(attrs[attr_name], Block): self.desc.set_block_attr(attr_name, attrs[attr_name].desc) + elif isinstance(attrs[attr_name], core.BlockDesc) or \ + isinstance(attrs[attr_name], core.ProgramDesc): + self.desc.set_serialized_attr( + attr_name, attrs[attr_name].serialize_to_string()) else: self.desc.set_attr(attr_name, attrs[attr_name]) diff --git a/python/paddle/v2/fluid/tests/book/test_recognize_digits_conv_dist.py b/python/paddle/v2/fluid/tests/book/test_recognize_digits_conv_dist.py index 1add8e40206..208002c8d6c 100644 --- a/python/paddle/v2/fluid/tests/book/test_recognize_digits_conv_dist.py +++ b/python/paddle/v2/fluid/tests/book/test_recognize_digits_conv_dist.py @@ -43,10 +43,11 @@ exe.optimize(optimize_ops, params_grads, pservers="127.0.0.1:6174", trainers=1) pserver_endpoint = os.getenv("PSERVER") if pserver_endpoint: - pserver_prog = exe.get_pserver_program(pserver_endpoint) + pserver_prog = exe.get_pserver_program(pserver_endpoint, optimize_ops) exe.run(fluid.default_startup_program()) while True: exe.run(pserver_prog) + print("Run pserver once end...") else: feeder = fluid.DataFeeder(feed_list=[images, label], place=place) exe.run(fluid.default_startup_program()) -- GitLab