diff --git a/paddle/fluid/framework/distributed_strategy.proto b/paddle/fluid/framework/distributed_strategy.proto index c9ae5a67950c8d305cedf23654a56d4f48d8dcba..21e28d7ac86d06571e49a522db13c17c0ebf33be 100644 --- a/paddle/fluid/framework/distributed_strategy.proto +++ b/paddle/fluid/framework/distributed_strategy.proto @@ -97,6 +97,7 @@ message AsyncConfig { optional int32 thread_pool_size = 6 [ default = 1 ]; optional int32 send_wait_times = 7 [ default = 1 ]; optional bool runtime_split_send_recv = 8 [ default = false ]; + optional bool launch_barrier = 9 [ default = true ]; } message PipelineConfig { optional int32 micro_batch = 1 [ default = 1 ]; } diff --git a/paddle/fluid/operators/distributed/parameter_recv.cc b/paddle/fluid/operators/distributed/parameter_recv.cc index a91df5b3c471e234dd1ae72771c287e21ebf7af0..51b13bc2c569d0078199e6ede42bfecb2e33148b 100644 --- a/paddle/fluid/operators/distributed/parameter_recv.cc +++ b/paddle/fluid/operators/distributed/parameter_recv.cc @@ -175,10 +175,6 @@ void RecvGeoSparseRecords(const CommContext &rpc_ctx, template void RecvLodTensor(const CommContext &rpc_ctx, const framework::Scope &scope) { - platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); - auto cpu_place = platform::CPUPlace(); - auto &cpu_ctx = *pool.Get(cpu_place); - distributed::RPCClient *rpc_client = distributed::RPCClient::GetInstance(rpc_ctx.trainer_id); @@ -188,8 +184,13 @@ void RecvLodTensor(const CommContext &rpc_ctx, const framework::Scope &scope) { if (rpc_ctx.origin_varnames.size() == 1 && rpc_ctx.splited_varnames.size() == 1) { auto varname = rpc_ctx.origin_varnames[0]; - VLOG(4) << "recv " << varname << " from " << rpc_ctx.epmap[0]; - rets.push_back(rpc_client->AsyncGetVarNoBarrier(rpc_ctx.epmap[0], cpu_ctx, + const auto place = + scope.FindVar(varname)->Get().place(); + platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); + auto &ctx = *pool.Get(place); + VLOG(4) << "recv " << varname << " from " << rpc_ctx.epmap[0] << " in gpu? " + << platform::is_gpu_place(place); + rets.push_back(rpc_client->AsyncGetVarNoBarrier(rpc_ctx.epmap[0], ctx, scope, varname, varname)); for (size_t i = 0; i < rets.size(); i++) { diff --git a/python/paddle/distributed/fleet/base/role_maker.py b/python/paddle/distributed/fleet/base/role_maker.py index f66f013e4dbaadd534d6859b7ba6530779c82a3b..36da7264efe2e489aadbffde56b4260418f91fb2 100644 --- a/python/paddle/distributed/fleet/base/role_maker.py +++ b/python/paddle/distributed/fleet/base/role_maker.py @@ -495,7 +495,7 @@ class RoleMakerBase(object): Returns: string: all heter_trainers'endpoints """ - assert self._heter_trainer_endpoints != [] + assert self._heter_trainer_endpoints != [], "Heter Worker Endpoints Not initialized" return self._heter_trainer_endpoints def _get_heter_worker_endpoint(self): @@ -505,10 +505,10 @@ class RoleMakerBase(object): e.g: if we have 4 cpu-trainer(default), 2 gpu-trainer(heter) then No.0 and No.2 cpu-trainer will work with No.0 gpu-trainer - and No.1 and No.3 cpu-trainer will work with No.1 gpu-trainerr + and No.1 and No.3 cpu-trainer will work with No.1 gpu-trainer """ - assert self._heter_trainer_endpoints != [] - return self._heter_trainer_endpoints[(self._current_id + 1) % + assert self._heter_trainer_endpoints != [], "Heter Worker Endpoints Not initialized" + return self._heter_trainer_endpoints[(self._current_id) % self._heter_worker_num()] def _get_heter_worker_device(self): diff --git a/python/paddle/distributed/fleet/runtime/parameter_server_runtime.py b/python/paddle/distributed/fleet/runtime/parameter_server_runtime.py index 6dd4661f00062f55bb834bbee50daf1924a0c87a..42be7e869d9a7c6394152167ac2cbce9b0986de0 100644 --- a/python/paddle/distributed/fleet/runtime/parameter_server_runtime.py +++ b/python/paddle/distributed/fleet/runtime/parameter_server_runtime.py @@ -23,6 +23,7 @@ from paddle.fluid.executor import Executor from paddle.fluid.parallel_executor import ParallelExecutor from .runtime_base import RuntimeBase +from ..base.private_helper_function import wait_server_ready class ParameterServerRuntime(RuntimeBase): @@ -94,8 +95,8 @@ class ParameterServerRuntime(RuntimeBase): return False if var.desc.type() == core.VarDesc.VarType.FEED_MINIBATCH or \ - var.desc.type() == core.VarDesc.VarType.FETCH_LIST or \ - var.desc.type() == core.VarDesc.VarType.READER: + var.desc.type() == core.VarDesc.VarType.FETCH_LIST or \ + var.desc.type() == core.VarDesc.VarType.READER: return False return var.persistable @@ -161,6 +162,17 @@ class ParameterServerRuntime(RuntimeBase): trainer_config = self.async_strategy.get_trainer_runtime_config() + dist_strategy = self.context["valid_strategy"] + launch_barrier = dist_strategy.a_sync_configs["launch_barrier"] + if launch_barrier: + # for trainer wait server ready + wait_server_ready(self.role_maker._get_pserver_endpoints()) + + # for ps-heter mode, wait heter worker ready + if self.role_maker._is_heter_parameter_server_mode and self.role_maker._is_worker( + ): + wait_server_ready(self.role_maker._get_heter_worker_endpoints()) + lrs = _has_global_step(_get_lr_ops(self.origin_main_program)) if lrs: @@ -312,7 +324,7 @@ class ParameterServerRuntime(RuntimeBase): opts = _get_optimize_ops(self.origin_main_program) for op in opts: if "Param" in op.input_names and \ - "LearningRate" in op.input_names and op.input("Param")[0] == param_name: + "LearningRate" in op.input_names and op.input("Param")[0] == param_name: return op def _save_dense_params(self, executor, dirname, context, main_program): diff --git a/python/paddle/fluid/tests/unittests/ctr_dataset_reader.py b/python/paddle/fluid/tests/unittests/ctr_dataset_reader.py index 15e98481c26b20de4e9fa493fa022380ba1fcd63..92d84b8b3f381e8607d0168e3891c2399037e456 100644 --- a/python/paddle/fluid/tests/unittests/ctr_dataset_reader.py +++ b/python/paddle/fluid/tests/unittests/ctr_dataset_reader.py @@ -153,7 +153,7 @@ def gen_fake_line(dnn_data_num=7, return line -def prepare_fake_data(file_nums=9, file_lines=1000): +def prepare_fake_data(file_nums=6, file_lines=1000): """ Create fake data with same type as avazu_ctr_data """ diff --git a/python/paddle/fluid/tests/unittests/dist_fleet_heter_ctr.py b/python/paddle/fluid/tests/unittests/dist_fleet_heter_ctr.py index f62ad66e462862f4c3f04bacc58ca7aac583ef1e..fefaecd3b8979b47cf7c0c4f7aa058e9ffcaae42 100644 --- a/python/paddle/fluid/tests/unittests/dist_fleet_heter_ctr.py +++ b/python/paddle/fluid/tests/unittests/dist_fleet_heter_ctr.py @@ -206,13 +206,6 @@ class TestHeterPsCTR2x2(FleetDistHeterRunnerBase): debug=int(os.getenv("Debug", "0"))) pass_time = time.time() - pass_start print("do_dataset_training done. using time {}".format(pass_time)) - if os.getenv("SAVE_MODEL") == "1": - model_dir = tempfile.mkdtemp() - fleet.save_inference_model(exe, model_dir, - [feed.name for feed in self.feeds], - self.avg_cost) - self.check_model_right(model_dir) - shutil.rmtree(model_dir) fleet.stop_worker() print("do_dataset_training stop worker.") diff --git a/python/paddle/fluid/tests/unittests/test_communicator_geo.py b/python/paddle/fluid/tests/unittests/test_communicator_geo.py index 5916000fba79fc0da2ef545beac634a3edfe01df..f625e1de4a3e0564037d71e2393f5914415917d9 100644 --- a/python/paddle/fluid/tests/unittests/test_communicator_geo.py +++ b/python/paddle/fluid/tests/unittests/test_communicator_geo.py @@ -113,6 +113,7 @@ class TestCommunicatorGeoEnd2End(unittest.TestCase): strategy = paddle.distributed.fleet.DistributedStrategy() strategy.a_sync = True strategy.a_sync_configs = {"k_steps": 100} + strategy.a_sync_configs = {"launch_barrier": False} if training_role == "TRAINER": self.run_trainer(role, strategy) diff --git a/python/paddle/fluid/tests/unittests/test_communicator_sync.py b/python/paddle/fluid/tests/unittests/test_communicator_sync.py index 95b209b14602676a089a667b0a720056bbe1562b..78e2050d3b48edc4a2bd0195a371a6b34afb7b1e 100644 --- a/python/paddle/fluid/tests/unittests/test_communicator_sync.py +++ b/python/paddle/fluid/tests/unittests/test_communicator_sync.py @@ -51,6 +51,7 @@ class TestCommunicator(unittest.TestCase): strategy = paddle.distributed.fleet.DistributedStrategy() strategy.a_sync = False + strategy.a_sync_configs = {"launch_barrier": False} optimizer = fleet.distributed_optimizer(optimizer, strategy) optimizer.minimize(avg_cost) diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_async.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_async.py index 7f55e956a94aee79dda07762e953e71807899bff..845be6eda6e0d972ed98535fc987f459e4e8d702 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_async.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_async.py @@ -52,6 +52,7 @@ class TestFleetGradientMergeMetaOptimizer(unittest.TestCase): strategy = paddle.distributed.fleet.DistributedStrategy() strategy.a_sync = True + strategy.a_sync_configs = {"launch_barrier": False} optimizer = paddle.fluid.optimizer.SGD(learning_rate=0.01) optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) optimizer.minimize(avg_cost) @@ -92,6 +93,7 @@ class TestFleetGradientMergeMetaOptimizer(unittest.TestCase): strategy = paddle.distributed.fleet.DistributedStrategy() strategy.a_sync = True + strategy.a_sync_configs = {"launch_barrier": False} optimizer = paddle.fluid.optimizer.SGD(learning_rate=0.01) optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) optimizer.minimize(avg_cost) diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_sync.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_sync.py index db3f2afb3668bc1831286f8d13b274895e7632fd..668b4ad872f43c65c47f75a8af016a6c6af58513 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_sync.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_sync.py @@ -44,6 +44,7 @@ class TestFleetGradientMergeMetaOptimizer(unittest.TestCase): strategy = paddle.distributed.fleet.DistributedStrategy() strategy.a_sync = False + strategy.a_sync_configs = {"launch_barrier": False} optimizer = paddle.fluid.optimizer.SGD(learning_rate=0.01) optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) optimizer.minimize(avg_cost) diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py index c46d1dc5b0f87262aee8efd4722418be433c98ea..195b3f8de0a40011e3da5ddfa6d633daaa90fc1c 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py @@ -312,9 +312,6 @@ class TestFleetBase(unittest.TestCase): "========================Error tr1_err end===========================" ) - self.assertEqual(tr0_ret, 0, "something wrong in tr0, please check") - self.assertEqual(tr1_ret, 0, "something wrong in tr1, please check") - # close trainer file tr0_pipe.close() tr1_pipe.close() @@ -325,6 +322,8 @@ class TestFleetBase(unittest.TestCase): ps1.terminate() shutil.rmtree(gloo_path) + self.assertEqual(tr0_ret, 0, "something wrong in tr0, please check") + self.assertEqual(tr1_ret, 0, "something wrong in tr1, please check") return 0, 0 def check_with_place(self, diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_heter_base.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_heter_base.py index ba97c5079bde429b0b7145208926b570d04725bc..6c5a1d6e36c2549d5e3549f81f26d5ffcca3a247 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_heter_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_heter_base.py @@ -81,7 +81,7 @@ class FleetDistHeterRunnerBase(object): def build_strategy(self, args): self.strategy = paddle.distributed.fleet.DistributedStrategy() self.strategy.a_sync = True - + self.strategy.a_sync_configs = {"launch_barrier": True} return self.strategy def build_optimizer(self, avg_cost, strategy): @@ -237,7 +237,10 @@ class TestFleetHeterBase(unittest.TestCase): return heter0_proc, heter1_proc, heter0_pipe, heter1_pipe def _run_cluster(self, model, envs): - env = {'GRAD_CLIP': str(self._grad_clip_mode)} + env = { + 'GRAD_CLIP': str(self._grad_clip_mode), + 'FLAGS_eager_delete_tensor_gb': str(-1) + } python_path = self._python_interp gloo_path = tempfile.mkdtemp() @@ -286,27 +289,6 @@ class TestFleetHeterBase(unittest.TestCase): tr0_ret = tr0.returncode tr1_ret = tr0.returncode - print("tr get returncode: {}".format(tr0_ret)) - if tr0_ret != 0: - print( - "========================Error tr0_err begin===========================" - ) - os.system("cat {}".format(tempfile.gettempdir() + "/tr0_err.log")) - print( - "========================Error tr0_err end===========================" - ) - - if tr1_ret != 0: - print( - "========================Error tr1_err begin===========================" - ) - os.system("cat {}".format(tempfile.gettempdir() + "/tr1_err.log")) - print( - "========================Error tr1_err end===========================" - ) - - self.assertEqual(tr0_ret, 0, "something wrong in tr0, please check") - self.assertEqual(tr1_ret, 0, "something wrong in tr1, please check") # close trainer file tr0_pipe.close() @@ -320,7 +302,8 @@ class TestFleetHeterBase(unittest.TestCase): ps1.terminate() heter0.terminate() heter1.terminate() - + self.assertEqual(tr0_ret, 0, "something wrong in tr0, please check") + self.assertEqual(tr1_ret, 0, "something wrong in tr1, please check") shutil.rmtree(gloo_path) return 0, 0 diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_heter_ctr.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_heter_ctr.py index b3e38a421287611c43bb82d93b4df166e23f6484..5f7d7b21d7ff8da8699c2f55adcde954c1c0156d 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_heter_ctr.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_heter_ctr.py @@ -23,38 +23,6 @@ import paddle paddle.enable_static() -class TestDistHeterDatasetAsync2x2(TestFleetHeterBase): - def _setup_config(self): - self._mode = "async" - self._reader = "dataset" - - def check_with_place(self, - model_file, - delta=1e-3, - check_error_log=False, - need_envs={}): - required_envs = { - "PATH": os.getenv("PATH", ""), - "PYTHONPATH": os.getenv("PYTHONPATH", ""), - "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""), - "FLAGS_rpc_deadline": "5000", # 5sec to fail fast - "http_proxy": "", - "CPU_NUM": "3" - } - - required_envs.update(need_envs) - - if check_error_log: - required_envs["GLOG_v"] = "3" - required_envs["GLOG_logtostderr"] = "1" - - tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs) - - def test_dist_train(self): - self.check_with_place( - "dist_fleet_heter_ctr.py", delta=1e-5, check_error_log=True) - - class TestDistHeterPyreaderAsync2x2(TestFleetHeterBase): def _setup_config(self): self._mode = "async"