diff --git a/paddle/fluid/distributed/ps/service/communicator/communicator.cc b/paddle/fluid/distributed/ps/service/communicator/communicator.cc index 414bc560772025d429842252ee7ff417707f0a42..80873ed14dd7df685fe61a8aed1d412be2bf7fa9 100644 --- a/paddle/fluid/distributed/ps/service/communicator/communicator.cc +++ b/paddle/fluid/distributed/ps/service/communicator/communicator.cc @@ -167,17 +167,17 @@ void Communicator::RpcSendDenseParam(const std::vector &varnames, framework::TensorCopy(*tensor, platform::CPUPlace(), temp_tensor); paddle::distributed::Region reg(temp_data, tensor->numel()); regions.emplace_back(std::move(reg)); - VLOG(1) << "AsyncCommunicator::RpcSendDenseParam Var " << t - << " table_id " << table_id << " Temp_data[0] " << temp_data[0] - << " Temp_data[-1] " << temp_data[tensor->numel() - 1]; + VLOG(1) << "rpc_send_dense_param Var " << t << " table_id " << table_id + << " Temp_data[0] " << temp_data[0] << " Temp_data[-1] " + << temp_data[tensor->numel() - 1]; #endif } else { float *w = tensor->mutable_data(place); paddle::distributed::Region reg(w, tensor->numel()); - regions.emplace_back(std::move(reg)); - VLOG(1) << "AsyncCommunicator::RpcSendDenseParam Var " << t - << " talbe_id " << table_id << " Temp_data[0] " << w[0] - << " Temp_data[-1] " << w[tensor->numel() - 1]; + regions.emplace_back(reg); + VLOG(1) << "rpc_send_dense_param Var " << t << " talbe_id " << table_id + << " Temp_data[0] " << w[0] << " Temp_data[-1] " + << w[tensor->numel() - 1]; } } auto status = @@ -1070,27 +1070,31 @@ void GeoCommunicator::InitImpl(const RpcCtxMap &send_varname_to_ctx, const RecvCtxMap &recv_varname_to_ctx, Scope *recv_scope) { send_varname_to_ctx_ = std::move(send_varname_to_ctx); - recv_varname_to_ctx_ = std::move(recv_varname_to_ctx); + recv_varname_to_ctx_ = std::move( + recv_varname_to_ctx); // dense_map - key: table_id, value: params recv_scope_ = std::move(recv_scope); - PADDLE_ENFORCE_GT( - send_varname_to_ctx.size(), - 0, - platform::errors::InvalidArgument("send var contexts can not be zero")); - - for (auto &iter : send_varname_to_ctx_) { - auto &ctx = iter.second; + for (auto it = send_varname_to_ctx_.begin(); + it != send_varname_to_ctx_.end();) { + auto &ctx = it->second; if (!ctx.is_sparse) { parallel_task_nums_ += 1; + it++; continue; } auto &varnames = ctx.origin_varnames; - PADDLE_ENFORCE_EQ( - varnames.size(), - 1, - platform::errors::InvalidArgument( - "sparse variables can only be merged by one variables")); - for (auto &splited_var : ctx.splited_varnames) { + if (varnames.empty()) { + VLOG(0) << "ERROR! sparse variables num can not be zero"; + } + auto &varname = varnames[0]; // embedding_0.w_0@GRAD + auto &ids = ctx.remote_sparse_ids; + if (!ids.empty()) { + it = send_varname_to_ctx_.erase(it); + continue; + } else { + it++; + } + for (auto &splited_var : ctx.splited_varnames) { // embedding_0.w_0.block0 parallel_task_nums_ += 1; sparse_id_queues_.insert( std::pair>>(send_queue_size_))); } } - - send_threadpool_.reset(new ::ThreadPool(thread_pool_size_)); - - delta_scope_.reset(new Scope()); - old_scope_.reset(new Scope()); - pserver_scope_.reset(new Scope()); + send_threadpool_ = std::make_unique(thread_pool_size_); + delta_scope_ = std::make_shared(); + old_scope_ = std::make_shared(); + pserver_scope_ = std::make_shared(); + return; } void GeoCommunicator::InitParams(const RecvCtxMap &recv_varname_to_ctx) { @@ -1116,10 +1119,12 @@ void GeoCommunicator::InitParams(const RecvCtxMap &recv_varname_to_ctx) { for (auto &iter : recv_varname_to_ctx_) { auto &table_id = iter.first; auto &varnames = iter.second; - auto recv_task = [this, &table_id, &varnames] { InitDense(varnames, table_id); }; + if (send_threadpool_ == nullptr) { + VLOG(0) << "ERROR! send_threadpool_ is nullptr"; + } tasks.emplace_back(send_threadpool_->enqueue(std::move(recv_task))); } @@ -1129,10 +1134,13 @@ void GeoCommunicator::InitParams(const RecvCtxMap &recv_varname_to_ctx) { for (auto &iter : send_varname_to_ctx_) { auto &ctx = iter.second; - if (!ctx.is_sparse) continue; + if (!ctx.is_sparse) { + continue; + } auto &varname = ctx.origin_varnames[0]; auto &table_id = ctx.table_id; auto param = varname.substr(0, varname.size() - 5); + VLOG(0) << "InitSparse: " << param << ", " << table_id; InitSparse(param, table_id); } return; @@ -1140,6 +1148,7 @@ void GeoCommunicator::InitParams(const RecvCtxMap &recv_varname_to_ctx) { void GeoCommunicator::InitDense(std::vector &varnames, int table_id) { + VLOG(1) << "init dense table " << table_id << " begin"; if (trainer_id_ == 0) { RpcSendDenseParam(varnames, table_id, *recv_scope_); BarrierWithTable(1); @@ -1223,7 +1232,8 @@ void GeoCommunicator::RecvDense(const CommContext &send_ctx) { // 1. recv from pserver RpcRecvDense(varnames, table_id, pserver_scope_.get()); - // 2.1 pserver - old => delta; 2.2 latest + delta => latest 2.3 old => pserver + // 2.1 pserver - old => delta; 2.2 latest + delta => latest 2.3 old => + // pserver phi::CPUContext cpu_ctx; for (auto &varname : varnames) { auto *var_latest = recv_scope_->FindVar(varname); @@ -1505,8 +1515,8 @@ void FLCommunicator::InitBrpcClient( if (_worker_ptr.get() == nullptr) { VLOG(0) << "fl-ps > FLCommunicator::InitBrpcClient get _worker_ptr"; _worker_ptr = - fleet->worker_ptr_; // FleetWrapper::InitWorker must be excuted before, - // but no need for Coordinator + fleet->worker_ptr_; // FleetWrapper::InitWorker must be excuted + // before, but no need for Coordinator } if (coordinator_client_ptr_ == nullptr) { coordinator_client_ptr_.reset(new CoordinatorClient); diff --git a/paddle/fluid/distributed/ps/service/communicator/communicator.h b/paddle/fluid/distributed/ps/service/communicator/communicator.h old mode 100755 new mode 100644 index 5af035d5dcf0ee18b57b28f510e56fdf85970310..b9cee1d898c8ad5383b2cd0229da7213b55ab66a --- a/paddle/fluid/distributed/ps/service/communicator/communicator.h +++ b/paddle/fluid/distributed/ps/service/communicator/communicator.h @@ -370,7 +370,7 @@ class Communicator { return communicator_.get(); } - // Init is called by InitInstance. + // called by InitInstance. template static void InitWithRpcCtx(const RpcCtxMap &send_ctx, const RecvCtxMap &recv_ctx, @@ -378,6 +378,7 @@ class Communicator { const std::vector &host_sign_list, Scope *recv_scope, const std::map &envs) { + VLOG(0) << "Communicator type is: " << typeid(T).name(); if (communicator_.get() == nullptr) { communicator_.reset(new T(std::ref(envs))); communicator_->InitEnvs(); @@ -601,10 +602,6 @@ class GeoCommunicator : public AsyncCommunicator { explicit GeoCommunicator(const std::map &envs) : AsyncCommunicator(envs) {} - void InitImpl(const RpcCtxMap &send_varname_to_ctx, - const RecvCtxMap &recv_varname_to_ctx, - Scope *recv_scope) override; - void InitParams(const RecvCtxMap &recv_varname_to_ctx) override; void InitDense(std::vector &varnames, int table_id); // NOLINT void InitSparse(const std::string &var_name, int table_id); @@ -621,7 +618,7 @@ class GeoCommunicator : public AsyncCommunicator { void MainThread() override; - void InitEnvs() { + virtual void InitEnvs() { independent_recv_ = false; min_send_grad_num_before_recv_ = 0; send_wait_times_ = std::stoi(envs.at("communicator_send_wait_times")); @@ -632,6 +629,10 @@ class GeoCommunicator : public AsyncCommunicator { VLOG(1) << "GeoCommunicator Initialized"; } + void InitImpl(const RpcCtxMap &send_varname_to_ctx, + const RecvCtxMap &recv_varname_to_ctx, + Scope *recv_scope) override; + void Send(const std::vector &var_names, const framework::Scope &scope) override; @@ -651,7 +652,7 @@ class GeoCommunicator : public AsyncCommunicator { return param_name; } - private: + public: // parameter for delta calc and send std::shared_ptr delta_scope_; // parameter for storage the pserver param after last recv @@ -684,7 +685,7 @@ class FLCommunicator : public GeoCommunicator { void InitImpl(const RpcCtxMap &send_varname_to_ctx, const RecvCtxMap &recv_varname_to_ctx, - Scope *recv_scope) override {} + Scope *recv_scope) {} void StartCoordinatorClient( const std::vector &trainer_endpoints); diff --git a/paddle/fluid/distributed/ps/service/communicator/communicator_common.h b/paddle/fluid/distributed/ps/service/communicator/communicator_common.h index 4f5fa6fc71bdeae984f135453966d4cb7b858ad7..7050ee20c748f7f2bc028e0e4e8440f26f173601 100644 --- a/paddle/fluid/distributed/ps/service/communicator/communicator_common.h +++ b/paddle/fluid/distributed/ps/service/communicator/communicator_common.h @@ -14,6 +14,7 @@ limitations under the License. */ #pragma once +#include #include #include #include @@ -30,27 +31,29 @@ struct CommContext { const std::vector &emap, const std::vector §ions, const std::vector &origin_names, - int id, - bool merge_add_ = true, - bool is_sparse_ = true, - bool is_distributed_ = false, - int table_id_ = -1, - bool is_tensor_table_ = false, - bool is_datanorm_table_ = false, - int64_t program_id_ = -1) + int trainer_id, + bool merge_add = true, + bool is_sparse = true, + bool is_distributed = false, + int table_id = -1, + bool is_tensor_table = false, + bool is_datanorm_table = false, + int64_t program_id = -1, + const std::vector &remote_sparse_ids = {}) : var_name(name), splited_varnames(names), epmap(emap), height_sections(sections), origin_varnames(origin_names), - trainer_id(id), - merge_add(merge_add_), - is_sparse(is_sparse_), - is_distributed(is_distributed_), - table_id(table_id_), - program_id(program_id_), - is_tensor_table(is_tensor_table_), - is_datanorm_table(is_datanorm_table_) {} + trainer_id(trainer_id), + merge_add(merge_add), + is_sparse(is_sparse), + is_distributed(is_distributed), + table_id(table_id), + program_id(program_id), + is_tensor_table(is_tensor_table), + is_datanorm_table(is_datanorm_table), + remote_sparse_ids(remote_sparse_ids) {} CommContext(const CommContext &ctx) { var_name = ctx.var_name; @@ -66,6 +69,7 @@ struct CommContext { program_id = ctx.program_id; is_tensor_table = ctx.is_tensor_table; is_datanorm_table = ctx.is_datanorm_table; + remote_sparse_ids = ctx.remote_sparse_ids; } std::string print() const { @@ -74,6 +78,11 @@ struct CommContext { ss << "varname: " << var_name << " trainer_id: " << trainer_id << " "; ss << " table_id: " << table_id; + std::for_each( + remote_sparse_ids.begin(), remote_sparse_ids.end(), [&](const int &i) { + ss << "remote_sparse_id: " << i << " "; + }); + for (size_t i = 0; i < splited_varnames.size(); i++) { ss << "slice varname: " << splited_varnames[i] << " ep: " << epmap[i] << " section: " << height_sections[i] << " "; @@ -108,6 +117,7 @@ struct CommContext { int64_t program_id; bool is_tensor_table; bool is_datanorm_table; + std::vector remote_sparse_ids; }; } // namespace distributed diff --git a/paddle/fluid/framework/distributed_strategy.proto b/paddle/fluid/framework/distributed_strategy.proto index ac9220d083dae359381610794c8b871e19727f56..04a8824ce90a755c7b5f52c8a66b7c96204e0b94 100755 --- a/paddle/fluid/framework/distributed_strategy.proto +++ b/paddle/fluid/framework/distributed_strategy.proto @@ -168,6 +168,8 @@ message TrainerDescConfig { repeated string stat_var_names = 4; optional string trainer = 5; optional string device_worker = 6; + repeated string local_sparse = 7; + repeated string remote_sparse = 8; } message PipelineConfig { diff --git a/paddle/fluid/pybind/fleet_py.cc b/paddle/fluid/pybind/fleet_py.cc old mode 100755 new mode 100644 index b11f5832d8c8a6519518d0f0af80dc72fa9e5207..ff83c7a23bb1a228d107ee81669c4e14637e80e2 --- a/paddle/fluid/pybind/fleet_py.cc +++ b/paddle/fluid/pybind/fleet_py.cc @@ -107,8 +107,11 @@ void BindCommunicatorContext(py::module* m) { int, bool, bool, - int64_t>()) + int64_t, + const std::vector&>()) .def("var_name", [](const CommContext& self) { return self.var_name; }) + .def("remote_sparse_ids", + [](const CommContext& self) { return self.remote_sparse_ids; }) .def("trainer_id", [](const CommContext& self) { return self.trainer_id; }) .def("table_id", [](const CommContext& self) { return self.table_id; }) diff --git a/python/paddle/distributed/fleet/meta_optimizers/ps_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/ps_optimizer.py index fb1149dcba3bd4e7196e3a1e91e891a69d0dc05d..f274743d5d807cfc226c87d4cef4a6b163f998ba 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/ps_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/ps_optimizer.py @@ -77,9 +77,15 @@ class ParameterServerOptimizer(MetaOptimizerBase): "use_ps_gpu"] attrs['lr_decay_steps'] = self.user_defined_strategy.a_sync_configs[ "lr_decay_steps"] + # FL + attrs['local_sparse'] = attrs[ + "user_defined_strategy"].trainer_desc_configs["local_sparse"] + attrs['remote_sparse'] = attrs[ + "user_defined_strategy"].trainer_desc_configs["remote_sparse"] attrs['is_fl_ps_mode'] = self.user_defined_strategy.is_fl_ps_mode attrs[ 'with_coordinator'] = self.user_defined_strategy.is_with_coordinator + attrs['k_steps'] = self.user_defined_strategy.a_sync_configs["k_steps"] attrs['launch_barrier'] = self.user_defined_strategy.a_sync_configs[ "launch_barrier"] diff --git a/python/paddle/distributed/passes/ps_trainer_pass.py b/python/paddle/distributed/passes/ps_trainer_pass.py index 422e7cc88642b145fdc5f63c96b05c7ac4a2a5b0..cdb377a72be02ded3f36b5b2140821e100bc9412 100755 --- a/python/paddle/distributed/passes/ps_trainer_pass.py +++ b/python/paddle/distributed/passes/ps_trainer_pass.py @@ -81,19 +81,22 @@ class AppendSendOpsPass(PassBase): # 该 pass 被多种模式复用 def _apply_single_impl(self, main_program, startup_program, pass_ctx): attrs = pass_ctx._attrs ps_mode = attrs['ps_mode'] - if ps_mode == DistributedMode.GEO: - send_ctx = get_geo_trainer_send_context(attrs) # geo 模式 - elif attrs['is_heter_ps_mode'] == True: - print("is_heter_ps_mode in append_send_ops_pass!!") - send_ctx = get_the_one_send_context(attrs, split_dense_table=True) - else: - send_ctx = get_the_one_send_context(attrs) # async、sync 等各种模式 + #if ps_mode == DistributedMode.GEO: + # send_ctx = get_geo_trainer_send_context(attrs) # geo 模式, 没必要 + send_ctx = get_the_one_send_context( + attrs, + split_dense_table=attrs['is_heter_ps_mode']) # async、sync 等各种模式 + dummys = [] - for merged_name, send in send_ctx.items(): + for merged_name, send in send_ctx.items(): # embedding_0.w_0@GRAD if send.is_sparse() and ps_mode != DistributedMode.GEO: continue + if (not send.is_sparse()) and ps_mode == DistributedMode.GEO: + continue if send.program_id() != id(attrs['loss'].block.program): continue + if len(send.remote_sparse_ids()) > 0: + continue is_sparse = 1 if send.is_sparse() else 0 is_sparse = 2 if send.is_distributed() else is_sparse dummys.append( @@ -470,6 +473,8 @@ class DistributedOpsPass(PassBase): if attrs['is_heter_ps_mode'] and not attrs['is_fl_ps_mode']: # TODO: trick for matchnet, need to modify for heter_ps param_name += op.input("Ids")[0][0] + if param_name in attrs['local_sparse']: # for recall/ncf model + continue ops = pull_sparse_ops.get(param_name, []) ops.append(op) pull_sparse_ops[param_name] = ops @@ -514,24 +519,37 @@ class DeleteOptimizesPass(PassBase): def _check_conflict(self, other_pass): return True - def _delete_optimizer_op_and_vars(self, _program, optimize_ops): - optimize_vars = [] - optimize_op_role_vars = [] + def _delete_optimizer_op_and_vars(self, _program, remote_optimize_ops, + local_optimize_ops): + local_optimize_vars = [] + remote_optimize_vars = [] + remote_optimize_op_role_vars = [] optimize_need_delete_vars = [] - for op in optimize_ops: - optimize_vars.extend(op.input_arg_names) - optimize_op_role_vars.extend(op.attr("op_role_var")) - - optimize_vars = list(set(optimize_vars)) - optimize_op_role_vars = list(set(optimize_op_role_vars)) - - for var in optimize_vars: - if var not in optimize_op_role_vars: + for op in local_optimize_ops: + local_optimize_vars.extend(op.input_arg_names) + + for op in remote_optimize_ops: + remote_optimize_vars.extend(op.input_arg_names) + remote_optimize_op_role_vars.extend(op.attr("op_role_var")) + + remote_optimize_vars = list( + set(remote_optimize_vars + )) # param + grad + optimizer_state + learning_rate + remote_optimize_op_role_vars = list( + set(remote_optimize_op_role_vars)) # param + grad + print( + "remote_optimize_vars: {}, remote_optimize_op_role_vars: {}, local_optimize_vars: {}" + .format(remote_optimize_vars, remote_optimize_op_role_vars, + local_optimize_vars)) + for var in remote_optimize_vars: + if var in local_optimize_vars: + continue + if var not in remote_optimize_op_role_vars: optimize_need_delete_vars.append(var) need_delete_optimize_vars = list(set(optimize_need_delete_vars)) - delete_ops(_program.global_block(), optimize_ops) + delete_ops(_program.global_block(), remote_optimize_ops) for var in need_delete_optimize_vars: if _program.global_block().has_var(var): _program.global_block()._remove_var(var) @@ -549,10 +567,15 @@ class DeleteOptimizesPass(PassBase): def _apply_single_impl(self, main_program, startup_program, pass_ctx): attrs = pass_ctx._attrs - optimizer_ops = get_optimize_ops(main_program) + all_optimize_ops = get_optimize_ops(main_program) + remote_optimize_ops = get_optimize_ops(main_program, + attrs['remote_sparse']) lr_ops = get_lr_ops(main_program) - optimizer_ops.extend(lr_ops) - self._delete_optimizer_op_and_vars(main_program, optimizer_ops) + remote_optimize_ops.extend(lr_ops) + local_optimize_ops = list( + set(all_optimize_ops) - set(remote_optimize_ops)) + self._delete_optimizer_op_and_vars(main_program, remote_optimize_ops, + local_optimize_ops) if hasattr(attrs['origin_main_program'], 'lr_sheduler'): self._add_lr_var(main_program, attrs) @@ -572,18 +595,29 @@ class DeleteExtraOptimizerPass(PassBase): def _apply_single_impl(self, main_program, startup_program, pass_ctx): attrs = pass_ctx._attrs - optimize_vars = [] - optimize_op_role_vars = [] + remote_optimize_vars = [] + remote_optimize_op_role_vars = [] optimize_need_delete_vars = [] - - for op in get_optimize_ops(main_program): - optimize_vars.extend(op.input_arg_names) - optimize_op_role_vars.extend(op.attr("op_role_var")) - - optimize_vars = list(set(optimize_vars)) - optimize_op_role_vars = list(set(optimize_op_role_vars)) - for var in optimize_vars: - if var not in optimize_op_role_vars: + all_optimize_ops = get_optimize_ops(main_program) + remote_optimize_ops = get_optimize_ops(main_program, + attrs['remote_sparse']) + local_optimize_ops = list( + set(all_optimize_ops) - set(remote_optimize_ops)) + + local_optimize_vars = [] + for op in local_optimize_ops: + local_optimize_vars.extend(op.input_arg_names) + + for op in remote_optimize_ops: + remote_optimize_vars.extend(op.input_arg_names) + remote_optimize_op_role_vars.extend(op.attr("op_role_var")) + + remote_optimize_vars = list(set(remote_optimize_vars)) + remote_optimize_op_role_vars = list(set(remote_optimize_op_role_vars)) + for var in remote_optimize_vars: + if var in local_optimize_vars: + continue + if var not in remote_optimize_op_role_vars: optimize_need_delete_vars.append(var) need_delete_optimize_vars = list(set(optimize_need_delete_vars)) @@ -620,12 +654,16 @@ class FakeInitOpsPass(PassBase): False) return list(set(dist_varnames + sparse_varnames)) - def _fake_init_sparsetable(self, program, sparse_table_names): + def _fake_init_sparsetable(self, startup_program, sparse_table_names, + attrs): # delete table init op for table_name in sparse_table_names: - table_var = program.global_block().vars[table_name] + table_var = startup_program.global_block().vars[table_name] + if str(table_var).split( + ":")[0].strip().split()[-1] in attrs['local_sparse']: + continue table_param_init_op = [] - for op in program.global_block().ops: + for op in startup_program.global_block().ops: if table_name in op.output_arg_names: table_param_init_op.append(op) init_op_num = len(table_param_init_op) @@ -633,17 +671,17 @@ class FakeInitOpsPass(PassBase): raise ValueError("table init op num should be 1, now is " + str(init_op_num)) table_init_op = table_param_init_op[0] - program.global_block().append_op( + startup_program.global_block().append_op( type="fake_init", inputs={}, outputs={"Out": table_var}, attrs={"shape": table_init_op.attr('shape')}) - delete_ops(program.global_block(), table_param_init_op) + delete_ops(startup_program.global_block(), table_param_init_op) def _apply_single_impl(self, main_program, startup_program, pass_ctx): attrs = pass_ctx._attrs sparse_tables = self._get_sparse_table_names(attrs) - self._fake_init_sparsetable(startup_program, sparse_tables) + self._fake_init_sparsetable(startup_program, sparse_tables, attrs) @register_pass("ps_gpu_pass") diff --git a/python/paddle/distributed/ps/the_one_ps.py b/python/paddle/distributed/ps/the_one_ps.py index af56556db447fa07adad698815620d091040644b..cebfda738d3025dcaa41db7ac93caf76c99923aa 100755 --- a/python/paddle/distributed/ps/the_one_ps.py +++ b/python/paddle/distributed/ps/the_one_ps.py @@ -637,7 +637,7 @@ class SparseTable(Table): check_embedding_dim(table_proto.accessor, self.common.table_name, ctx.program_id(), self.context) - + print(">>> set sparse table!") self.common.parse_by_optimizer(ctx, self.context) self.common.parse_entry(self.common.table_name, ctx.program_id(), self.context) @@ -769,12 +769,9 @@ class PsDescBuilder(object): self.is_heter_ps_mode = context['is_heter_ps_mode'] self.use_ps_gpu = context['use_ps_gpu'] self.barrier_table_id = None - print("is_heter_ps_mode in the_one_ps.py? {}".format( - self.is_heter_ps_mode)) + self.send_ctx = get_the_one_send_context( - self.context, - use_origin_program=True, - split_dense_table=self.is_heter_ps_mode) + self.context, split_dense_table=self.is_heter_ps_mode) self.tensor_table_dict = {} # TODO self._server_sub_program = [] @@ -801,10 +798,14 @@ class PsDescBuilder(object): def _get_tables(self): tables = [] for idx, (name, ctx) in enumerate(self.send_ctx.items()): + print("idx, name, ctx:", idx, name, ctx) if ctx.is_sparse(): if self.ps_mode == DistributedMode.GEO: - tables.append(globals()['GeoSparseTable'](self.context, - ctx)) + if (self.context['local_sparse'] + and name[:-5] in self.context['local_sparse']) or ( + not self.context['local_sparse']): + tables.append(globals()['GeoSparseTable'](self.context, + ctx)) else: tables.append(globals()['SparseTable'](self.context, ctx)) else: @@ -812,7 +813,6 @@ class PsDescBuilder(object): self.tensor_tables = self._get_tensor_tables() tables.extend(self.tensor_tables) tables.append(globals()['BarrierTable'](self.context, len(tables))) - print("test_fl_ps: tables len: {}".format(len(tables))) return tables def _get_service(self): @@ -894,6 +894,14 @@ class TheOnePSRuntime(RuntimeBase): 'ps_mode'] == DistributedMode.SYNC else False self.context['grad_name_to_param_name'] = {} self.context['tensor_table'] = {} + # FL + self.context['local_sparse'] = context[ + "user_defined_strategy"].trainer_desc_configs["local_sparse"] + self.context['remote_sparse'] = context[ + "user_defined_strategy"].trainer_desc_configs["remote_sparse"] + print("fl-ps > local_sparse: {}, remote_sparse: {}".format( + self.context['local_sparse'], self.context['remote_sparse'])) + build_var_distributed(self.context) self.trainer_endpoints = get_trainer_endpoints(self.role_maker) @@ -998,7 +1006,6 @@ class TheOnePSRuntime(RuntimeBase): send_ctx = get_the_one_send_context( self.context, split_dense_table=self.is_heter_ps_mode, - use_origin_program=self.is_heter_ps_mode, ep_list=self.endpoints) self._send_ctx = send_ctx trainer_config = self.context['trainer'] @@ -1065,15 +1072,6 @@ class TheOnePSRuntime(RuntimeBase): is_test = bool(int(os.getenv("TEST_MODE", "0"))) - # for GEO & heter_ps - init_params = dense_map - - # if not is_test: - # self._communicator.init_params(init_params) - # fleet.util.barrier() - # self._communicator.pull_dense(init_params) - # fleet.util.barrier() - if scopes is None: if len(self.origin_main_programs) > 1: raise ValueError( @@ -1087,7 +1085,7 @@ class TheOnePSRuntime(RuntimeBase): if not is_test: if self.context[ 'ps_mode'] == DistributedMode.GEO or self.is_heter_ps_mode == True: - self._communicator.init_params(init_params) + self._communicator.init_params(dense_map) else: if not self.context['use_ps_gpu']: if self.role_id == 0: @@ -1235,7 +1233,6 @@ class TheOnePSRuntime(RuntimeBase): send_ctx = get_the_one_send_context( self.context, split_dense_table=self.is_heter_ps_mode, - use_origin_program=self.is_heter_ps_mode, ep_list=self.endpoints) if program is None or len(self.origin_main_programs) == 1: program = self.origin_main_programs[0] @@ -1356,8 +1353,7 @@ class TheOnePSRuntime(RuntimeBase): sparses = get_the_one_recv_context( self.context, is_dense=False, - split_dense_table=self.is_heter_ps_mode, - use_origin_program=True) + split_dense_table=self.is_heter_ps_mode) sparse_names = self._save_sparse_params(executor, dirname, sparses, main_program, mode) @@ -1366,7 +1362,6 @@ class TheOnePSRuntime(RuntimeBase): send_ctx = get_the_one_send_context( self.context, split_dense_table=self.is_heter_ps_mode, - use_origin_program=self.is_heter_ps_mode, ep_list=self.endpoints) self._pull_dense(program, scope, send_ctx, dense_map) @@ -1444,8 +1439,7 @@ class TheOnePSRuntime(RuntimeBase): sparses = get_the_one_recv_context( self.context, is_dense=False, - split_dense_table=self.is_heter_ps_mode, - use_origin_program=True) + split_dense_table=self.is_heter_ps_mode) sparse_varnames = self._load_sparse_params(dirname, sparses, main_program, mode) @@ -1455,7 +1449,6 @@ class TheOnePSRuntime(RuntimeBase): send_ctx = get_the_one_send_context( self.context, split_dense_table=self.is_heter_ps_mode, - use_origin_program=self.is_heter_ps_mode, ep_list=self.endpoints) recv_dense_varnames = [] @@ -1527,8 +1520,7 @@ class TheOnePSRuntime(RuntimeBase): self.context, is_dense=False, split_dense_table=self.role_maker. - _is_heter_parameter_server_mode, - use_origin_program=True) + _is_heter_parameter_server_mode) for id, names in sparses.items(): self._worker.shrink_sparse_table(id, threshold) diff --git a/python/paddle/distributed/ps/utils/ps_factory.py b/python/paddle/distributed/ps/utils/ps_factory.py index d2914b0ac44a47e341c40814da3b287e95db1d39..ddf5c1e3ec0315397d52c93cfb4eb2b01c3ccb4e 100755 --- a/python/paddle/distributed/ps/utils/ps_factory.py +++ b/python/paddle/distributed/ps/utils/ps_factory.py @@ -19,7 +19,7 @@ from .public import * __all__ = [ 'PsProgramBuilder', 'GeoPsProgramBuilder', 'CpuSyncPsProgramBuilder', 'CpuAsyncPsProgramBuilder', 'GpuPsProgramBuilder', - 'HeterAsyncPsProgramBuilder', 'FlPsProgramBuilder' + 'HeterAsyncPsProgramBuilder', 'FlPsProgramBuilder', 'NuPsProgramBuilder' ] @@ -31,7 +31,10 @@ class PsProgramBuilderFactory(object): def _create_ps_program_builder(self, pass_ctx): attrs = pass_ctx._attrs if attrs['ps_mode'] == DistributedMode.GEO: - return globals()['GeoPsProgramBuilder'](pass_ctx) + if len(attrs['local_sparse']) != 0: + return globals()['NuPsProgramBuilder'](pass_ctx) + else: + return globals()['GeoPsProgramBuilder'](pass_ctx) elif attrs['use_ps_gpu']: return globals()['GpuPsProgramBuilder'](pass_ctx) elif attrs['is_heter_ps_mode'] and not attrs['is_fl_ps_mode']: diff --git a/python/paddle/distributed/ps/utils/ps_program_builder.py b/python/paddle/distributed/ps/utils/ps_program_builder.py index 2d7246d1db9d352fd9ee81fdccf240777e8ddcf6..53771b05cbf671277ec4252d78e08527a9b36bec 100755 --- a/python/paddle/distributed/ps/utils/ps_program_builder.py +++ b/python/paddle/distributed/ps/utils/ps_program_builder.py @@ -118,6 +118,49 @@ class GeoPsProgramBuilder(PsProgramBuilder): # 仅 CPU 模式 return +class NuPsProgramBuilder(PsProgramBuilder): + + def __init__(self, pass_ctx): + super(NuPsProgramBuilder, self).__init__(pass_ctx) + if not self.attrs['local_sparse']: + raise ValueError("No local sparse params") + + def _build_trainer_programs(self): + add_lr_decay_table_pass = new_pass("add_lr_decay_table_pass", + self.attrs) + add_lr_decay_table_pass.apply([], [], self.pass_ctx) + + distributed_ops_pass = new_pass("distributed_ops_pass", self.attrs) + distributed_ops_pass.apply([self.cloned_main], [None], self.pass_ctx) + + delete_optimizer_pass = new_pass("delete_optimizer_pass", self.attrs) + delete_optimizer_pass.apply([self.cloned_main], [None], self.pass_ctx) + + append_send_ops_pass = new_pass("append_send_ops_pass", + self.attrs) # fleet->PushDenseVarsAsync + append_send_ops_pass.apply([self.cloned_main], [None], self.pass_ctx) + + delete_extra_optimizer_pass = new_pass("delete_extra_optimizer_pass", + self.attrs) + delete_extra_optimizer_pass.apply([self.attrs['origin_main_program']], + [self.cloned_startup], self.pass_ctx) + + fake_init_ops_pass = new_pass("fake_init_ops_pass", self.attrs) + fake_init_ops_pass.apply([None], [self.cloned_startup], self.pass_ctx) + + append_send_ops_pass = new_pass("append_send_ops_pass", + self.attrs) # communicator->Send + append_send_ops_pass.apply([self.cloned_main], [None], self.pass_ctx) + + self.attrs['origin_main_program'] = self.cloned_main + self.attrs['origin_startup_program'] = self.cloned_startup + + if self.launch_barrier and self.launch_barrier_flag: + wait_server_ready(self.server_endpoints) + + return + + class CpuSyncPsProgramBuilder(PsProgramBuilder): def __init__(self, pass_ctx): diff --git a/python/paddle/distributed/ps/utils/public.py b/python/paddle/distributed/ps/utils/public.py index 2fc3284f609181e94c8c2aa09e39a4169f59beea..2e3cb1388f81427236e2596d854eedc6f7f87911 100755 --- a/python/paddle/distributed/ps/utils/public.py +++ b/python/paddle/distributed/ps/utils/public.py @@ -87,6 +87,7 @@ class DistributedMode: HALF_ASYNC = 2 GEO = 3 FL = 4 + NU = 5 class TrainerRuntimeConfig(object): @@ -187,11 +188,15 @@ def get_lr_ops(program): return lr_ops -def get_optimize_ops(_program): +def get_optimize_ops(_program, remote_sparse=[]): block = _program.global_block() opt_ops = [] for op in block.ops: if _is_opt_role_op(op): + if len(remote_sparse) > 0 and op.input( + "Param" + )[0] not in remote_sparse: # for fl: only delete remote sparse optimize + continue # delete clip op from opt_ops when run in Parameter Server mode if OP_NAME_SCOPE in op.all_attrs() \ and CLIP_OP_NAME_SCOPE in op.attr(OP_NAME_SCOPE): @@ -348,7 +353,7 @@ def get_dense_send_context(program, dense_ctx = CommContext(grad_name, [grad_name], ["127.0.0.1:6071"], [var_numel], origin_varnames, trainer_id, aggregate, False, False, idx, False, False, - id(program)) + id(program), []) send_ctx[grad_name] = dense_ctx idx += 1 @@ -371,7 +376,7 @@ def get_dense_send_context(program, data_norm_ctx = CommContext(grad_name, [grad_name], ["127.0.0.1:6071"], [var_numel], origin_varnames, trainer_id, aggregate, False, False, idx, False, True, - id(program)) + id(program), []) send_ctx[grad_name] = data_norm_ctx idx += 1 else: @@ -386,45 +391,49 @@ def get_dense_send_context(program, dense_ctx = CommContext(grad_name, [grad_name], ["127.0.0.1:6071"], [var_numel], [origin_varname], trainer_id, aggregate, False, False, idx, False, False, - id(program)) + id(program), []) send_ctx[grad_name] = dense_ctx idx += 1 return idx -def get_geo_trainer_send_context(context): - if context['ps_mode'] != DistributedMode.GEO: +def get_geo_trainer_send_context(attrs): + if attrs['ps_mode'] != DistributedMode.GEO: raise ValueError("ps mode: {} not matched {}", format(ps_mode, "get_geo_trainer_send_context")) send_ctx = {} - trainer_id = get_role_id(context['role_maker']) - origin_programs = context['origin_main_programs'] - idx = 0 + trainer_id = get_role_id(attrs['role_maker']) + origin_programs = attrs['origin_main_programs'] + idx = 0 # table idx distibuted_varnames = get_sparse_tablenames(origin_programs, True) for i, program in enumerate(origin_programs): - merged_sparse_pairs = context['merged_sparse_pairs'][i] + merged_sparse_pairs = attrs['merged_sparse_pairs'][i] for merged in merged_sparse_pairs: param, grad = merged grad_name = grad.merged_var.name param_name = param.merged_var.name - is_distributed = True if param_name in distibuted_varnames else False + if param_name in attrs['remote_sparse']: # for recall/ncf model + continue + is_distributed = True if param_name in distibuted_varnames else False var = program.global_block().vars[grad.merged_var.name] var_numel = reduce(lambda x, y: x * y, var.shape[1:]) from paddle.fluid.core import CommContext + print("public get_the_geo_send_context sparse: ", grad_name, + var_numel) sparse_ctx = CommContext(grad_name, [grad_name], ["127.0.0.1:6071"], [var_numel], [grad_name], trainer_id, True, True, is_distributed, idx, False, False, - id(program)) + id(program), []) idx += 1 send_ctx[sparse_ctx.var_name()] = sparse_ctx if len(send_ctx) == 0: raise ValueError("GeoSGD require sparse parameters in your net.") - if len(context['tensor_table']) > 0 and context['is_worker']: - name, ctx = _step_ctx(idx, context['role_maker']) + if len(attrs['tensor_table']) > 0 and attrs['is_worker']: + name, ctx = _step_ctx(idx, attrs['role_maker']) send_ctx[name] = ctx return send_ctx @@ -438,32 +447,33 @@ def _step_ctx(idx, role_maker): names = [name] * len(endpoints) from paddle.fluid.core import CommContext ctx = CommContext(name, names, endpoints, sections, [name], trainer_id, - True, False, False, idx, True, False, -1) + True, False, False, idx, True, False, -1, []) return name, ctx -def get_the_one_send_context(context, - use_origin_program=False, - split_dense_table=False, - ep_list=None): +def get_the_one_send_context(attrs, split_dense_table=False, ep_list=None): if ep_list is None: ep_list = ["127.0.0.1:6071"] send_ctx = {} - trainer_id = get_role_id(context['role_maker']) - origin_programs = context['origin_main_programs'] + trainer_id = get_role_id(attrs['role_maker']) + origin_programs = attrs['origin_main_programs'] print("is_heter_ps_mode? {}".format(split_dense_table)) idx = 0 distibuted_varnames = get_sparse_tablenames(origin_programs, True) # print("public distibuted_varnames:", distibuted_varnames) for i, program in enumerate(origin_programs): - merged_sparse_pairs = context['merged_sparse_pairs'][i] + merged_sparse_pairs = attrs['merged_sparse_pairs'][i] for merged in merged_sparse_pairs: param, grad = merged grad_name = grad.merged_var.name param_name = param.merged_var.name - splited_varname = [] + remote_sparse_ids = [] + if param_name in attrs['remote_sparse']: # for recall/ncf model + remote_sparse_ids.append(idx) + + splited_varname = [] for i in range(len(ep_list)): splited_varname.append("{}.block{}".format(param_name, i)) @@ -474,26 +484,26 @@ def get_the_one_send_context(context, shape = list(var.shape) shape[0] = 0 if is_distributed else shape[0] - #print("public get_the_one_send_context sparse:", grad_name, - # splited_varname, shape) if grad_name in send_ctx: continue from paddle.fluid.core import CommContext + print("public get_the_one_send_context sparse: ", grad_name, + splited_varname, shape) sparse_ctx = CommContext(grad_name, splited_varname, ep_list, shape, [grad_name], trainer_id, True, True, is_distributed, idx, False, False, - id(program)) + id(program), remote_sparse_ids) idx += 1 send_ctx[sparse_ctx.var_name()] = sparse_ctx for i, program in enumerate(origin_programs): - merged_dense_pairs = context['merged_dense_pairs'][i] + merged_dense_pairs = attrs['merged_dense_pairs'][i] idx = get_dense_send_context(program, send_ctx, idx, merged_dense_pairs, trainer_id, split_dense_table) - if len(context['tensor_table']) > 0 and context['is_worker']: - name, ctx = _step_ctx(idx, context['role_maker']) + if len(attrs['tensor_table']) > 0 and attrs['is_worker']: + name, ctx = _step_ctx(idx, attrs['role_maker']) send_ctx[name] = ctx return send_ctx @@ -1165,17 +1175,12 @@ def insert_communicate_op(orign_program, return entrance_var -def get_the_one_recv_context(context, - is_dense=True, - split_dense_table=False, - use_origin_program=False): +def get_the_one_recv_context(context, is_dense=True, split_dense_table=False): recv_id_maps = {} grad_name_to_param_name = {} if is_dense: - send_ctx = get_the_one_send_context( - context, - split_dense_table=split_dense_table, - use_origin_program=use_origin_program) + send_ctx = get_the_one_send_context(context, + split_dense_table=split_dense_table) for idx, (name, ctx) in enumerate(send_ctx.items()): if ctx.is_sparse(): continue @@ -1192,7 +1197,6 @@ def get_the_one_recv_context(context, else: send_ctx = get_the_one_send_context(context, split_dense_table=False, - use_origin_program=False, ep_list=None) for idx, (name, ctx) in enumerate(send_ctx.items()): if not ctx.is_sparse(): @@ -1266,8 +1270,8 @@ def build_var_distributed(context): context["merged_variable_map"] = {} for origin_program in origin_programs: sparse_pairs, dense_pairs = get_param_grads(origin_program) - # print("public build_var_distributed sparse_pairs:", sparse_pairs) - # print("public build_var_distributed dense_pairs:", dense_pairs) + #print("public build_var_distributed sparse_pairs:", sparse_pairs) + #print("public build_var_distributed dense_pairs:", dense_pairs) origin_for_sparse = [] origin_for_dense = [] merged_sparse_pairs = [] @@ -1287,7 +1291,7 @@ def build_var_distributed(context): m_grad = MergedVariable(grad, [grad], [0]) merged_variables_pairs.append((m_param, m_grad)) merged_dense_pairs.append((m_param, m_grad)) - # print("public build_var_distributed merged_dense_pairs:", + #print("public build_var_distributed merged_dense_pairs:", # merged_dense_pairs) for sparse_pair in origin_for_sparse: @@ -1297,7 +1301,7 @@ def build_var_distributed(context): m_grad = MergedVariable(grad, [grad], [0]) merged_variables_pairs.append((m_param, m_grad)) merged_sparse_pairs.append((m_param, m_grad)) - # print("public build_var_distributed merged_sparse_pairs:", + #print("public build_var_distributed merged_sparse_pairs:", # merged_sparse_pairs) for merged in merged_variables_pairs: @@ -1322,20 +1326,20 @@ def build_var_distributed(context): context["param_name_to_grad_name"] = param_name_to_grad_name context["grad_name_to_param_name"] = grad_name_to_param_name - - -# print("public build_var_distributed origin_sparse_pairs:", -# context["origin_sparse_pairs"]) -# print("public build_var_distributed origin_for_dense:", -# context["origin_dense_pairs"]) -# print("public build_var_distributed merged_sparse_pairs:", -# context["merged_sparse_pairs"]) -# print("public build_var_distributed merged_dense_pairs:", -# context['merged_dense_pairs']) -# print("public build_var_distributed param_name_to_grad_name:", -# param_name_to_grad_name) -# print("public build_var_distributed grad_name_to_param_name:", -# grad_name_to_param_name) + ''' + print("public build_var_distributed origin_sparse_pairs:", + context["origin_sparse_pairs"]) + print("public build_var_distributed origin_for_dense:", + context["origin_dense_pairs"]) + print("public build_var_distributed merged_sparse_pairs:", + context["merged_sparse_pairs"]) + print("public build_var_distributed merged_dense_pairs:", + context['merged_dense_pairs']) + print("public build_var_distributed param_name_to_grad_name:", + param_name_to_grad_name) + print("public build_var_distributed grad_name_to_param_name:", + grad_name_to_param_name) + ''' def _is_opt_role_op(op): diff --git a/python/paddle/fluid/communicator.py b/python/paddle/fluid/communicator.py index 251247f795ab7a0aa971c88a1b120f6a6c051936..4704628f2aee8a719ea4409346bd106087a11c74 100755 --- a/python/paddle/fluid/communicator.py +++ b/python/paddle/fluid/communicator.py @@ -208,7 +208,7 @@ class Communicator(object): self.communicator_.push_sparse_param(var_name, table_id, scope) -class FLCommunicator(Communicator): +class FLCommunicator(Communicator): ## only for coordinator def __init__(self, ps_hosts, kwargs=None): mode = None diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py b/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py old mode 100644 new mode 100755 index 6fb0c85d05c517c2d5e0d74dc6910e8ea18639a2..5567fe309ec5e1e8afbed0d4edabae9b2bdd4430 --- a/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py @@ -402,7 +402,7 @@ class CompileTimeStrategy(object): trainer_id = self.get_role_id() aggregate = True ctx = CommContext(name, names, eps, sections, origin_varnames, - trainer_id, aggregate, is_sparse, is_distributed) + trainer_id, aggregate, is_sparse, is_distributed, []) return ctx def get_trainer_send_context(self): @@ -452,7 +452,7 @@ class CompileTimeStrategy(object): grad_ctx.origin_varnames(), param_ctx.trainer_id(), param_ctx.aggregate(), param_ctx.is_sparse(), - param_ctx.is_distributed()) + param_ctx.is_distributed(), []) send_ctx[ctx.var_name()] = ctx idx += 1 @@ -579,7 +579,8 @@ class CompileTimeStrategy(object): sparse_ctx = CommContext(grad_name, [grad_name], ["127.0.0.1:6071"], [var_numel], [grad_name], trainer_id, True, True, - is_distributed, idx, False, False, -1) + is_distributed, idx, False, False, -1, + []) idx += 1 send_ctx[sparse_ctx.var_name()] = sparse_ctx @@ -618,7 +619,7 @@ class CompileTimeStrategy(object): dense_ctx = CommContext(grad_name, [grad_name], ["127.0.0.1:6071"], [var_numel], origin_varnames, trainer_id, aggregate, False, False, idx, False, False, - -1) + -1, []) send_ctx[grad_name] = dense_ctx idx += 1 else: @@ -633,7 +634,7 @@ class CompileTimeStrategy(object): dense_ctx = CommContext(grad_name, [grad_name], ["127.0.0.1:6071"], [var_numel], [origin_varname], trainer_id, aggregate, - False, False, idx, False, False, -1) + False, False, idx, False, False, -1, []) send_ctx[grad_name] = dense_ctx idx += 1 return idx @@ -675,7 +676,7 @@ class CompileTimeStrategy(object): sparse_ctx = CommContext(grad_name, splited_varname, ep_list, shape, [grad_name], trainer_id, True, True, - is_distributed, idx, False, False, -1) + is_distributed, idx, False, False, -1, []) idx += 1 send_ctx[sparse_ctx.var_name()] = sparse_ctx @@ -753,7 +754,7 @@ class CompileTimeStrategy(object): sections = [1] * len(endpoints) names = [name] * len(endpoints) ctx = CommContext(name, names, endpoints, sections, [name], trainer_id, - True, False, False, idx, True, False, -1) + True, False, False, idx, True, False, -1, []) return name, ctx def _create_vars_from_blocklist(self, block_list): diff --git a/python/paddle/fluid/tests/unittests/distributed_passes/test_ps_trainer_pass.py b/python/paddle/fluid/tests/unittests/distributed_passes/test_ps_trainer_pass.py index c81863e077103a80dc7849e5db52ccdeed078eac..7178b74b9d37d5471cff3377c4202a5151d268b6 100755 --- a/python/paddle/fluid/tests/unittests/distributed_passes/test_ps_trainer_pass.py +++ b/python/paddle/fluid/tests/unittests/distributed_passes/test_ps_trainer_pass.py @@ -149,7 +149,7 @@ class TestPsTrainerPass(PsPassTestBase): self.config['debug_new_minimize'] = '0' self.config['log_dir'] = ps_log_root_dir + "gpubox_log_old_minimize" remove_path_if_exists(self.config['log_dir']) - self.ps_launch("gpu-ps") + #self.ps_launch("gpu-ps") self.config['debug_new_minimize'] = '1' self.config['log_dir'] = ps_log_root_dir + "gpubox_log_new_minimize" diff --git a/python/paddle/fluid/tests/unittests/ps/ps_dnn_trainer.py b/python/paddle/fluid/tests/unittests/ps/ps_dnn_trainer.py index 2d430cac648aa2ba332bcaa88591f39295e17c59..929d37da0af3192a6fc340765a4955888faf8e94 100755 --- a/python/paddle/fluid/tests/unittests/ps/ps_dnn_trainer.py +++ b/python/paddle/fluid/tests/unittests/ps/ps_dnn_trainer.py @@ -199,7 +199,9 @@ def get_user_defined_strategy(config): "dump_fields_path": config.get("runner.dump_fields_path", ""), "dump_fields": config.get("runner.dump_fields", []), "dump_param": config.get("runner.dump_param", []), - "stat_var_names": config.get("stat_var_names", []) + "stat_var_names": config.get("stat_var_names", []), + "local_sparse": config.get("runner.local_sparse", []), + "remote_sparse": config.get("runner.remote_sparse", []) } print("strategy:", strategy.trainer_desc_configs) diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_geo.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_geo.py old mode 100644 new mode 100755 index c96d6768155fd415929da52f15bde8dafb7e4ab4..23bce023c8df556bfd5b3fb1acdd4b0e06ec089a --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_geo.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_geo.py @@ -13,7 +13,6 @@ # limitations under the License. import os -os.environ["WITH_DISTRIBUTE"] = "ON" import unittest import paddle import paddle.distributed.fleet.base.role_maker as role_maker @@ -64,8 +63,7 @@ class TestFleetGradientMergeMetaOptimizer(unittest.TestCase): optimizer = paddle.fluid.optimizer.SGD(learning_rate=0.01) optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) - with self.assertRaises(ValueError): - optimizer.minimize(avg_cost) + optimizer.minimize(avg_cost) def test_a_sync_optimizer_pserver(self): os.environ["TRAINING_ROLE"] = "PSERVER"