diff --git a/tools/CrossStackProfiler/CspChromeTraceFormatter.py b/tools/CrossStackProfiler/CspChromeTraceFormatter.py new file mode 100755 index 0000000000000000000000000000000000000000..a8030988aacf1a922c41257a409c27274a5aba0a --- /dev/null +++ b/tools/CrossStackProfiler/CspChromeTraceFormatter.py @@ -0,0 +1,129 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import json +import six +import sys +import re +import os +import glob +import unittest +import pandas +import tempfile +import platform +import pandas as pd + + +class ChromeTraceFormatter(object): + def __init__(self): + self._events = [] + self._metadata = [] + + def _create_event(self, ph, category, name, pid, tid, timestamp): + """Creates a new Chrome Trace event. + + For details of the file format, see: + https://github.com/catapult-project/catapult/blob/master/tracing/README.md + + Args: + ph: The type of event - usually a single character. + category: The event category as a string. + name: The event name as a string. + pid: Identifier of the process generating this event as an integer. + tid: Identifier of the thread generating this event as an integer. + timestamp: The timestamp of this event as a long integer. + + Returns: + A JSON compatible event object. + """ + event = {} + event['ph'] = ph + event['cat'] = category + event['name'] = name + event['pid'] = pid + event['tid'] = tid + event['ts'] = timestamp + return event + + def emit_pid(self, name, pid): + """Adds a process metadata event to the trace. + + Args: + name: The process name as a string. + pid: Identifier of the process as an integer. + """ + event = {} + event['name'] = 'process_name' + event['ph'] = 'M' + event['pid'] = pid + event['args'] = {'name': name} + self._metadata.append(event) + + def emit_region(self, timestamp, duration, pid, tid, category, name, args): + """Adds a region event to the trace. + + Args: + timestamp: The start timestamp of this region as a long integer. + duration: The duration of this region as a long integer. + pid: Identifier of the process generating this event as an integer. + tid: Identifier of the thread generating this event as an integer. + category: The event category as a string. + name: The event name as a string. + args: A JSON-compatible dictionary of event arguments. + """ + event = self._create_event('X', category, name, pid, tid, timestamp) + event['dur'] = duration + event['args'] = args + self._events.append(event) + + def emit_counter(self, category, name, pid, timestamp, counter, value): + """Emits a record for a single counter. + + Args: + category: The event category as string + name: The event name as string + pid: Identifier of the process generating this event as integer + timestamp: The timestamps of this event as long integer + counter: Name of the counter as string + value: Value of the counter as integer + tid: Thread id of the allocation as integer + """ + event = self._create_event('C', category, name, pid, 0, timestamp) + event['args'] = {counter: value} + self._events.append(event) + + def format_to_string(self, pretty=False): + """Formats the chrome trace to a string. + + Args: + pretty: (Optional.) If True, produce human-readable JSON output. + + Returns: + A JSON-formatted string in Chrome Trace format. + """ + trace = {} + trace['traceEvents'] = self._metadata + self._events + if pretty: + return json.dumps(trace, indent=4, separators=(',', ': ')) + else: + return json.dumps(trace, separators=(',', ':')) + + def clear(self): + self._events = [] + self._metadata = [] + + +if __name__ == "__main__": + pass diff --git a/tools/CrossStackProfiler/CspFileReader.py b/tools/CrossStackProfiler/CspFileReader.py new file mode 100755 index 0000000000000000000000000000000000000000..12de488aa693ebbdd0443bec7a2c7a25f35adffa --- /dev/null +++ b/tools/CrossStackProfiler/CspFileReader.py @@ -0,0 +1,400 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import time +import json +import glob +import logging +import pandas as pd +from multiprocessing import Process, Lock +""" Some terms to clarify the code + in most case, one or more paremeters may be set as input args for a class or a function + in form of single variable or k-v dict + + 1. trainerId + 2. gpuId + 3. rankId + 4. gpuPerTrainer + 5. groupSize + 6. groupId + 7. groupNum + 8. displaySize + 9. dataPath + 10. resultPath + 11. fileOrganizeForm -- "byRank" OR "byTrainer" or "other" + +""" + +PIPELINEINFO_TRACE_NUM = 1 + +dcgmMetricParameterMap = { + "02_gpuUtility": [("GPUTL", "GPUTL"), ("GRACT", "GRACT")], + "03_smUtility": [("SMACT", "SMACT"), ("SMOCC", "SMOCC")], + "04_memUtility": [("FB_USED_RATIO", "FB_USED_RATIO"), ("DRAMA", "DRAMA")], + "05_txUtility": [("NVLTX", "NVLTX"), ("NVLRX", "NVLRX"), ("PCITX", "PCITX"), + ("PCIRX", "PCIRX")], + "06_calUtility": + [("FP32A", "FP32A"), ("FP16A", "FP16A"), ("TENSO", "TENSO")] +} +DCGMINFO_TRACE_NUM = len(dcgmMetricParameterMap.keys()) +NETINFO_TRACE_NUM = 2 + +DCGM_PATH = "dcgm" +NET_PATH = "net" +TIME_PATH = "time" +PROFILE_PATH = "profile" + +FILEORGANIZEFORM_BYRANK = "byRank" +FILEORGANIZEFORM_BYTRAINER = "byTrainer" +FILEORGANIZEFORM_BYOTHER = "other" +FILEORGANIZEFORM = [ + FILEORGANIZEFORM_BYRANK, FILEORGANIZEFORM_BYTRAINER, + FILEORGANIZEFORM_BYOTHER +] + + +class FileReader(object): + def __init__(self, logger, args): + self._logger = logger + self._args = args + + self._fileList = [] + self._fileNum = 0 + + self._dataPath = "" + self._groupSize = 0 + self._displaySize = 0 + self._organizeForm = FILEORGANIZEFORM_BYOTHER + self._gpuPerTrainer = 0 + + self._checkArgs() + self._getFileList() + + self._lock = Lock() + + def printArgs(self): + self._logger.info("dataPath:") + self._logger.info(self._dataPath) + self._logger.info("groupSize:") + self._logger.info(self._groupSize) + self._logger.info("displaySize:") + self._logger.info(self._displaySize) + self._logger.info("organizeForm:") + self._logger.info(self._organizeForm) + self._logger.info("gpuPerTrainer:") + self._logger.info(self._gpuPerTrainer) + self._logger.info("minTimeStamp:") + self._logger.info(self._minTimeStamp) + + def _checkArgsKey(self, key, type): + if not self._args.has_key(key): + raise KeyError("args should has key [%s]!" % key) + + if not isinstance(self._args[key], type): + raise TypeError( + "Invalid type of key [%s] in args dict, it should be a %s!" % + (key, type)) + + exec("self._%s = self._args[\"%s\"]" % (key, key)) + + def _align_ts(self, ts): + return ts - self._minTimeStamp + + def _checkArgs(self): + if not isinstance(self._args, dict): + raise TypeError("Invalid type of args, it should be a dict!") + + self._checkArgsKey("organizeForm", str) + if self._organizeForm not in FILEORGANIZEFORM or \ + self._organizeForm == FILEORGANIZEFORM_BYOTHER: + raise NotImplementedError( + "we have not known how to process this form of file [%s]!" % + self._organizeForm) + + self._checkArgsKey("gpuPerTrainer", int) + + self._checkArgsKey("dataPath", str) + if not os.path.exists(self._dataPath): + raise IOError("input data path [%s] not existed!" % + (self._dataPath)) + + self._checkArgsKey("groupSize", int) + self._checkArgsKey("displaySize", int) + self._checkArgsKey("minTimeStamp", int) + + def getFileListByGroup(self, groupId): + lIndext = 0 + rIndext = 0 + + if self._organizeForm == FILEORGANIZEFORM_BYTRAINER: + lIndext = groupId * self._groupSize + rIndext = (groupId + 1) * self._groupSize + elif self._organizeForm == FILEORGANIZEFORM_BYRANK: + lIndext = groupId * self._groupSize * self._gpuPerTrainer + rIndext = (groupId + 1) * self._groupSize * self._gpuPerTrainer + + try: + return self._fileList[lIndext:rIndext] + except IndexError: + raise IndexError("invalid index of file list") + + def getFileList(self): + return self._getFileList + + def _cmp(self, x, y): + return self._getId(x, self._organizeForm) - self._getId( + y, self._organizeForm) + + def _getFileList(self): + self._fileList = glob.glob(os.path.join(self._dataPath, "*.*")) + + # check unique + idList = [] + newFileList = [] + for file in self._fileList: + id = self._getId(file, self._organizeForm) + if id not in idList: + idList.append(id) + newFileList.append(file) + else: + raise NotImplementedError( + "[%s] is repeated by id, we don not how to process it!" % + file) + + if not self._fileList: + if (self._getId(self._fileList[-1]) - self._getId(self._fileList[0]) + ) != len(self._fileList) - 1: + raise Exception("The file id should be countious!") + # sort + def _sortBySuffix(elem): + return int(elem.split(".")[-1]) + + self._fileList.sort(key=_sortBySuffix) + + if not self._fileList: + self._logger.warning("we can not find any file in dir [%s]!" % + self._dataPath) + else: + self._logger.info("file list in dir [%s] is : %s !" % + (self._dataPath, ', '.join(self._fileList))) + + return self._fileList + + def _getId(self, fileName, organizeForm, sed="."): + if self._organizeForm != organizeForm: + raise TypeError("Can not get rank id when organizer form is not %s!" + % organizeForm) + + if not os.path.isfile(fileName): + raise IOError("[%s] is not a valid file!" % (fileName)) + + try: + prefix_str = fileName.split(sed)[-1] + try: + return int(prefix_str) + except ValueError, Argument: + print(Argument) + raise TypeError("invalid fileName [%s]" % fileName) + + except IndexError, Argument: + print(Argument) + raise TypeError( + "invalid fileName [%s], the prefix should be a number!" % + fileName) + + def getRankId(self, fileName, sed="."): + return self._getId(fileName, FILEORGANIZEFORM_BYRANK, sed) + + def getRankNum(self): + if self._organizeForm == FILEORGANIZEFORM_BYRANK: + return len(self._fileList) + + elif self._organizeForm == FILEORGANIZEFORM_BYTRAINER: + return len(self._fileList) * self._gpuPerTrainer + + def getTrainerNum(self): + if self._organizeForm == FILEORGANIZEFORM_BYRANK: + return len(self._fileList) / self._gpuPerTrainer + + elif self._organizeForm == FILEORGANIZEFORM_BYTRAINER: + return len(self._fileList) + + def getTrainerId(self, fileName, sed="."): + return self._getId(fileName, FILEORGANIZEFORM_BYTRAINER, sed) + + def _splitTaskListForMultiProcess(self, ls, n): + if not isinstance(ls, list) or not isinstance(n, int): + return [] + ls_len = len(ls) + if n <= 0 or 0 == ls_len: + return [] + if n >= ls_len: + return [[i] for i in ls] + else: + j = int((ls_len + n - 1) / n) + k = ls_len % n + ls_return = [] + end = 0 + for i in range(0, (n) * j, j): + if i < len(ls) and (i + j) < len(ls): + ls_return.append(ls[i:i + j]) + end = i + j + ls_return.append(ls[end:]) + return ls_return + + def getOpInfoFileName(self, groupId, gpuId, tmpPath="./tmp"): + return self.getFileName("opinfo", groupId, gpuId, tmpPath) + + def getPipeLineInfoFileName(self, groupId, gpuId, tmpPath="./tmp"): + return self.getFileName("pipilineinfo", groupId, gpuId, tmpPath) + + def getDCGMInfoFileName(self, groupId, gpuId, tmpPath="./tmp"): + return self.getFileName("dcgm", groupId, gpuId, tmpPath) + + def getFileName(self, name, groupId, gpuId, tmpPath="./tmp"): + return os.path.join(tmpPath, "%s_%d_%d.json" % (name, groupId, gpuId)) + + def getOpInfoDict(self, groupId, gpuId, tmpPath="./tmp"): + return self.getDict("opinfo", groupId, gpuId, tmpPath) + + def getDcgmInfoDict(self, groupId, gpuId, tmpPath="./tmp"): + return self.getDict("dcgm", groupId, gpuId, tmpPath) + + def getDict(self, name, groupId, gpuId, tmpPath="./tmp"): + fileName = self.getFileName(name, groupId, gpuId, tmpPath) + if not os.path.isfile(fileName): + raise IOError("[%s] is not existed!" % fileName) + + data = {} + with open(fileName, "r") as rf: + try: + data = json.load(rf) + except Exception: + self._logger.error("read [%s] error. not a json file!" % + (fileName)) + raise TypeError("read [%s] error. not a json file!" % + (fileName)) + return data + + def dumpOpInfoDict(self, + data, + groupId, + gpuId, + pretty=False, + tmpPath="./tmp"): + return self.dumpDict( + data, "opinfo", groupId, gpuId, pretty=False, tmpPath="./tmp") + + def dumpDCGMDict(self, data, groupId, gpuId, pretty=False, tmpPath="./tmp"): + return self.dumpDict( + data, "dcgm", groupId, gpuId, pretty=False, tmpPath="./tmp") + + def dumpDict(self, + data, + name, + groupId, + gpuId, + pretty=False, + tmpPath="./tmp"): + self._lock.acquire() + if not os.path.exists(tmpPath): + os.makedirs(tmpPath) + self._lock.release() + if pretty: + jsObj = json.dumps(data, indent=4, separators=(',', ': ')) + else: + jsObj = json.dumps(data, separators=(',', ':')) + + fileName = self.getFileName(name, groupId, gpuId, tmpPath) + if os.path.isfile(fileName): + os.remove(fileName) + + fileObject = open(fileName, 'w') + fileObject.write(jsObj) + fileObject.close() + self._logger.info("dump [%s] sucessfully!" % fileName) + + +def getLogger(): + logger = logging.getLogger() + logger.setLevel(logging.DEBUG) + + rq = time.strftime('%Y%m%d%H%M.%s', time.localtime(time.time())) + log_path = os.path.dirname(os.getcwd()) + '/Logs/' + if not os.path.exists(log_path): + os.makedirs(log_path) + + log_name = log_path + rq + '.log' + logfile = log_name + fh = logging.FileHandler(logfile, mode='w') + fh.setLevel(logging.DEBUG) + + formatter = logging.Formatter( + "%(asctime)s - %(filename)s[line:%(lineno)d] - %(process)d - %(levelname)s: %(message)s" + ) + fh.setFormatter(formatter) + + logger.addHandler(fh) + return logger + + +def test_FileReader(args): + try: + testReader = FileReader(None, args) + except Exception, Argument: + print(Argument) + else: + testReader.printArgs() + + +if __name__ == "__main__": + args = 0 + test_FileReader(args) + + args = { + "dataPath": ".", + "groupSize": 1, + "displaySize": 1, + "gpuPerTrainer": 8, + "organizeForm": FILEORGANIZEFORM_BYOTHER, + } + test_FileReader(args) + + args = { + "dataPath": ".", + "groupSize": 1, + "displaySize": 1, + "gpuPerTrainer": 8, + "organizeForm": FILEORGANIZEFORM_BYTRAINER, + } + test_FileReader(args) + + args = { + "dataPath": "./res", + "groupSize": 1, + "displaySize": 1, + "gpuPerTrainer": 8, + "organizeForm": FILEORGANIZEFORM_BYTRAINER, + } + test_FileReader(args) + + args = { + "dataPath": ".", + "groupSize": "", + "displaySize": 1, + "gpuPerTrainer": 8, + "organizeForm": FILEORGANIZEFORM_BYTRAINER, + } + test_FileReader(args) diff --git a/tools/CrossStackProfiler/CspReporter.py b/tools/CrossStackProfiler/CspReporter.py new file mode 100755 index 0000000000000000000000000000000000000000..1b8ae0e3855348441e99e61fd302742852ac0156 --- /dev/null +++ b/tools/CrossStackProfiler/CspReporter.py @@ -0,0 +1,237 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import glob +import logging +import argparse +import multiprocessing + +import pandas as pd +from multiprocessing import Process + +from NetFileReader import netFileReader +from DCGMFileReader import dcgmFileReader +from ProfileFileReader import profileFileReader + +from CspFileReader import getLogger +from CspFileReader import TIME_PATH, DCGM_PATH, NET_PATH, PROFILE_PATH +from CspFileReader import NETINFO_TRACE_NUM, DCGMINFO_TRACE_NUM, PIPELINEINFO_TRACE_NUM +from CspFileReader import FILEORGANIZEFORM_BYRANK, FILEORGANIZEFORM_BYTRAINER, FILEORGANIZEFORM_BYOTHER, FILEORGANIZEFORM + + +def get_argparse(): + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + '--profile_path', + type=str, + default='.', + help='Working path that store the monitor data.') + + parser.add_argument( + '--timeline_path', + type=str, + default='.', + help='Output timeline file name.') + + parser.add_argument( + '--gpuPerTrainer', type=int, default=8, help='Gpus per trainer.') + + parser.add_argument( + '--trainerNum', type=int, default=4, help='Num of trainer.') + + parser.add_argument( + '--groupSize', type=int, default=8, help='Num of trainer in a group.') + + parser.add_argument( + '--displaySize', + type=int, + default=2, + help='Num of line need to display in a group.') + + return parser.parse_args() + + +class CspReporter(object): + def __init__(self, args): + self._args = args + print(self._args) + + self._workPath = self._args.profile_path + self._saveFilePath = self._args.timeline_path + self._gpuPerTrainer = self._args.gpuPerTrainer + self._groupSize = self._args.groupSize + self._displaySize = self._args.displaySize + self._trainerNum = self._args.trainerNum + + self._checkArgs() + + self._init_logger() + self._init_timeInfo() + self._init_reader() + + def _checkArgs(self): + if self._trainerNum % self._groupSize != 0: + raise Exception( + "Input args error: trainerNum[%d] %% groupSize[%d] != 0" % + (self._trainerNum, self._groupSize)) + + def _init_logger(self): + self._logger = getLogger() + + def _init_reader(self): + self._dcgmPath = os.path.join(self._workPath, DCGM_PATH) + self._netPath = os.path.join(self._workPath, NET_PATH) + self._profilePath = os.path.join(self._workPath, PROFILE_PATH) + + self._netFileReaderArgs = { + "dataPath": self._netPath, + "groupSize": self._groupSize, + "displaySize": self._displaySize, + "gpuPerTrainer": self._gpuPerTrainer, + "minTimeStamp": self._minTimeStamp, + "organizeForm": FILEORGANIZEFORM_BYTRAINER, + } + + self._dcgmFileReaderArgs = { + "dataPath": self._dcgmPath, + "groupSize": self._groupSize, + "displaySize": self._displaySize, + "gpuPerTrainer": self._gpuPerTrainer, + "minTimeStamp": self._minTimeStamp, + "organizeForm": FILEORGANIZEFORM_BYTRAINER, + } + + self._profileFileReaderArgs = { + "dataPath": self._profilePath, + "groupSize": self._groupSize, + "displaySize": self._displaySize, + "gpuPerTrainer": self._gpuPerTrainer, + "minTimeStamp": self._minTimeStamp, + "organizeForm": FILEORGANIZEFORM_BYRANK, + } + + self._dcgmFileReader = dcgmFileReader(self._logger, + self._dcgmFileReaderArgs) + self._profileFileReader = profileFileReader(self._logger, + self._profileFileReaderArgs) + + def _init_timeInfo(self): + self._timePath = os.path.join(self._workPath, TIME_PATH) + self._timeInfo = {} + self._minTimeStamp = 0 + self._set_timeInfo() + + def _set_timeInfo(self, timeFileNamePrefix="time.txt", sed="."): + timeFileNameList = glob.glob( + os.path.join(self._timePath, timeFileNamePrefix, sed, "*")) + for timeFileName in timeFileNameList: + trainerId = int(timeFileName.split(sed)[-1]) + gpuId = int(timeFileName.split(sed)[-2]) + info = {} + with open(timeFileName, "r") as rf: + for line in rf: + if line.startswith("start time:"): + info["start_time"] = int( + float(line.split(":")[-1]) * 1e9) + + self._minTimeStamp = min(self._minTimeStamp, + info["start_time"]) + + if line.startswith("end time:"): + info["end_time"] = int(float(line.split(":")[-1]) * 1e9) + if not info: + self._timeInfo[gpuId * trainerId] = info + + def _generateTraceFileByGroupAndGpuId(self, pipileInfo, netInfo, groupId, + gpuId): + dcgmInfoDict = self._dcgmFileReader.getDcgmInfoDict(groupId, gpuId) + opInfoDict = self._profileFileReader.getOpInfoDict(groupId, gpuId) + + traceObj = {} + traceObj["traceEvents"] = pipileInfo[str(gpuId)] + opInfoDict[ + "traceEvents"] + dcgmInfoDict["traceEvents"] + netInfo[ + "traceEvents"] + + self._profileFileReader.dumpDict(traceObj, "traceFile", groupId, gpuId, + False, self._saveFilePath) + + def _generateTraceFileByGroup(self, groupId, processNum): + # first we need to generate pipeline info + pipileInfo = self._profileFileReader.getPipeLineInfo(groupId, + processNum) + # second we need to generate dcgm info + dcgmInfo = self._dcgmFileReader.getDCGMTraceInfo(groupId, processNum) + + # third we need to generate net info + netInfo = {} + netInfo["traceEvents"] = [] + # netInfo = self._netFileReader.parseFileByGroup(groupId, processNum) + + # forth we need to generate op info + opInfo = self._profileFileReader.getOPTraceInfo(groupId) + + # finially we need dump this information into disk + processPool = [] + pidList = [] + + for gpuId in range(self._gpuPerTrainer): + subproc = Process( + target=self._generateTraceFileByGroupAndGpuId, + args=( + pipileInfo, + netInfo, + groupId, + gpuId, )) + processPool.append(subproc) + subproc.start() + pidList.append(subproc.pid) + self._logger.info( + "[traceFile]: process [%d] has been started, total task num is %d ..." + % (subproc.pid, 1)) + + for t in processPool: + t.join() + pidList.remove(t.pid) + self._logger.info( + "[traceFile]: process [%d] has exited! remained %d process!" % + (t.pid, len(pidList))) + + def generateTraceFile(self, processNum=8): + processPool = [] + pidList = [] + for groupId in range(self._trainerNum / self._groupSize): + subproc = Process( + target=self._generateTraceFileByGroup, + args=( + groupId, + processNum, )) + processPool.append(subproc) + subproc.start() + pidList.append(subproc.pid) + self._logger.info( + "[GroupTraceFile]: process [%d] has been started, total task num is %d ..." + % (subproc.pid, 1)) + for t in processPool: + t.join() + pidList.remove(t.pid) + self._logger.info( + "[GroupTraceFile]: process [%d] has exited! remained %d process!" + % (t.pid, len(pidList))) + + +if __name__ == '__main__': + args = get_argparse() + tl = CspReporter(args) + tl.generateTraceFile() diff --git a/tools/CrossStackProfiler/DCGMFileReader.py b/tools/CrossStackProfiler/DCGMFileReader.py new file mode 100755 index 0000000000000000000000000000000000000000..599acb44c6556c1ecc43ad1e831355c201171e01 --- /dev/null +++ b/tools/CrossStackProfiler/DCGMFileReader.py @@ -0,0 +1,269 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import re +import json +import glob +import logging +import tempfile +import argparse +import pandas as pd +import multiprocessing +from multiprocessing import Process + +from CspChromeTraceFormatter import ChromeTraceFormatter + +from CspFileReader import FileReader +from CspFileReader import getLogger +from CspFileReader import dcgmMetricParameterMap +from CspFileReader import TIME_PATH, DCGM_PATH, NET_PATH, PROFILE_PATH +from CspFileReader import NETINFO_TRACE_NUM, DCGMINFO_TRACE_NUM, PIPELINEINFO_TRACE_NUM +from CspFileReader import FILEORGANIZEFORM_BYRANK, FILEORGANIZEFORM_BYTRAINER, FILEORGANIZEFORM_BYOTHER, FILEORGANIZEFORM + + +class dcgmFileReader(FileReader): + def parseFileByGroup(self, groupId, processNum=8): + fileFist = self.getFileListByGroup(groupId) + displaySize = min(self._displaySize, len(fileFist)) + fileFist = fileFist[:displaySize] + + if processNum == 0: + return self._parseTask(fileFist) + + else: + self._logger.info("using [%d] process to do this work!" % + processNum) + processPool = [] + pidList = [] + + manager = multiprocessing.Manager() + q = manager.Queue() + + taskList = self._splitTaskListForMultiProcess(fileFist, processNum) + for task in taskList: + subproc = Process( + target=self._parseTask, args=( + task, + q, )) + processPool.append(subproc) + subproc.start() + pidList.append(subproc.pid) + self._logger.info( + "[DCGM reader]: process [%d] has been started, total task num is %d ..." + % (subproc.pid, len(processPool))) + + for t in processPool: + t.join() + pidList.remove(t.pid) + self._logger.info( + "[DCGM reader]: process [%d] has exited! remained %d process!" + % (t.pid, len(pidList))) + + isFistProcess = True + for t in processPool: + if isFistProcess: + isFistProcess = False + dcgm_data = q.get() + else: + dcgm_data = pd.concat( + [dcgm_data, q.get()], axis=0, join='outer') + + return dcgm_data + + def _parseTask(self, taskList, q=None): + is_first = True + for fileName in taskList: + self._logger.info("I am processing %s!" % fileName) + tmp_data = self._parseSingleFile(fileName) + if tmp_data is None: + continue + + if is_first: + is_first = False + dcgm_data = tmp_data + else: + dcgm_data = pd.concat( + [dcgm_data, tmp_data], axis=0, join='outer') + dcgm_data = dcgm_data.dropna() + if not q is None: + q.put(dcgm_data) + self._logger.info("I finish processing %s!" % fileName) + return dcgm_data + + def _parseSingleFile(self, fileName): + trainerId = self.getTrainerId(fileName) + + if not os.path.exists(fileName): + logging.warning(fileName + ' not found') + return + + regex_list = [ + (re.compile(r' +'), ','), + (re.compile(r'^,'), ''), + ] + + csv_tempfile = tempfile.TemporaryFile() + with open(fileName, 'r') as fp: + has_header = False + + for line in fp: + # skip `nvidia-dcgm-dmon.sh` init and fini info lines + if 'nv-hostengine' in line or 'dmon' in line or 'Host Engine Listener Started' in line: + continue + + if not line.strip().startswith("GPU") and not line.strip( + ).startswith("# Entity"): + continue + + # skip non-needed headers (only the header in 1th line was needed) + if line.strip().startswith("# Entity"): + line = line.strip()[2:] + + if 'Entity' == line[0:len('Entity')]: + if has_header: + continue + else: + has_header = True + + if line.strip().startswith("GPU"): + line = line.strip()[3:] + + for r in regex_list: + line = r[0].sub(r[1], line) + + csv_tempfile.write(bytes(line + "\n")) + + csv_tempfile.seek(0) + + dcgm = pd.read_csv(csv_tempfile, header=0, delimiter=',') + # dcgm.info() + dcgm['FB_USED_RATIO'] = dcgm['FBUSD'] / dcgm['FBTTL'] + dcgm['GPUTL'] = dcgm['GPUTL'] / 100.0 + dcgm['ts'] = dcgm['TIMESTAMP'] * 1e9 + dcgm['trainerId'] = trainerId + + return dcgm + + def _getDCGMTraceInfoByGpuId(self, + groupId, + gpuId, + dcgm_data, + pid_map, + q=None): + self._logger.info( + "Begin to generate dcgm info, groupId = %d, gpuID = %d ..." % + (groupId, gpuId)) + + gpuDcgmData = dcgm_data[dcgm_data['Entity'].isin([gpuId])] + + traceEventList = [] + for metric, parameteList in dcgmMetricParameterMap.items(): + metaInfo = {} + metaInfo['name'] = 'process_name' + metaInfo['ph'] = 'M' + metaInfo['pid'] = pid_map[metric] + metaInfo['args'] = {'name': metric} + traceEventList.append(metaInfo) + + for index, row in gpuDcgmData.iterrows(): + for metric, parameteList in dcgmMetricParameterMap.items(): + trainerId = int(row['trainerId']) % self._groupSize + if trainerId >= self._displaySize: + continue + + di = {} + # name = "%s_%d" % (metric, trainerId) + name = "%s" % (metric) + di['name'] = name + di['pid'] = pid_map[metric] + di['ts'] = self._align_ts(int(row['ts'])) + # di['ts'] = int(row['ts']) + di['cat'] = metric + di['tid'] = "%d_%d" % (groupId, trainerId) + di['ph'] = "C" + di['id'] = trainerId + + args = {} + for p in parameteList: + args[p[0]] = row[p[1]] + di['args'] = args + + traceEventList.append(di) + trace = {} + trace['traceEvents'] = traceEventList + self.dumpDCGMDict(trace, groupId, gpuId, True) + + return trace + + def getDCGMTraceInfo(self, groupId, processNum=8): + dcgm_data = self.parseFileByGroup(groupId, processNum) + + pid_map = {} + init_pid = PIPELINEINFO_TRACE_NUM + + for metric in dcgmMetricParameterMap.keys(): + pid_map[metric] = init_pid + init_pid = init_pid + 1 + + manager = multiprocessing.Manager() + q = manager.Queue() + processPool = [] + pidList = [] + + for gpuId in range(self._gpuPerTrainer): + subproc = Process( + target=self._getDCGMTraceInfoByGpuId, + args=( + groupId, + gpuId, + dcgm_data, + pid_map, + q, )) + processPool.append(subproc) + subproc.start() + pidList.append(subproc.pid) + self._logger.info( + "[DCGM info]: process [%d] has been started, total task num is %d ..." + % (subproc.pid, 1)) + + for t in processPool: + t.join() + pidList.remove(t.pid) + self._logger.info( + "[DCGM info]: process [%d] has exited! remained %d process!" % + (t.pid, len(pidList))) + + dcgmInfo = {} + + return dcgmInfo + + +def test_dcgmFileReader(): + args = { + "dataPath": "data/newdata/dcgm", + "groupSize": 4, + "displaySize": 8, + "gpuPerTrainer": 8, + "minTimeStamp": 0, + "organizeForm": FILEORGANIZEFORM_BYTRAINER, + } + + testReader = dcgmFileReader(getLogger(), args) + testReader.printArgs() + data = testReader.getDCGMTraceInfo(0, 8) + + +if __name__ == "__main__": + test_dcgmFileReader() diff --git a/tools/CrossStackProfiler/NetFileReader.py b/tools/CrossStackProfiler/NetFileReader.py new file mode 100755 index 0000000000000000000000000000000000000000..29c2ae85e60458f6b712e3eeadb739a2dee70d09 --- /dev/null +++ b/tools/CrossStackProfiler/NetFileReader.py @@ -0,0 +1,146 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import json +import glob +import logging +import pandas as pd + +from multiprocessing import Process + +from CspChromeTraceFormatter import ChromeTraceFormatter + +from CspFileReader import FileReader +from CspFileReader import getLogger +from CspFileReader import TIME_PATH, DCGM_PATH, NET_PATH, PROFILE_PATH +from CspFileReader import NETINFO_TRACE_NUM, DCGMINFO_TRACE_NUM, PIPELINEINFO_TRACE_NUM +from CspFileReader import FILEORGANIZEFORM_BYRANK, FILEORGANIZEFORM_BYTRAINER, FILEORGANIZEFORM_BYOTHER, FILEORGANIZEFORM + + +class netFileReader(FileReader): + def _parseSingleFile(self, fileNameList, tx_pid, rx_pid, q=None): + + traceInfo = {} + traceEventList = [] + + metaInfo = {} + metaInfo['name'] = 'process_name' + metaInfo['ph'] = 'M' + metaInfo['pid'] = tx_pid + metaInfo['args'] = {'name': "%02d_tx" % tx_pid} + + traceEventList.append(metaInfo) + metaInfo = {} + metaInfo['name'] = 'process_name' + metaInfo['ph'] = 'M' + metaInfo['pid'] = rx_pid + metaInfo['args'] = {'name': "%02d_rx" % rx_pid} + + traceEventList.append(metaInfo) + + trainerIdList = [] + for fileName in fileNameList: + trainerId = self.getTrainerId(fileName) + trainerIdList.append(trainerId) + with open(fileName, "r") as rf: + for line in rf: + try: + event_str = json.loads(line.strip()) + event_str["pid"] = tx_pid if event_str[ + "name"] == "tx" else rx_pid + # the unit of net is ms, we need ns + event_str["ts"] = self._align_ts(event_str["ts"] * 1e6) + event_str["id"] = trainerId + traceEventList.append(event_str) + + except Exception: + self._logger.warning( + "invalid record [%s] in [%s]. skip it!" % + (line[:-1], fileName)) + traceInfo["traceEvents"] = traceEventList + + if not q is None: + q.put(traceInfo) + else: + return traceInfo + + def parseFileByGroup(self, groupId, processNum=8): + fileFist = self.getFileListByGroup(groupId) + fileFist = fileFist[:min(self._displaySize, len(fileFist))] + + manager = multiprocessing.Manager() + q = manager.Queue() + + processPool = [] + pidList = [] + tx_pid = PIPELINEINFO_TRACE_NUM + rx_pid = PIPELINEINFO_TRACE_NUM + 1 + + taskList = self._splitTaskListForMultiProcess(fileFist, processNum) + for task in taskList: + subproc = Process( + target=self._parseSingleFile, args=( + task, + tx_pid, + rx_pid, + q, )) + processPool.append(subproc) + subproc.start() + pidList.append(subproc.pid) + self._logger.info( + "[Net info]: process [%d] has been started, total task num is %d ..." + % (subproc.pid, len(processPool))) + + for t in processPool: + t.join() + pidList.remove(t.pid) + self._logger.info( + "[Net info]: process [%d] has exited! remained %d process!" % + (t.pid, len(pidList))) + + traceInfo = {} + isFistProcess = True + for t in processPool: + if isFistProcess: + isFistProcess = False + traceInfo["traceEvents"] = q.get()["traceEvents"] + else: + traceInfo["traceEvents"].extend(q.get()["traceEvents"]) + + return traceInfo + + +def test_netFileReader(): + args = { + "dataPath": "data/newdata/net", + "groupSize": 4, + "displaySize": 2, + "gpuPerTrainer": 8, + "minTimeStamp": 0, + "organizeForm": FILEORGANIZEFORM_BYTRAINER, + } + + testReader = netFileReader(getLogger(), args) + testReader.printArgs() + data = testReader.parseFileByGroup(0, 8) + + jsObj = json.dumps(data, indent=4, separators=(',', ': ')) + fileObject = open('jsonFile.json', 'w') + fileObject.write(jsObj) + fileObject.close() + + +if __name__ == "__main__": + test_netFileReader() diff --git a/tools/CrossStackProfiler/ProfileFileReader.py b/tools/CrossStackProfiler/ProfileFileReader.py new file mode 100755 index 0000000000000000000000000000000000000000..0f3299ef5473fad6cde3c06ae99ba7727e1a7206 --- /dev/null +++ b/tools/CrossStackProfiler/ProfileFileReader.py @@ -0,0 +1,480 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import six +import glob +import json +import logging +import argparse +import pandas as pd +import multiprocessing +from multiprocessing import Process + +import google.protobuf.text_format as text_format +import paddle.fluid.proto.profiler.profiler_pb2 as profiler_pb2 + +from CspChromeTraceFormatter import ChromeTraceFormatter + +from CspFileReader import FileReader +from CspFileReader import getLogger +from CspFileReader import TIME_PATH, DCGM_PATH, NET_PATH, PROFILE_PATH +from CspFileReader import NETINFO_TRACE_NUM, DCGMINFO_TRACE_NUM, PIPELINEINFO_TRACE_NUM +from CspFileReader import FILEORGANIZEFORM_BYRANK, FILEORGANIZEFORM_BYTRAINER, FILEORGANIZEFORM_BYOTHER, FILEORGANIZEFORM + + +class profileFileReader(FileReader): + def _parseSingleFile(self, profile): + with open(profile, 'rb') as f: + profile_s = f.read() + profile_pb = profiler_pb2.Profile() + profile_pb.ParseFromString(profile_s) + + return profile_pb + + def _parseTask(self, taskList, q=None): + profile_dict = {} + + for fileName in taskList: + rankId = self.getRankId(fileName) + profile_dict["trainerRank.%03d" % + (rankId)] = self._parseSingleFile(fileName) + self._logger.info("I finish processing %s!" % fileName) + + if not q is None: + q.put(profile_dict) + + return profile_dict + + def _is_forwardBackwardInfo(self, items): + if items["name"] == "marker/compute/MarkerCUDA": + if items.has_key("args"): + if isinstance(items["args"], dict): + args = items["args"] + if args.has_key("detail_info"): + if args["detail_info"] == "marker_forward_B" or \ + args["detail_info"] == "marker_forward_E" or \ + args["detail_info"] == "marker_backward_B" or \ + args["detail_info"] == "marker_backward_E": + return True + return False + + def _allocate_forwardBackwardInfo(self, restList, pid, tid): + def _cmp_ele(items): + return items["ts"] + + restList.sort(key=_cmp_ele) + newList = [] + + lastEle = {} + for items in restList: + if items["args"]["detail_info"].endswith("E"): + if not lastEle: + continue + else: + lastEle["dur"] = items["ts"] - lastEle["ts"] + name = lastEle["args"]["detail_info"] + name = name[:name.rfind('_')] + name = name.split('_')[1] + lastEle["name"] = name + lastEle["args"]["detail_info"] = name + lastEle["args"]["name"] = name + if name == "backward": + lastEle["cname"] = "good" + else: + lastEle["cname"] = "bad" + + lastEle["tid"] = tid + lastEle["pid"] = pid + + newList.append(lastEle) + else: + lastEle = items + + return newList + + def _getPipeLineInfo(self, profileList, q=None): + + res = {} + for profile in profileList: + rankId = self.getRankId(profile) + + profile_pb = self._parseSingleFile(profile) + traceEventList = [] + pid = 0 + tid = rankId + + for event in profile_pb.events: + args = {'name': event.name} + if event.memcopy.bytes > 0: + args['mem_bytes'] = event.memcopy.bytes + if hasattr(event, "detail_info") and event.detail_info: + args['detail_info'] = event.detail_info + + traceEvent = {} + traceEvent['ph'] = 'X' + traceEvent['cat'] = 'Op' + traceEvent['name'] = event.name + traceEvent['pid'] = pid + traceEvent['tid'] = tid + traceEvent['ts'] = self._align_ts(event.start_ns) + traceEvent['dur'] = (event.end_ns - event.start_ns) / 1.0 + traceEvent['args'] = args + + if self._is_forwardBackwardInfo(traceEvent): + traceEventList.append(traceEvent) + + pipeLineList = self._allocate_forwardBackwardInfo(traceEventList, + pid, tid) + + res[str(rankId)] = pipeLineList + + if not q is None: + q.put(res) + + return res + + def getPipeLineInfo(self, groupId, processNum=8): + fileFist = self.getFileListByGroup(groupId) + + self._logger.info( + "using [%d] process to do this work, total task num is %d!" % + (processNum, len(fileFist))) + processPool = [] + pidList = [] + + manager = multiprocessing.Manager() + q = manager.Queue() + + taskList = self._splitTaskListForMultiProcess(fileFist, processNum) + for task in taskList: + subproc = Process( + target=self._getPipeLineInfo, args=( + task, + q, )) + processPool.append(subproc) + subproc.start() + pidList.append(subproc.pid) + self._logger.info( + "[pipeline info]: process [%d] has been started, total task num is %d ..." + % (subproc.pid, len(task))) + + for t in processPool: + t.join() + pidList.remove(t.pid) + self._logger.info( + "[pipeline info]: process [%d] has exited! remained %d process!" + % (t.pid, len(pidList))) + + pipeLineInfo = {} + + metaInfo = {} + metaInfo['name'] = 'process_name' + metaInfo['ph'] = 'M' + metaInfo['pid'] = 0 + metaInfo['args'] = { + 'name': "%02d_pipeLineInfo" % PIPELINEINFO_TRACE_NUM + } + + for t in processPool: + for k, v in q.get().items(): + rankId = int(k) + gpuId = rankId % self._gpuPerTrainer + if str(gpuId) not in pipeLineInfo.keys(): + pipeLineInfo[str(gpuId)] = [metaInfo] + pipeLineInfo[str(gpuId)].extend(v) + + return pipeLineInfo + + def _allocate_pids(self, profile_dict, gpuId, initPid): + chrome_trace = ChromeTraceFormatter() + devices = dict() + mem_devices = dict() + + initLineNum = initPid + 1 + lineDelta = len(profile_dict.keys()) + i = 0 + for k, profile_pb in six.iteritems(profile_dict): + lineNum = initLineNum + for event in profile_pb.events: + if event.type == profiler_pb2.Event.CPU: + if (k, event.device_id, "CPU") not in devices: + pid = initPid + initPid = initPid + 1 + devices[(k, event.device_id, "CPU")] = pid + # -1 device id represents CUDA API(RunTime) call.(e.g. cudaLaunch, cudaMemcpy) + if event.device_id == -1: + chrome_trace.emit_pid("%02d_%s:cuda_api" % + (lineNum, k), pid) + lineNum = lineNum + 1 + else: + chrome_trace.emit_pid("%02d_%s:cpu:block:%d" % + (lineNum, k, event.device_id), + pid) + lineNum = lineNum + 1 + elif event.type == profiler_pb2.Event.GPUKernel: + if (k, event.device_id, "GPUKernel") not in devices: + if gpuId == event.device_id: + pid = initPid + initPid = initPid + 1 + + devices[(k, event.device_id, "GPUKernel")] = pid + chrome_trace.emit_pid("%02d_%s:gpu:%d" % + (lineNum, k, event.device_id), + pid) + lineNum = lineNum + 1 + + if not hasattr(profile_pb, "mem_events"): + continue + for mevent in profile_pb.mem_events: + if mevent.place == profiler_pb2.MemEvent.CUDAPlace: + if (k, mevent.device_id, "GPU") not in mem_devices: + if gpuId == mevent.device_id: + pid = initPid + initPid = initPid + 1 + + mem_devices[(k, mevent.device_id, "GPU")] = pid + chrome_trace.emit_pid( + "%02d_memory usage on %s:gpu:%d" % + (lineNum, k, mevent.device_id), pid) + lineNum = lineNum + 1 + elif mevent.place == profiler_pb2.MemEvent.CPUPlace: + if (k, mevent.device_id, "CPU") not in mem_devices: + pid = initPid + initPid = initPid + 1 + + mem_devices[(k, mevent.device_id, "CPU")] = pid + chrome_trace.emit_pid("%02d_memory usage on %s:cpu:%d" % + (lineNum, k, mevent.device_id), + pid) + lineNum = lineNum + 1 + elif mevent.place == profiler_pb2.MemEvent.CUDAPinnedPlace: + if (k, mevent.device_id, "CUDAPinnedPlace" + ) not in mem_devices: + if gpuId == mevent.device_id: + pid = initPid + initPid = initPid + 1 + + mem_devices[(k, mevent.device_id, + "CUDAPinnedPlace")] = pid + chrome_trace.emit_pid( + "%02d_memory usage on %s:cudapinnedplace:%d" % + (lineNum, k, mevent.device_id), pid) + lineNum = lineNum + 1 + if (k, 0, "CPU") not in mem_devices: + pid = initPid + initPid = initPid + 1 + + mem_devices[(k, 0, "CPU")] = pid + chrome_trace.emit_pid("%02d_memory usage on %s:cpu:%d" % + (lineNum, k, 0), pid) + lineNum = lineNum + 1 + if (k, 0, "GPU") not in mem_devices: + # if gpuId == mevent.device_id: + pid = initPid + initPid = initPid + 1 + + mem_devices[(k, 0, "GPU")] = pid + chrome_trace.emit_pid("%02d_memory usage on %s:gpu:%d" % + (lineNum, k, 0), pid) + lineNum = lineNum + 1 + if (k, 0, "CUDAPinnedPlace") not in mem_devices: + pid = initPid + initPid = initPid + 1 + + mem_devices[(k, 0, "CUDAPinnedPlace")] = pid + chrome_trace.emit_pid( + "%02d_memory usage on %s:cudapinnedplace:%d" % + (lineNum, k, 0), pid) + lineNum = lineNum + 1 + i = i + 1 + return chrome_trace, devices, mem_devices + + def _allocate_events(self, profile_dict, devices, gpuId): + chrome_trace = ChromeTraceFormatter() + for k, profile_pb in six.iteritems(profile_dict): + + rankId = int(k.split(".")[-1]) + + for event in profile_pb.events: + if event.type == profiler_pb2.Event.CPU: + type = "CPU" + elif event.type == profiler_pb2.Event.GPUKernel: + type = "GPUKernel" + + if event.type == profiler_pb2.Event.GPUKernel and event.device_id != gpuId and rankId % self._gpuPerTrainer != gpuId: + continue + + pid = devices[(k, event.device_id, type)] + args = {'name': event.name} + if event.memcopy.bytes > 0: + args['mem_bytes'] = event.memcopy.bytes + if hasattr(event, "detail_info") and event.detail_info: + args['detail_info'] = event.detail_info + # TODO(panyx0718): Chrome tracing only handles ms. However, some + # ops takes micro-seconds. Hence, we keep the ns here. + chrome_trace.emit_region( + self._align_ts(event.start_ns), + (event.end_ns - event.start_ns) / 1.0, pid, + event.sub_device_id, 'Op', event.name, args) + return chrome_trace + + def _allocate_memory_event(self, profile_dict, mem_devices, gpuId): + chrome_trace = ChromeTraceFormatter() + if not hasattr(profiler_pb2, "MemEvent"): + return + place_to_str = { + profiler_pb2.MemEvent.CPUPlace: "CPU", + profiler_pb2.MemEvent.CUDAPlace: "GPU", + profiler_pb2.MemEvent.CUDAPinnedPlace: "CUDAPinnedPlace" + } + for k, profile_pb in six.iteritems(profile_dict): + rankId = int(k.split(".")[-1]) + + trainerId = rankId / self._gpuPerTrainer + + if trainerId >= self._displaySize: + continue + + mem_list = [] + end_profiler = 0 + for mevent in profile_pb.mem_events: + crt_info = dict() + crt_info['time'] = mevent.start_ns + crt_info['size'] = mevent.bytes + if mevent.place in place_to_str: + place = place_to_str[mevent.place] + else: + place = "UnDefine" + + if (mevent.place == profiler_pb2.MemEvent.CUDAPlace or + mevent.place == profiler_pb2.MemEvent.CUDAPinnedPlace + ) and mevent.device_id != gpuId: + continue + + crt_info['place'] = place + pid = mem_devices[(k, mevent.device_id, place)] + crt_info['pid'] = pid + crt_info['thread_id'] = mevent.thread_id + crt_info['device_id'] = mevent.device_id + mem_list.append(crt_info) + crt_info = dict() + crt_info['place'] = place + crt_info['pid'] = pid + crt_info['thread_id'] = mevent.thread_id + crt_info['device_id'] = mevent.device_id + crt_info['time'] = mevent.end_ns + crt_info['size'] = -mevent.bytes + mem_list.append(crt_info) + end_profiler = max(end_profiler, crt_info['time']) + mem_list.sort(key=lambda tmp: (tmp.get('time', 0))) + i = 0 + total_size = 0 + while i < len(mem_list): + total_size += mem_list[i]['size'] + while i < len(mem_list) - 1 and mem_list[i]['time'] == mem_list[ + i + 1]['time']: + total_size += mem_list[i + 1]['size'] + i += 1 + + chrome_trace.emit_counter( + "Memory", "Memory", mem_list[i]['pid'], + self._align_ts(mem_list[i]['time']), 0, total_size) + i += 1 + return chrome_trace + + def _getOPTraceInfoByGpuId(self, groupId, gpuId): + fileFist = self.getFileListByGroup(groupId) + newFileList = [] + for file in fileFist: + rankId = self.getRankId(file) + localRank = rankId % self._gpuPerTrainer + if localRank == gpuId and (rankId / self._gpuPerTrainer + ) % self._groupSize < self._displaySize: + newFileList.append(file) + + profile_dict = self._parseTask(newFileList) + initPid = PIPELINEINFO_TRACE_NUM + DCGMINFO_TRACE_NUM + NETINFO_TRACE_NUM + metaTrace, devicesPid, mem_devicesPid = self._allocate_pids( + profile_dict, gpuId, initPid) + eventsTrace = self._allocate_events(profile_dict, devicesPid, gpuId) + memEventsTrace = self._allocate_memory_event(profile_dict, + mem_devicesPid, gpuId) + + trace = {} + trace[ + 'traceEvents'] = metaTrace._metadata + eventsTrace._events + memEventsTrace._events + self.dumpOpInfoDict(trace, groupId, gpuId, True) + + return trace + + def getOPTraceInfo(self, groupId): + manager = multiprocessing.Manager() + q = manager.Queue() + processPool = [] + pidList = [] + + for gpuId in range(self._gpuPerTrainer): + subproc = Process( + target=self._getOPTraceInfoByGpuId, args=( + groupId, + gpuId, )) + processPool.append(subproc) + subproc.start() + pidList.append(subproc.pid) + self._logger.info( + "[op info]: process [%d] has been started, total task num is %d ..." + % (subproc.pid, 1)) + + for t in processPool: + t.join() + pidList.remove(t.pid) + self._logger.info( + "[op info]: process [%d] has exited! remained %d process!" % + (t.pid, len(pidList))) + + opInfo = {} + + return opInfo + + def parseFileByGroup(self, groupId, processNum=8): + fileFist = self.getFileListByGroup(groupId) + if processNum == 0: + return self._parseTask(fileFist) + else: + return self._parseTask(fileFist) + + +def test_profileFileReader(): + args = { + "dataPath": "data/newdata/profile", + "groupSize": 4, + "displaySize": 8, + "gpuPerTrainer": 8, + "minTimeStamp": 0, + "organizeForm": FILEORGANIZEFORM_BYRANK, + } + + testReader = profileFileReader(getLogger(), args) + testReader.printArgs() + data = testReader.getOPTraceInfo(0) + + jsObj = json.dumps(data) + fileObject = open('jsonFile.json', 'w') + fileObject.write(jsObj) + fileObject.close() + + +if __name__ == "__main__": + test_profileFileReader() diff --git a/tools/CrossStackProfiler/__init__.py b/tools/CrossStackProfiler/__init__.py new file mode 100755 index 0000000000000000000000000000000000000000..6f0ea85344b7e0c679730356928c8749cf71cd66 --- /dev/null +++ b/tools/CrossStackProfiler/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License.