diff --git a/paddle/fluid/operators/distributed_ops/recv_op.cc b/paddle/fluid/operators/distributed_ops/recv_op.cc index 15b36baeada300e1ab472737b4e35538f9882cb7..2547ba3acb16031245ceae622e11893597bb9b9b 100644 --- a/paddle/fluid/operators/distributed_ops/recv_op.cc +++ b/paddle/fluid/operators/distributed_ops/recv_op.cc @@ -37,12 +37,6 @@ class RecvOp : public framework::OperatorBase { void RunImpl(const framework::Scope &scope, const platform::Place &place) const override { - int do_not_run = Attr("do_not_run"); - if (do_not_run) { - VLOG(3) << "recv do not run!"; - return; - } - std::vector epmap = Attr>("epmap"); std::vector varnames = Attr>("varnames"); @@ -63,11 +57,10 @@ class RecvOp : public framework::OperatorBase { if (recv_varnames.size() > 0) { auto *communicator = distributed::Communicator::GetInstance(); - if (communicator == nullptr) { + if (communicator != nullptr) { PADDLE_THROW(platform::errors::InvalidArgument( - "need run fleet.init_worker first")); + "execute startup program must before fleet.init_worker")); } - communicator->RecvNoBarrier(); } else { std::vector rets; if (with_barrier) { diff --git a/python/paddle/distributed/fleet/runtime/parameter_server_runtime.py b/python/paddle/distributed/fleet/runtime/parameter_server_runtime.py index 1741f10ccb1c28bfe6abaa63e754568fa08e21ce..54885505b287c84cadd37d7938960586e830db14 100644 --- a/python/paddle/distributed/fleet/runtime/parameter_server_runtime.py +++ b/python/paddle/distributed/fleet/runtime/parameter_server_runtime.py @@ -216,12 +216,12 @@ class ParameterServerRuntime(RuntimeBase): else: model_dirname = None - if self.role_maker._is_heter_worker(): - self._init_worker() - executor = self._get_executor() executor.run(fluid.default_startup_program()) + if self.role_maker._is_heter_worker(): + self._init_worker() + if self.role_maker._is_heter_worker(): return diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py index 236cb458be4c6a07f768761b41464e64d4d53f77..e556a98ed7504b199624deeac10ea594efa269b4 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py @@ -191,12 +191,14 @@ class FleetTranspiler(Fleet): self._communicator = Communicator( trainer_config.mode, kwargs, trainer_config.get_communicator_flags()) + self._communicator.init_with_ctx(send_ctx, recv_ctx) if not self._communicator.is_running(): self._communicator.start() else: - warnings.warn("communicator has been initialized, skip") + raise ValueError( + "Communicator can only be inited once, please check") def init_worker(self): """ diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py b/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py index 82e626dace18339d30f5b482be2e1c8e340a6c78..201b3863a4b6d6d5fed036d85b2103f5defe61f0 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py @@ -222,22 +222,22 @@ def append_send_ops_pass(program, config): def init_from_server_pass(program, config): fetch_barrier_out = program.global_block().create_var( name=framework.generate_control_dev_var_name()) - # - # recv_ctx = config.get_communicator_recv_context(recv_type=1) - # recv_varnames = [] - # - # for name, ctxs in recv_ctx.items(): - # recv_varnames.extend(ctxs.origin_varnames()) - # - # program.global_block().append_op( - # type="recv", - # inputs={"X": []}, - # outputs={"Out": []}, - # attrs={ - # "recv_varnames": recv_varnames, - # "trainer_id": config.get_role_id(), - # RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE - # }) + + recv_ctx = config.get_communicator_recv_context(recv_type=1) + recv_varnames = [] + + for name, ctxs in recv_ctx.items(): + recv_varnames.extend(ctxs.origin_varnames()) + + program.global_block().append_op( + type="recv", + inputs={"X": []}, + outputs={"Out": []}, + attrs={ + "recv_varnames": recv_varnames, + "trainer_id": config.get_role_id(), + RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE + }) program.global_block().append_op( type="fetch_barrier", diff --git a/python/paddle/fluid/incubate/fleet/tests/fleet_deep_ctr.py b/python/paddle/fluid/incubate/fleet/tests/fleet_deep_ctr.py index 60378aa98272dae32a97b33e84fc61e71193658c..06a90b78fd2e53d065f1abbaf9e95df848f9cc52 100644 --- a/python/paddle/fluid/incubate/fleet/tests/fleet_deep_ctr.py +++ b/python/paddle/fluid/incubate/fleet/tests/fleet_deep_ctr.py @@ -164,8 +164,8 @@ def train(args): elif fleet.is_worker(): logger.info("run trainer") - fleet.init_worker() exe.run(fleet.startup_program) + fleet.init_worker() thread_num = 2 filelist = [] diff --git a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py index 73b546b95cfeb8032c6e99eabe24c883d1f5f66c..abe4377b78263259d1071968bdc84c953922d273 100644 --- a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py +++ b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py @@ -161,8 +161,10 @@ class TestDistCTR2x2(FleetDistRunnerBase): """ exe = fluid.Executor(fluid.CPUPlace()) - fleet.init_worker() + exe.run(fluid.default_startup_program()) + fleet.init_worker() + batch_size = 4 train_reader = paddle.batch(fake_ctr_reader(), batch_size=batch_size) self.reader.decorate_sample_list_generator(train_reader) @@ -201,8 +203,8 @@ class TestDistCTR2x2(FleetDistRunnerBase): exe = fluid.Executor(fluid.CPUPlace()) - fleet.init_worker() exe.run(fluid.default_startup_program()) + fleet.init_worker() thread_num = 2 batch_size = 128 diff --git a/python/paddle/fluid/tests/unittests/dist_fleet_ctr_ps_gpu.py b/python/paddle/fluid/tests/unittests/dist_fleet_ctr_ps_gpu.py index 03d0fa447daf3e3a502e7d77491045f92695496c..887693be3d5cc46c23f21780bed77b13e881da47 100644 --- a/python/paddle/fluid/tests/unittests/dist_fleet_ctr_ps_gpu.py +++ b/python/paddle/fluid/tests/unittests/dist_fleet_ctr_ps_gpu.py @@ -60,8 +60,9 @@ class TestDistGpuPsCTR2x2(TestDistCTR2x2): device_id = int(os.getenv("FLAGS_selected_gpus", "0")) place = fluid.CUDAPlace(device_id) exe = fluid.Executor(place) - fleet.init_worker() + exe.run(fleet.startup_program) + fleet.init_worker() batch_size = 4 train_reader = paddle.batch(fake_ctr_reader(), batch_size=batch_size) @@ -104,8 +105,8 @@ class TestDistGpuPsCTR2x2(TestDistCTR2x2): place = fluid.CUDAPlace(device_id) exe = fluid.Executor(place) - fleet.init_worker() exe.run(fleet.startup_program) + fleet.init_worker() thread_num = 2 batch_size = 128 diff --git a/python/paddle/fluid/tests/unittests/dist_fleet_heter_ctr.py b/python/paddle/fluid/tests/unittests/dist_fleet_heter_ctr.py index 0de898d6dde217ec6d5cdf53611f986f7b04863f..f37dff060cd539d400e52317dc1f3dce0c350ed9 100644 --- a/python/paddle/fluid/tests/unittests/dist_fleet_heter_ctr.py +++ b/python/paddle/fluid/tests/unittests/dist_fleet_heter_ctr.py @@ -150,8 +150,9 @@ class TestHeterPsCTR2x2(FleetDistHeterRunnerBase): """ exe = fluid.Executor(fluid.CPUPlace()) - fleet.init_worker() exe.run(fluid.default_startup_program()) + fleet.init_worker() + batch_size = 4 train_reader = paddle.batch(fake_ctr_reader(), batch_size=batch_size) self.reader.decorate_sample_list_generator(train_reader) @@ -174,8 +175,8 @@ class TestHeterPsCTR2x2(FleetDistHeterRunnerBase): exe = fluid.Executor(fluid.CPUPlace()) - fleet.init_worker() exe.run(fluid.default_startup_program()) + fleet.init_worker() thread_num = 1 batch_size = 128 diff --git a/python/paddle/fluid/tests/unittests/dist_fleet_sparse_embedding_ctr.py b/python/paddle/fluid/tests/unittests/dist_fleet_sparse_embedding_ctr.py index 77697896b4d556da8a98c17e281b3d7a6999fd64..81530573a604205f0202d088853038bbc71b92e6 100644 --- a/python/paddle/fluid/tests/unittests/dist_fleet_sparse_embedding_ctr.py +++ b/python/paddle/fluid/tests/unittests/dist_fleet_sparse_embedding_ctr.py @@ -151,8 +151,9 @@ class TestDistCTR2x2(FleetDistRunnerBase): """ exe = fluid.Executor(fluid.CPUPlace()) - fleet.init_worker() + exe.run(fluid.default_startup_program()) + fleet.init_worker() batch_size = 4 diff --git a/python/paddle/fluid/tests/unittests/test_communicator_geo.py b/python/paddle/fluid/tests/unittests/test_communicator_geo.py index 30207340a27db0c1d00ab982cbac716e4b639c7e..f5b1350065ecce19299375364edb75dd48364e47 100644 --- a/python/paddle/fluid/tests/unittests/test_communicator_geo.py +++ b/python/paddle/fluid/tests/unittests/test_communicator_geo.py @@ -81,8 +81,8 @@ class TestCommunicatorGeoEnd2End(unittest.TestCase): optimizer = fleet.distributed_optimizer(optimizer, strategy) optimizer.minimize(avg_cost) - fleet.init_worker() exe.run(fluid.default_startup_program()) + fleet.init_worker() train_reader = paddle.batch(self.fake_reader(), batch_size=24) feeder = fluid.DataFeeder(place=place, feed_list=[x, z, y]) diff --git a/python/paddle/fluid/tests/unittests/test_communicator_half_async.py b/python/paddle/fluid/tests/unittests/test_communicator_half_async.py index 542d1874179ec53b0a4701e941f9748d0bc14766..991d34e42ae5d1bfed1b0afa0a0d051d9f75e357 100644 --- a/python/paddle/fluid/tests/unittests/test_communicator_half_async.py +++ b/python/paddle/fluid/tests/unittests/test_communicator_half_async.py @@ -69,8 +69,8 @@ class TestCommunicatorHalfAsyncEnd2End(unittest.TestCase): optimizer = fleet.distributed_optimizer(optimizer, strategy) optimizer.minimize(avg_cost) - fleet.init_worker() exe.run(fleet.startup_program) + fleet.init_worker() train_reader = paddle.batch(self.fake_reader(), batch_size=24) feeder = fluid.DataFeeder(place=place, feed_list=[x, y])