未验证 提交 3052f36c 编写于 作者: K kuizhiqing 提交者: GitHub

[Launch] add gpu report during training (#42675)

* add nvsmi

* collect gpu info to log

* fix unitest

* rm ret_type
上级 0c6baf3c
...@@ -21,6 +21,7 @@ from paddle.distributed.launch.job.pod import Pod ...@@ -21,6 +21,7 @@ from paddle.distributed.launch.job.pod import Pod
from paddle.distributed.launch.job.container import Container from paddle.distributed.launch.job.container import Container
from .master import Master from .master import Master
from .watcher import Watcher
import time import time
...@@ -39,6 +40,8 @@ class ControllerBase(object): ...@@ -39,6 +40,8 @@ class ControllerBase(object):
self.ctx = ctx self.ctx = ctx
self.master = Master.factory(self.ctx) self.master = Master.factory(self.ctx)
self.watcher = Watcher(self.ctx)
self.job = Job(nnodes=self.ctx.args.nnodes, self.job = Job(nnodes=self.ctx.args.nnodes,
mode=self.ctx.args.run_mode, mode=self.ctx.args.run_mode,
jid=self.ctx.args.job_id) jid=self.ctx.args.job_id)
...@@ -114,6 +117,9 @@ class ControllerBase(object): ...@@ -114,6 +117,9 @@ class ControllerBase(object):
def stop(self, sigint=None): def stop(self, sigint=None):
self.ctx.logger.debug("Controller stop") self.ctx.logger.debug("Controller stop")
self.watcher.stop()
self.master.stop() self.master.stop()
self.pod.stop(sigint) self.pod.stop(sigint)
......
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ..utils.nvsmi import get_gpu_process, get_gpu_util, get_gpu_info
import time
import os
from threading import Thread
class Watcher(object):
def __init__(self, ctx):
self.ctx = ctx
self.interval = 10
self.gpu_util = []
# gpu log file
self.gpus = self.ctx.args.devices or self.ctx.node.device.labels
if len(self.gpus) > 0:
fn = os.path.join(self.ctx.args.log_dir,
"{}.gpu.log".format(self.ctx.args.job_id))
os.makedirs(os.path.dirname(fn), exist_ok=True)
self.gpu_fd = open(fn, 'w')
else:
return
# start
self.proc = Thread(target=self.watch)
self.proc.daemon = True
self.proc.start()
def watch(self):
if not len(self.gpus) > 0:
return
self._print_gpu_info()
util_key = "index,utilization_gpu,memory_total,memory_used,memory_free,timestamp"
self.gpu_fd.write(util_key)
self.gpu_fd.write('\n')
while not self.ctx.status.is_done():
self._save_gpu_log(util_key)
time.sleep(self.interval)
if hasattr(self, "gpu_fd"):
self.gpu_fd.close()
def _print_gpu_info(self):
try:
info_key = "index,uuid,driver_version,name,gpu_serial,display_active,display_mode"
self.gpu_fd.write(info_key)
self.gpu_fd.write('\n')
for line in get_gpu_info(self.gpus):
self.gpu_fd.write(line.str(info_key))
self.gpu_fd.write('\n')
self.gpu_fd.write('\n')
process_key = "pid,process_name,gpu_uuid,gpu_name,used_memory"
self.gpu_fd.write(process_key)
self.gpu_fd.write('\n')
for line in get_gpu_process(self.gpus):
self.gpu_fd.write(line.str(process_key))
self.gpu_fd.write('\n')
self.gpu_fd.write('\n')
self.gpu_fd.flush()
except:
self.ctx.log.error("save gpu info failed")
def _save_gpu_log(self, util_key):
try:
for line in get_gpu_util(self.gpus):
self.gpu_fd.write(line.str(util_key))
self.gpu_fd.write('\n')
self.gpu_fd.flush()
except:
self.ctx.log.error("save gpu log failed")
def stop(self):
if hasattr(self, "proc"):
self.proc.join()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import subprocess
import shlex
import os
import json
import shutil
class Info(object):
def __repr__(self):
return str(self.__dict__)
def json(self):
return json.dumps(self.__dict__)
def dict(self):
return self.__dict__
def str(self, keys=None):
if keys is None:
keys = self.__dict__.keys()
if isinstance(keys, str):
keys = keys.split(',')
values = [str(self.__dict__.get(k, '')) for k in keys]
return ",".join(values)
def query_smi(query=None, query_type="gpu", index=None, dtype=None):
"""
query_type: gpu/compute
"""
if not has_nvidia_smi():
return []
cmd = ["nvidia-smi", "--format=csv,noheader,nounits"]
if isinstance(query, list) and query_type == "gpu":
cmd.extend(["--query-gpu={}".format(",".join(query))])
elif isinstance(query, list) and query_type.startswith("compute"):
cmd.extend(["--query-compute-apps={}".format(",".join(query))])
else:
return
if isinstance(index, list) and len(index) > 0:
cmd.extend(["--id={}".format(",".join(index))])
if not isinstance(dtype, list) or len(dtype) != len(query):
dtype = [str] * len(query)
output = subprocess.check_output(cmd, timeout=3)
lines = output.decode("utf-8").split(os.linesep)
ret = []
for line in lines:
if not line:
continue
info = Info()
for k, v, d in zip(query, line.split(", "), dtype):
setattr(info, k.replace(".", "_"), d(v))
ret.append(info)
return ret
def get_gpu_info(index=None):
q = "index,uuid,driver_version,name,gpu_serial,display_active,display_mode".split(
",")
d = [int, str, str, str, str, str, str]
index = index if index is None or isinstance(
index, list) else str(index).split(",")
return query_smi(q, index=index, dtype=d)
def get_gpu_util(index=None):
q = "index,utilization.gpu,memory.total,memory.used,memory.free,timestamp".split(
",")
d = [int, int, int, int, int, str]
index = index if index is None or isinstance(
index, list) else str(index).split(",")
return query_smi(q, index=index, dtype=d)
def get_gpu_process(index=None):
q = "pid,process_name,gpu_uuid,gpu_name,used_memory".split(",")
d = [int, str, str, str, int]
index = index if index is None or isinstance(
index, list) else str(index).split(",")
return query_smi(q, index=index, query_type="compute", dtype=d)
def has_nvidia_smi():
return shutil.which("nvidia-smi")
if __name__ == '__main__':
print(get_gpu_info(0))
print(get_gpu_util(0))
print(get_gpu_process(0))
u = get_gpu_util()
for i in u:
print(i.str())
...@@ -51,7 +51,9 @@ def write_file(name, ct): ...@@ -51,7 +51,9 @@ def write_file(name, ct):
def get_files(pth, prefix): def get_files(pth, prefix):
return [ return [
f for f in listdir(pth) if isfile(join(pth, f)) and f.startswith(prefix) f for f in listdir(pth)
if isfile(join(pth, f)) and f.startswith(prefix) and f !=
f"{prefix}.gpu.log"
] ]
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册