diff --git a/paddle/fluid/framework/distributed_strategy.proto b/paddle/fluid/framework/distributed_strategy.proto index 2a7d97f3530298a96f58c92af154ddc6c98da5f1..a24cc4cb55c0491ed9be0298e4fbac4f2434b6d0 100644 --- a/paddle/fluid/framework/distributed_strategy.proto +++ b/paddle/fluid/framework/distributed_strategy.proto @@ -97,7 +97,7 @@ message AsyncConfig { optional int32 thread_pool_size = 6 [ default = 1 ]; optional int32 send_wait_times = 7 [ default = 1 ]; optional bool runtime_split_send_recv = 8 [ default = false ]; - optional string worker_device = 9 [ default = 'cpu' ]; + optional string heter_worker_device = 9 [ default = 'cpu' ]; } message PipelineConfig { optional int32 micro_batch = 1 [ default = 1 ]; } diff --git a/python/paddle/distributed/fleet/base/role_maker.py b/python/paddle/distributed/fleet/base/role_maker.py index dd76d14284107cf75113c9ec24f8747304a41a2b..aab0a6f3cd3f3e31dcbedd30711ba8f62824021b 100644 --- a/python/paddle/distributed/fleet/base/role_maker.py +++ b/python/paddle/distributed/fleet/base/role_maker.py @@ -511,13 +511,6 @@ class RoleMakerBase(object): return self._heter_trainer_endpoints[(self._current_id) % self._heter_worker_num()] - def _get_heter_worker_device(self): - """ - Returns: - string: heter_trainer's device of current node, e.g: CPU/GPU/XPU - """ - return self._heter_trainer_device.upper() - class PaddleCloudRoleMaker(RoleMakerBase): def __init__(self, is_collective=False, **kwargs): @@ -696,8 +689,7 @@ class PaddleCloudRoleMaker(RoleMakerBase): # For heter parameter server env setting heter_trainer_eplist = os.getenv("PADDLE_HETER_TRAINER_IP_PORT_LIST", "") - heter_trainer_device = os.getenv("PADDLE_HETER_TRAINER_DEVICE", "") - if heter_trainer_eplist != "" and heter_trainer_device != "": + if heter_trainer_eplist != "": try: heter_trainer_eplist = os.environ[ "PADDLE_HETER_TRAINER_IP_PORT_LIST"].split(",") @@ -708,12 +700,6 @@ class PaddleCloudRoleMaker(RoleMakerBase): self._is_heter_parameter_server_mode = True heter_trainers_num = len(heter_trainer_eplist) - current_node_device = heter_trainer_device.upper() - if current_node_device not in ["CPU", "GPU", "XPU"]: - raise ValueError( - "Heter Trainer doesn't support {} device now, please use CPU / GPU / XPU(KunLun)". - format(heter_trainer_device)) - self._heter_trainer_device = current_node_device else: self._is_heter_parameter_server_mode = False heter_trainers_num = 0 diff --git a/python/paddle/distributed/fleet/launch.py b/python/paddle/distributed/fleet/launch.py index 1a87fc2c300896db972b39e0294df0689c3ede6e..519a80c30dee90e3643ec3151fc990614d1f21cd 100644 --- a/python/paddle/distributed/fleet/launch.py +++ b/python/paddle/distributed/fleet/launch.py @@ -91,14 +91,6 @@ see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/tra ''') base_group = parser.add_argument_group("Base Parameters") - base_group.add_argument( - "-d", - "--distributed_mode", - type=str, - choices=["collective", "ps", "ps_heter", "ps_gpu", ""], - default="", - help="Distributed running mode: collective/ps/ps_gpu/ps_heter") - base_group.add_argument( "--log_dir", type=str, @@ -150,13 +142,6 @@ see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/tra ps_group.add_argument( "--heter_worker_num", type=int, help="number of heter_workers") - ps_group.add_argument( - "--heter_worker_device", - type=str, - default="gpu", - choices=["gpu", "xpu"], - help="heter worker device") - return parser.parse_args() @@ -244,34 +229,37 @@ def launch_collective(args): shutil.rmtree(gloo_rendezvous_dir) -def launch_ps(args): +def launch_ps(args, distribute_mode): cloud_flag = cloud_utils.use_paddlecloud() # for ps-cpu on paddlecloud - direct_start_mode = ["ps", ""] - if cloud_flag and (args.distributed_mode in direct_start_mode): + if cloud_flag and distribute_mode == DistributeMode.PS: direct_start(args) return - elif cloud_flag and args.distributed_mode == "ps_heter": + elif cloud_flag and distribute_mode == DistributeMode.PS_HETER: cloud_ps_heter_env_set(args) args.trainers = os.getenv("PADDLE_TRAINER_ENDPOINTS") args.workers = os.getenv("PADDLE_PSERVERS_IP_PORT_LIST") args.heter_workers = os.getenv("PADDLE_HETER_TRAINER_IP_PORT_LIST") - ps_launcher = ParameterServerLauncher(args) - ps_launcher.start_ps(args) + ps_launcher = ParameterServerLauncher(args, distribute_mode) + ps_launcher.start_ps() return -def launch(): - args = _parse_args() - logger = get_logger() - _print_arguments(args) +def which_distributed_mode(args): ps_args = [ - '--worker_num', '--server_num', '--heter_worker_num', '--servers', - '--workers', '--heter_worrkers', 'heter_worker_device' + '--worker_num', + '--server_num', + '--heter_worker_num', + '--servers', + '--workers', + '--heter_workers', ] - collective_args = ['--ips', '--gpus'] + collective_args = ['--ips'] + + ps_heter_args = ["--heter_worker_num", "--heter_workers"] + has_ps_args = [ ps_arg for ps_arg in ps_args if ps_arg in " ".join(sys.argv[1:-1]) ] @@ -279,25 +267,45 @@ def launch(): co_arg for co_arg in collective_args if co_arg in " ".join(sys.argv[1:-1]) ] + + assert ( + len(has_ps_args) > 1 and len(has_collective_args) > 1 + ), "Only one mode(Collective or Parameter-Server ) can be selected at the same time, but more than one configuration was received." + if fluid.core.is_compiled_with_cuda(): cuda_device_num = fluid.core.get_cuda_device_count() else: cuda_device_num = 0 - ps_mode = ['ps', 'ps_gpu', 'ps_heter'] - if len(has_ps_args) > 0 or args.distributed_mode in ps_mode: + if len(has_ps_args) > 0: logger.info( "Run parameter-sever mode. pserver arguments:{}, cuda count:{}". format(has_ps_args, cuda_device_num)) - launch_ps(args) + has_ps_heter_args = list(set(has_ps_args) & set(ps_heter_args)) + if len(has_ps_heter_args) > 0: + return DistributeMode.PS_HETER + else: + return DistributeMode.PS elif len(has_collective_args) > 0: logger.info("Run collective gpu mode. gpu arguments:{}, cuda count:{}". format(has_collective_args, cuda_device_num)) - launch_collective(args) + return DistributeMode.COLLECTIVE else: logger.warning( "Not found distinct arguments. Default use gpu collective mode") + return DistributeMode.COLLECTIVE + + +def launch(): + args = _parse_args() + logger = get_logger() + _print_arguments(args) + + distribute_mode = which_distributed_mode(args) + if distribute_mode == DistributeMode.COLLECTIVE: launch_collective(args) + else: + launch_ps(args, distribute_mode) if __name__ == "__main__": diff --git a/python/paddle/distributed/fleet/launch_utils.py b/python/paddle/distributed/fleet/launch_utils.py index 708ef396930ceea2afd08244e92eff93ec1c42ef..e7204441ca584ed433182e2136e6dae2a47bcb45 100644 --- a/python/paddle/distributed/fleet/launch_utils.py +++ b/python/paddle/distributed/fleet/launch_utils.py @@ -32,6 +32,15 @@ logger = logging.getLogger("root") logger.propagate = False +class DistributeMode: + """ + There are various mode for fleetrun, each of them is designed for different model. + """ + COLLECTIVE = 0 + PS = 1 + PS_HETER = 2 + + class Cluster(object): def __init__(self, hdfs): self.job_server = None @@ -616,7 +625,9 @@ def cloud_ps_heter_env_set(args): class ParameterServerLauncher(object): - def __init__(self, args): + def __init__(self, args, distribute_mode): + self.args = args + self.distribute_mode = distribute_mode self.server_num = 0 self.worker_num = 0 self.heter_worker_num = 0 @@ -677,7 +688,7 @@ class ParameterServerLauncher(object): self.worker_num = len(self.worker_endpoints.split(",")) # get heter worker envs - if args.distributed_mode == "ps_heter": + if self.distribute_mode == DistributeMode.PS_HETER: if args.heter_worker_num: self.heter_worker_num = args.heter_worker_num if args.heter_workers: @@ -713,7 +724,7 @@ class ParameterServerLauncher(object): ] self.node_ips = list( set(self.server_endpoints_ips + self.worker_endpoints_ips)) - if args.distributed_mode == "ps_heter": + if self.distribute_mode == DistributeMode.PS_HETER: self.heter_worker_endpoints_ips = [ x.strip().split(":")[0] for x in self.heter_worker_endpoints.split(",") diff --git a/python/paddle/distributed/fleet/runtime/parameter_server_runtime.py b/python/paddle/distributed/fleet/runtime/parameter_server_runtime.py index af309fe1e36decd301e002bc1803068278d5dfca..1db9da908a22c96dccba96a0c907ab86f3595d27 100644 --- a/python/paddle/distributed/fleet/runtime/parameter_server_runtime.py +++ b/python/paddle/distributed/fleet/runtime/parameter_server_runtime.py @@ -198,16 +198,21 @@ class ParameterServerRuntime(RuntimeBase): warnings.warn("communicator has been initialized, skip") def _get_executor(self): + if self.role_maker._is_heter_worker(): - if self.role_maker._get_heter_worker_device() == "GPU": + heter_worker_device = self.context["valid_strategy"].a_sync_configs[ + "heter_worker_device"].upper() + if heter_worker_device == "GPU": gpu_id = int(os.getenv("FLAGS_selected_gpus", "0")) executor = Executor(fluid.CUDAPlace(gpu_id)) - elif self.role_maker._get_heter_worker_device() == "XPU": + elif heter_worker_device == "XPU": xpu_id = int(os.getenv("FLAGS_selected_xpus", "0")) executor = Executor(fluid.XPUPlace(xpu_id)) + elif heter_worker_device == "CPU": + fluid.Executor(fluid.CPUPlace()) else: - raise ValueError("Not Support Device {}".format( - self.role_maker._get_heter_worker_device())) + raise ValueError("Heter Worker Not Support Device {}".format( + heter_worker_device)) else: executor = fluid.Executor(fluid.CPUPlace()) return executor