diff --git a/paddle/operators/recv_op.cc b/paddle/operators/recv_op.cc index c69e416e10f2a9ced1f1b22c39235e4c9338e77c..45222f6b7685518c30db6c21c8d549e21fd6916e 100644 --- a/paddle/operators/recv_op.cc +++ b/paddle/operators/recv_op.cc @@ -72,8 +72,10 @@ class RecvOp : public framework::OperatorBase { // FIXME(typhoonzero): do not copy framework::CopyFrom(t, dev_ctx.GetPlace(), dev_ctx, tensor); - auto *block = Attr("OptimizeBlock"); - auto *program = block->Program(); + std::string program_str = Attr("OptimizeProgram"); + framework::Program program_desc; + program_desc.ParseFromString(program_str); + framework::ProgramDescBind program(program_desc); framework::Executor executor(dev_ctx); // Run sub graph to get optimized tensor executor.Run(*program, &recv_scope, block->ID(), @@ -108,8 +110,9 @@ This operator will recv tensor from send_op "IP address to listen on.") .SetDefault("127.0.0.1:6164") .AddCustomChecker([](const std::string &ip) { return !ip.empty(); }); - AddAttr("OptimizeBlock", "type BlockDescBind*", - "optimize network run in server"); + AddAttr( + "OptimizeProgram", "type string", + "Serialized ProgramDesc string for recv to run."); } }; diff --git a/paddle/operators/send_recv_op_test.cc b/paddle/operators/send_recv_op_test.cc index ac03eb3752e7cd31dd80f4caa39dc0625f0409d5..c35dc8fa5087314ea12c35f2d437025870ae2c2a 100644 --- a/paddle/operators/send_recv_op_test.cc +++ b/paddle/operators/send_recv_op_test.cc @@ -85,7 +85,7 @@ void StartServerNet() { paddle::framework::AttributeMap attrs; attrs.insert({"endpoint", std::string("127.0.0.1:6174")}); - attrs.insert({"OptimizeBlock", block}); + attrs.insert({"OptimizeProgram", program.Proto()->SerializeToString()}); recv_op = paddle::framework::OpRegistry::CreateOp("recv", {{"RX", {"RX"}}}, {{"Out", {"Out"}}}, attrs); paddle::platform::CPUDeviceContext ctx(place); diff --git a/python/paddle/v2/fluid/distribute_planner.py b/python/paddle/v2/fluid/distribute_planner.py index 2eb32b5227e0177ec335a2647edede18b6b71096..39e9e3d9db2b3e63ba4d5af3c6ee6b2ff3476ab5 100644 --- a/python/paddle/v2/fluid/distribute_planner.py +++ b/python/paddle/v2/fluid/distribute_planner.py @@ -4,172 +4,46 @@ from regularizer import append_regularization_ops import optimizer from layer_helper import LayerHelper -__all__ = ['SGD', 'Momentum', 'Adagrad', 'Adam', 'Adamax', 'DecayedAdagrad'] +def hash_name_to_server(params_grads, pserver_endpoints): + """ + :param param_grads: + :return: a map of pserver endpoint -> + params -> [param list] + grads -> [grad list] + """ -def hash_name_to_server(parameters, pserver_endpoints): def _hash_param(param_name, total): return hash(param_name) % total - param_map = dict() - for param in parameters: - if param.trainable is True: + param_grad_map = dict() + for param, grad in params_grads: + if param.trainable is True and grad is not None: server_id = _hash_param(param.name, len(pserver_endpoints)) server_for_param = pserver_endpoints[server_id] - if param_map.has_key(server_for_param): - param_map[server_for_param].append(param) - else: - param_map[server_for_param] = [param] + if not param_grad_map.has_key(server_for_param): + param_grad_map[server_for_param] = {"params": [], "grads": []} + param_grad_map[server_for_param]["params"].append(param) + param_grad_map[server_for_param]["grads"].append(grad) - return param_map + return param_grad_map def round_robin(parameters, pserver_endpoints): assert (len(parameters) < len(pserver_endpoints)) - param_map = dict() + param_grad_map = dict() pserver_idx = 0 for param in parameters: if param.trainable is True: server_for_param = pserver_endpoints[pserver_idx] - if param_map.has_key(server_for_param): - param_map[server_for_param].append(param) - else: - param_map[server_for_param] = [param] + if not param_grad_map.has_key(server_for_param): + param_grad_map[server_for_param] = {"params": [], "grads": []} + + param_grad_map[server_for_param]["params"].append(param) + param_grad_map[server_for_param]["grads"].append(param) pserver_idx += 1 if pserver_idx > len(pserver_endpoints): pserver_idx = 0 - return param_map - - -def _append_sendop_for_trainer(loss, - parameters_and_grads, - pserver_endpoints, - split_method=round_robin): - assert (callable(split_method)) - param_map, grad_map = \ - split_method(parameters_and_grads, pserver_endpoints) - - for ep in pserver_endpoints: - # FIXME(typhoonzero): send to different servers can run in parrallel. - send_op = loss.block.append_op( - type="send", - inputs={"X": param_map[ep]}, - outputs={"Out": param_map[ep]}, - attrs={"endpoint": ep}) - - return send_op - - -class DistributedPlanner(optimizer.Optimizer): - def __init__(self, global_step=None, parallelism_type='dp'): - """ - parallelism_type: - dp: data parallelism - mp: model parallelism - """ - super(DistributedPlanner).__init__(self, global_step) - if parallelism_type == "mp": - raise NotImplementedError("model parallelism not implemented") - elif parallelism_type == "dp": - self.parameter_server_program_map = dict() - self.worker_program = None - else: - raise NameError("parallelism_type %s not supported" % - parallelism_type) - - def create_optimization_pass(self, - parameters_and_grads, - program, - startup_program=None): - # Create any accumulators - self.helper = LayerHelper( - self.__class__.__name__, - main_program=program, - startup_program=startup_program) - self._create_accumulators(program.global_block(), - [p[0] for p in parameters_and_grads]) - - optimize_ops = [] - for param_and_grad in parameters_and_grads: - if param_and_grad[0].trainable is True and param_and_grad[ - 1] is not None: - optimize_op = self._append_optimize_op(program.global_block(), - param_and_grad) - optimize_ops.append(optimize_op) - - # Returned list of ops can include more ops in addition - # to optimization ops - return_ops = optimize_ops - - # Get custom finish ops for subclasses - # FIXME: Need to fix this once we figure out how to handle dependencies - finish_ops = self._finish_update(program.global_block()) - if finish_ops is not None: - return_ops += finish_ops - - if self._global_step is not None: - return_ops.append( - self._increment_global_step(program.global_block())) - return return_ops - - def minimize(self, - loss, - startup_program=None, - parameter_list=None, - no_grad_set=None, - split_method=round_robin): - """ - For distributed case, this call append backward ops and then - append sevaral send_ops at the end for each parameter server. - - Then call get_pserver_program(idx/endpoint) will return the program of - coresponding pserver program to run. - """ - params_grads = append_backward_ops(loss, parameter_list, no_grad_set) - # Add regularization if any - params_grads = append_regularization_ops(params_grads) - _append_sendop_for_trainer(loss, params_grads, self.pserver_endpoints, - split_method) - self.worker_program = loss.block.program - - optimize_sub_program = framework.Program() - optimize_ops = self.create_optimization_pass( - params_grads, optimize_sub_program, startup_program) - param_list = [] - for param_and_grad in params_grads: - if param_and_grad[0].trainable is True and param_and_grad[ - 1] is not None: - param_list.append(param_and_grad[0]) - - param_map, grad_map = \ - split_method(params_grads, self.pserver_endpoints) - - for ep in self.pserver_endpoints: - pserver_program = framework.Program() - self.parameter_server_program_map[ep] = pserver_program - pserver_program.global_block().append_op( - type="recv", - inputs={"RX": param_map[ep]}, - outputs={}, - attrs={ - "OptimizeBlock": optimize_sub_program.global_block(), - "endpoint": ep - }) - # FIXME(typhoonzero): when to use this return value? - return None - - def get_pserver_program(self, endpoint): - return self.parameter_server_program_map.get(endpoint) - - -SGD = optimizer.SGDOptimizer -Momentum = optimizer.MomentumOptimizer -Adagrad = optimizer.AdagradOptimizer -Adam = optimizer.AdamOptimizer -Adamax = optimizer.AdamaxOptimizer -DecayedAdagrad = optimizer.DecayedAdagradOptimizer - -for optcls in __all__: - eval(optcls).__base__ = DistributedPlanner + return param_grad_map diff --git a/python/paddle/v2/fluid/executor.py b/python/paddle/v2/fluid/executor.py index 4a03e55ee07688ccd680f0ac57cc7e334a1d73d8..ee7497e305ca860d1739bc5195bf2593ec5d1c4d 100644 --- a/python/paddle/v2/fluid/executor.py +++ b/python/paddle/v2/fluid/executor.py @@ -69,7 +69,8 @@ class Executor(object): if kwargs.has_key("pservers"): return self._optimize_distributed(optimize_ops, program, **kwargs) - def _optimize_distributed(self, optimize_ops, program, **kwargs): + def _optimize_distributed(self, optimize_ops, program, params_and_grads, + **kwargs): # remove optimize ops and add a send op to main_program # FIXME(typhoonzero): delete_op only remove the first accurence, # need to consider about multiple same optimize op? @@ -83,43 +84,36 @@ class Executor(object): assert (callable(split_method)) pserver_endpoints = kwargs["pservers"].split(",") params = program.global_block().all_parameters() - param_map = split_method(params, pserver_endpoints) + self.param_grad_map = split_method(params, pserver_endpoints) for ep in pserver_endpoints: # FIXME(typhoonzero): send to different servers can run in parrallel. send_op = program.global_block().append_op( type="send", - inputs={"X": param_map[ep] + inputs={"X": self.param_grad_map[ep]["params"] }, # inputs is a list of tensors to be send - outputs={"Out": param_map[ep]}, + outputs={"Out": self.param_grad_map[ep]["params"]}, attrs={"endpoint": ep}) - # -------------- generate pserver program -------------- - self.parameter_server_program_map = dict() - - optimize_sub_program = Program() - optimize_ops = self.create_optimization_pass( - params_grads, optimize_sub_program, startup_program) - param_list = [] - for param in params: - if param.trainable is True: - param_list.append(param) - - param_map = split_method(params, pserver_endpoints) - - for ep in pserver_endpoints: - pserver_program = Program() - self.parameter_server_program_map[ep] = pserver_program - pserver_program.global_block().append_op( - type="recv", - inputs={"RX": param_map[ep]}, # grads to recv - outputs={}, - attrs={ - "OptimizeBlock": optimize_sub_program.global_block(), - "endpoint": ep - }) + # -------------- generate optimize sub program -------------- + self.optimize_sub_program = Program() + for opt_op in optimize_ops: + self.optimize_sub_program.global_block().ops.append(opt_op) def get_pserver_program(self, endpoint): - pass + pserver_program = Program() + + for param in self.param_grad_map[endpoint]["params"]: + pserver_program.global_block().create_parameter(**param.__dict__) + + pserver_program.global_block().append_op( + type="recv", + inputs={"RX": + self.param_grad_map[endpoint]["grads"]}, # grads to recv + outputs={}, + attrs={ + "OptimizeProgram": self.optimize_sub_program.to_string(), + "endpoint": endpoint + }) def get_trainer_program(self): return default_main_program() diff --git a/python/paddle/v2/fluid/tests/book/test_recognize_digits_conv_dist.py b/python/paddle/v2/fluid/tests/book/test_recognize_digits_conv_dist.py index 35bf8da924dc76475df9bd5e6a4c04f4d204426a..b856526114f1053fbcb4653ddcd4e13f3fb30a7a 100644 --- a/python/paddle/v2/fluid/tests/book/test_recognize_digits_conv_dist.py +++ b/python/paddle/v2/fluid/tests/book/test_recognize_digits_conv_dist.py @@ -37,24 +37,33 @@ train_reader = paddle.batch( place = fluid.CPUPlace() exe = fluid.Executor(place) -feeder = fluid.DataFeeder(feed_list=[images, label], place=place) -exe.run(fluid.default_startup_program()) - -for pass_id in range(PASS_NUM): - accuracy.reset(exe) - for data in train_reader(): - loss, acc = exe.run(fluid.default_main_program(), - feed=feeder.feed(data), - fetch_list=[avg_cost] + accuracy.metrics) + +exe.optimize(pservers="127.0.0.1:6174", trainers=1) + +pserver_endpoint = os.getenv("PSERVER") +if is_pserver: + pserver_prog = exe.get_pserver_program(pserver_endpoint) + exe.run(fluid.default_startup_program()) + exe.run(pserver_prog) +else: + feeder = fluid.DataFeeder(feed_list=[images, label], place=place) + exe.run(fluid.default_startup_program()) + + for pass_id in range(PASS_NUM): + accuracy.reset(exe) + for data in train_reader(): + loss, acc = exe.run(fluid.default_main_program(), + feed=feeder.feed(data), + fetch_list=[avg_cost] + accuracy.metrics) + pass_acc = accuracy.eval(exe) + print("pass_id=" + str(pass_id) + " acc=" + str(acc) + " pass_acc=" + + str(pass_acc)) + # print loss, acc + if loss < 10.0 and pass_acc > 0.9: + # if avg cost less than 10.0 and accuracy is larger than 0.9, we think our code is good. + exit(0) + pass_acc = accuracy.eval(exe) - print("pass_id=" + str(pass_id) + " acc=" + str(acc) + " pass_acc=" + - str(pass_acc)) - # print loss, acc - if loss < 10.0 and pass_acc > 0.9: - # if avg cost less than 10.0 and accuracy is larger than 0.9, we think our code is good. - exit(0) - - pass_acc = accuracy.eval(exe) - print("pass_id=" + str(pass_id) + " pass_acc=" + str(pass_acc)) + print("pass_id=" + str(pass_id) + " pass_acc=" + str(pass_acc)) exit(1)