# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import print_function import paddle.distributed.fleet.base.role_maker as role_maker from paddle.distributed.ps.utils.ps_program_builder import * import paddle.distributed.fleet as fleet import argparse import time import sys import yaml, six, copy import paddle import os import warnings import ast import numpy as np import struct sys.path.append("..") from ps_dnn_model import StaticModel __dir__ = os.path.dirname(os.path.abspath(__file__)) sys.path.append(os.path.abspath(os.path.join(__dir__, '..'))) def is_distributed_env(): node_role = os.getenv("TRAINING_ROLE") logger.info("-- Role: {} --".format(node_role)) if node_role is None: return False else: return True class YamlHelper(object): def load_yaml(self, yaml_file, other_part=None): part_list = ["runner", "hyper_parameters"] if other_part: part_list += other_part running_config = self.get_all_inters_from_yaml(yaml_file, part_list) running_config = self.workspace_adapter(running_config) return running_config def print_yaml(self, config): print(self.pretty_print_envs(config)) def parse_yaml(self, config): vs = [int(i) for i in yaml.__version__.split(".")] if vs[0] < 5: use_full_loader = False elif vs[0] > 5: use_full_loader = True else: if vs[1] >= 1: use_full_loader = True else: use_full_loader = False if os.path.isfile(config): if six.PY2: with open(config, 'r') as rb: if use_full_loader: _config = yaml.load(rb.read(), Loader=yaml.FullLoader) else: _config = yaml.load(rb.read()) return _config else: with open(config, 'r', encoding="utf-8") as rb: if use_full_loader: _config = yaml.load(rb.read(), Loader=yaml.FullLoader) else: _config = yaml.load(rb.read()) return _config else: raise ValueError("config {} can not be supported".format(config)) def get_all_inters_from_yaml(self, file, filters): _envs = self.parse_yaml(file) all_flattens = {} def fatten_env_namespace(namespace_nests, local_envs): for k, v in local_envs.items(): if isinstance(v, dict): nests = copy.deepcopy(namespace_nests) nests.append(k) fatten_env_namespace(nests, v) else: global_k = ".".join(namespace_nests + [k]) all_flattens[global_k] = v fatten_env_namespace([], _envs) ret = {} for k, v in all_flattens.items(): for f in filters: if k.startswith(f): ret[k] = v return ret def workspace_adapter(self, config): workspace = config.get("workspace") for k, v in config.items(): if isinstance(v, str) and "{workspace}" in v: config[k] = v.replace("{workspace}", workspace) return config def pretty_print_envs(self, envs, header=None): spacing = 2 max_k = 40 max_v = 45 for k, v in envs.items(): max_k = max(max_k, len(k)) h_format = " " + "|{{:>{}s}}{}{{:^{}s}}|\n".format(max_k, " " * spacing, max_v) l_format = " " + "|{{:>{}s}}{{}}{{:^{}s}}|\n".format(max_k, max_v) length = max_k + max_v + spacing border = " +" + "".join(["="] * length) + "+" line = " +" + "".join(["-"] * length) + "+" draws = "" draws += border + "\n" if header: draws += h_format.format(header[0], header[1]) else: draws += h_format.format("Ps Benchmark Envs", "Value") draws += line + "\n" for k, v in sorted(envs.items()): if isinstance(v, str) and len(v) >= max_v: str_v = "... " + v[-41:] else: str_v = v draws += l_format.format(k, " " * spacing, str(str_v)) draws += border _str = "\n{}\n".format(draws) return _str def get_user_defined_strategy(config): if not is_distributed_env(): logger.warn( "Not Find Distributed env, Change To local train mode. If you want train with fleet, please use [fleetrun] command." ) #return None sync_mode = config.get("runner.sync_mode") assert sync_mode in ["async", "sync", "geo", "heter", "gpubox"] if sync_mode == "sync": strategy = paddle.distributed.fleet.DistributedStrategy() strategy.a_sync = False elif sync_mode == "async": strategy = paddle.distributed.fleet.DistributedStrategy() strategy.a_sync = True elif sync_mode == "geo": strategy = paddle.distributed.fleet.DistributedStrategy() strategy.a_sync = True strategy.a_sync_configs = {"k_steps": config.get("runner.geo_step")} elif sync_mode == "heter": strategy = paddle.distributed.fleet.DistributedStrategy() strategy.a_sync = True strategy.a_sync_configs = {"heter_worker_device_guard": "gpu"} strategy.pipeline = True strategy.pipeline_configs = { "accumulate_steps": config.get('runner.micro_num') } elif sync_mode == "gpubox": print("sync_mode = {}".format(sync_mode)) strategy = paddle.distributed.fleet.DistributedStrategy() strategy.a_sync = True strategy.a_sync_configs = {"use_ps_gpu": 1} strategy.trainer_desc_configs = { "dump_fields_path": config.get("runner.dump_fields_path", ""), "dump_fields": config.get("runner.dump_fields", []), "dump_param": config.get("runner.dump_param", []), "stat_var_names": config.get("stat_var_names", []) } print("strategy:", strategy.trainer_desc_configs) if config.get("runner.fs_client.uri") is not None: strategy.fs_client_param = { "uri": config.get("runner.fs_client.uri", ""), "user": config.get("runner.fs_client.user", ""), "passwd": config.get("runner.fs_client.passwd", ""), "hadoop_bin": config.get("runner.fs_client.hadoop_bin", "hadoop") } print("strategy:", strategy.fs_client_param) strategy.adam_d2sum = config.get("hyper_parameters.adam_d2sum", True) table_config = {} for x in config: if x.startswith("table_parameters"): table_name = x.split('.')[1] if table_name not in table_config: table_config[table_name] = {} table_config[table_name][x] = config[x] print("table_config:", table_config) strategy.sparse_table_configs = table_config print("strategy table config:", strategy.sparse_table_configs) a_sync_configs = strategy.a_sync_configs a_sync_configs["launch_barrier"] = False strategy.a_sync_configs = a_sync_configs print("launch_barrier: ", strategy.a_sync_configs["launch_barrier"]) return strategy def get_distributed_strategy(user_defined_strategy): from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory k_steps = user_defined_strategy.a_sync_configs["k_steps"] strategy = None if not user_defined_strategy.a_sync and k_steps == 0: strategy = StrategyFactory.create_sync_strategy() if user_defined_strategy.a_sync and k_steps == 0: strategy = StrategyFactory.create_async_strategy() if user_defined_strategy.a_sync and k_steps > 0: strategy = StrategyFactory.create_geo_strategy(k_steps) if not strategy: raise ValueError("k_steps must be invalid value, please check") return strategy def get_model(config): abs_dir = config['config_abs_dir'] sys.path.append(abs_dir) static_model = StaticModel(config) return static_model def parse_args(): parser = argparse.ArgumentParser("PsTest train script") parser.add_argument( '-m', '--config_yaml', type=str, required=True, help='config file path') parser.add_argument( '-bf16', '--pure_bf16', type=ast.literal_eval, default=False, help="whether use bf16") parser.add_argument( '--run_minimize', type=int, default=0, help="test single pass") parser.add_argument( '--run_single_pass', type=int, default=0, help="test single pass") parser.add_argument( '--debug_new_minimize', type=int, default=0, help="test single pass") parser.add_argument( '--debug_new_pass', type=int, default=0, help="test single pass") parser.add_argument( '--applied_pass_name', type=str, default="", help="test single pass") args = parser.parse_args() args.abs_dir = os.path.dirname(os.path.abspath(args.config_yaml)) yaml_helper = YamlHelper() config = yaml_helper.load_yaml(args.config_yaml) config["yaml_path"] = args.config_yaml config["config_abs_dir"] = args.abs_dir config["pure_bf16"] = args.pure_bf16 config['run_minimize'] = args.run_minimize config['run_single_pass'] = args.run_single_pass config['debug_new_minimize'] = args.debug_new_minimize config['debug_new_pass'] = args.debug_new_pass config['applied_pass_name'] = args.applied_pass_name yaml_helper.print_yaml(config) return config def bf16_to_fp32(val): return np.float32(struct.unpack('