未验证 提交 abe4fc6b 编写于 作者: C Cheng Li 提交者: GitHub

encoded ds config into command line argument when launching child processes in autotuning (#2524)

* rollback ds config changes

* fix format

* Fix error when output_file is a relative path without a prefix (#2397)
Co-authored-by: NBenjamin Steenhoek <benjaminjsteenhoek@gmail.com>

* fix restuls and exprs path to use absolute path

* use base64 encoded ds config as cmd arg

* fix format

* remove assert

* write out optimial config after tuning

* fix format

* no need to update ds config path when encoding ds config

* udpate

* do not use abs path for result and expr dir

* fix conflicts

* fix run mode

* fix format

* fix format
Co-authored-by: NBenjamin Steenhoek <benjaminjsteenhoek@gmail.com>
Co-authored-by: NOlatunji Ruwase <olruwase@microsoft.com>
上级 340fc0cf
......@@ -87,6 +87,8 @@ class Autotuner:
self.rm.nodes), "num_nodes in the autotuning configuration must not be less than the --num_nodes value in the train script if any"
self.records = {}
self.optimal_cmd = None
self.optmal_ds_config = None
def print_tuning_results(self):
"""Print the autotuning results in tabular format.
......@@ -1125,9 +1127,6 @@ class Autotuner:
ds_config_path = os.path.join(self.results_dir, "ds_config_optimal.json")
json.dump(ds_config, open(ds_config_path, "w"))
idx = cmd.index(os.path.join(exp_dir, "ds_config.json"))
cmd[idx] = ds_config_path
cmd_path = os.path.join(self.results_dir, "cmd_optimal.txt")
with open(cmd_path, "w") as fd:
fd.write(" ".join(cmd))
......@@ -1138,9 +1137,6 @@ class Autotuner:
logger.info(
f"Wrote the optimal DeepSpeed configuration found by autotuning to {ds_config_path}, and the corresponding DeepSpeed command to {cmd_path}"
)
else:
self.optimal_cmd = None
self.optmal_ds_config = None
def run_after_tuning(self):
""" Launches the training with the optimal DeepSpeed configuration found through the autotuning process.
......
......@@ -38,15 +38,13 @@ class DeepSpeedAutotuningConfig(DeepSpeedConfigObject):
AUTOTUNING_FAST,
AUTOTUNING_FAST_DEFAULT)
self.results_dir = os.path.abspath(
get_scalar_param(autotuning_dict,
AUTOTUNING_RESULTS_DIR,
AUTOTUNING_RESULTS_DIR_DEFAULT))
self.results_dir = get_scalar_param(autotuning_dict,
AUTOTUNING_RESULTS_DIR,
AUTOTUNING_RESULTS_DIR_DEFAULT)
assert self.results_dir, "results_dir cannot be empty"
self.exps_dir = os.path.abspath(
get_scalar_param(autotuning_dict,
AUTOTUNING_EXPS_DIR,
AUTOTUNING_EXPS_DIR_DEFAULT))
self.exps_dir = get_scalar_param(autotuning_dict,
AUTOTUNING_EXPS_DIR,
AUTOTUNING_EXPS_DIR_DEFAULT)
assert self.exps_dir, "exps_dir cannot be empty"
self.overwrite = get_scalar_param(autotuning_dict,
AUTOTUNING_OVERWRITE,
......
......@@ -6,12 +6,13 @@ import subprocess
import sys
import threading
import time
import base64
import os
import hjson
from tqdm import tqdm
from ..utils import logger
from .constants import *
from .constants import AUTOTUNING, AUTOTUNING_METRIC_PATH
from .utils import get_val_by_key, search_error, was_interruptted
"""
......@@ -180,7 +181,6 @@ class ResourceManager:
logger.debug(f'Put exp_id = {exp["exp_id"]} back into the queue')
self.experiment_check(pbar)
else:
desc = ""
for reservation in reservations:
reservation.slots.sort()
......@@ -336,19 +336,27 @@ def run_experiment(exp: dict, reservations, user_script, user_args):
exp["job_id"] = get_job_id()
exp_dir = exp["result_dir"]
os.makedirs(exp_dir, exist_ok=True)
exp["ds_config_path"] = os.path.join(exp_dir, "ds_config.json")
ds_config_path = os.path.join(exp_dir, "ds_config.json")
exp["ds_config_path"] = ds_config_path
ds_config = copy.deepcopy(exp["ds_config"])
ds_config_json = json.dumps(ds_config).encode('utf-8')
exp["ds_config_base64"] = base64.urlsafe_b64encode(ds_config_json).decode('utf-8')
with open(exp["ds_config_path"], "w", buffering=BUFSIZE) as fd:
json.dump(ds_config, fd)
fd.flush()
os.fsync(fd)
path = exp["ds_config_path"]
logger.info(f"Scheduler wrote ds_config to {path}, {os.path.abspath(path)}")
with open(os.path.join(exp_dir, "exp.json"), "w", buffering=BUFSIZE) as fd:
json.dump(exp, fd)
fd.flush()
os.fsync(fd)
path = os.path.join(exp_dir, "exp.json")
logger.info(f"Scheduler wrote exp to {path}, {os.path.abspath(path)}")
# remove "--deepspeed_config ds_config.json" from user_args
if user_args:
......@@ -357,9 +365,10 @@ def run_experiment(exp: dict, reservations, user_script, user_args):
# "--deepspeed_config" is omitted in HF
elif "--deepspeed" in user_args:
idx = user_args.index("--deepspeed")
assert idx < len(user_args) and ".json" in user_args[idx +
1], "there is no ds_config file specified after --deepspeed_config or --deepspeed"
user_args[idx + 1] = exp["ds_config_path"]
assert idx < len(user_args), "there is no ds_config file specified after --deepspeed_config or --deepspeed"
# user_args[idx + 1] = exp["ds_config_path"]
# pass base64 serialized ds_config to launcher
user_args[idx + 1] = exp["ds_config_base64"]
exp["user_script"] = user_script
exp["user_args"] = user_args
......@@ -375,7 +384,7 @@ def run_experiment(exp: dict, reservations, user_script, user_args):
os.fsync(fd)
logger.info(
f"Launching exp_id = {exp['exp_id']}, exp_name = {exp['name']}, with resource = {include_str}"
f"Launching exp_id = {exp['exp_id']}, exp_name = {exp['name']}, with resource = {include_str}, and ds_config = {os.path.abspath(ds_config_path)}"
)
with open(os.path.join(exp_dir, "stdout.log"), "wb") as out, open(
......
......@@ -8,6 +8,7 @@ from typing import Union
import torch
import json
import copy
import base64
from .constants import *
from .fp16.loss_scaler import (
......@@ -724,9 +725,13 @@ class DeepSpeedConfig(object):
"r"),
object_pairs_hook=dict_raise_error_on_duplicate_keys)
else:
raise ValueError(
f"Expected a string path to an existing deepspeed config, or a dictionary. Received: {config}"
)
try:
config_decoded = base64.urlsafe_b64decode(config).decode('utf-8')
self._param_dict = json.loads(config_decoded)
except (UnicodeDecodeError, AttributeError):
raise ValueError(
f"Expected a string path to an existing deepspeed config, or a dictionary or a valid base64. Received: {config}"
)
try:
self.global_rank = dist.get_rank()
if mpu is None:
......
......@@ -956,12 +956,6 @@ class DeepSpeedEngine(Module):
args, "deepspeed_config") and args.deepspeed_config is not None
), "DeepSpeed requires --deepspeed_config to specify configuration file"
assert os.path.isfile(
args.deepspeed_config
), "DeepSpeed configuration file: {} is not an existing file".format(
args.deepspeed_config
)
def _is_supported_optimizer(self, optimizer_name):
return (optimizer_name in DEEPSPEED_OPTIMIZERS
or getattr(torch.optim,
......@@ -2162,6 +2156,9 @@ class DeepSpeedEngine(Module):
msg["throughput"] = self.train_batch_size() * 1000 / \
msg["latency"]
print_json_dist(msg, [0], path=self.autotuning_metric_path())
log_dist(
f"Wrote metrics to {self.autotuning_metric_path()}, {os.path.abspath(self.autotuning_metric_path())}",
ranks=[0])
import atexit
atexit.register(print, "Autotuning: done with running current ds config.")
exit()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册