# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import paddle.distributed.fleet.base.role_maker as role_maker from paddle.distributed.ps.utils.ps_program_builder import debug_program, logger, new_pass, ps_log_root_dir import paddle.distributed.fleet as fleet import argparse import sys import yaml, six, copy import paddle import os import ast import numpy as np import struct sys.path.append("..") from ps_dnn_model import StaticModel __dir__ = os.path.dirname(os.path.abspath(__file__)) sys.path.append(os.path.abspath(os.path.join(__dir__, '..'))) def is_distributed_env(): node_role = os.getenv("TRAINING_ROLE") print("-- Role: {} --".format(node_role)) if node_role is None: return False else: return True class YamlHelper(object): def load_yaml(self, yaml_file, other_part=None): part_list = ["runner", "hyper_parameters"] if other_part: part_list += other_part running_config = self.get_all_inters_from_yaml(yaml_file, part_list) running_config = self.workspace_adapter(running_config) return running_config def print_yaml(self, config): print(self.pretty_print_envs(config)) def parse_yaml(self, config): vs = [int(i) for i in yaml.__version__.split(".")] if vs[0] < 5: use_full_loader = False elif vs[0] > 5: use_full_loader = True else: if vs[1] >= 1: use_full_loader = True else: use_full_loader = False if os.path.isfile(config): if six.PY2: with open(config, 'r') as rb: if use_full_loader: _config = yaml.load(rb.read(), Loader=yaml.FullLoader) else: _config = yaml.load(rb.read()) return _config else: with open(config, 'r', encoding="utf-8") as rb: if use_full_loader: _config = yaml.load(rb.read(), Loader=yaml.FullLoader) else: _config = yaml.load(rb.read()) return _config else: raise ValueError("config {} can not be supported".format(config)) def get_all_inters_from_yaml(self, file, filters): _envs = self.parse_yaml(file) all_flattens = {} def fatten_env_namespace(namespace_nests, local_envs): for k, v in local_envs.items(): if isinstance(v, dict): nests = copy.deepcopy(namespace_nests) nests.append(k) fatten_env_namespace(nests, v) else: global_k = ".".join(namespace_nests + [k]) all_flattens[global_k] = v fatten_env_namespace([], _envs) ret = {} for k, v in all_flattens.items(): for f in filters: if k.startswith(f): ret[k] = v return ret def workspace_adapter(self, config): workspace = config.get("workspace") for k, v in config.items(): if isinstance(v, str) and "{workspace}" in v: config[k] = v.replace("{workspace}", workspace) return config def pretty_print_envs(self, envs, header=None): spacing = 2 max_k = 40 max_v = 45 for k, v in envs.items(): max_k = max(max_k, len(k)) h_format = " " + "|{{:>{}s}}{}{{:^{}s}}|\n".format( max_k, " " * spacing, max_v) l_format = " " + "|{{:>{}s}}{{}}{{:^{}s}}|\n".format(max_k, max_v) length = max_k + max_v + spacing border = " +" + "".join(["="] * length) + "+" line = " +" + "".join(["-"] * length) + "+" draws = "" draws += border + "\n" if header: draws += h_format.format(header[0], header[1]) else: draws += h_format.format("Ps Benchmark Envs", "Value") draws += line + "\n" for k, v in sorted(envs.items()): if isinstance(v, str) and len(v) >= max_v: str_v = "... " + v[-41:] else: str_v = v draws += l_format.format(k, " " * spacing, str(str_v)) draws += border _str = "\n{}\n".format(draws) return _str def get_user_defined_strategy(config): if not is_distributed_env(): logger.warn( "Not Find Distributed env, Change To local train mode. If you want train with fleet, please use [fleetrun] command." ) #return None sync_mode = config.get("runner.sync_mode") assert sync_mode in ["async", "sync", "geo", "heter", "gpubox"] if sync_mode == "sync": strategy = paddle.distributed.fleet.DistributedStrategy() strategy.a_sync = False elif sync_mode == "async": strategy = paddle.distributed.fleet.DistributedStrategy() strategy.a_sync = True strategy.is_fl_ps_mode = True if config.get( "runner.is_fl_ps_mode") == 1 else False if strategy.is_fl_ps_mode == True: strategy.pipeline = False micro_num = 1 strategy.pipeline_configs = { "accumulate_steps": micro_num } ## num_microbatches elif sync_mode == "geo": strategy = paddle.distributed.fleet.DistributedStrategy() strategy.a_sync = True strategy.a_sync_configs = {"k_steps": config.get("runner.geo_step")} elif sync_mode == "heter": strategy = paddle.distributed.fleet.DistributedStrategy() strategy.a_sync = True strategy.a_sync_configs = {"heter_worker_device_guard": "gpu"} strategy.pipeline = True strategy.pipeline_configs = { "accumulate_steps": config.get('runner.micro_num') } elif sync_mode == "gpubox": print("sync_mode = {}".format(sync_mode)) strategy = paddle.distributed.fleet.DistributedStrategy() strategy.a_sync = True strategy.a_sync_configs = {"use_ps_gpu": 1} strategy.trainer_desc_configs = { "dump_fields_path": config.get("runner.dump_fields_path", ""), "dump_fields": config.get("runner.dump_fields", []), "dump_param": config.get("runner.dump_param", []), "stat_var_names": config.get("stat_var_names", []), "local_sparse": config.get("runner.local_sparse", []), "remote_sparse": config.get("runner.remote_sparse", []) } print("strategy:", strategy.trainer_desc_configs) if config.get("runner.fs_client.uri") is not None: strategy.fs_client_param = { "uri": config.get("runner.fs_client.uri", ""), "user": config.get("runner.fs_client.user", ""), "passwd": config.get("runner.fs_client.passwd", ""), "hadoop_bin": config.get("runner.fs_client.hadoop_bin", "hadoop") } print("strategy:", strategy.fs_client_param) strategy.adam_d2sum = config.get("hyper_parameters.adam_d2sum", True) table_config = {} for x in config: if x.startswith("table_parameters"): table_name = x.split('.')[1] if table_name not in table_config: table_config[table_name] = {} table_config[table_name][x] = config[x] print("table_config:", table_config) strategy.sparse_table_configs = table_config print("strategy table config:", strategy.sparse_table_configs) a_sync_configs = strategy.a_sync_configs a_sync_configs["launch_barrier"] = False # a_sync_configs["launch_barrier"] = True strategy.a_sync_configs = a_sync_configs print("launch_barrier: ", strategy.a_sync_configs["launch_barrier"]) return strategy def get_distributed_strategy(user_defined_strategy): # pslib from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory k_steps = user_defined_strategy.a_sync_configs["k_steps"] strategy = None if not user_defined_strategy.a_sync and k_steps == 0: strategy = StrategyFactory.create_sync_strategy() if user_defined_strategy.a_sync and k_steps == 0: strategy = StrategyFactory.create_async_strategy() if user_defined_strategy.a_sync and k_steps > 0: strategy = StrategyFactory.create_geo_strategy(k_steps) if not strategy: raise ValueError("k_steps must be invalid value, please check") return strategy def get_model(config): abs_dir = config['config_abs_dir'] sys.path.append(abs_dir) static_model = StaticModel(config) return static_model def parse_args(): parser = argparse.ArgumentParser("PsTest train script") parser.add_argument('-m', '--config_yaml', type=str, required=True, help='config file path') parser.add_argument('-bf16', '--pure_bf16', type=ast.literal_eval, default=False, help="whether use bf16") parser.add_argument('--run_minimize', type=int, default=0, help="test single pass") parser.add_argument('--run_single_pass', type=int, default=0, help="test single pass") parser.add_argument('--run_the_one_ps', type=int, default=0, help="test the_one_ps") parser.add_argument('--debug_new_minimize', type=int, default=0, help="test single pass") parser.add_argument('--debug_new_pass', type=int, default=0, help="test single pass") parser.add_argument('--applied_pass_name', type=str, default="", help="test single pass") parser.add_argument('--debug_the_one_ps', type=int, default=0, help="test the_one_ps") args = parser.parse_args() args.abs_dir = os.path.dirname(os.path.abspath(args.config_yaml)) yaml_helper = YamlHelper() config = yaml_helper.load_yaml(args.config_yaml) config["yaml_path"] = args.config_yaml config["config_abs_dir"] = args.abs_dir config["pure_bf16"] = args.pure_bf16 config['run_minimize'] = args.run_minimize config['run_single_pass'] = args.run_single_pass config['run_the_one_ps'] = args.run_the_one_ps config['debug_new_minimize'] = args.debug_new_minimize config['debug_new_pass'] = args.debug_new_pass config['applied_pass_name'] = args.applied_pass_name config['debug_the_one_ps'] = args.debug_the_one_ps yaml_helper.print_yaml(config) return config def bf16_to_fp32(val): return np.float32(struct.unpack('>>>>>>>>> python process started") os.environ["CPU_NUM"] = str(config.get("runner.thread_num")) benchmark_main = DnnTrainer(config) if config['run_single_pass'] == 1: benchmark_main.run_single_pass() elif config['run_minimize'] == 1: benchmark_main.run_minimize() elif config['run_the_one_ps'] == 1: benchmark_main.run_the_one_ps()