From eeaf04da195e5a6b54b7a51555788271a96fd009 Mon Sep 17 00:00:00 2001 From: Chengmo Date: Thu, 17 Oct 2019 11:22:07 +0800 Subject: [PATCH] [cherry-pick]Fix communicator slow bug & fix communicator stop bug (#20366) (#20646) * Fix communicator slow bug & fix communicator stop bug (#20366) * test=develop,Fix communicator slow bug * test=develop, delete if() in stop_worker() * test=develop * fix UT, test=develop * fix bug in fetch handler, test=develop * fix bug in fetch handler, test=develop * test=develop, fix fetch barrier bug * test=develop, bug fix * test=develop, bug fix * test=develop, fix bug * test=develop,test=release/1.6 --- paddle/fluid/framework/dist_multi_trainer.cc | 8 ++++++-- paddle/fluid/framework/trainer.h | 2 +- paddle/fluid/operators/distributed/communicator.cc | 1 + paddle/fluid/operators/distributed/rpc_client.cc | 2 +- .../fluid/operators/distributed_ops/fetch_barrier_op.cc | 3 +++ .../parameter_server/distribute_transpiler/__init__.py | 3 +-- python/paddle/fluid/tests/unittests/ctr_dataset_reader.py | 2 +- python/paddle/fluid/tests/unittests/dist_fleet_ctr.py | 4 ++-- .../paddle/fluid/tests/unittests/test_dist_fleet_ctr.py | 1 - python/paddle/fluid/trainer_factory.py | 3 +++ python/paddle/fluid/transpiler/distribute_transpiler.py | 6 +++++- 11 files changed, 24 insertions(+), 11 deletions(-) diff --git a/paddle/fluid/framework/dist_multi_trainer.cc b/paddle/fluid/framework/dist_multi_trainer.cc index ba1b5c2a61..98ff53deb7 100644 --- a/paddle/fluid/framework/dist_multi_trainer.cc +++ b/paddle/fluid/framework/dist_multi_trainer.cc @@ -144,6 +144,10 @@ void DistMultiTrainer::Run() { } } +Scope *DistMultiTrainer::GetWorkerScope(int thread_id) { + return workers_[thread_id]->GetThreadScope(); +} + void DistMultiTrainer::Finalize() { for (auto &th : threads_) { th.join(); @@ -199,5 +203,5 @@ void DistMultiTrainer::MergeToRootScope(LoDTensor *root_tensor, root_data[i] += data[i]; } } -} // end namespace framework -} // end namespace paddle +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/trainer.h b/paddle/fluid/framework/trainer.h index e0c59caa89..10018439ed 100644 --- a/paddle/fluid/framework/trainer.h +++ b/paddle/fluid/framework/trainer.h @@ -93,8 +93,8 @@ class DistMultiTrainer : public MultiTrainer { void MergeToRootScope(LoDTensor* root_tensor, LoDTensor* thread_tensor); virtual void FinalizeDumpEnv(); virtual void InitDumpEnv(); + virtual Scope* GetWorkerScope(int thread_id); virtual void DumpWork(int tid); - virtual Scope* GetWorkerScope(int thread_id) { return root_scope_; } protected: std::shared_ptr pull_dense_worker_; diff --git a/paddle/fluid/operators/distributed/communicator.cc b/paddle/fluid/operators/distributed/communicator.cc index 58b262100e..3467511221 100644 --- a/paddle/fluid/operators/distributed/communicator.cc +++ b/paddle/fluid/operators/distributed/communicator.cc @@ -923,6 +923,7 @@ void GeoSgdCommunicator::RpcSend(const std::string &origin_var_name, auto &cpu_ctx_send = *pool.Get(platform::CPUPlace()); distributed::RPCClient *rpc_client = distributed::RPCClient::GetInstance(trainer_id); + rpc_client->AsyncSendVar(endpoint, cpu_ctx_send, *delta_scope_.get(), splited_var_name); } diff --git a/paddle/fluid/operators/distributed/rpc_client.cc b/paddle/fluid/operators/distributed/rpc_client.cc index 57ce54870d..ca48c22aa7 100644 --- a/paddle/fluid/operators/distributed/rpc_client.cc +++ b/paddle/fluid/operators/distributed/rpc_client.cc @@ -17,7 +17,7 @@ // default to 3min to avoid temprary network failures. DEFINE_int32(rpc_deadline, 180000, "deadline timeouts for rpc"); -DEFINE_int32(rpc_retry_times, 3, "retry times for rpc"); +DEFINE_int32(rpc_retry_times, 0, "retry times for rpc"); namespace paddle { namespace operators { diff --git a/paddle/fluid/operators/distributed_ops/fetch_barrier_op.cc b/paddle/fluid/operators/distributed_ops/fetch_barrier_op.cc index ae4b687ffc..8a75269085 100644 --- a/paddle/fluid/operators/distributed_ops/fetch_barrier_op.cc +++ b/paddle/fluid/operators/distributed_ops/fetch_barrier_op.cc @@ -55,6 +55,9 @@ class FetchBarrierOp : public framework::OperatorBase { class FetchBarrierOpMaker : public framework::OpProtoAndCheckerMaker { public: void Make() { + AddInput("X", "(Any) Dummy inputs, used for control dependency") + .AsDispensable() + .AsDuplicable(); AddOutput("Out", "(Any) Dummy outputs, used for control dependency") .AsDuplicable(); AddComment(R"DOC( diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py index 9969251c94..f2c67c65b0 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py @@ -129,8 +129,7 @@ class DistributedTranspiler(Fleet): Returns: None """ - if not self._transpile_config.sync_mode and self._communicator.is_running( - ): + if not self._transpile_config.sync_mode: self._communicator.stop() self._executor.close() if isinstance(self._role_maker, MPISymetricRoleMaker): diff --git a/python/paddle/fluid/tests/unittests/ctr_dataset_reader.py b/python/paddle/fluid/tests/unittests/ctr_dataset_reader.py index 6c1672a708..1c4d9703fa 100644 --- a/python/paddle/fluid/tests/unittests/ctr_dataset_reader.py +++ b/python/paddle/fluid/tests/unittests/ctr_dataset_reader.py @@ -67,7 +67,7 @@ class DatasetCtrReader(data_generator.MultiSlotDataGenerator): return random.random() def iter(): - if get_rand() < 0.1: + if get_rand() < 0.05: fs = line.strip().split('\t') dnn_input = load_dnn_input_record(fs[0]) lr_input = load_lr_input_record(fs[1]) diff --git a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py index 5edaa71700..c27f23a2d8 100644 --- a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py +++ b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py @@ -139,7 +139,7 @@ class TestDistCTR2x2(FleetDistRunnerBase): dataset.set_filelist(filelist) dataset.set_thread(thread_num) - for epoch_id in range(2): + for epoch_id in range(1): pass_start = time.time() dataset.set_filelist(filelist) exe.train_from_dataset( @@ -157,7 +157,7 @@ class TestDistCTR2x2(FleetDistRunnerBase): print("{}: \n {}\n".format(self.fetch_target_names[0], fetch_target_vars[0])) - for epoch_id in range(2): + for epoch_id in range(1): pass_start = time.time() dataset.set_filelist(filelist) exe.train_from_dataset( diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_ctr.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_ctr.py index 9bad641a8c..acefd65b56 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_ctr.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_ctr.py @@ -30,7 +30,6 @@ def skip_ci(func): return __func__ -@skip_ci class TestDistMnist2x2(TestFleetBase): def _setup_config(self): self._sync_mode = False diff --git a/python/paddle/fluid/trainer_factory.py b/python/paddle/fluid/trainer_factory.py index c05ed6f835..daea2deba6 100644 --- a/python/paddle/fluid/trainer_factory.py +++ b/python/paddle/fluid/trainer_factory.py @@ -84,6 +84,9 @@ class FetchHandlerMonitor(object): for varname in fetch_target_names ] + if None in fetch_vars: + continue + fetch_tensors = [var.get_tensor() for var in fetch_vars] if self.fetch_instance.return_np: diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index 2f809c9f1c..c1eff2f5f7 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -701,6 +701,7 @@ class DistributeTranspiler(object): send_vars.append(var) if self.sync_mode: + fetch_barrier_input = [] send_barrier_out = program.global_block().create_var( name=framework.generate_control_dev_var_name()) if self.has_distributed_lookup_table: @@ -718,6 +719,7 @@ class DistributeTranspiler(object): "trainer_id": self.trainer_id, RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE }) + fetch_barrier_input.append(send_barrier_out) # step 3: insert recv op to receive parameters from parameter server recv_vars = [] @@ -788,12 +790,14 @@ class DistributeTranspiler(object): OP_ROLE_VAR_ATTR_NAME: [param_varname, recv_op_role_var_name] }) + if self.sync_mode: + fetch_barrier_input.extend(splited_var) if self.sync_mode: # form a WAW dependency program.global_block().append_op( type="fetch_barrier", - inputs={}, + inputs={"X": fetch_barrier_input}, outputs={"Out": all_recv_outputs}, attrs={ "endpoints": pserver_endpoints, -- GitLab