From 7bb4a4e86540a422520ebe05acc4bc504bcdacae Mon Sep 17 00:00:00 2001 From: seiriosPlus Date: Mon, 31 Aug 2020 14:51:14 +0800 Subject: [PATCH] rectification init_worker and exe.run startup program --- .../operators/distributed_ops/recv_op.cc | 11 ++----- .../fleet/runtime/parameter_server_runtime.py | 6 ++-- .../distribute_transpiler/__init__.py | 4 ++- .../fleet/parameter_server/ir/trainer_pass.py | 32 +++++++++---------- .../incubate/fleet/tests/fleet_deep_ctr.py | 2 +- .../fluid/tests/unittests/dist_fleet_ctr.py | 6 ++-- .../tests/unittests/dist_fleet_ctr_ps_gpu.py | 5 +-- .../tests/unittests/dist_fleet_heter_ctr.py | 5 +-- .../dist_fleet_sparse_embedding_ctr.py | 3 +- .../tests/unittests/test_communicator_geo.py | 2 +- .../unittests/test_communicator_half_async.py | 2 +- 11 files changed, 39 insertions(+), 39 deletions(-) diff --git a/paddle/fluid/operators/distributed_ops/recv_op.cc b/paddle/fluid/operators/distributed_ops/recv_op.cc index 15b36baeada..2547ba3acb1 100644 --- a/paddle/fluid/operators/distributed_ops/recv_op.cc +++ b/paddle/fluid/operators/distributed_ops/recv_op.cc @@ -37,12 +37,6 @@ class RecvOp : public framework::OperatorBase { void RunImpl(const framework::Scope &scope, const platform::Place &place) const override { - int do_not_run = Attr("do_not_run"); - if (do_not_run) { - VLOG(3) << "recv do not run!"; - return; - } - std::vector epmap = Attr>("epmap"); std::vector varnames = Attr>("varnames"); @@ -63,11 +57,10 @@ class RecvOp : public framework::OperatorBase { if (recv_varnames.size() > 0) { auto *communicator = distributed::Communicator::GetInstance(); - if (communicator == nullptr) { + if (communicator != nullptr) { PADDLE_THROW(platform::errors::InvalidArgument( - "need run fleet.init_worker first")); + "execute startup program must before fleet.init_worker")); } - communicator->RecvNoBarrier(); } else { std::vector rets; if (with_barrier) { diff --git a/python/paddle/distributed/fleet/runtime/parameter_server_runtime.py b/python/paddle/distributed/fleet/runtime/parameter_server_runtime.py index 1741f10ccb1..54885505b28 100644 --- a/python/paddle/distributed/fleet/runtime/parameter_server_runtime.py +++ b/python/paddle/distributed/fleet/runtime/parameter_server_runtime.py @@ -216,12 +216,12 @@ class ParameterServerRuntime(RuntimeBase): else: model_dirname = None - if self.role_maker._is_heter_worker(): - self._init_worker() - executor = self._get_executor() executor.run(fluid.default_startup_program()) + if self.role_maker._is_heter_worker(): + self._init_worker() + if self.role_maker._is_heter_worker(): return diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py index 236cb458be4..e556a98ed75 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py @@ -191,12 +191,14 @@ class FleetTranspiler(Fleet): self._communicator = Communicator( trainer_config.mode, kwargs, trainer_config.get_communicator_flags()) + self._communicator.init_with_ctx(send_ctx, recv_ctx) if not self._communicator.is_running(): self._communicator.start() else: - warnings.warn("communicator has been initialized, skip") + raise ValueError( + "Communicator can only be inited once, please check") def init_worker(self): """ diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py b/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py index 82e626dace1..201b3863a4b 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py @@ -222,22 +222,22 @@ def append_send_ops_pass(program, config): def init_from_server_pass(program, config): fetch_barrier_out = program.global_block().create_var( name=framework.generate_control_dev_var_name()) - # - # recv_ctx = config.get_communicator_recv_context(recv_type=1) - # recv_varnames = [] - # - # for name, ctxs in recv_ctx.items(): - # recv_varnames.extend(ctxs.origin_varnames()) - # - # program.global_block().append_op( - # type="recv", - # inputs={"X": []}, - # outputs={"Out": []}, - # attrs={ - # "recv_varnames": recv_varnames, - # "trainer_id": config.get_role_id(), - # RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE - # }) + + recv_ctx = config.get_communicator_recv_context(recv_type=1) + recv_varnames = [] + + for name, ctxs in recv_ctx.items(): + recv_varnames.extend(ctxs.origin_varnames()) + + program.global_block().append_op( + type="recv", + inputs={"X": []}, + outputs={"Out": []}, + attrs={ + "recv_varnames": recv_varnames, + "trainer_id": config.get_role_id(), + RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE + }) program.global_block().append_op( type="fetch_barrier", diff --git a/python/paddle/fluid/incubate/fleet/tests/fleet_deep_ctr.py b/python/paddle/fluid/incubate/fleet/tests/fleet_deep_ctr.py index 60378aa9827..06a90b78fd2 100644 --- a/python/paddle/fluid/incubate/fleet/tests/fleet_deep_ctr.py +++ b/python/paddle/fluid/incubate/fleet/tests/fleet_deep_ctr.py @@ -164,8 +164,8 @@ def train(args): elif fleet.is_worker(): logger.info("run trainer") - fleet.init_worker() exe.run(fleet.startup_program) + fleet.init_worker() thread_num = 2 filelist = [] diff --git a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py index 73b546b95cf..abe4377b782 100644 --- a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py +++ b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py @@ -161,8 +161,10 @@ class TestDistCTR2x2(FleetDistRunnerBase): """ exe = fluid.Executor(fluid.CPUPlace()) - fleet.init_worker() + exe.run(fluid.default_startup_program()) + fleet.init_worker() + batch_size = 4 train_reader = paddle.batch(fake_ctr_reader(), batch_size=batch_size) self.reader.decorate_sample_list_generator(train_reader) @@ -201,8 +203,8 @@ class TestDistCTR2x2(FleetDistRunnerBase): exe = fluid.Executor(fluid.CPUPlace()) - fleet.init_worker() exe.run(fluid.default_startup_program()) + fleet.init_worker() thread_num = 2 batch_size = 128 diff --git a/python/paddle/fluid/tests/unittests/dist_fleet_ctr_ps_gpu.py b/python/paddle/fluid/tests/unittests/dist_fleet_ctr_ps_gpu.py index 03d0fa447da..887693be3d5 100644 --- a/python/paddle/fluid/tests/unittests/dist_fleet_ctr_ps_gpu.py +++ b/python/paddle/fluid/tests/unittests/dist_fleet_ctr_ps_gpu.py @@ -60,8 +60,9 @@ class TestDistGpuPsCTR2x2(TestDistCTR2x2): device_id = int(os.getenv("FLAGS_selected_gpus", "0")) place = fluid.CUDAPlace(device_id) exe = fluid.Executor(place) - fleet.init_worker() + exe.run(fleet.startup_program) + fleet.init_worker() batch_size = 4 train_reader = paddle.batch(fake_ctr_reader(), batch_size=batch_size) @@ -104,8 +105,8 @@ class TestDistGpuPsCTR2x2(TestDistCTR2x2): place = fluid.CUDAPlace(device_id) exe = fluid.Executor(place) - fleet.init_worker() exe.run(fleet.startup_program) + fleet.init_worker() thread_num = 2 batch_size = 128 diff --git a/python/paddle/fluid/tests/unittests/dist_fleet_heter_ctr.py b/python/paddle/fluid/tests/unittests/dist_fleet_heter_ctr.py index 0de898d6dde..f37dff060cd 100644 --- a/python/paddle/fluid/tests/unittests/dist_fleet_heter_ctr.py +++ b/python/paddle/fluid/tests/unittests/dist_fleet_heter_ctr.py @@ -150,8 +150,9 @@ class TestHeterPsCTR2x2(FleetDistHeterRunnerBase): """ exe = fluid.Executor(fluid.CPUPlace()) - fleet.init_worker() exe.run(fluid.default_startup_program()) + fleet.init_worker() + batch_size = 4 train_reader = paddle.batch(fake_ctr_reader(), batch_size=batch_size) self.reader.decorate_sample_list_generator(train_reader) @@ -174,8 +175,8 @@ class TestHeterPsCTR2x2(FleetDistHeterRunnerBase): exe = fluid.Executor(fluid.CPUPlace()) - fleet.init_worker() exe.run(fluid.default_startup_program()) + fleet.init_worker() thread_num = 1 batch_size = 128 diff --git a/python/paddle/fluid/tests/unittests/dist_fleet_sparse_embedding_ctr.py b/python/paddle/fluid/tests/unittests/dist_fleet_sparse_embedding_ctr.py index 77697896b4d..81530573a60 100644 --- a/python/paddle/fluid/tests/unittests/dist_fleet_sparse_embedding_ctr.py +++ b/python/paddle/fluid/tests/unittests/dist_fleet_sparse_embedding_ctr.py @@ -151,8 +151,9 @@ class TestDistCTR2x2(FleetDistRunnerBase): """ exe = fluid.Executor(fluid.CPUPlace()) - fleet.init_worker() + exe.run(fluid.default_startup_program()) + fleet.init_worker() batch_size = 4 diff --git a/python/paddle/fluid/tests/unittests/test_communicator_geo.py b/python/paddle/fluid/tests/unittests/test_communicator_geo.py index 30207340a27..f5b1350065e 100644 --- a/python/paddle/fluid/tests/unittests/test_communicator_geo.py +++ b/python/paddle/fluid/tests/unittests/test_communicator_geo.py @@ -81,8 +81,8 @@ class TestCommunicatorGeoEnd2End(unittest.TestCase): optimizer = fleet.distributed_optimizer(optimizer, strategy) optimizer.minimize(avg_cost) - fleet.init_worker() exe.run(fluid.default_startup_program()) + fleet.init_worker() train_reader = paddle.batch(self.fake_reader(), batch_size=24) feeder = fluid.DataFeeder(place=place, feed_list=[x, z, y]) diff --git a/python/paddle/fluid/tests/unittests/test_communicator_half_async.py b/python/paddle/fluid/tests/unittests/test_communicator_half_async.py index 542d1874179..991d34e42ae 100644 --- a/python/paddle/fluid/tests/unittests/test_communicator_half_async.py +++ b/python/paddle/fluid/tests/unittests/test_communicator_half_async.py @@ -69,8 +69,8 @@ class TestCommunicatorHalfAsyncEnd2End(unittest.TestCase): optimizer = fleet.distributed_optimizer(optimizer, strategy) optimizer.minimize(avg_cost) - fleet.init_worker() exe.run(fleet.startup_program) + fleet.init_worker() train_reader = paddle.batch(self.fake_reader(), batch_size=24) feeder = fluid.DataFeeder(place=place, feed_list=[x, y]) -- GitLab