diff --git a/python/paddle/distributed/launch/controllers/controller.py b/python/paddle/distributed/launch/controllers/controller.py index 69b2237f0ba7d267c3b297e5afe2fbc69b173869..f069bfbcd350133f13a7ce1ceba9bcddd37682a8 100644 --- a/python/paddle/distributed/launch/controllers/controller.py +++ b/python/paddle/distributed/launch/controllers/controller.py @@ -21,6 +21,7 @@ from paddle.distributed.launch.job.pod import Pod from paddle.distributed.launch.job.container import Container from .master import Master +from .watcher import Watcher import time @@ -39,6 +40,8 @@ class ControllerBase(object): self.ctx = ctx self.master = Master.factory(self.ctx) + self.watcher = Watcher(self.ctx) + self.job = Job(nnodes=self.ctx.args.nnodes, mode=self.ctx.args.run_mode, jid=self.ctx.args.job_id) @@ -114,6 +117,9 @@ class ControllerBase(object): def stop(self, sigint=None): self.ctx.logger.debug("Controller stop") + + self.watcher.stop() + self.master.stop() self.pod.stop(sigint) diff --git a/python/paddle/distributed/launch/controllers/watcher.py b/python/paddle/distributed/launch/controllers/watcher.py new file mode 100644 index 0000000000000000000000000000000000000000..4d49b924f1e817a372e839a41ec80c9d3120fb03 --- /dev/null +++ b/python/paddle/distributed/launch/controllers/watcher.py @@ -0,0 +1,95 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ..utils.nvsmi import get_gpu_process, get_gpu_util, get_gpu_info +import time +import os + +from threading import Thread + + +class Watcher(object): + def __init__(self, ctx): + self.ctx = ctx + + self.interval = 10 + + self.gpu_util = [] + + # gpu log file + self.gpus = self.ctx.args.devices or self.ctx.node.device.labels + if len(self.gpus) > 0: + fn = os.path.join(self.ctx.args.log_dir, + "{}.gpu.log".format(self.ctx.args.job_id)) + os.makedirs(os.path.dirname(fn), exist_ok=True) + self.gpu_fd = open(fn, 'w') + else: + return + + # start + self.proc = Thread(target=self.watch) + self.proc.daemon = True + self.proc.start() + + def watch(self): + if not len(self.gpus) > 0: + return + + self._print_gpu_info() + + util_key = "index,utilization_gpu,memory_total,memory_used,memory_free,timestamp" + self.gpu_fd.write(util_key) + self.gpu_fd.write('\n') + + while not self.ctx.status.is_done(): + self._save_gpu_log(util_key) + time.sleep(self.interval) + + if hasattr(self, "gpu_fd"): + self.gpu_fd.close() + + def _print_gpu_info(self): + try: + info_key = "index,uuid,driver_version,name,gpu_serial,display_active,display_mode" + self.gpu_fd.write(info_key) + self.gpu_fd.write('\n') + for line in get_gpu_info(self.gpus): + self.gpu_fd.write(line.str(info_key)) + self.gpu_fd.write('\n') + self.gpu_fd.write('\n') + + process_key = "pid,process_name,gpu_uuid,gpu_name,used_memory" + self.gpu_fd.write(process_key) + self.gpu_fd.write('\n') + for line in get_gpu_process(self.gpus): + self.gpu_fd.write(line.str(process_key)) + self.gpu_fd.write('\n') + self.gpu_fd.write('\n') + + self.gpu_fd.flush() + except: + self.ctx.log.error("save gpu info failed") + + def _save_gpu_log(self, util_key): + try: + for line in get_gpu_util(self.gpus): + self.gpu_fd.write(line.str(util_key)) + self.gpu_fd.write('\n') + self.gpu_fd.flush() + except: + self.ctx.log.error("save gpu log failed") + + def stop(self): + if hasattr(self, "proc"): + self.proc.join() diff --git a/python/paddle/distributed/launch/utils/nvsmi.py b/python/paddle/distributed/launch/utils/nvsmi.py new file mode 100644 index 0000000000000000000000000000000000000000..82a23189ac6af680e95296a0ec005aa1dd12af3d --- /dev/null +++ b/python/paddle/distributed/launch/utils/nvsmi.py @@ -0,0 +1,117 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import subprocess +import shlex +import os +import json +import shutil + + +class Info(object): + def __repr__(self): + return str(self.__dict__) + + def json(self): + return json.dumps(self.__dict__) + + def dict(self): + return self.__dict__ + + def str(self, keys=None): + if keys is None: + keys = self.__dict__.keys() + + if isinstance(keys, str): + keys = keys.split(',') + + values = [str(self.__dict__.get(k, '')) for k in keys] + return ",".join(values) + + +def query_smi(query=None, query_type="gpu", index=None, dtype=None): + """ + query_type: gpu/compute + """ + + if not has_nvidia_smi(): + return [] + + cmd = ["nvidia-smi", "--format=csv,noheader,nounits"] + if isinstance(query, list) and query_type == "gpu": + cmd.extend(["--query-gpu={}".format(",".join(query))]) + elif isinstance(query, list) and query_type.startswith("compute"): + cmd.extend(["--query-compute-apps={}".format(",".join(query))]) + else: + return + + if isinstance(index, list) and len(index) > 0: + cmd.extend(["--id={}".format(",".join(index))]) + if not isinstance(dtype, list) or len(dtype) != len(query): + dtype = [str] * len(query) + + output = subprocess.check_output(cmd, timeout=3) + lines = output.decode("utf-8").split(os.linesep) + ret = [] + for line in lines: + if not line: + continue + info = Info() + for k, v, d in zip(query, line.split(", "), dtype): + setattr(info, k.replace(".", "_"), d(v)) + ret.append(info) + return ret + + +def get_gpu_info(index=None): + q = "index,uuid,driver_version,name,gpu_serial,display_active,display_mode".split( + ",") + d = [int, str, str, str, str, str, str] + index = index if index is None or isinstance( + index, list) else str(index).split(",") + + return query_smi(q, index=index, dtype=d) + + +def get_gpu_util(index=None): + q = "index,utilization.gpu,memory.total,memory.used,memory.free,timestamp".split( + ",") + d = [int, int, int, int, int, str] + index = index if index is None or isinstance( + index, list) else str(index).split(",") + + return query_smi(q, index=index, dtype=d) + + +def get_gpu_process(index=None): + q = "pid,process_name,gpu_uuid,gpu_name,used_memory".split(",") + d = [int, str, str, str, int] + index = index if index is None or isinstance( + index, list) else str(index).split(",") + + return query_smi(q, index=index, query_type="compute", dtype=d) + + +def has_nvidia_smi(): + return shutil.which("nvidia-smi") + + +if __name__ == '__main__': + print(get_gpu_info(0)) + print(get_gpu_util(0)) + print(get_gpu_process(0)) + + u = get_gpu_util() + for i in u: + print(i.str()) diff --git a/python/paddle/fluid/tests/unittests/test_run.py b/python/paddle/fluid/tests/unittests/test_run.py index 365d3f931c27c180eebd9d3b72c80dac5f9227e5..28bcc379fb9a06445786823169ce355b5ec80d19 100644 --- a/python/paddle/fluid/tests/unittests/test_run.py +++ b/python/paddle/fluid/tests/unittests/test_run.py @@ -51,7 +51,9 @@ def write_file(name, ct): def get_files(pth, prefix): return [ - f for f in listdir(pth) if isfile(join(pth, f)) and f.startswith(prefix) + f for f in listdir(pth) + if isfile(join(pth, f)) and f.startswith(prefix) and f != + f"{prefix}.gpu.log" ]