# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import argparse import ast import copy import os import struct import sys import numpy as np import yaml from ps_dnn_model import StaticModel import paddle from paddle.distributed import fleet from paddle.distributed.fleet.base import role_maker from paddle.distributed.ps.utils.ps_program_builder import ( debug_program, logger, new_pass, ps_log_root_dir, ) __dir__ = os.path.dirname(os.path.abspath(__file__)) sys.path.append(os.path.abspath(os.path.join(__dir__, '..'))) def is_distributed_env(): node_role = os.getenv("TRAINING_ROLE") print(f"-- Role: {node_role} --") if node_role is None: return False else: return True class YamlHelper: def load_yaml(self, yaml_file, other_part=None): part_list = ["runner", "hyper_parameters"] if other_part: part_list += other_part running_config = self.get_all_inters_from_yaml(yaml_file, part_list) running_config = self.workspace_adapter(running_config) return running_config def print_yaml(self, config): print(self.pretty_print_envs(config)) def parse_yaml(self, config): vs = [int(i) for i in yaml.__version__.split(".")] if vs[0] < 5: use_full_loader = False elif vs[0] > 5: use_full_loader = True else: if vs[1] >= 1: use_full_loader = True else: use_full_loader = False if os.path.isfile(config): with open(config, 'r', encoding="utf-8") as rb: if use_full_loader: _config = yaml.load(rb.read(), Loader=yaml.FullLoader) else: _config = yaml.load(rb.read()) return _config else: raise ValueError(f"config {config} can not be supported") def get_all_inters_from_yaml(self, file, filters): _envs = self.parse_yaml(file) all_flattens = {} def fatten_env_namespace(namespace_nests, local_envs): for k, v in local_envs.items(): if isinstance(v, dict): nests = copy.deepcopy(namespace_nests) nests.append(k) fatten_env_namespace(nests, v) else: global_k = ".".join(namespace_nests + [k]) all_flattens[global_k] = v fatten_env_namespace([], _envs) ret = {} for k, v in all_flattens.items(): for f in filters: if k.startswith(f): ret[k] = v return ret def workspace_adapter(self, config): workspace = config.get("workspace") for k, v in config.items(): if isinstance(v, str) and "{workspace}" in v: config[k] = v.replace("{workspace}", workspace) return config def pretty_print_envs(self, envs, header=None): spacing = 2 max_k = 40 max_v = 45 for k, v in envs.items(): max_k = max(max_k, len(k)) h_format = " " + "|{{:>{}s}}{}{{:^{}s}}|\n".format( max_k, " " * spacing, max_v ) l_format = " " + f"|{{:>{max_k}s}}{{}}{{:^{max_v}s}}|\n" length = max_k + max_v + spacing border = " +" + "".join(["="] * length) + "+" line = " +" + "".join(["-"] * length) + "+" draws = "" draws += border + "\n" if header: draws += h_format.format(header[0], header[1]) else: draws += h_format.format("Ps Benchmark Envs", "Value") draws += line + "\n" for k, v in sorted(envs.items()): if isinstance(v, str) and len(v) >= max_v: str_v = "... " + v[-41:] else: str_v = v draws += l_format.format(k, " " * spacing, str(str_v)) draws += border _str = f"\n{draws}\n" return _str def get_user_defined_strategy(config): if not is_distributed_env(): logger.warn( "Not Find Distributed env, Change To local train mode. If you want train with fleet, please use [fleetrun] command." ) # return None sync_mode = config.get("runner.sync_mode") assert sync_mode in ["async", "sync", "geo", "heter", "gpubox"] if sync_mode == "sync": strategy = paddle.distributed.fleet.DistributedStrategy() strategy.a_sync = False elif sync_mode == "async": strategy = paddle.distributed.fleet.DistributedStrategy() strategy.a_sync = True strategy.is_fl_ps_mode = ( True if config.get("runner.is_fl_ps_mode") == 1 else False ) if strategy.is_fl_ps_mode: strategy.pipeline = False micro_num = 1 strategy.pipeline_configs = { "accumulate_steps": micro_num } # num_microbatches elif sync_mode == "geo": strategy = paddle.distributed.fleet.DistributedStrategy() strategy.a_sync = True strategy.a_sync_configs = {"k_steps": config.get("runner.geo_step")} elif sync_mode == "heter": strategy = paddle.distributed.fleet.DistributedStrategy() strategy.a_sync = True strategy.a_sync_configs = {"heter_worker_device_guard": "gpu"} strategy.pipeline = True strategy.pipeline_configs = { "accumulate_steps": config.get('runner.micro_num') } elif sync_mode == "gpubox": print(f"sync_mode = {sync_mode}") strategy = paddle.distributed.fleet.DistributedStrategy() strategy.a_sync = True strategy.a_sync_configs = {"use_ps_gpu": 1} strategy.trainer_desc_configs = { "dump_fields_path": config.get("runner.dump_fields_path", ""), "dump_fields": config.get("runner.dump_fields", []), "dump_param": config.get("runner.dump_param", []), "stat_var_names": config.get("stat_var_names", []), "local_sparse": config.get("runner.local_sparse", []), "remote_sparse": config.get("runner.remote_sparse", []), } print("strategy:", strategy.trainer_desc_configs) if config.get("runner.fs_client.uri") is not None: strategy.fs_client_param = { "uri": config.get("runner.fs_client.uri", ""), "user": config.get("runner.fs_client.user", ""), "passwd": config.get("runner.fs_client.passwd", ""), "hadoop_bin": config.get("runner.fs_client.hadoop_bin", "hadoop"), } print("strategy:", strategy.fs_client_param) strategy.adam_d2sum = config.get("hyper_parameters.adam_d2sum", True) table_config = {} for x in config: if x.startswith("table_parameters"): table_name = x.split('.')[1] if table_name not in table_config: table_config[table_name] = {} table_config[table_name][x] = config[x] print("table_config:", table_config) strategy.sparse_table_configs = table_config print("strategy table config:", strategy.sparse_table_configs) a_sync_configs = strategy.a_sync_configs a_sync_configs["launch_barrier"] = False # a_sync_configs["launch_barrier"] = True strategy.a_sync_configs = a_sync_configs print("launch_barrier: ", strategy.a_sync_configs["launch_barrier"]) return strategy def get_distributed_strategy(user_defined_strategy): # pslib from paddle.incubate.distributed.fleet.parameter_server.distribute_transpiler.distributed_strategy import ( StrategyFactory, ) k_steps = user_defined_strategy.a_sync_configs["k_steps"] strategy = None if not user_defined_strategy.a_sync and k_steps == 0: strategy = StrategyFactory.create_sync_strategy() if user_defined_strategy.a_sync and k_steps == 0: strategy = StrategyFactory.create_async_strategy() if user_defined_strategy.a_sync and k_steps > 0: strategy = StrategyFactory.create_geo_strategy(k_steps) if not strategy: raise ValueError("k_steps must be invalid value, please check") return strategy def get_model(config): abs_dir = config['config_abs_dir'] sys.path.append(abs_dir) static_model = StaticModel(config) return static_model def parse_args(): parser = argparse.ArgumentParser("PsTest train script") parser.add_argument( '-m', '--config_yaml', type=str, required=True, help='config file path' ) parser.add_argument( '-bf16', '--pure_bf16', type=ast.literal_eval, default=False, help="whether use bf16", ) parser.add_argument( '--run_minimize', type=int, default=0, help="test single pass" ) parser.add_argument( '--run_single_pass', type=int, default=0, help="test single pass" ) parser.add_argument( '--run_the_one_ps', type=int, default=0, help="test the_one_ps" ) parser.add_argument( '--debug_new_minimize', type=int, default=0, help="test single pass" ) parser.add_argument( '--debug_new_pass', type=int, default=0, help="test single pass" ) parser.add_argument( '--applied_pass_name', type=str, default="", help="test single pass" ) parser.add_argument( '--debug_the_one_ps', type=int, default=0, help="test the_one_ps" ) args = parser.parse_args() args.abs_dir = os.path.dirname(os.path.abspath(args.config_yaml)) yaml_helper = YamlHelper() config = yaml_helper.load_yaml(args.config_yaml) config["yaml_path"] = args.config_yaml config["config_abs_dir"] = args.abs_dir config["pure_bf16"] = args.pure_bf16 config['run_minimize'] = args.run_minimize config['run_single_pass'] = args.run_single_pass config['run_the_one_ps'] = args.run_the_one_ps config['debug_new_minimize'] = args.debug_new_minimize config['debug_new_pass'] = args.debug_new_pass config['applied_pass_name'] = args.applied_pass_name config['debug_the_one_ps'] = args.debug_the_one_ps yaml_helper.print_yaml(config) return config def bf16_to_fp32(val): return np.float32(struct.unpack('>>>>>>>>> python process started") os.environ["CPU_NUM"] = str(config.get("runner.thread_num")) benchmark_main = DnnTrainer(config) if config['run_single_pass'] == 1: benchmark_main.run_single_pass() elif config['run_minimize'] == 1: benchmark_main.run_minimize() elif config['run_the_one_ps'] == 1: benchmark_main.run_the_one_ps()