diff --git a/paddle/fluid/framework/dist_multi_trainer.cc b/paddle/fluid/framework/dist_multi_trainer.cc index ba1b5c2a61f70cf9f849aff501e6697ace661dd4..98ff53deb7b87b9a9aab084b808569b64d07dfb9 100644 --- a/paddle/fluid/framework/dist_multi_trainer.cc +++ b/paddle/fluid/framework/dist_multi_trainer.cc @@ -144,6 +144,10 @@ void DistMultiTrainer::Run() { } } +Scope *DistMultiTrainer::GetWorkerScope(int thread_id) { + return workers_[thread_id]->GetThreadScope(); +} + void DistMultiTrainer::Finalize() { for (auto &th : threads_) { th.join(); @@ -199,5 +203,5 @@ void DistMultiTrainer::MergeToRootScope(LoDTensor *root_tensor, root_data[i] += data[i]; } } -} // end namespace framework -} // end namespace paddle +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/trainer.h b/paddle/fluid/framework/trainer.h index e0c59caa89d7f25b127d33070752f446b4c50799..10018439edfd4bea860850c6c8b672a39cf420e5 100644 --- a/paddle/fluid/framework/trainer.h +++ b/paddle/fluid/framework/trainer.h @@ -93,8 +93,8 @@ class DistMultiTrainer : public MultiTrainer { void MergeToRootScope(LoDTensor* root_tensor, LoDTensor* thread_tensor); virtual void FinalizeDumpEnv(); virtual void InitDumpEnv(); + virtual Scope* GetWorkerScope(int thread_id); virtual void DumpWork(int tid); - virtual Scope* GetWorkerScope(int thread_id) { return root_scope_; } protected: std::shared_ptr pull_dense_worker_; diff --git a/paddle/fluid/operators/distributed/rpc_client.cc b/paddle/fluid/operators/distributed/rpc_client.cc index 57ce54870decf2d56c321efbaddbc108fb113ea7..ca48c22aa7696d340845b1a18ff724eb58b39f1b 100644 --- a/paddle/fluid/operators/distributed/rpc_client.cc +++ b/paddle/fluid/operators/distributed/rpc_client.cc @@ -17,7 +17,7 @@ // default to 3min to avoid temprary network failures. DEFINE_int32(rpc_deadline, 180000, "deadline timeouts for rpc"); -DEFINE_int32(rpc_retry_times, 3, "retry times for rpc"); +DEFINE_int32(rpc_retry_times, 0, "retry times for rpc"); namespace paddle { namespace operators { diff --git a/paddle/fluid/operators/distributed_ops/fetch_barrier_op.cc b/paddle/fluid/operators/distributed_ops/fetch_barrier_op.cc index ae4b687ffc4c85501d9ef0325960ff8767ee5704..8a752690857d03d0ba05b41ee8d58c02d6336e56 100644 --- a/paddle/fluid/operators/distributed_ops/fetch_barrier_op.cc +++ b/paddle/fluid/operators/distributed_ops/fetch_barrier_op.cc @@ -55,6 +55,9 @@ class FetchBarrierOp : public framework::OperatorBase { class FetchBarrierOpMaker : public framework::OpProtoAndCheckerMaker { public: void Make() { + AddInput("X", "(Any) Dummy inputs, used for control dependency") + .AsDispensable() + .AsDuplicable(); AddOutput("Out", "(Any) Dummy outputs, used for control dependency") .AsDuplicable(); AddComment(R"DOC( diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py index 9969251c9481a8a047b402ade68c7cdb1390bb81..f2c67c65b014f5720bea8e86a9d14f98a1a96984 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py @@ -129,8 +129,7 @@ class DistributedTranspiler(Fleet): Returns: None """ - if not self._transpile_config.sync_mode and self._communicator.is_running( - ): + if not self._transpile_config.sync_mode: self._communicator.stop() self._executor.close() if isinstance(self._role_maker, MPISymetricRoleMaker): diff --git a/python/paddle/fluid/tests/unittests/ctr_dataset_reader.py b/python/paddle/fluid/tests/unittests/ctr_dataset_reader.py index 6c1672a708f1ff39b5cbfa0c88da546a5bfe5dec..1c4d9703fafed0d0f251be814a8664ded658e9e0 100644 --- a/python/paddle/fluid/tests/unittests/ctr_dataset_reader.py +++ b/python/paddle/fluid/tests/unittests/ctr_dataset_reader.py @@ -67,7 +67,7 @@ class DatasetCtrReader(data_generator.MultiSlotDataGenerator): return random.random() def iter(): - if get_rand() < 0.1: + if get_rand() < 0.05: fs = line.strip().split('\t') dnn_input = load_dnn_input_record(fs[0]) lr_input = load_lr_input_record(fs[1]) diff --git a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py index 5edaa71700ca38847f15f439002c079a5c542c2e..c27f23a2d8c9c226587c24622376089fd918d334 100644 --- a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py +++ b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py @@ -139,7 +139,7 @@ class TestDistCTR2x2(FleetDistRunnerBase): dataset.set_filelist(filelist) dataset.set_thread(thread_num) - for epoch_id in range(2): + for epoch_id in range(1): pass_start = time.time() dataset.set_filelist(filelist) exe.train_from_dataset( @@ -157,7 +157,7 @@ class TestDistCTR2x2(FleetDistRunnerBase): print("{}: \n {}\n".format(self.fetch_target_names[0], fetch_target_vars[0])) - for epoch_id in range(2): + for epoch_id in range(1): pass_start = time.time() dataset.set_filelist(filelist) exe.train_from_dataset( diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_ctr.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_ctr.py index 9bad641a8cbd867c6c64467991b00ff9d7aa3011..acefd65b56b94e6b0862d8e2676bc9cb8826981b 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_ctr.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_ctr.py @@ -30,7 +30,6 @@ def skip_ci(func): return __func__ -@skip_ci class TestDistMnist2x2(TestFleetBase): def _setup_config(self): self._sync_mode = False diff --git a/python/paddle/fluid/trainer_factory.py b/python/paddle/fluid/trainer_factory.py index c05ed6f83519084a64fe20562be204185cfaf685..daea2deba698a51fb316adfa5ff177a58b708424 100644 --- a/python/paddle/fluid/trainer_factory.py +++ b/python/paddle/fluid/trainer_factory.py @@ -84,6 +84,9 @@ class FetchHandlerMonitor(object): for varname in fetch_target_names ] + if None in fetch_vars: + continue + fetch_tensors = [var.get_tensor() for var in fetch_vars] if self.fetch_instance.return_np: diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index 2f809c9f1c79fb52d6c7bdc82bef37d2a0398afc..c1eff2f5f7ee5d97eb784e9b4d8125ca965c20fb 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -701,6 +701,7 @@ class DistributeTranspiler(object): send_vars.append(var) if self.sync_mode: + fetch_barrier_input = [] send_barrier_out = program.global_block().create_var( name=framework.generate_control_dev_var_name()) if self.has_distributed_lookup_table: @@ -718,6 +719,7 @@ class DistributeTranspiler(object): "trainer_id": self.trainer_id, RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE }) + fetch_barrier_input.append(send_barrier_out) # step 3: insert recv op to receive parameters from parameter server recv_vars = [] @@ -788,12 +790,14 @@ class DistributeTranspiler(object): OP_ROLE_VAR_ATTR_NAME: [param_varname, recv_op_role_var_name] }) + if self.sync_mode: + fetch_barrier_input.extend(splited_var) if self.sync_mode: # form a WAW dependency program.global_block().append_op( type="fetch_barrier", - inputs={}, + inputs={"X": fetch_barrier_input}, outputs={"Out": all_recv_outputs}, attrs={ "endpoints": pserver_endpoints,