未验证 提交 273f3859 编写于 作者: L LiuWei 提交者: GitHub

add cross stack profiler to profile super ernie (#33112)

上级 4d805e6a
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import json
import six
import sys
import re
import os
import glob
import unittest
import pandas
import tempfile
import platform
import pandas as pd
class ChromeTraceFormatter(object):
def __init__(self):
self._events = []
self._metadata = []
def _create_event(self, ph, category, name, pid, tid, timestamp):
"""Creates a new Chrome Trace event.
For details of the file format, see:
https://github.com/catapult-project/catapult/blob/master/tracing/README.md
Args:
ph: The type of event - usually a single character.
category: The event category as a string.
name: The event name as a string.
pid: Identifier of the process generating this event as an integer.
tid: Identifier of the thread generating this event as an integer.
timestamp: The timestamp of this event as a long integer.
Returns:
A JSON compatible event object.
"""
event = {}
event['ph'] = ph
event['cat'] = category
event['name'] = name
event['pid'] = pid
event['tid'] = tid
event['ts'] = timestamp
return event
def emit_pid(self, name, pid):
"""Adds a process metadata event to the trace.
Args:
name: The process name as a string.
pid: Identifier of the process as an integer.
"""
event = {}
event['name'] = 'process_name'
event['ph'] = 'M'
event['pid'] = pid
event['args'] = {'name': name}
self._metadata.append(event)
def emit_region(self, timestamp, duration, pid, tid, category, name, args):
"""Adds a region event to the trace.
Args:
timestamp: The start timestamp of this region as a long integer.
duration: The duration of this region as a long integer.
pid: Identifier of the process generating this event as an integer.
tid: Identifier of the thread generating this event as an integer.
category: The event category as a string.
name: The event name as a string.
args: A JSON-compatible dictionary of event arguments.
"""
event = self._create_event('X', category, name, pid, tid, timestamp)
event['dur'] = duration
event['args'] = args
self._events.append(event)
def emit_counter(self, category, name, pid, timestamp, counter, value):
"""Emits a record for a single counter.
Args:
category: The event category as string
name: The event name as string
pid: Identifier of the process generating this event as integer
timestamp: The timestamps of this event as long integer
counter: Name of the counter as string
value: Value of the counter as integer
tid: Thread id of the allocation as integer
"""
event = self._create_event('C', category, name, pid, 0, timestamp)
event['args'] = {counter: value}
self._events.append(event)
def format_to_string(self, pretty=False):
"""Formats the chrome trace to a string.
Args:
pretty: (Optional.) If True, produce human-readable JSON output.
Returns:
A JSON-formatted string in Chrome Trace format.
"""
trace = {}
trace['traceEvents'] = self._metadata + self._events
if pretty:
return json.dumps(trace, indent=4, separators=(',', ': '))
else:
return json.dumps(trace, separators=(',', ':'))
def clear(self):
self._events = []
self._metadata = []
if __name__ == "__main__":
pass
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
import json
import glob
import logging
import pandas as pd
from multiprocessing import Process, Lock
""" Some terms to clarify the code
in most case, one or more paremeters may be set as input args for a class or a function
in form of single variable or k-v dict
1. trainerId
2. gpuId
3. rankId
4. gpuPerTrainer
5. groupSize
6. groupId
7. groupNum
8. displaySize
9. dataPath
10. resultPath
11. fileOrganizeForm -- "byRank" OR "byTrainer" or "other"
"""
PIPELINEINFO_TRACE_NUM = 1
dcgmMetricParameterMap = {
"02_gpuUtility": [("GPUTL", "GPUTL"), ("GRACT", "GRACT")],
"03_smUtility": [("SMACT", "SMACT"), ("SMOCC", "SMOCC")],
"04_memUtility": [("FB_USED_RATIO", "FB_USED_RATIO"), ("DRAMA", "DRAMA")],
"05_txUtility": [("NVLTX", "NVLTX"), ("NVLRX", "NVLRX"), ("PCITX", "PCITX"),
("PCIRX", "PCIRX")],
"06_calUtility":
[("FP32A", "FP32A"), ("FP16A", "FP16A"), ("TENSO", "TENSO")]
}
DCGMINFO_TRACE_NUM = len(dcgmMetricParameterMap.keys())
NETINFO_TRACE_NUM = 2
DCGM_PATH = "dcgm"
NET_PATH = "net"
TIME_PATH = "time"
PROFILE_PATH = "profile"
FILEORGANIZEFORM_BYRANK = "byRank"
FILEORGANIZEFORM_BYTRAINER = "byTrainer"
FILEORGANIZEFORM_BYOTHER = "other"
FILEORGANIZEFORM = [
FILEORGANIZEFORM_BYRANK, FILEORGANIZEFORM_BYTRAINER,
FILEORGANIZEFORM_BYOTHER
]
class FileReader(object):
def __init__(self, logger, args):
self._logger = logger
self._args = args
self._fileList = []
self._fileNum = 0
self._dataPath = ""
self._groupSize = 0
self._displaySize = 0
self._organizeForm = FILEORGANIZEFORM_BYOTHER
self._gpuPerTrainer = 0
self._checkArgs()
self._getFileList()
self._lock = Lock()
def printArgs(self):
self._logger.info("dataPath:")
self._logger.info(self._dataPath)
self._logger.info("groupSize:")
self._logger.info(self._groupSize)
self._logger.info("displaySize:")
self._logger.info(self._displaySize)
self._logger.info("organizeForm:")
self._logger.info(self._organizeForm)
self._logger.info("gpuPerTrainer:")
self._logger.info(self._gpuPerTrainer)
self._logger.info("minTimeStamp:")
self._logger.info(self._minTimeStamp)
def _checkArgsKey(self, key, type):
if not self._args.has_key(key):
raise KeyError("args should has key [%s]!" % key)
if not isinstance(self._args[key], type):
raise TypeError(
"Invalid type of key [%s] in args dict, it should be a %s!" %
(key, type))
exec("self._%s = self._args[\"%s\"]" % (key, key))
def _align_ts(self, ts):
return ts - self._minTimeStamp
def _checkArgs(self):
if not isinstance(self._args, dict):
raise TypeError("Invalid type of args, it should be a dict!")
self._checkArgsKey("organizeForm", str)
if self._organizeForm not in FILEORGANIZEFORM or \
self._organizeForm == FILEORGANIZEFORM_BYOTHER:
raise NotImplementedError(
"we have not known how to process this form of file [%s]!" %
self._organizeForm)
self._checkArgsKey("gpuPerTrainer", int)
self._checkArgsKey("dataPath", str)
if not os.path.exists(self._dataPath):
raise IOError("input data path [%s] not existed!" %
(self._dataPath))
self._checkArgsKey("groupSize", int)
self._checkArgsKey("displaySize", int)
self._checkArgsKey("minTimeStamp", int)
def getFileListByGroup(self, groupId):
lIndext = 0
rIndext = 0
if self._organizeForm == FILEORGANIZEFORM_BYTRAINER:
lIndext = groupId * self._groupSize
rIndext = (groupId + 1) * self._groupSize
elif self._organizeForm == FILEORGANIZEFORM_BYRANK:
lIndext = groupId * self._groupSize * self._gpuPerTrainer
rIndext = (groupId + 1) * self._groupSize * self._gpuPerTrainer
try:
return self._fileList[lIndext:rIndext]
except IndexError:
raise IndexError("invalid index of file list")
def getFileList(self):
return self._getFileList
def _cmp(self, x, y):
return self._getId(x, self._organizeForm) - self._getId(
y, self._organizeForm)
def _getFileList(self):
self._fileList = glob.glob(os.path.join(self._dataPath, "*.*"))
# check unique
idList = []
newFileList = []
for file in self._fileList:
id = self._getId(file, self._organizeForm)
if id not in idList:
idList.append(id)
newFileList.append(file)
else:
raise NotImplementedError(
"[%s] is repeated by id, we don not how to process it!" %
file)
if not self._fileList:
if (self._getId(self._fileList[-1]) - self._getId(self._fileList[0])
) != len(self._fileList) - 1:
raise Exception("The file id should be countious!")
# sort
def _sortBySuffix(elem):
return int(elem.split(".")[-1])
self._fileList.sort(key=_sortBySuffix)
if not self._fileList:
self._logger.warning("we can not find any file in dir [%s]!" %
self._dataPath)
else:
self._logger.info("file list in dir [%s] is : %s !" %
(self._dataPath, ', '.join(self._fileList)))
return self._fileList
def _getId(self, fileName, organizeForm, sed="."):
if self._organizeForm != organizeForm:
raise TypeError("Can not get rank id when organizer form is not %s!"
% organizeForm)
if not os.path.isfile(fileName):
raise IOError("[%s] is not a valid file!" % (fileName))
try:
prefix_str = fileName.split(sed)[-1]
try:
return int(prefix_str)
except ValueError, Argument:
print(Argument)
raise TypeError("invalid fileName [%s]" % fileName)
except IndexError, Argument:
print(Argument)
raise TypeError(
"invalid fileName [%s], the prefix should be a number!" %
fileName)
def getRankId(self, fileName, sed="."):
return self._getId(fileName, FILEORGANIZEFORM_BYRANK, sed)
def getRankNum(self):
if self._organizeForm == FILEORGANIZEFORM_BYRANK:
return len(self._fileList)
elif self._organizeForm == FILEORGANIZEFORM_BYTRAINER:
return len(self._fileList) * self._gpuPerTrainer
def getTrainerNum(self):
if self._organizeForm == FILEORGANIZEFORM_BYRANK:
return len(self._fileList) / self._gpuPerTrainer
elif self._organizeForm == FILEORGANIZEFORM_BYTRAINER:
return len(self._fileList)
def getTrainerId(self, fileName, sed="."):
return self._getId(fileName, FILEORGANIZEFORM_BYTRAINER, sed)
def _splitTaskListForMultiProcess(self, ls, n):
if not isinstance(ls, list) or not isinstance(n, int):
return []
ls_len = len(ls)
if n <= 0 or 0 == ls_len:
return []
if n >= ls_len:
return [[i] for i in ls]
else:
j = int((ls_len + n - 1) / n)
k = ls_len % n
ls_return = []
end = 0
for i in range(0, (n) * j, j):
if i < len(ls) and (i + j) < len(ls):
ls_return.append(ls[i:i + j])
end = i + j
ls_return.append(ls[end:])
return ls_return
def getOpInfoFileName(self, groupId, gpuId, tmpPath="./tmp"):
return self.getFileName("opinfo", groupId, gpuId, tmpPath)
def getPipeLineInfoFileName(self, groupId, gpuId, tmpPath="./tmp"):
return self.getFileName("pipilineinfo", groupId, gpuId, tmpPath)
def getDCGMInfoFileName(self, groupId, gpuId, tmpPath="./tmp"):
return self.getFileName("dcgm", groupId, gpuId, tmpPath)
def getFileName(self, name, groupId, gpuId, tmpPath="./tmp"):
return os.path.join(tmpPath, "%s_%d_%d.json" % (name, groupId, gpuId))
def getOpInfoDict(self, groupId, gpuId, tmpPath="./tmp"):
return self.getDict("opinfo", groupId, gpuId, tmpPath)
def getDcgmInfoDict(self, groupId, gpuId, tmpPath="./tmp"):
return self.getDict("dcgm", groupId, gpuId, tmpPath)
def getDict(self, name, groupId, gpuId, tmpPath="./tmp"):
fileName = self.getFileName(name, groupId, gpuId, tmpPath)
if not os.path.isfile(fileName):
raise IOError("[%s] is not existed!" % fileName)
data = {}
with open(fileName, "r") as rf:
try:
data = json.load(rf)
except Exception:
self._logger.error("read [%s] error. not a json file!" %
(fileName))
raise TypeError("read [%s] error. not a json file!" %
(fileName))
return data
def dumpOpInfoDict(self,
data,
groupId,
gpuId,
pretty=False,
tmpPath="./tmp"):
return self.dumpDict(
data, "opinfo", groupId, gpuId, pretty=False, tmpPath="./tmp")
def dumpDCGMDict(self, data, groupId, gpuId, pretty=False, tmpPath="./tmp"):
return self.dumpDict(
data, "dcgm", groupId, gpuId, pretty=False, tmpPath="./tmp")
def dumpDict(self,
data,
name,
groupId,
gpuId,
pretty=False,
tmpPath="./tmp"):
self._lock.acquire()
if not os.path.exists(tmpPath):
os.makedirs(tmpPath)
self._lock.release()
if pretty:
jsObj = json.dumps(data, indent=4, separators=(',', ': '))
else:
jsObj = json.dumps(data, separators=(',', ':'))
fileName = self.getFileName(name, groupId, gpuId, tmpPath)
if os.path.isfile(fileName):
os.remove(fileName)
fileObject = open(fileName, 'w')
fileObject.write(jsObj)
fileObject.close()
self._logger.info("dump [%s] sucessfully!" % fileName)
def getLogger():
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
rq = time.strftime('%Y%m%d%H%M.%s', time.localtime(time.time()))
log_path = os.path.dirname(os.getcwd()) + '/Logs/'
if not os.path.exists(log_path):
os.makedirs(log_path)
log_name = log_path + rq + '.log'
logfile = log_name
fh = logging.FileHandler(logfile, mode='w')
fh.setLevel(logging.DEBUG)
formatter = logging.Formatter(
"%(asctime)s - %(filename)s[line:%(lineno)d] - %(process)d - %(levelname)s: %(message)s"
)
fh.setFormatter(formatter)
logger.addHandler(fh)
return logger
def test_FileReader(args):
try:
testReader = FileReader(None, args)
except Exception, Argument:
print(Argument)
else:
testReader.printArgs()
if __name__ == "__main__":
args = 0
test_FileReader(args)
args = {
"dataPath": ".",
"groupSize": 1,
"displaySize": 1,
"gpuPerTrainer": 8,
"organizeForm": FILEORGANIZEFORM_BYOTHER,
}
test_FileReader(args)
args = {
"dataPath": ".",
"groupSize": 1,
"displaySize": 1,
"gpuPerTrainer": 8,
"organizeForm": FILEORGANIZEFORM_BYTRAINER,
}
test_FileReader(args)
args = {
"dataPath": "./res",
"groupSize": 1,
"displaySize": 1,
"gpuPerTrainer": 8,
"organizeForm": FILEORGANIZEFORM_BYTRAINER,
}
test_FileReader(args)
args = {
"dataPath": ".",
"groupSize": "",
"displaySize": 1,
"gpuPerTrainer": 8,
"organizeForm": FILEORGANIZEFORM_BYTRAINER,
}
test_FileReader(args)
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import glob
import logging
import argparse
import multiprocessing
import pandas as pd
from multiprocessing import Process
from NetFileReader import netFileReader
from DCGMFileReader import dcgmFileReader
from ProfileFileReader import profileFileReader
from CspFileReader import getLogger
from CspFileReader import TIME_PATH, DCGM_PATH, NET_PATH, PROFILE_PATH
from CspFileReader import NETINFO_TRACE_NUM, DCGMINFO_TRACE_NUM, PIPELINEINFO_TRACE_NUM
from CspFileReader import FILEORGANIZEFORM_BYRANK, FILEORGANIZEFORM_BYTRAINER, FILEORGANIZEFORM_BYOTHER, FILEORGANIZEFORM
def get_argparse():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
'--profile_path',
type=str,
default='.',
help='Working path that store the monitor data.')
parser.add_argument(
'--timeline_path',
type=str,
default='.',
help='Output timeline file name.')
parser.add_argument(
'--gpuPerTrainer', type=int, default=8, help='Gpus per trainer.')
parser.add_argument(
'--trainerNum', type=int, default=4, help='Num of trainer.')
parser.add_argument(
'--groupSize', type=int, default=8, help='Num of trainer in a group.')
parser.add_argument(
'--displaySize',
type=int,
default=2,
help='Num of line need to display in a group.')
return parser.parse_args()
class CspReporter(object):
def __init__(self, args):
self._args = args
print(self._args)
self._workPath = self._args.profile_path
self._saveFilePath = self._args.timeline_path
self._gpuPerTrainer = self._args.gpuPerTrainer
self._groupSize = self._args.groupSize
self._displaySize = self._args.displaySize
self._trainerNum = self._args.trainerNum
self._checkArgs()
self._init_logger()
self._init_timeInfo()
self._init_reader()
def _checkArgs(self):
if self._trainerNum % self._groupSize != 0:
raise Exception(
"Input args error: trainerNum[%d] %% groupSize[%d] != 0" %
(self._trainerNum, self._groupSize))
def _init_logger(self):
self._logger = getLogger()
def _init_reader(self):
self._dcgmPath = os.path.join(self._workPath, DCGM_PATH)
self._netPath = os.path.join(self._workPath, NET_PATH)
self._profilePath = os.path.join(self._workPath, PROFILE_PATH)
self._netFileReaderArgs = {
"dataPath": self._netPath,
"groupSize": self._groupSize,
"displaySize": self._displaySize,
"gpuPerTrainer": self._gpuPerTrainer,
"minTimeStamp": self._minTimeStamp,
"organizeForm": FILEORGANIZEFORM_BYTRAINER,
}
self._dcgmFileReaderArgs = {
"dataPath": self._dcgmPath,
"groupSize": self._groupSize,
"displaySize": self._displaySize,
"gpuPerTrainer": self._gpuPerTrainer,
"minTimeStamp": self._minTimeStamp,
"organizeForm": FILEORGANIZEFORM_BYTRAINER,
}
self._profileFileReaderArgs = {
"dataPath": self._profilePath,
"groupSize": self._groupSize,
"displaySize": self._displaySize,
"gpuPerTrainer": self._gpuPerTrainer,
"minTimeStamp": self._minTimeStamp,
"organizeForm": FILEORGANIZEFORM_BYRANK,
}
self._dcgmFileReader = dcgmFileReader(self._logger,
self._dcgmFileReaderArgs)
self._profileFileReader = profileFileReader(self._logger,
self._profileFileReaderArgs)
def _init_timeInfo(self):
self._timePath = os.path.join(self._workPath, TIME_PATH)
self._timeInfo = {}
self._minTimeStamp = 0
self._set_timeInfo()
def _set_timeInfo(self, timeFileNamePrefix="time.txt", sed="."):
timeFileNameList = glob.glob(
os.path.join(self._timePath, timeFileNamePrefix, sed, "*"))
for timeFileName in timeFileNameList:
trainerId = int(timeFileName.split(sed)[-1])
gpuId = int(timeFileName.split(sed)[-2])
info = {}
with open(timeFileName, "r") as rf:
for line in rf:
if line.startswith("start time:"):
info["start_time"] = int(
float(line.split(":")[-1]) * 1e9)
self._minTimeStamp = min(self._minTimeStamp,
info["start_time"])
if line.startswith("end time:"):
info["end_time"] = int(float(line.split(":")[-1]) * 1e9)
if not info:
self._timeInfo[gpuId * trainerId] = info
def _generateTraceFileByGroupAndGpuId(self, pipileInfo, netInfo, groupId,
gpuId):
dcgmInfoDict = self._dcgmFileReader.getDcgmInfoDict(groupId, gpuId)
opInfoDict = self._profileFileReader.getOpInfoDict(groupId, gpuId)
traceObj = {}
traceObj["traceEvents"] = pipileInfo[str(gpuId)] + opInfoDict[
"traceEvents"] + dcgmInfoDict["traceEvents"] + netInfo[
"traceEvents"]
self._profileFileReader.dumpDict(traceObj, "traceFile", groupId, gpuId,
False, self._saveFilePath)
def _generateTraceFileByGroup(self, groupId, processNum):
# first we need to generate pipeline info
pipileInfo = self._profileFileReader.getPipeLineInfo(groupId,
processNum)
# second we need to generate dcgm info
dcgmInfo = self._dcgmFileReader.getDCGMTraceInfo(groupId, processNum)
# third we need to generate net info
netInfo = {}
netInfo["traceEvents"] = []
# netInfo = self._netFileReader.parseFileByGroup(groupId, processNum)
# forth we need to generate op info
opInfo = self._profileFileReader.getOPTraceInfo(groupId)
# finially we need dump this information into disk
processPool = []
pidList = []
for gpuId in range(self._gpuPerTrainer):
subproc = Process(
target=self._generateTraceFileByGroupAndGpuId,
args=(
pipileInfo,
netInfo,
groupId,
gpuId, ))
processPool.append(subproc)
subproc.start()
pidList.append(subproc.pid)
self._logger.info(
"[traceFile]: process [%d] has been started, total task num is %d ..."
% (subproc.pid, 1))
for t in processPool:
t.join()
pidList.remove(t.pid)
self._logger.info(
"[traceFile]: process [%d] has exited! remained %d process!" %
(t.pid, len(pidList)))
def generateTraceFile(self, processNum=8):
processPool = []
pidList = []
for groupId in range(self._trainerNum / self._groupSize):
subproc = Process(
target=self._generateTraceFileByGroup,
args=(
groupId,
processNum, ))
processPool.append(subproc)
subproc.start()
pidList.append(subproc.pid)
self._logger.info(
"[GroupTraceFile]: process [%d] has been started, total task num is %d ..."
% (subproc.pid, 1))
for t in processPool:
t.join()
pidList.remove(t.pid)
self._logger.info(
"[GroupTraceFile]: process [%d] has exited! remained %d process!"
% (t.pid, len(pidList)))
if __name__ == '__main__':
args = get_argparse()
tl = CspReporter(args)
tl.generateTraceFile()
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import re
import json
import glob
import logging
import tempfile
import argparse
import pandas as pd
import multiprocessing
from multiprocessing import Process
from CspChromeTraceFormatter import ChromeTraceFormatter
from CspFileReader import FileReader
from CspFileReader import getLogger
from CspFileReader import dcgmMetricParameterMap
from CspFileReader import TIME_PATH, DCGM_PATH, NET_PATH, PROFILE_PATH
from CspFileReader import NETINFO_TRACE_NUM, DCGMINFO_TRACE_NUM, PIPELINEINFO_TRACE_NUM
from CspFileReader import FILEORGANIZEFORM_BYRANK, FILEORGANIZEFORM_BYTRAINER, FILEORGANIZEFORM_BYOTHER, FILEORGANIZEFORM
class dcgmFileReader(FileReader):
def parseFileByGroup(self, groupId, processNum=8):
fileFist = self.getFileListByGroup(groupId)
displaySize = min(self._displaySize, len(fileFist))
fileFist = fileFist[:displaySize]
if processNum == 0:
return self._parseTask(fileFist)
else:
self._logger.info("using [%d] process to do this work!" %
processNum)
processPool = []
pidList = []
manager = multiprocessing.Manager()
q = manager.Queue()
taskList = self._splitTaskListForMultiProcess(fileFist, processNum)
for task in taskList:
subproc = Process(
target=self._parseTask, args=(
task,
q, ))
processPool.append(subproc)
subproc.start()
pidList.append(subproc.pid)
self._logger.info(
"[DCGM reader]: process [%d] has been started, total task num is %d ..."
% (subproc.pid, len(processPool)))
for t in processPool:
t.join()
pidList.remove(t.pid)
self._logger.info(
"[DCGM reader]: process [%d] has exited! remained %d process!"
% (t.pid, len(pidList)))
isFistProcess = True
for t in processPool:
if isFistProcess:
isFistProcess = False
dcgm_data = q.get()
else:
dcgm_data = pd.concat(
[dcgm_data, q.get()], axis=0, join='outer')
return dcgm_data
def _parseTask(self, taskList, q=None):
is_first = True
for fileName in taskList:
self._logger.info("I am processing %s!" % fileName)
tmp_data = self._parseSingleFile(fileName)
if tmp_data is None:
continue
if is_first:
is_first = False
dcgm_data = tmp_data
else:
dcgm_data = pd.concat(
[dcgm_data, tmp_data], axis=0, join='outer')
dcgm_data = dcgm_data.dropna()
if not q is None:
q.put(dcgm_data)
self._logger.info("I finish processing %s!" % fileName)
return dcgm_data
def _parseSingleFile(self, fileName):
trainerId = self.getTrainerId(fileName)
if not os.path.exists(fileName):
logging.warning(fileName + ' not found')
return
regex_list = [
(re.compile(r' +'), ','),
(re.compile(r'^,'), ''),
]
csv_tempfile = tempfile.TemporaryFile()
with open(fileName, 'r') as fp:
has_header = False
for line in fp:
# skip `nvidia-dcgm-dmon.sh` init and fini info lines
if 'nv-hostengine' in line or 'dmon' in line or 'Host Engine Listener Started' in line:
continue
if not line.strip().startswith("GPU") and not line.strip(
).startswith("# Entity"):
continue
# skip non-needed headers (only the header in 1th line was needed)
if line.strip().startswith("# Entity"):
line = line.strip()[2:]
if 'Entity' == line[0:len('Entity')]:
if has_header:
continue
else:
has_header = True
if line.strip().startswith("GPU"):
line = line.strip()[3:]
for r in regex_list:
line = r[0].sub(r[1], line)
csv_tempfile.write(bytes(line + "\n"))
csv_tempfile.seek(0)
dcgm = pd.read_csv(csv_tempfile, header=0, delimiter=',')
# dcgm.info()
dcgm['FB_USED_RATIO'] = dcgm['FBUSD'] / dcgm['FBTTL']
dcgm['GPUTL'] = dcgm['GPUTL'] / 100.0
dcgm['ts'] = dcgm['TIMESTAMP'] * 1e9
dcgm['trainerId'] = trainerId
return dcgm
def _getDCGMTraceInfoByGpuId(self,
groupId,
gpuId,
dcgm_data,
pid_map,
q=None):
self._logger.info(
"Begin to generate dcgm info, groupId = %d, gpuID = %d ..." %
(groupId, gpuId))
gpuDcgmData = dcgm_data[dcgm_data['Entity'].isin([gpuId])]
traceEventList = []
for metric, parameteList in dcgmMetricParameterMap.items():
metaInfo = {}
metaInfo['name'] = 'process_name'
metaInfo['ph'] = 'M'
metaInfo['pid'] = pid_map[metric]
metaInfo['args'] = {'name': metric}
traceEventList.append(metaInfo)
for index, row in gpuDcgmData.iterrows():
for metric, parameteList in dcgmMetricParameterMap.items():
trainerId = int(row['trainerId']) % self._groupSize
if trainerId >= self._displaySize:
continue
di = {}
# name = "%s_%d" % (metric, trainerId)
name = "%s" % (metric)
di['name'] = name
di['pid'] = pid_map[metric]
di['ts'] = self._align_ts(int(row['ts']))
# di['ts'] = int(row['ts'])
di['cat'] = metric
di['tid'] = "%d_%d" % (groupId, trainerId)
di['ph'] = "C"
di['id'] = trainerId
args = {}
for p in parameteList:
args[p[0]] = row[p[1]]
di['args'] = args
traceEventList.append(di)
trace = {}
trace['traceEvents'] = traceEventList
self.dumpDCGMDict(trace, groupId, gpuId, True)
return trace
def getDCGMTraceInfo(self, groupId, processNum=8):
dcgm_data = self.parseFileByGroup(groupId, processNum)
pid_map = {}
init_pid = PIPELINEINFO_TRACE_NUM
for metric in dcgmMetricParameterMap.keys():
pid_map[metric] = init_pid
init_pid = init_pid + 1
manager = multiprocessing.Manager()
q = manager.Queue()
processPool = []
pidList = []
for gpuId in range(self._gpuPerTrainer):
subproc = Process(
target=self._getDCGMTraceInfoByGpuId,
args=(
groupId,
gpuId,
dcgm_data,
pid_map,
q, ))
processPool.append(subproc)
subproc.start()
pidList.append(subproc.pid)
self._logger.info(
"[DCGM info]: process [%d] has been started, total task num is %d ..."
% (subproc.pid, 1))
for t in processPool:
t.join()
pidList.remove(t.pid)
self._logger.info(
"[DCGM info]: process [%d] has exited! remained %d process!" %
(t.pid, len(pidList)))
dcgmInfo = {}
return dcgmInfo
def test_dcgmFileReader():
args = {
"dataPath": "data/newdata/dcgm",
"groupSize": 4,
"displaySize": 8,
"gpuPerTrainer": 8,
"minTimeStamp": 0,
"organizeForm": FILEORGANIZEFORM_BYTRAINER,
}
testReader = dcgmFileReader(getLogger(), args)
testReader.printArgs()
data = testReader.getDCGMTraceInfo(0, 8)
if __name__ == "__main__":
test_dcgmFileReader()
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import json
import glob
import logging
import pandas as pd
from multiprocessing import Process
from CspChromeTraceFormatter import ChromeTraceFormatter
from CspFileReader import FileReader
from CspFileReader import getLogger
from CspFileReader import TIME_PATH, DCGM_PATH, NET_PATH, PROFILE_PATH
from CspFileReader import NETINFO_TRACE_NUM, DCGMINFO_TRACE_NUM, PIPELINEINFO_TRACE_NUM
from CspFileReader import FILEORGANIZEFORM_BYRANK, FILEORGANIZEFORM_BYTRAINER, FILEORGANIZEFORM_BYOTHER, FILEORGANIZEFORM
class netFileReader(FileReader):
def _parseSingleFile(self, fileNameList, tx_pid, rx_pid, q=None):
traceInfo = {}
traceEventList = []
metaInfo = {}
metaInfo['name'] = 'process_name'
metaInfo['ph'] = 'M'
metaInfo['pid'] = tx_pid
metaInfo['args'] = {'name': "%02d_tx" % tx_pid}
traceEventList.append(metaInfo)
metaInfo = {}
metaInfo['name'] = 'process_name'
metaInfo['ph'] = 'M'
metaInfo['pid'] = rx_pid
metaInfo['args'] = {'name': "%02d_rx" % rx_pid}
traceEventList.append(metaInfo)
trainerIdList = []
for fileName in fileNameList:
trainerId = self.getTrainerId(fileName)
trainerIdList.append(trainerId)
with open(fileName, "r") as rf:
for line in rf:
try:
event_str = json.loads(line.strip())
event_str["pid"] = tx_pid if event_str[
"name"] == "tx" else rx_pid
# the unit of net is ms, we need ns
event_str["ts"] = self._align_ts(event_str["ts"] * 1e6)
event_str["id"] = trainerId
traceEventList.append(event_str)
except Exception:
self._logger.warning(
"invalid record [%s] in [%s]. skip it!" %
(line[:-1], fileName))
traceInfo["traceEvents"] = traceEventList
if not q is None:
q.put(traceInfo)
else:
return traceInfo
def parseFileByGroup(self, groupId, processNum=8):
fileFist = self.getFileListByGroup(groupId)
fileFist = fileFist[:min(self._displaySize, len(fileFist))]
manager = multiprocessing.Manager()
q = manager.Queue()
processPool = []
pidList = []
tx_pid = PIPELINEINFO_TRACE_NUM
rx_pid = PIPELINEINFO_TRACE_NUM + 1
taskList = self._splitTaskListForMultiProcess(fileFist, processNum)
for task in taskList:
subproc = Process(
target=self._parseSingleFile, args=(
task,
tx_pid,
rx_pid,
q, ))
processPool.append(subproc)
subproc.start()
pidList.append(subproc.pid)
self._logger.info(
"[Net info]: process [%d] has been started, total task num is %d ..."
% (subproc.pid, len(processPool)))
for t in processPool:
t.join()
pidList.remove(t.pid)
self._logger.info(
"[Net info]: process [%d] has exited! remained %d process!" %
(t.pid, len(pidList)))
traceInfo = {}
isFistProcess = True
for t in processPool:
if isFistProcess:
isFistProcess = False
traceInfo["traceEvents"] = q.get()["traceEvents"]
else:
traceInfo["traceEvents"].extend(q.get()["traceEvents"])
return traceInfo
def test_netFileReader():
args = {
"dataPath": "data/newdata/net",
"groupSize": 4,
"displaySize": 2,
"gpuPerTrainer": 8,
"minTimeStamp": 0,
"organizeForm": FILEORGANIZEFORM_BYTRAINER,
}
testReader = netFileReader(getLogger(), args)
testReader.printArgs()
data = testReader.parseFileByGroup(0, 8)
jsObj = json.dumps(data, indent=4, separators=(',', ': '))
fileObject = open('jsonFile.json', 'w')
fileObject.write(jsObj)
fileObject.close()
if __name__ == "__main__":
test_netFileReader()
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import six
import glob
import json
import logging
import argparse
import pandas as pd
import multiprocessing
from multiprocessing import Process
import google.protobuf.text_format as text_format
import paddle.fluid.proto.profiler.profiler_pb2 as profiler_pb2
from CspChromeTraceFormatter import ChromeTraceFormatter
from CspFileReader import FileReader
from CspFileReader import getLogger
from CspFileReader import TIME_PATH, DCGM_PATH, NET_PATH, PROFILE_PATH
from CspFileReader import NETINFO_TRACE_NUM, DCGMINFO_TRACE_NUM, PIPELINEINFO_TRACE_NUM
from CspFileReader import FILEORGANIZEFORM_BYRANK, FILEORGANIZEFORM_BYTRAINER, FILEORGANIZEFORM_BYOTHER, FILEORGANIZEFORM
class profileFileReader(FileReader):
def _parseSingleFile(self, profile):
with open(profile, 'rb') as f:
profile_s = f.read()
profile_pb = profiler_pb2.Profile()
profile_pb.ParseFromString(profile_s)
return profile_pb
def _parseTask(self, taskList, q=None):
profile_dict = {}
for fileName in taskList:
rankId = self.getRankId(fileName)
profile_dict["trainerRank.%03d" %
(rankId)] = self._parseSingleFile(fileName)
self._logger.info("I finish processing %s!" % fileName)
if not q is None:
q.put(profile_dict)
return profile_dict
def _is_forwardBackwardInfo(self, items):
if items["name"] == "marker/compute/MarkerCUDA":
if items.has_key("args"):
if isinstance(items["args"], dict):
args = items["args"]
if args.has_key("detail_info"):
if args["detail_info"] == "marker_forward_B" or \
args["detail_info"] == "marker_forward_E" or \
args["detail_info"] == "marker_backward_B" or \
args["detail_info"] == "marker_backward_E":
return True
return False
def _allocate_forwardBackwardInfo(self, restList, pid, tid):
def _cmp_ele(items):
return items["ts"]
restList.sort(key=_cmp_ele)
newList = []
lastEle = {}
for items in restList:
if items["args"]["detail_info"].endswith("E"):
if not lastEle:
continue
else:
lastEle["dur"] = items["ts"] - lastEle["ts"]
name = lastEle["args"]["detail_info"]
name = name[:name.rfind('_')]
name = name.split('_')[1]
lastEle["name"] = name
lastEle["args"]["detail_info"] = name
lastEle["args"]["name"] = name
if name == "backward":
lastEle["cname"] = "good"
else:
lastEle["cname"] = "bad"
lastEle["tid"] = tid
lastEle["pid"] = pid
newList.append(lastEle)
else:
lastEle = items
return newList
def _getPipeLineInfo(self, profileList, q=None):
res = {}
for profile in profileList:
rankId = self.getRankId(profile)
profile_pb = self._parseSingleFile(profile)
traceEventList = []
pid = 0
tid = rankId
for event in profile_pb.events:
args = {'name': event.name}
if event.memcopy.bytes > 0:
args['mem_bytes'] = event.memcopy.bytes
if hasattr(event, "detail_info") and event.detail_info:
args['detail_info'] = event.detail_info
traceEvent = {}
traceEvent['ph'] = 'X'
traceEvent['cat'] = 'Op'
traceEvent['name'] = event.name
traceEvent['pid'] = pid
traceEvent['tid'] = tid
traceEvent['ts'] = self._align_ts(event.start_ns)
traceEvent['dur'] = (event.end_ns - event.start_ns) / 1.0
traceEvent['args'] = args
if self._is_forwardBackwardInfo(traceEvent):
traceEventList.append(traceEvent)
pipeLineList = self._allocate_forwardBackwardInfo(traceEventList,
pid, tid)
res[str(rankId)] = pipeLineList
if not q is None:
q.put(res)
return res
def getPipeLineInfo(self, groupId, processNum=8):
fileFist = self.getFileListByGroup(groupId)
self._logger.info(
"using [%d] process to do this work, total task num is %d!" %
(processNum, len(fileFist)))
processPool = []
pidList = []
manager = multiprocessing.Manager()
q = manager.Queue()
taskList = self._splitTaskListForMultiProcess(fileFist, processNum)
for task in taskList:
subproc = Process(
target=self._getPipeLineInfo, args=(
task,
q, ))
processPool.append(subproc)
subproc.start()
pidList.append(subproc.pid)
self._logger.info(
"[pipeline info]: process [%d] has been started, total task num is %d ..."
% (subproc.pid, len(task)))
for t in processPool:
t.join()
pidList.remove(t.pid)
self._logger.info(
"[pipeline info]: process [%d] has exited! remained %d process!"
% (t.pid, len(pidList)))
pipeLineInfo = {}
metaInfo = {}
metaInfo['name'] = 'process_name'
metaInfo['ph'] = 'M'
metaInfo['pid'] = 0
metaInfo['args'] = {
'name': "%02d_pipeLineInfo" % PIPELINEINFO_TRACE_NUM
}
for t in processPool:
for k, v in q.get().items():
rankId = int(k)
gpuId = rankId % self._gpuPerTrainer
if str(gpuId) not in pipeLineInfo.keys():
pipeLineInfo[str(gpuId)] = [metaInfo]
pipeLineInfo[str(gpuId)].extend(v)
return pipeLineInfo
def _allocate_pids(self, profile_dict, gpuId, initPid):
chrome_trace = ChromeTraceFormatter()
devices = dict()
mem_devices = dict()
initLineNum = initPid + 1
lineDelta = len(profile_dict.keys())
i = 0
for k, profile_pb in six.iteritems(profile_dict):
lineNum = initLineNum
for event in profile_pb.events:
if event.type == profiler_pb2.Event.CPU:
if (k, event.device_id, "CPU") not in devices:
pid = initPid
initPid = initPid + 1
devices[(k, event.device_id, "CPU")] = pid
# -1 device id represents CUDA API(RunTime) call.(e.g. cudaLaunch, cudaMemcpy)
if event.device_id == -1:
chrome_trace.emit_pid("%02d_%s:cuda_api" %
(lineNum, k), pid)
lineNum = lineNum + 1
else:
chrome_trace.emit_pid("%02d_%s:cpu:block:%d" %
(lineNum, k, event.device_id),
pid)
lineNum = lineNum + 1
elif event.type == profiler_pb2.Event.GPUKernel:
if (k, event.device_id, "GPUKernel") not in devices:
if gpuId == event.device_id:
pid = initPid
initPid = initPid + 1
devices[(k, event.device_id, "GPUKernel")] = pid
chrome_trace.emit_pid("%02d_%s:gpu:%d" %
(lineNum, k, event.device_id),
pid)
lineNum = lineNum + 1
if not hasattr(profile_pb, "mem_events"):
continue
for mevent in profile_pb.mem_events:
if mevent.place == profiler_pb2.MemEvent.CUDAPlace:
if (k, mevent.device_id, "GPU") not in mem_devices:
if gpuId == mevent.device_id:
pid = initPid
initPid = initPid + 1
mem_devices[(k, mevent.device_id, "GPU")] = pid
chrome_trace.emit_pid(
"%02d_memory usage on %s:gpu:%d" %
(lineNum, k, mevent.device_id), pid)
lineNum = lineNum + 1
elif mevent.place == profiler_pb2.MemEvent.CPUPlace:
if (k, mevent.device_id, "CPU") not in mem_devices:
pid = initPid
initPid = initPid + 1
mem_devices[(k, mevent.device_id, "CPU")] = pid
chrome_trace.emit_pid("%02d_memory usage on %s:cpu:%d" %
(lineNum, k, mevent.device_id),
pid)
lineNum = lineNum + 1
elif mevent.place == profiler_pb2.MemEvent.CUDAPinnedPlace:
if (k, mevent.device_id, "CUDAPinnedPlace"
) not in mem_devices:
if gpuId == mevent.device_id:
pid = initPid
initPid = initPid + 1
mem_devices[(k, mevent.device_id,
"CUDAPinnedPlace")] = pid
chrome_trace.emit_pid(
"%02d_memory usage on %s:cudapinnedplace:%d" %
(lineNum, k, mevent.device_id), pid)
lineNum = lineNum + 1
if (k, 0, "CPU") not in mem_devices:
pid = initPid
initPid = initPid + 1
mem_devices[(k, 0, "CPU")] = pid
chrome_trace.emit_pid("%02d_memory usage on %s:cpu:%d" %
(lineNum, k, 0), pid)
lineNum = lineNum + 1
if (k, 0, "GPU") not in mem_devices:
# if gpuId == mevent.device_id:
pid = initPid
initPid = initPid + 1
mem_devices[(k, 0, "GPU")] = pid
chrome_trace.emit_pid("%02d_memory usage on %s:gpu:%d" %
(lineNum, k, 0), pid)
lineNum = lineNum + 1
if (k, 0, "CUDAPinnedPlace") not in mem_devices:
pid = initPid
initPid = initPid + 1
mem_devices[(k, 0, "CUDAPinnedPlace")] = pid
chrome_trace.emit_pid(
"%02d_memory usage on %s:cudapinnedplace:%d" %
(lineNum, k, 0), pid)
lineNum = lineNum + 1
i = i + 1
return chrome_trace, devices, mem_devices
def _allocate_events(self, profile_dict, devices, gpuId):
chrome_trace = ChromeTraceFormatter()
for k, profile_pb in six.iteritems(profile_dict):
rankId = int(k.split(".")[-1])
for event in profile_pb.events:
if event.type == profiler_pb2.Event.CPU:
type = "CPU"
elif event.type == profiler_pb2.Event.GPUKernel:
type = "GPUKernel"
if event.type == profiler_pb2.Event.GPUKernel and event.device_id != gpuId and rankId % self._gpuPerTrainer != gpuId:
continue
pid = devices[(k, event.device_id, type)]
args = {'name': event.name}
if event.memcopy.bytes > 0:
args['mem_bytes'] = event.memcopy.bytes
if hasattr(event, "detail_info") and event.detail_info:
args['detail_info'] = event.detail_info
# TODO(panyx0718): Chrome tracing only handles ms. However, some
# ops takes micro-seconds. Hence, we keep the ns here.
chrome_trace.emit_region(
self._align_ts(event.start_ns),
(event.end_ns - event.start_ns) / 1.0, pid,
event.sub_device_id, 'Op', event.name, args)
return chrome_trace
def _allocate_memory_event(self, profile_dict, mem_devices, gpuId):
chrome_trace = ChromeTraceFormatter()
if not hasattr(profiler_pb2, "MemEvent"):
return
place_to_str = {
profiler_pb2.MemEvent.CPUPlace: "CPU",
profiler_pb2.MemEvent.CUDAPlace: "GPU",
profiler_pb2.MemEvent.CUDAPinnedPlace: "CUDAPinnedPlace"
}
for k, profile_pb in six.iteritems(profile_dict):
rankId = int(k.split(".")[-1])
trainerId = rankId / self._gpuPerTrainer
if trainerId >= self._displaySize:
continue
mem_list = []
end_profiler = 0
for mevent in profile_pb.mem_events:
crt_info = dict()
crt_info['time'] = mevent.start_ns
crt_info['size'] = mevent.bytes
if mevent.place in place_to_str:
place = place_to_str[mevent.place]
else:
place = "UnDefine"
if (mevent.place == profiler_pb2.MemEvent.CUDAPlace or
mevent.place == profiler_pb2.MemEvent.CUDAPinnedPlace
) and mevent.device_id != gpuId:
continue
crt_info['place'] = place
pid = mem_devices[(k, mevent.device_id, place)]
crt_info['pid'] = pid
crt_info['thread_id'] = mevent.thread_id
crt_info['device_id'] = mevent.device_id
mem_list.append(crt_info)
crt_info = dict()
crt_info['place'] = place
crt_info['pid'] = pid
crt_info['thread_id'] = mevent.thread_id
crt_info['device_id'] = mevent.device_id
crt_info['time'] = mevent.end_ns
crt_info['size'] = -mevent.bytes
mem_list.append(crt_info)
end_profiler = max(end_profiler, crt_info['time'])
mem_list.sort(key=lambda tmp: (tmp.get('time', 0)))
i = 0
total_size = 0
while i < len(mem_list):
total_size += mem_list[i]['size']
while i < len(mem_list) - 1 and mem_list[i]['time'] == mem_list[
i + 1]['time']:
total_size += mem_list[i + 1]['size']
i += 1
chrome_trace.emit_counter(
"Memory", "Memory", mem_list[i]['pid'],
self._align_ts(mem_list[i]['time']), 0, total_size)
i += 1
return chrome_trace
def _getOPTraceInfoByGpuId(self, groupId, gpuId):
fileFist = self.getFileListByGroup(groupId)
newFileList = []
for file in fileFist:
rankId = self.getRankId(file)
localRank = rankId % self._gpuPerTrainer
if localRank == gpuId and (rankId / self._gpuPerTrainer
) % self._groupSize < self._displaySize:
newFileList.append(file)
profile_dict = self._parseTask(newFileList)
initPid = PIPELINEINFO_TRACE_NUM + DCGMINFO_TRACE_NUM + NETINFO_TRACE_NUM
metaTrace, devicesPid, mem_devicesPid = self._allocate_pids(
profile_dict, gpuId, initPid)
eventsTrace = self._allocate_events(profile_dict, devicesPid, gpuId)
memEventsTrace = self._allocate_memory_event(profile_dict,
mem_devicesPid, gpuId)
trace = {}
trace[
'traceEvents'] = metaTrace._metadata + eventsTrace._events + memEventsTrace._events
self.dumpOpInfoDict(trace, groupId, gpuId, True)
return trace
def getOPTraceInfo(self, groupId):
manager = multiprocessing.Manager()
q = manager.Queue()
processPool = []
pidList = []
for gpuId in range(self._gpuPerTrainer):
subproc = Process(
target=self._getOPTraceInfoByGpuId, args=(
groupId,
gpuId, ))
processPool.append(subproc)
subproc.start()
pidList.append(subproc.pid)
self._logger.info(
"[op info]: process [%d] has been started, total task num is %d ..."
% (subproc.pid, 1))
for t in processPool:
t.join()
pidList.remove(t.pid)
self._logger.info(
"[op info]: process [%d] has exited! remained %d process!" %
(t.pid, len(pidList)))
opInfo = {}
return opInfo
def parseFileByGroup(self, groupId, processNum=8):
fileFist = self.getFileListByGroup(groupId)
if processNum == 0:
return self._parseTask(fileFist)
else:
return self._parseTask(fileFist)
def test_profileFileReader():
args = {
"dataPath": "data/newdata/profile",
"groupSize": 4,
"displaySize": 8,
"gpuPerTrainer": 8,
"minTimeStamp": 0,
"organizeForm": FILEORGANIZEFORM_BYRANK,
}
testReader = profileFileReader(getLogger(), args)
testReader.printArgs()
data = testReader.getOPTraceInfo(0)
jsObj = json.dumps(data)
fileObject = open('jsonFile.json', 'w')
fileObject.write(jsObj)
fileObject.close()
if __name__ == "__main__":
test_profileFileReader()
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册