未验证 提交 4dc8c44b 编写于 作者: C Chengmo 提交者: GitHub

【Paddle.Fleet】Fix fleetrun heter (#28252)

* fix fleetrun heter ps on paddlecloud
上级 8f83d5d8
......@@ -603,7 +603,7 @@ def cloud_ps_heter_env_set(args):
avilable_ports = os.getenv("TRAINER_PORTS", "").split(",")
assert len(
avilable_ports
) > 3, "set paddle_ports_num >= 2 in config.ini for paddlecloud job submit"
) >= 2, "set paddle_ports_num >= 2 in config.ini for paddlecloud job submit"
# hard code for paddlecloud custom-framework
trainers_num = len(paddle_pserver_endpoints.split(","))
......@@ -894,7 +894,7 @@ class ParameterServerLauncher(object):
"TRAINING_ROLE": "PSERVER",
"PADDLE_TRAINERS_NUM": str(self.worker_num),
"POD_IP": cur_server.endpoint.split(":")[0],
"PADDLE_WITH_GLOO": "1",
"PADDLE_WITH_GLOO": str(os.getenv("PADDLE_WITH_GLOO", "1")),
"PADDLE_GLOO_RENDEZVOUS": "3",
"PADDLE_GLOO_FS_PATH": self.gloo_rendezvous_dir,
"PADDLE_GLOO_HTTP_ENDPOINT": self.http_port
......@@ -958,7 +958,7 @@ class ParameterServerLauncher(object):
self.heter_worker_endpoints,
"TRAINING_ROLE": "TRAINER",
"PADDLE_TRAINER_ID": str(cur_worker.rank),
"PADDLE_WITH_GLOO": "1",
"PADDLE_WITH_GLOO": str(os.getenv("PADDLE_WITH_GLOO", "1")),
"PADDLE_GLOO_RENDEZVOUS": "3",
"PADDLE_GLOO_FS_PATH": self.gloo_rendezvous_dir,
"FLAGS_selected_gpus": "0",
......@@ -1014,7 +1014,8 @@ class ParameterServerLauncher(object):
elif fluid.core.is_compiled_with_xpu():
heter_device_num = fluid.core.get_xpu_device_count()
device_list = [str(x) for x in range(0, heter_device_num)]
assert heter_device_num != 0
if heter_device_num == 0:
return
for idx, cur_heter_worker in enumerate(pod.heter_workers):
device_id = str(device_list[idx % heter_device_num])
......@@ -1027,7 +1028,7 @@ class ParameterServerLauncher(object):
"TRAINING_ROLE": "HETER_TRAINER",
"PADDLE_TRAINERS_NUM": str(self.worker_num),
"POD_IP": cur_heter_worker.endpoint.split(":")[0],
"PADDLE_WITH_GLOO": "1",
"PADDLE_WITH_GLOO": str(os.getenv("PADDLE_WITH_GLOO", "1")),
"PADDLE_GLOO_RENDEZVOUS": "3",
"PADDLE_GLOO_FS_PATH": self.gloo_rendezvous_dir,
"FLAGS_selected_gpus": "0",
......
......@@ -16,7 +16,9 @@ from .meta_optimizer_base import MetaOptimizerBase
from paddle.fluid import core
import subprocess
import re
import os
import platform
from ..base.private_helper_function import wait_server_ready
class ParameterServerOptimizer(MetaOptimizerBase):
......@@ -96,6 +98,18 @@ class ParameterServerOptimizer(MetaOptimizerBase):
compiled_config.set_origin_ps_main_program(_main)
compiled_config.set_origin_ps_startup_program(_startup)
launch_barrier = self.user_defined_strategy.a_sync_configs[
"launch_barrier"]
launch_barrier_flag = int(os.getenv("FLAGS_LAUNCH_BARRIER", "1"))
if launch_barrier and launch_barrier_flag:
# for trainer wait server ready
wait_server_ready(self.role_maker._get_pserver_endpoints())
# for ps-heter mode, wait heter worker ready
if self.role_maker._is_heter_parameter_server_mode and self.role_maker._is_worker(
):
wait_server_ready(self.role_maker._get_heter_worker_endpoints())
return _main, _startup
def _build_pserver_programs(self, compiled_config):
......
......@@ -14,6 +14,7 @@
# limitations under the License.
from __future__ import print_function
import os
import six
import collections
import warnings
......@@ -549,11 +550,10 @@ def create_heter_program(program, config, heter_program, heter_ops,
"pserver_id": config.get_role_id(),
"Fanin": config.get_trainers(),
"distributed_mode": config.get_distributed_mode(),
"rpc_get_thread_num": 12,
"rpc_send_thread_num": 12,
"rpc_prefetch_thread_num": 12
"rpc_get_thread_num": int(os.getenv("CPU_NUM", 32)),
"rpc_send_thread_num": int(os.getenv("CPU_NUM", 32)),
"rpc_prefetch_thread_num": int(os.getenv("CPU_NUM", 32))
}
# append the listen_and_serv op
heter_program.global_block().append_op(
type="listen_and_serv", inputs={'X': []}, outputs={}, attrs=attrs)
......
......@@ -54,6 +54,7 @@ class TestFleetGradientMergeMetaOptimizer(unittest.TestCase):
input=prediction, label=input_y)
avg_cost = paddle.fluid.layers.mean(x=cost)
os.environ["FLAGS_LAUNCH_BARRIER"] = "0"
strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.auto = True
optimizer = paddle.fluid.optimizer.Adam(learning_rate=0.01)
......
......@@ -66,6 +66,7 @@ class TestFleetGradientMergeMetaOptimizer(unittest.TestCase):
input=prediction, label=input_y)
avg_cost = paddle.fluid.layers.mean(x=cost)
os.environ["FLAGS_LAUNCH_BARRIER"] = "0"
strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.auto = True
optimizer = paddle.fluid.optimizer.SGD(learning_rate=0.01)
......
......@@ -53,7 +53,7 @@ class TestFleetGradientMergeMetaOptimizer(unittest.TestCase):
cost = paddle.fluid.layers.cross_entropy(
input=prediction, label=input_y)
avg_cost = paddle.fluid.layers.mean(x=cost)
os.environ["FLAGS_LAUNCH_BARRIER"] = "0"
strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.auto = True
optimizer = paddle.fluid.optimizer.SGD(learning_rate=0.01)
......
......@@ -56,7 +56,7 @@ class TestFleetGradientMergeMetaOptimizer(unittest.TestCase):
strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.a_sync = True
strategy.a_sync_configs = {"k_steps": 100}
strategy.a_sync_configs = {"k_steps": 100, "launch_barrier": False}
optimizer = paddle.fluid.optimizer.SGD(learning_rate=0.01)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
optimizer.minimize(avg_cost)
......@@ -99,7 +99,7 @@ class TestFleetGradientMergeMetaOptimizer(unittest.TestCase):
strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.a_sync = True
strategy.a_sync_configs = {"k_steps": 100}
strategy.a_sync_configs = {"k_steps": 100, "launch_barrier": False}
optimizer = paddle.optimizer.SGD(learning_rate=0.01)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
optimizer.minimize(avg_cost)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册