CspReporter.py 8.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import glob
import argparse

from multiprocessing import Process

from DCGMFileReader import dcgmFileReader
from ProfileFileReader import profileFileReader

from CspFileReader import getLogger
from CspFileReader import TIME_PATH, DCGM_PATH, NET_PATH, PROFILE_PATH
26
from CspFileReader import FILEORGANIZEFORM_BYRANK, FILEORGANIZEFORM_BYTRAINER
27 28 29 30


def get_argparse():
    parser = argparse.ArgumentParser(description=__doc__)
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
    parser.add_argument(
        '--profile_path',
        type=str,
        default='.',
        help='Working path that store the monitor data.',
    )

    parser.add_argument(
        '--timeline_path',
        type=str,
        default='.',
        help='Output timeline file name.',
    )

    parser.add_argument(
        '--gpuPerTrainer', type=int, default=8, help='Gpus per trainer.'
    )

    parser.add_argument(
        '--trainerNum', type=int, default=4, help='Num of trainer.'
    )

    parser.add_argument(
        '--groupSize', type=int, default=8, help='Num of trainer in a group.'
    )

    parser.add_argument(
        '--displaySize',
        type=int,
        default=2,
        help='Num of line need to display in a group.',
    )
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87

    return parser.parse_args()


class CspReporter(object):
    def __init__(self, args):
        self._args = args
        print(self._args)

        self._workPath = self._args.profile_path
        self._saveFilePath = self._args.timeline_path
        self._gpuPerTrainer = self._args.gpuPerTrainer
        self._groupSize = self._args.groupSize
        self._displaySize = self._args.displaySize
        self._trainerNum = self._args.trainerNum

        self._checkArgs()

        self._init_logger()
        self._init_timeInfo()
        self._init_reader()

    def _checkArgs(self):
        if self._trainerNum % self._groupSize != 0:
            raise Exception(
88 89 90
                "Input args error: trainerNum[%d] %% groupSize[%d] != 0"
                % (self._trainerNum, self._groupSize)
            )
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126

    def _init_logger(self):
        self._logger = getLogger()

    def _init_reader(self):
        self._dcgmPath = os.path.join(self._workPath, DCGM_PATH)
        self._netPath = os.path.join(self._workPath, NET_PATH)
        self._profilePath = os.path.join(self._workPath, PROFILE_PATH)

        self._netFileReaderArgs = {
            "dataPath": self._netPath,
            "groupSize": self._groupSize,
            "displaySize": self._displaySize,
            "gpuPerTrainer": self._gpuPerTrainer,
            "minTimeStamp": self._minTimeStamp,
            "organizeForm": FILEORGANIZEFORM_BYTRAINER,
        }

        self._dcgmFileReaderArgs = {
            "dataPath": self._dcgmPath,
            "groupSize": self._groupSize,
            "displaySize": self._displaySize,
            "gpuPerTrainer": self._gpuPerTrainer,
            "minTimeStamp": self._minTimeStamp,
            "organizeForm": FILEORGANIZEFORM_BYTRAINER,
        }

        self._profileFileReaderArgs = {
            "dataPath": self._profilePath,
            "groupSize": self._groupSize,
            "displaySize": self._displaySize,
            "gpuPerTrainer": self._gpuPerTrainer,
            "minTimeStamp": self._minTimeStamp,
            "organizeForm": FILEORGANIZEFORM_BYRANK,
        }

127 128 129 130 131 132
        self._dcgmFileReader = dcgmFileReader(
            self._logger, self._dcgmFileReaderArgs
        )
        self._profileFileReader = profileFileReader(
            self._logger, self._profileFileReaderArgs
        )
133 134 135 136 137 138 139 140 141

    def _init_timeInfo(self):
        self._timePath = os.path.join(self._workPath, TIME_PATH)
        self._timeInfo = {}
        self._minTimeStamp = 0
        self._set_timeInfo()

    def _set_timeInfo(self, timeFileNamePrefix="time.txt", sed="."):
        timeFileNameList = glob.glob(
142 143
            os.path.join(self._timePath, timeFileNamePrefix, sed, "*")
        )
144 145 146 147 148 149 150 151
        for timeFileName in timeFileNameList:
            trainerId = int(timeFileName.split(sed)[-1])
            gpuId = int(timeFileName.split(sed)[-2])
            info = {}
            with open(timeFileName, "r") as rf:
                for line in rf:
                    if line.startswith("start time:"):
                        info["start_time"] = int(
152 153
                            float(line.split(":")[-1]) * 1e9
                        )
154

155 156 157
                        self._minTimeStamp = min(
                            self._minTimeStamp, info["start_time"]
                        )
158 159 160 161 162 163

                    if line.startswith("end time:"):
                        info["end_time"] = int(float(line.split(":")[-1]) * 1e9)
            if not info:
                self._timeInfo[gpuId * trainerId] = info

164 165 166
    def _generateTraceFileByGroupAndGpuId(
        self, pipileInfo, netInfo, groupId, gpuId
    ):
167 168 169 170
        dcgmInfoDict = self._dcgmFileReader.getDcgmInfoDict(groupId, gpuId)
        opInfoDict = self._profileFileReader.getOpInfoDict(groupId, gpuId)

        traceObj = {}
171 172 173 174 175 176
        traceObj["traceEvents"] = (
            pipileInfo[str(gpuId)]
            + opInfoDict["traceEvents"]
            + dcgmInfoDict["traceEvents"]
            + netInfo["traceEvents"]
        )
177

178 179 180
        self._profileFileReader.dumpDict(
            traceObj, "traceFile", groupId, gpuId, False, self._saveFilePath
        )
181 182 183

    def _generateTraceFileByGroup(self, groupId, processNum):
        # first we need to generate pipeline info
184
        pipileInfo = self._profileFileReader.getPipeLineInfo(
185 186
            groupId, processNum
        )
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
        # second we need to generate dcgm info
        dcgmInfo = self._dcgmFileReader.getDCGMTraceInfo(groupId, processNum)

        # third we need to generate net info
        netInfo = {}
        netInfo["traceEvents"] = []
        # netInfo = self._netFileReader.parseFileByGroup(groupId, processNum)

        # forth we need to generate op info
        opInfo = self._profileFileReader.getOPTraceInfo(groupId)

        # finially we need dump this information into disk
        processPool = []
        pidList = []

        for gpuId in range(self._gpuPerTrainer):
203 204 205 206 207 208 209 210 211
            subproc = Process(
                target=self._generateTraceFileByGroupAndGpuId,
                args=(
                    pipileInfo,
                    netInfo,
                    groupId,
                    gpuId,
                ),
            )
212 213 214 215 216
            processPool.append(subproc)
            subproc.start()
            pidList.append(subproc.pid)
            self._logger.info(
                "[traceFile]: process [%d] has been started, total task num is %d ..."
217 218
                % (subproc.pid, 1)
            )
219 220 221 222 223

        for t in processPool:
            t.join()
            pidList.remove(t.pid)
            self._logger.info(
224 225 226
                "[traceFile]: process [%d] has exited! remained %d process!"
                % (t.pid, len(pidList))
            )
227 228 229 230 231

    def generateTraceFile(self, processNum=8):
        processPool = []
        pidList = []
        for groupId in range(self._trainerNum / self._groupSize):
232 233 234 235 236 237 238
            subproc = Process(
                target=self._generateTraceFileByGroup,
                args=(
                    groupId,
                    processNum,
                ),
            )
239 240 241 242 243
            processPool.append(subproc)
            subproc.start()
            pidList.append(subproc.pid)
            self._logger.info(
                "[GroupTraceFile]: process [%d] has been started, total task num is %d ..."
244 245
                % (subproc.pid, 1)
            )
246 247 248 249 250
        for t in processPool:
            t.join()
            pidList.remove(t.pid)
            self._logger.info(
                "[GroupTraceFile]: process [%d] has exited! remained %d process!"
251 252
                % (t.pid, len(pidList))
            )
253 254 255 256 257 258


if __name__ == '__main__':
    args = get_argparse()
    tl = CspReporter(args)
    tl.generateTraceFile()