From 4dc8c44ba163733eddf9edbae36b93a4f7374501 Mon Sep 17 00:00:00 2001 From: Chengmo Date: Wed, 28 Oct 2020 14:12:55 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90Paddle.Fleet=E3=80=91Fix=20fleetrun=20?= =?UTF-8?q?heter=20(#28252)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix fleetrun heter ps on paddlecloud --- python/paddle/distributed/fleet/launch_utils.py | 11 ++++++----- .../meta_optimizers/parameter_server_optimizer.py | 14 ++++++++++++++ .../fleet/parameter_server/ir/trainer_pass.py | 8 ++++---- .../test_dist_fleet_a_sync_optimizer_auto.py | 1 + .../test_dist_fleet_a_sync_optimizer_auto_async.py | 1 + .../test_dist_fleet_a_sync_optimizer_auto_geo.py | 2 +- .../test_dist_fleet_a_sync_optimizer_geo.py | 4 ++-- 7 files changed, 29 insertions(+), 12 deletions(-) diff --git a/python/paddle/distributed/fleet/launch_utils.py b/python/paddle/distributed/fleet/launch_utils.py index ec4b0342f24..2ae5747af9e 100644 --- a/python/paddle/distributed/fleet/launch_utils.py +++ b/python/paddle/distributed/fleet/launch_utils.py @@ -603,7 +603,7 @@ def cloud_ps_heter_env_set(args): avilable_ports = os.getenv("TRAINER_PORTS", "").split(",") assert len( avilable_ports - ) > 3, "set paddle_ports_num >= 2 in config.ini for paddlecloud job submit" + ) >= 2, "set paddle_ports_num >= 2 in config.ini for paddlecloud job submit" # hard code for paddlecloud custom-framework trainers_num = len(paddle_pserver_endpoints.split(",")) @@ -894,7 +894,7 @@ class ParameterServerLauncher(object): "TRAINING_ROLE": "PSERVER", "PADDLE_TRAINERS_NUM": str(self.worker_num), "POD_IP": cur_server.endpoint.split(":")[0], - "PADDLE_WITH_GLOO": "1", + "PADDLE_WITH_GLOO": str(os.getenv("PADDLE_WITH_GLOO", "1")), "PADDLE_GLOO_RENDEZVOUS": "3", "PADDLE_GLOO_FS_PATH": self.gloo_rendezvous_dir, "PADDLE_GLOO_HTTP_ENDPOINT": self.http_port @@ -958,7 +958,7 @@ class ParameterServerLauncher(object): self.heter_worker_endpoints, "TRAINING_ROLE": "TRAINER", "PADDLE_TRAINER_ID": str(cur_worker.rank), - "PADDLE_WITH_GLOO": "1", + "PADDLE_WITH_GLOO": str(os.getenv("PADDLE_WITH_GLOO", "1")), "PADDLE_GLOO_RENDEZVOUS": "3", "PADDLE_GLOO_FS_PATH": self.gloo_rendezvous_dir, "FLAGS_selected_gpus": "0", @@ -1014,7 +1014,8 @@ class ParameterServerLauncher(object): elif fluid.core.is_compiled_with_xpu(): heter_device_num = fluid.core.get_xpu_device_count() device_list = [str(x) for x in range(0, heter_device_num)] - assert heter_device_num != 0 + if heter_device_num == 0: + return for idx, cur_heter_worker in enumerate(pod.heter_workers): device_id = str(device_list[idx % heter_device_num]) @@ -1027,7 +1028,7 @@ class ParameterServerLauncher(object): "TRAINING_ROLE": "HETER_TRAINER", "PADDLE_TRAINERS_NUM": str(self.worker_num), "POD_IP": cur_heter_worker.endpoint.split(":")[0], - "PADDLE_WITH_GLOO": "1", + "PADDLE_WITH_GLOO": str(os.getenv("PADDLE_WITH_GLOO", "1")), "PADDLE_GLOO_RENDEZVOUS": "3", "PADDLE_GLOO_FS_PATH": self.gloo_rendezvous_dir, "FLAGS_selected_gpus": "0", diff --git a/python/paddle/distributed/fleet/meta_optimizers/parameter_server_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/parameter_server_optimizer.py index 83345cb6f62..10b0c82c0ee 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/parameter_server_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/parameter_server_optimizer.py @@ -16,7 +16,9 @@ from .meta_optimizer_base import MetaOptimizerBase from paddle.fluid import core import subprocess import re +import os import platform +from ..base.private_helper_function import wait_server_ready class ParameterServerOptimizer(MetaOptimizerBase): @@ -96,6 +98,18 @@ class ParameterServerOptimizer(MetaOptimizerBase): compiled_config.set_origin_ps_main_program(_main) compiled_config.set_origin_ps_startup_program(_startup) + launch_barrier = self.user_defined_strategy.a_sync_configs[ + "launch_barrier"] + launch_barrier_flag = int(os.getenv("FLAGS_LAUNCH_BARRIER", "1")) + if launch_barrier and launch_barrier_flag: + # for trainer wait server ready + wait_server_ready(self.role_maker._get_pserver_endpoints()) + + # for ps-heter mode, wait heter worker ready + if self.role_maker._is_heter_parameter_server_mode and self.role_maker._is_worker( + ): + wait_server_ready(self.role_maker._get_heter_worker_endpoints()) + return _main, _startup def _build_pserver_programs(self, compiled_config): diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py b/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py index 3f826da3ae2..8749b939de2 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py @@ -14,6 +14,7 @@ # limitations under the License. from __future__ import print_function +import os import six import collections import warnings @@ -549,11 +550,10 @@ def create_heter_program(program, config, heter_program, heter_ops, "pserver_id": config.get_role_id(), "Fanin": config.get_trainers(), "distributed_mode": config.get_distributed_mode(), - "rpc_get_thread_num": 12, - "rpc_send_thread_num": 12, - "rpc_prefetch_thread_num": 12 + "rpc_get_thread_num": int(os.getenv("CPU_NUM", 32)), + "rpc_send_thread_num": int(os.getenv("CPU_NUM", 32)), + "rpc_prefetch_thread_num": int(os.getenv("CPU_NUM", 32)) } - # append the listen_and_serv op heter_program.global_block().append_op( type="listen_and_serv", inputs={'X': []}, outputs={}, attrs=attrs) diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_auto.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_auto.py index b8393f1e28a..35577c27121 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_auto.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_auto.py @@ -54,6 +54,7 @@ class TestFleetGradientMergeMetaOptimizer(unittest.TestCase): input=prediction, label=input_y) avg_cost = paddle.fluid.layers.mean(x=cost) + os.environ["FLAGS_LAUNCH_BARRIER"] = "0" strategy = paddle.distributed.fleet.DistributedStrategy() strategy.auto = True optimizer = paddle.fluid.optimizer.Adam(learning_rate=0.01) diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_auto_async.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_auto_async.py index 49b34f059e8..415a8092b1b 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_auto_async.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_auto_async.py @@ -66,6 +66,7 @@ class TestFleetGradientMergeMetaOptimizer(unittest.TestCase): input=prediction, label=input_y) avg_cost = paddle.fluid.layers.mean(x=cost) + os.environ["FLAGS_LAUNCH_BARRIER"] = "0" strategy = paddle.distributed.fleet.DistributedStrategy() strategy.auto = True optimizer = paddle.fluid.optimizer.SGD(learning_rate=0.01) diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_auto_geo.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_auto_geo.py index 334a4e028b2..ec975ec1fa8 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_auto_geo.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_auto_geo.py @@ -53,7 +53,7 @@ class TestFleetGradientMergeMetaOptimizer(unittest.TestCase): cost = paddle.fluid.layers.cross_entropy( input=prediction, label=input_y) avg_cost = paddle.fluid.layers.mean(x=cost) - + os.environ["FLAGS_LAUNCH_BARRIER"] = "0" strategy = paddle.distributed.fleet.DistributedStrategy() strategy.auto = True optimizer = paddle.fluid.optimizer.SGD(learning_rate=0.01) diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_geo.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_geo.py index db73069bf7d..71937f70ef8 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_geo.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_geo.py @@ -56,7 +56,7 @@ class TestFleetGradientMergeMetaOptimizer(unittest.TestCase): strategy = paddle.distributed.fleet.DistributedStrategy() strategy.a_sync = True - strategy.a_sync_configs = {"k_steps": 100} + strategy.a_sync_configs = {"k_steps": 100, "launch_barrier": False} optimizer = paddle.fluid.optimizer.SGD(learning_rate=0.01) optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) optimizer.minimize(avg_cost) @@ -99,7 +99,7 @@ class TestFleetGradientMergeMetaOptimizer(unittest.TestCase): strategy = paddle.distributed.fleet.DistributedStrategy() strategy.a_sync = True - strategy.a_sync_configs = {"k_steps": 100} + strategy.a_sync_configs = {"k_steps": 100, "launch_barrier": False} optimizer = paddle.optimizer.SGD(learning_rate=0.01) optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) optimizer.minimize(avg_cost) -- GitLab