# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import argparse import ast import copy import os import struct import sys import numpy as np import yaml import paddle from paddle.distributed import fleet from paddle.distributed.fleet.base import role_maker from paddle.distributed.ps.utils.ps_program_builder import ( debug_program, logger, new_pass, ps_log_root_dir, ) sys.path.append("..") from ps_dnn_model import StaticModel __dir__ = os.path.dirname(os.path.abspath(__file__)) sys.path.append(os.path.abspath(os.path.join(__dir__, '..'))) def is_distributed_env(): node_role = os.getenv("TRAINING_ROLE") print("-- Role: {} --".format(node_role)) if node_role is None: return False else: return True class YamlHelper: def load_yaml(self, yaml_file, other_part=None): part_list = ["runner", "hyper_parameters"] if other_part: part_list += other_part running_config = self.get_all_inters_from_yaml(yaml_file, part_list) running_config = self.workspace_adapter(running_config) return running_config def print_yaml(self, config): print(self.pretty_print_envs(config)) def parse_yaml(self, config): vs = [int(i) for i in yaml.__version__.split(".")] if vs[0] < 5: use_full_loader = False elif vs[0] > 5: use_full_loader = True else: if vs[1] >= 1: use_full_loader = True else: use_full_loader = False if os.path.isfile(config): with open(config, 'r', encoding="utf-8") as rb: if use_full_loader: _config = yaml.load(rb.read(), Loader=yaml.FullLoader) else: _config = yaml.load(rb.read()) return _config else: raise ValueError("config {} can not be supported".format(config)) def get_all_inters_from_yaml(self, file, filters): _envs = self.parse_yaml(file) all_flattens = {} def fatten_env_namespace(namespace_nests, local_envs): for k, v in local_envs.items(): if isinstance(v, dict): nests = copy.deepcopy(namespace_nests) nests.append(k) fatten_env_namespace(nests, v) else: global_k = ".".join(namespace_nests + [k]) all_flattens[global_k] = v fatten_env_namespace([], _envs) ret = {} for k, v in all_flattens.items(): for f in filters: if k.startswith(f): ret[k] = v return ret def workspace_adapter(self, config): workspace = config.get("workspace") for k, v in config.items(): if isinstance(v, str) and "{workspace}" in v: config[k] = v.replace("{workspace}", workspace) return config def pretty_print_envs(self, envs, header=None): spacing = 2 max_k = 40 max_v = 45 for k, v in envs.items(): max_k = max(max_k, len(k)) h_format = " " + "|{{:>{}s}}{}{{:^{}s}}|\n".format( max_k, " " * spacing, max_v ) l_format = " " + "|{{:>{}s}}{{}}{{:^{}s}}|\n".format(max_k, max_v) length = max_k + max_v + spacing border = " +" + "".join(["="] * length) + "+" line = " +" + "".join(["-"] * length) + "+" draws = "" draws += border + "\n" if header: draws += h_format.format(header[0], header[1]) else: draws += h_format.format("Ps Benchmark Envs", "Value") draws += line + "\n" for k, v in sorted(envs.items()): if isinstance(v, str) and len(v) >= max_v: str_v = "... " + v[-41:] else: str_v = v draws += l_format.format(k, " " * spacing, str(str_v)) draws += border _str = "\n{}\n".format(draws) return _str def get_user_defined_strategy(config): if not is_distributed_env(): logger.warn( "Not Find Distributed env, Change To local train mode. If you want train with fleet, please use [fleetrun] command." ) # return None sync_mode = config.get("runner.sync_mode") assert sync_mode in ["async", "sync", "geo", "heter", "gpubox"] if sync_mode == "sync": strategy = paddle.distributed.fleet.DistributedStrategy() strategy.a_sync = False elif sync_mode == "async": strategy = paddle.distributed.fleet.DistributedStrategy() strategy.a_sync = True strategy.is_fl_ps_mode = ( True if config.get("runner.is_fl_ps_mode") == 1 else False ) if strategy.is_fl_ps_mode: strategy.pipeline = False micro_num = 1 strategy.pipeline_configs = { "accumulate_steps": micro_num } # num_microbatches elif sync_mode == "geo": strategy = paddle.distributed.fleet.DistributedStrategy() strategy.a_sync = True strategy.a_sync_configs = {"k_steps": config.get("runner.geo_step")} elif sync_mode == "heter": strategy = paddle.distributed.fleet.DistributedStrategy() strategy.a_sync = True strategy.a_sync_configs = {"heter_worker_device_guard": "gpu"} strategy.pipeline = True strategy.pipeline_configs = { "accumulate_steps": config.get('runner.micro_num') } elif sync_mode == "gpubox": print("sync_mode = {}".format(sync_mode)) strategy = paddle.distributed.fleet.DistributedStrategy() strategy.a_sync = True strategy.a_sync_configs = {"use_ps_gpu": 1} strategy.trainer_desc_configs = { "dump_fields_path": config.get("runner.dump_fields_path", ""), "dump_fields": config.get("runner.dump_fields", []), "dump_param": config.get("runner.dump_param", []), "stat_var_names": config.get("stat_var_names", []), "local_sparse": config.get("runner.local_sparse", []), "remote_sparse": config.get("runner.remote_sparse", []), } print("strategy:", strategy.trainer_desc_configs) if config.get("runner.fs_client.uri") is not None: strategy.fs_client_param = { "uri": config.get("runner.fs_client.uri", ""), "user": config.get("runner.fs_client.user", ""), "passwd": config.get("runner.fs_client.passwd", ""), "hadoop_bin": config.get("runner.fs_client.hadoop_bin", "hadoop"), } print("strategy:", strategy.fs_client_param) strategy.adam_d2sum = config.get("hyper_parameters.adam_d2sum", True) table_config = {} for x in config: if x.startswith("table_parameters"): table_name = x.split('.')[1] if table_name not in table_config: table_config[table_name] = {} table_config[table_name][x] = config[x] print("table_config:", table_config) strategy.sparse_table_configs = table_config print("strategy table config:", strategy.sparse_table_configs) a_sync_configs = strategy.a_sync_configs a_sync_configs["launch_barrier"] = False # a_sync_configs["launch_barrier"] = True strategy.a_sync_configs = a_sync_configs print("launch_barrier: ", strategy.a_sync_configs["launch_barrier"]) return strategy def get_distributed_strategy(user_defined_strategy): # pslib from paddle.incubate.distributed.fleet.parameter_server.distribute_transpiler.distributed_strategy import ( StrategyFactory, ) k_steps = user_defined_strategy.a_sync_configs["k_steps"] strategy = None if not user_defined_strategy.a_sync and k_steps == 0: strategy = StrategyFactory.create_sync_strategy() if user_defined_strategy.a_sync and k_steps == 0: strategy = StrategyFactory.create_async_strategy() if user_defined_strategy.a_sync and k_steps > 0: strategy = StrategyFactory.create_geo_strategy(k_steps) if not strategy: raise ValueError("k_steps must be invalid value, please check") return strategy def get_model(config): abs_dir = config['config_abs_dir'] sys.path.append(abs_dir) static_model = StaticModel(config) return static_model def parse_args(): parser = argparse.ArgumentParser("PsTest train script") parser.add_argument( '-m', '--config_yaml', type=str, required=True, help='config file path' ) parser.add_argument( '-bf16', '--pure_bf16', type=ast.literal_eval, default=False, help="whether use bf16", ) parser.add_argument( '--run_minimize', type=int, default=0, help="test single pass" ) parser.add_argument( '--run_single_pass', type=int, default=0, help="test single pass" ) parser.add_argument( '--run_the_one_ps', type=int, default=0, help="test the_one_ps" ) parser.add_argument( '--debug_new_minimize', type=int, default=0, help="test single pass" ) parser.add_argument( '--debug_new_pass', type=int, default=0, help="test single pass" ) parser.add_argument( '--applied_pass_name', type=str, default="", help="test single pass" ) parser.add_argument( '--debug_the_one_ps', type=int, default=0, help="test the_one_ps" ) args = parser.parse_args() args.abs_dir = os.path.dirname(os.path.abspath(args.config_yaml)) yaml_helper = YamlHelper() config = yaml_helper.load_yaml(args.config_yaml) config["yaml_path"] = args.config_yaml config["config_abs_dir"] = args.abs_dir config["pure_bf16"] = args.pure_bf16 config['run_minimize'] = args.run_minimize config['run_single_pass'] = args.run_single_pass config['run_the_one_ps'] = args.run_the_one_ps config['debug_new_minimize'] = args.debug_new_minimize config['debug_new_pass'] = args.debug_new_pass config['applied_pass_name'] = args.applied_pass_name config['debug_the_one_ps'] = args.debug_the_one_ps yaml_helper.print_yaml(config) return config def bf16_to_fp32(val): return np.float32(struct.unpack('>>>>>>>>> python process started") os.environ["CPU_NUM"] = str(config.get("runner.thread_num")) benchmark_main = DnnTrainer(config) if config['run_single_pass'] == 1: benchmark_main.run_single_pass() elif config['run_minimize'] == 1: benchmark_main.run_minimize() elif config['run_the_one_ps'] == 1: benchmark_main.run_the_one_ps()