未验证 提交 d251028d 编写于 作者: C chenjian 提交者: GitHub

Add profiler backend

上级 8b1815f3
......@@ -12,15 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# =======================================================================
from visualdl.proto.record_pb2 import Record
import numpy as np
from PIL import Image
from visualdl.proto.record_pb2 import Record
def scalar(tag, value, step, walltime=None):
"""Package data to one scalar.
Args:
tag (string): Data identifier
value (float): Value of scalar
......@@ -52,8 +51,7 @@ def meta_data(tag='meta_data_tag', display_name="", step=0, walltime=None):
"""
meta = Record.MetaData(display_name=display_name)
return Record(values=[
Record.Value(id=step, tag=tag, timestamp=walltime,
meta_data=meta)
Record.Value(id=step, tag=tag, timestamp=walltime, meta_data=meta)
])
......@@ -82,8 +80,8 @@ def imgarray2bytes(np_array):
def make_grid(I, ncols=8): # noqa: E741
assert isinstance(
I, np.ndarray), 'plugin error, should pass numpy array here'
assert isinstance(I,
np.ndarray), 'plugin error, should pass numpy array here'
if I.shape[1] == 1:
I = np.concatenate([I, I, I], 1) # noqa: E741
assert I.ndim == 4 and I.shape[1] == 3 or I.shape[1] == 4
......@@ -113,9 +111,11 @@ def convert_to_HWC(tensor, input_format):
Return:
Image of format `HWC`.
"""
assert(len(set(input_format)) == len(input_format)), "You can not use the same dimension shordhand twice. \
assert (len(set(input_format)) == len(input_format)
), "You can not use the same dimension shordhand twice. \
input_format: {}".format(input_format)
assert(len(tensor.shape) == len(input_format)), "size of input tensor and input format are different. \
assert (len(tensor.shape) == len(input_format)
), "size of input tensor and input format are different. \
tensor shape: {}, input_format: {}".format(tensor.shape, input_format)
input_format = input_format.upper()
......@@ -129,7 +129,8 @@ def convert_to_HWC(tensor, input_format):
index = [input_format.find(c) for c in 'HWC']
tensor_HWC = tensor.transpose(index)
if tensor_HWC.shape[2] == 1:
tensor_HWC = np.concatenate([tensor_HWC, tensor_HWC, tensor_HWC], 2)
tensor_HWC = np.concatenate([tensor_HWC, tensor_HWC, tensor_HWC],
2)
return tensor_HWC
if len(input_format) == 2:
......@@ -202,7 +203,8 @@ def embedding(tag, labels, hot_vectors, step, labels_meta=None, walltime=None):
for label, hot_vector in zip(labels, hot_vectors):
if not isinstance(label, list):
label = [label]
embeddings.embeddings.append(Record.Embedding(label=label, vectors=hot_vector))
embeddings.embeddings.append(
Record.Embedding(label=label, vectors=hot_vector))
return Record(values=[
Record.Value(
......@@ -325,7 +327,8 @@ def hparam(name, hparam_dict, metric_list, walltime):
hparamInfo.string_value = v
hm.hparamInfos.append(hparamInfo)
else:
print("The value of %s must be int, float or str, not %s" % (k, str(type(v))))
print("The value of %s must be int, float or str, not %s" %
(k, str(type(v))))
for metric in metric_list:
metricInfo = Record.HParam.HparamInfo()
metricInfo.name = metric
......@@ -333,8 +336,7 @@ def hparam(name, hparam_dict, metric_list, walltime):
hm.metricInfos.append(metricInfo)
return Record(values=[
Record.Value(
id=1, tag="hparam", timestamp=walltime, hparam=hm)
Record.Value(id=1, tag="hparam", timestamp=walltime, hparam=hm)
])
......@@ -389,7 +391,12 @@ def compute_curve(labels, predictions, num_thresholds=None, weights=None):
return data
def pr_curve(tag, labels, predictions, step, walltime, num_thresholds=127,
def pr_curve(tag,
labels,
predictions,
step,
walltime,
num_thresholds=127,
weights=None):
"""Package data to one pr_curve.
......@@ -409,15 +416,16 @@ def pr_curve(tag, labels, predictions, step, walltime, num_thresholds=127,
num_thresholds = min(num_thresholds, 127)
prcurve_map = compute_curve(labels, predictions, num_thresholds, weights)
return pr_curve_raw(tag=tag,
tp=prcurve_map['tp'],
fp=prcurve_map['fp'],
tn=prcurve_map['tn'],
fn=prcurve_map['fn'],
precision=prcurve_map['precision'],
recall=prcurve_map['recall'],
step=step,
walltime=walltime)
return pr_curve_raw(
tag=tag,
tp=prcurve_map['tp'],
fp=prcurve_map['fp'],
tn=prcurve_map['tn'],
fn=prcurve_map['fn'],
precision=prcurve_map['precision'],
recall=prcurve_map['recall'],
step=step,
walltime=walltime)
def pr_curve_raw(tag, tp, fp, tn, fn, precision, recall, step, walltime):
......@@ -441,7 +449,6 @@ def pr_curve_raw(tag, tp, fp, tn, fn, precision, recall, step, walltime):
Return:
Package with format of record_pb2.Record
"""
"""
if isinstance(tp, np.ndarray):
tp = tp.astype(int).tolist()
......@@ -456,15 +463,10 @@ def pr_curve_raw(tag, tp, fp, tn, fn, precision, recall, step, walltime):
if isinstance(recall, np.ndarray):
recall = recall.astype(int).tolist()
"""
prcurve = Record.PRCurve(TP=tp,
FP=fp,
TN=tn,
FN=fn,
precision=precision,
recall=recall)
prcurve = Record.PRCurve(
TP=tp, FP=fp, TN=tn, FN=fn, precision=precision, recall=recall)
return Record(values=[
Record.Value(
id=step, tag=tag, timestamp=walltime, pr_curve=prcurve)
Record.Value(id=step, tag=tag, timestamp=walltime, pr_curve=prcurve)
])
......@@ -518,7 +520,13 @@ def compute_roc_curve(labels, predictions, num_thresholds=None, weights=None):
return data
def roc_curve(tag, labels, predictions, step, walltime, num_thresholds=127, weights=None):
def roc_curve(tag,
labels,
predictions,
step,
walltime,
num_thresholds=127,
weights=None):
"""Package data to one roc_curve.
Args:
tag (string): Data identifier
......@@ -533,17 +541,19 @@ def roc_curve(tag, labels, predictions, step, walltime, num_thresholds=127, weig
Package with format of record_pb2.Record
"""
num_thresholds = min(num_thresholds, 127)
roc_curve_map = compute_roc_curve(labels, predictions, num_thresholds, weights)
roc_curve_map = compute_roc_curve(labels, predictions, num_thresholds,
weights)
return roc_curve_raw(tag=tag,
tp=roc_curve_map['tp'],
fp=roc_curve_map['fp'],
tn=roc_curve_map['tn'],
fn=roc_curve_map['fn'],
tpr=roc_curve_map['tpr'],
fpr=roc_curve_map['fpr'],
step=step,
walltime=walltime)
return roc_curve_raw(
tag=tag,
tp=roc_curve_map['tp'],
fp=roc_curve_map['fp'],
tn=roc_curve_map['tn'],
fn=roc_curve_map['fn'],
tpr=roc_curve_map['tpr'],
fpr=roc_curve_map['fpr'],
step=step,
walltime=walltime)
def roc_curve_raw(tag, tp, fp, tn, fn, tpr, fpr, step, walltime):
......@@ -563,7 +573,6 @@ def roc_curve_raw(tag, tp, fp, tn, fn, tpr, fpr, step, walltime):
Return:
Package with format of record_pb2.Record
"""
"""
if isinstance(tp, np.ndarray):
tp = tp.astype(int).tolist()
......@@ -578,12 +587,7 @@ def roc_curve_raw(tag, tp, fp, tn, fn, tpr, fpr, step, walltime):
if isinstance(fpr, np.ndarray):
fpr = fpr.astype(int).tolist()
"""
roc_curve = Record.ROC_Curve(TP=tp,
FP=fp,
TN=tn,
FN=fn,
tpr=tpr,
fpr=fpr)
roc_curve = Record.ROC_Curve(TP=tp, FP=fp, TN=tn, FN=fn, tpr=tpr, fpr=fpr)
return Record(values=[
Record.Value(
id=step, tag=tag, timestamp=walltime, roc_curve=roc_curve)
......
# Copyright (c) 2022 VisualDL Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =======================================================================
# Copyright (c) 2022 VisualDL Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =======================================================================
# Copyright (c) 2022 VisualDL Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =======================================================================
__ALL__ = [
'TOOLTIP_DEVICE_INFO_CN', 'TOOLTIP_MODEL_PERSPECTIVE_CN',
'TOOLTIP_MODEL_PERSPECTIVE_PERSTEP_CN',
'TOOLTIP_EVENT_TYPE_PERSPECTIVE_CN',
'TOOLTIP_EVENT_TYPE_MODEL_PERSPECTIVE_CN', 'TOOLTIP_DEVICE_INFO_EN',
'TOOLTIP_MODEL_PERSPECTIVE_EN', 'TOOLTIP_MODEL_PERSPECTIVE_PERSTEP_EN',
'TOOLTIP_EVENT_TYPE_PERSPECTIVE_EN',
'TOOLTIP_EVENT_TYPE_MODEL_PERSPECTIVE_EN'
]
TOOLTIP_DEVICE_INFO_CN = \
'<b class="bold">CPU进程利用率:</b><br>'\
'进程所利用到的CPU的时间 / ProfileStep的时间(即性能分析的时间跨度)<br>'\
'<b class="bold">CPU系统利用率:</b><br>'\
'整个系统所有进程利用到的CPU时间 / CPU总时间(ProfileStep的时间*CPU核心数)<br>'\
'<b class="bold">GPU利用率:</b><br>'\
'进程利用GPU计算的时间 / ProfileStep的时间,进程利用GPU计算的时间即是GPU Kernel计算的时间,越高越好<br>'\
'<b class="bold">流处理器效率:</b><br>'\
'对于流处理器处理某个GPU Kernel, 其效率为SM_Eff_i = min(Kernel所用的Blocks数量 / GPU的流处理器数量, 100%)。'\
'流处理器效率为SM_Eff_i关于每个Kernel的执行时间加权和 / ProfileStep的时间<br>'\
'<b class="bold">流处理器占用率:</b><br>'\
'对于流处理器处理某个GPU Kernel, 其占用率Occu_i = 为活跃的warp数 / 能支持的最大warp数。流处理器占用率为Occu_i关于每个Kernel执行时间的加权平均<br>'\
'<b class="bold">Tensor cores使用时间占比:</b><br>'\
'使用Tensor Cores的GPU Kernel的计算时间 / 所有Kernel的计算时间<br>'
TOOLTIP_MODEL_PERSPECTIVE_CN = \
'展示模型各阶段DataLoader, Forward, Backward, Optimization以及Other的总CPU和GPU时间。<br>'\
'CPU时间即是各阶段代码执行的时间,GPU时间是各阶段所调用的GPU Kernel在GPU上的计算时间。<br>'\
'<b class="bold">DataLoader:</b> 表示使用paddle.io.DataLoader从数据集中取数据的阶段<br>'\
'<b class="bold">Forward:</b> 表示模型前向计算的阶段<br>'\
'<b class="bold">Backward:</b> 表示模型反向梯度计算的阶段<br>'\
'<b class="bold">Optimization:</b> 表示模型优化更新参数的阶段<br>'\
'<b class="bold">Other:</b> 其它时间<br>'
TOOLTIP_MODEL_PERSPECTIVE_PERSTEP_CN = \
'展示每一个ProfileStep内模型各阶段DataLoader, Forward, Backward, Optimization以及Other的CPU和GPU时间。<br>'\
'CPU时间即是各阶段代码执行的时间,GPU时间是各阶段所调用的GPU Kernel在GPU上的计算时间。<br>'\
'<b class="bold">DataLoader:</b> 表示使用paddle.io.DataLoader从数据集中取数据的阶段<br>'\
'<b class="bold">Forward:</b> 表示模型前向计算的阶段<br>'\
'<b class="bold">Backward:</b> 表示模型反向梯度计算的阶段<br>'\
'<b class="bold">Optimization:</b> 表示模型优化更新参数的阶段<br>'\
'<b class="bold">Other:</b> 其它时间<br>'
TOOLTIP_EVENT_TYPE_PERSPECTIVE_CN = \
'展示不同类型的事件在模型各阶段DataLoader, Forward, Backward, Optimization以及Other的分布。<br>'\
'<b class="bold">Operator:</b> 表示框架内的算子执行<br>'\
'<b class="bold">CudaRuntime:</b> 表示cuda runtime的函数执行<br>'\
'<b class="bold">Kernel:</b> 表示GPU上计算的Kernel函数执行<br>'\
'<b class="bold">Memcpy:</b> 表示CPU和GPU之间的数据传输<br>'\
'<b class="bold">Memset:</b> 表示GPU的显存值设置<br>'\
'<b class="bold">UserDefined:</b> 表示用户在python脚本中自定义的事件<br>'\
'<b class="bold">OperatorInner:</b> 表示框架内算子的执行子过程<br>'\
'<b class="bold">Communication:</b> 表示分布式通信有关的事件<br>'
TOOLTIP_EVENT_TYPE_MODEL_PERSPECTIVE_CN = \
'展示在模型各阶段DataLoader, Forward, Backward, Optimization以及Other所包含的各种事件的时间。<br>'\
'<b class="bold">Operator:</b> 表示框架内的算子执行<br>'\
'<b class="bold">CudaRuntime:</b> 表示cuda runtime的函数执行<br>'\
'<b class="bold">Kernel:</b> 表示GPU上计算的Kernel函数执行<br>'\
'<b class="bold">Memcpy:</b> 表示CPU和GPU之间的数据传输<br>'\
'<b class="bold">Memset:</b> 表示GPU的显存值设置<br>'\
'<b class="bold">UserDefined:</b> 表示用户在python脚本中自定义的事件<br>'\
'<b class="bold">OperatorInner:</b> 表示框架内算子的执行子过程<br>'\
'<b class="bold">Communication:</b> 表示分布式通信有关的数据通信和计算事件<br>'
TOOLTIP_EVENT_DISTRIBUTED_HISTOGRAM_CN = \
'展示模型在每个迭代过程中通信、计算以及两者重叠部分的时间。<br>'\
'<b class="bold">ProfileStep:</b> 表示某一步迭代的总时间<br>'\
'<b class="bold">Communication:</b> 表示和通信相关的时间,包括框架内打的Communication事件、和通信有关的算子和Kernel(nccl)执行的时间<br>'\
'<b class="bold">Computation:</b> 表示GPU Kernel计算的时间,但是去除了和通信有关的Kernel(nccl)<br>'\
'<b class="bold">Overlap:</b> 表示通信和计算过程并行执行时候时间相互重叠的部分<br>'\
'<b class="bold">Others:</b> 表示通信和计算之外的时间<br>'
TOOLTIP_DEVICE_INFO_EN = \
'<b class="bold">CPU Process Utilization:</b><br>'\
'Process CPU time / ProfileStep time(total time of profiling)<br>'\
'<b class="bold">CPU System Utilization:</b><br>'\
'Sum of system\'s all processes CPU time/ CPU total time(ProfileStep time* #CPU Core)<br>'\
'<b class="bold">GPU Utilization:</b><br>'\
'GPU busy time / ProfileStep time,GPU busy time is the time during in which at least one GPU kernel is\
running on it.<br>'\
'<b class="bold">Est. SM Efficiency:</b><br>'\
'The SM efficiency for one kernel can be denoted as SM_Eff_i = min(blocks of this kernel / SM number \
of this GPU, 100%).'\
'Est. SM efficiency of GPU is the weighted sum of SM_Eff_i across all kernels / ProfileStep time<br>'\
'<b class="bold">Est. Achieved Occupancy:</b><br>'\
'The SM occupancy for one kernel can be denoted as Occu_i = active warps on an SM / maximum number \
of active warps supported by the SM. \
Est. SM occupancy of GPU is the weighted average of Occu_i across all kernels<br>'\
'<b class="bold">Tensor cores ratio:</b><br>'\
'Sum of kernel time using Tensor Cores / Sum of total kernel time<br>'
TOOLTIP_MODEL_PERSPECTIVE_EN = \
'Present CPU and GPU time for each stage of a model, i.e. DataLoader, Forward, Backward, Optimization and Other.<br>'\
'CPU time is the execution time for code,GPU time is the calculation time of kernels launched in the stage.<br>'\
'<b class="bold">DataLoader:</b> denote data fetching using paddle.io.DataLoader<br>'\
'<b class="bold">Forward:</b> denote model forward<br>'\
'<b class="bold">Backward:</b> denote gradient back-propagate<br>'\
'<b class="bold">Optimization:</b> denote parameters update<br>'\
'<b class="bold">Other:</b> other time out of above range'
TOOLTIP_MODEL_PERSPECTIVE_PERSTEP_EN = \
'Present CPU and GPU time in each ProfileStep for each stage of a model, \
i.e. DataLoader, Forward, Backward, Optimization and Other.<br>'\
'CPU time is the execution time for code,GPU time is the calculation time of kernels launched in the stage.<br>'\
'<b class="bold">DataLoader:</b> denote data fetching using paddle.io.DataLoader<br>'\
'<b class="bold">Forward:</b> denote model forward<br>'\
'<b class="bold">Backward:</b> denote gradient back-propagate<br>'\
'<b class="bold">Optimization:</b> denote parameters update<br>'\
'<b class="bold">Other:</b> other time out of above range'
TOOLTIP_EVENT_TYPE_PERSPECTIVE_EN = \
'Present the distribution of each kind of events across DataLoader,\
Forward, Backward, Optimization and Other stage.<br>'\
'<b class="bold">Operator:</b> denote operator execution<br>'\
'<b class="bold">CudaRuntime:</b> denote cuda runtime function execution<br>'\
'<b class="bold">Kernel:</b> denote kernel execution on GPU<br>'\
'<b class="bold">Memcpy:</b> denote data transfer between CPU and GPU<br>'\
'<b class="bold">Memset:</b> denote memory data set on GPU<br>'\
'<b class="bold">UserDefined:</b> denote events defined by users in python script<br>'\
'<b class="bold">OperatorInner:</b> denote operator\'s subprocess execution<br>'\
'<b class="bold">Communication:</b> denote events associated with distributed data transfer and computation.<br>'
TOOLTIP_EVENT_TYPE_MODEL_PERSPECTIVE_EN = \
'Present the time of each kind of events included in DataLoader, Forward, Backward, Optimization \
and Other stage.<br>'\
'<b class="bold">Operator:</b> denote operator execution<br>'\
'<b class="bold">CudaRuntime:</b> denote cuda runtime function execution<br>'\
'<b class="bold">Kernel:</b> denote kernel execution on GPU<br>'\
'<b class="bold">Memcpy:</b> denote data transfer between CPU and GPU<br>'\
'<b class="bold">Memset:</b> denote memory data set on GPU<br>'\
'<b class="bold">UserDefined:</b> denote events defined by users in python script<br>'\
'<b class="bold">OperatorInner:</b> denote operator\'s subprocess execution<br>'\
'<b class="bold">Communication:</b> denote events associated with distributed data transfer and computation.<br>'
TOOLTIP_EVENT_DISTRIBUTED_HISTOGRAM_EN = \
'Present the time of communication, computation and their overlap in program.<br>'\
'<b class="bold">ProfileStep:</b> denote an iteration step of training process<br>'\
'<b class="bold">Communication:</b> denote the time related to communication, including events of communication type\
in paddle framework、communication-related operators and GPU Kernels(nccl)<br>'\
'<b class="bold">Computation:</b> denote the computation \
time of GPU Kernels,except communication-related Kernels(nccl)<br>'\
'<b class="bold">Overlap:</b> denote the overlap time between Communication and \
Computation when they are executed parallelly.<br>'\
'<b class="bold">Others:</b> denote the time out of Communication and Computation<br>'
# Copyright (c) 2022 VisualDL Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =======================================================================
from collections import defaultdict
from .utils import get_device_nodes
from .utils import intersection_ranges
from .utils import merge_ranges
from .utils import merge_self_ranges
from .utils import rebuild_node_trees
from .utils import sum_ranges
from .utils import traverse_tree
_CommunicationOpName = ['allreduce', 'broadcast', 'rpc']
class DistributedParser:
r"""
Analysis communication and computation time range, and their overlap.
The computation time is all kernel except kernels for communication like nccl.
"""
def __init__(self):
self.steps_data = defaultdict(lambda: defaultdict(list))
self.calls = defaultdict(lambda: defaultdict(int))
self.steps_time = defaultdict(lambda: defaultdict(float))
self.profile_steps_time = {}
def parse(self, nodetrees):
'''
Collect all communication and computation time ranges.
'''
total_time = 0.0
nodetrees = rebuild_node_trees(nodetrees)
thread2hostnodes = traverse_tree(nodetrees)
thread_count = 0
for threadid, hostnodes in thread2hostnodes.items():
for hostnode in hostnodes[1:]: # skip root node
# case 1: TracerEventType is Communication
if hostnode.type == 'ProfileStep':
if thread_count == 0:
total_time += (hostnode.end_ns - hostnode.start_ns)
self._parse_step(hostnode)
continue
thread_count += 1
new_steps_data = defaultdict(lambda: defaultdict(list))
self.profile_steps_time['All'] = total_time
for step, step_data in self.steps_data.items():
self.calls[step]['cpu_communication_range'] = len(
step_data['cpu_communication_range'])
self.calls[step]['gpu_communication_range'] = len(
step_data['gpu_communication_range'])
new_steps_data[step][
'cpu_communication_range'] = merge_self_ranges(
step_data['cpu_communication_range'], is_sorted=False)
new_steps_data[step][
'gpu_communication_range'] = merge_self_ranges(
step_data['gpu_communication_range'], is_sorted=False)
new_steps_data[step]['communication_range'] = merge_ranges(
new_steps_data[step]['cpu_communication_range'],
new_steps_data[step]['gpu_communication_range'],
is_sorted=True)
new_steps_data[step]['computation_range'] = merge_self_ranges(
step_data['computation_range'], is_sorted=False)
new_steps_data[step]['overlap_range'] = intersection_ranges(
new_steps_data[step]['communication_range'],
new_steps_data[step]['computation_range'],
is_sorted=True)
self.steps_time[step]['communication_time'] = sum_ranges(
new_steps_data[step]['communication_range'])
self.steps_time[step]['computation_time'] = sum_ranges(
new_steps_data[step]['computation_range'])
self.steps_time[step]['overlap_time'] = sum_ranges(
new_steps_data[step]['overlap_range'])
self.steps_time[step]['others_time'] = self.profile_steps_time[
step] - self.steps_time[step][
'communication_time'] - self.steps_time[step][
'computation_time'] + self.steps_time[step][
'overlap_time']
self.steps_data = new_steps_data
def _parse_step(self, profile_step_node):
step = profile_step_node.name.split('#')[1]
self.profile_steps_time[
step] = profile_step_node.end_ns - profile_step_node.start_ns
nodes = []
stack = []
stack.append(profile_step_node)
while stack:
current_node = stack.pop()
nodes.append(current_node)
for childnode in current_node.children_node:
stack.append(childnode)
for hostnode in nodes:
if hostnode.type == 'Communication':
self.steps_data[step]['cpu_communication_range'].append(
(hostnode.start_ns, hostnode.end_ns))
self.steps_data['All']['cpu_communication_range'].append(
(hostnode.start_ns, hostnode.end_ns))
device_nodes = get_device_nodes(hostnode)
for device_node in device_nodes:
if device_node.type == 'Kernel':
self.steps_data[step][
'gpu_communication_range'].append(
(device_node.start_ns, device_node.end_ns))
self.steps_data['All'][
'gpu_communication_range'].append(
(device_node.start_ns, device_node.end_ns))
# case 2: TracerEventType is Operator but is communication op
elif hostnode.type == 'Operator' and any([
name in hostnode.name.lower()
for name in _CommunicationOpName
]):
self.steps_data[step]['cpu_communication_range'].append(
(hostnode.start_ns, hostnode.end_ns))
self.steps_data['All']['cpu_communication_range'].append(
(hostnode.start_ns, hostnode.end_ns))
device_nodes = get_device_nodes(hostnode)
for device_node in device_nodes:
if device_node.type == 'Kernel':
self.steps_data[step][
'gpu_communication_range'].append(
(device_node.start_ns, device_node.end_ns))
self.steps_data['All'][
'gpu_communication_range'].append(
(device_node.start_ns, device_node.end_ns))
# case 3: Others, filter kernels named with nccl
else:
for runtimenode in hostnode.runtime_node:
for devicenode in runtimenode.device_node:
if devicenode.type == 'Kernel':
if 'nccl' in devicenode.name.lower():
self.steps_data[step][
'gpu_communication_range'].append(
(devicenode.start_ns,
devicenode.end_ns))
self.steps_data['All'][
'gpu_communication_range'].append(
(devicenode.start_ns,
devicenode.end_ns))
else:
self.steps_data[step][
'computation_range'].append(
(devicenode.start_ns,
devicenode.end_ns))
self.steps_data['All'][
'computation_range'].append(
(devicenode.start_ns,
devicenode.end_ns))
# Copyright (c) 2022 VisualDL Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =======================================================================
import collections
import functools
import json
import re
import sys
_show_name_pattern = re.compile(r'(.+)(\[.+\])')
_show_tid_pattern = re.compile(r'\w+(\(.+\))')
host_node_type_map = {
"Operator", "Dataloader", "ProfileStep", "CudaRuntime", "UserDefined",
"OperatorInner", "Forward", "Backward", "Optimization", "Communication",
"PythonOp", "PythonUserDefined"
}
device_node_type_map = {"Kernel", "Memcpy", "Memset"}
memory_node_event_map = {
"Allocate", "Free", "ReservedAllocate", "ReservedFree"
}
class HostNode:
def __init__(self):
self.name = None
self.type = None
self.start_ns = 0
self.end_ns = 0
self.process_id = 0
self.thread_id = 0
self.correlation_id = -1
self.input_shapes = {}
self.dtypes = {}
self.callstack = ""
self.children_node = []
self.runtime_node = []
self.device_node = []
self.mem_node = []
@classmethod
def from_json(cls, json_obj):
self = cls()
self.name = json_obj['name'].replace(
_show_name_pattern.match(json_obj['name']).group(2), "")
self.type = json_obj['cat']
self.start_ns = int(
float(json_obj['args']['start_time'].split(' ')[0]) * 1000)
self.end_ns = int(
float(json_obj['args']['end_time'].split(' ')[0]) * 1000)
self.process_id = json_obj['pid']
self.thread_id = json_obj['tid'].replace(
_show_tid_pattern.match(json_obj['tid']).group(1), "")
self.correlation_id = json_obj['args'][
'correlation id'] if 'correlation id' in json_obj['args'] else -1
self.input_shapes = json_obj['args'][
'input_shapes'] if 'input_shapes' in json_obj['args'] else {}
self.dtypes = json_obj['args'][
'input_dtypes'] if 'input_dtypes' in json_obj['args'] else {}
self.callstack = json_obj['args'][
'callstack'] if 'callstack' in json_obj['args'] else ""
self.children_node = []
self.runtime_node = []
self.device_node = []
self.mem_node = []
return self
class MemNode:
def __init__(self):
self.type = None
self.timestamp_ns = 0
self.addr = 0
self.process_id = 0
self.thread_id = 0
self.increase_bytes = 0
self.place = None
self.current_allocated = 0
self.current_reserved = 0
self.peak_allocated = 0
self.peak_reserved = 0
@classmethod
def from_json(cls, json_obj):
self = cls()
self.type = json_obj['cat']
self.timestamp_ns = json_obj['ts'] * 1000
self.addr = hex(int(
json_obj['args']['addr'])) if 'addr' in json_obj['args'] else 0
self.process_id = json_obj['pid']
self.thread_id = json_obj['tid'].replace(
_show_tid_pattern.match(json_obj['tid']).group(1), "")
self.increase_bytes = json_obj['args'][
'increase_bytes'] if 'increase_bytes' in json_obj['args'] else 0
self.place = json_obj['args']['place'] if 'place' in json_obj[
'args'] else "Place(cpu)"
self.current_allocated = json_obj['args'][
'current_allocated'] if 'current_allocated' in json_obj[
'args'] else 0
self.current_reserved = json_obj['args'][
'current_reserved'] if 'current_reserved' in json_obj['args'] else 0
self.peak_allocated = json_obj['args'][
'peak_allocated'] if 'peak_allocated' in json_obj['args'] else 0
self.peak_reserved = json_obj['args'][
'peak_reserved'] if 'peak_reserved' in json_obj['args'] else 0
return self
class DeviceNode:
def __init__(self):
self.name = None
self.type = None
self.start_ns = 0
self.end_ns = 0
self.device_id = 0
self.stream_id = 0
self.context_id = 0
self.correlation_id = 0
self.block_x, self.block_y, self.block_z = [0, 0, 0]
self.grid_x, self.grid_y, self.grid_z = [0, 0, 0]
self.shared_memory = 0
self.registers_per_thread = 0
self.num_bytes = 0
self.value = 0
self.occupancy = 0
self.blocks_per_sm = 0
self.warps_per_sm = 0
@classmethod
def from_json(cls, json_obj):
self = cls()
self.name = json_obj['name'].replace(
_show_name_pattern.match(json_obj['name']).group(2), "")
self.type = json_obj['cat']
self.start_ns = int(
float(json_obj['args']['start_time'].split(' ')[0]) * 1000)
self.end_ns = int(
float(json_obj['args']['end_time'].split(' ')[0]) * 1000)
self.device_id = json_obj['pid']
self.stream_id = json_obj['tid']
self.context_id = json_obj['args']['context'] if 'context' in json_obj[
'args'] else 0
self.correlation_id = json_obj['args']['correlation id']
self.block_x, self.block_y, self.block_z = json_obj['args'][
'block'] if 'block' in json_obj['args'] else [0, 0, 0]
self.grid_x, self.grid_y, self.grid_z = json_obj['args'][
'grid'] if 'grid' in json_obj['args'] else [0, 0, 0]
self.shared_memory = json_obj['args'][
'shared memory'] if 'shared memory' in json_obj['args'] else 0
self.registers_per_thread = json_obj['args'][
'registers per thread'] if 'registers per thread' in json_obj[
'args'] else 0
self.num_bytes = json_obj['args']['bytes'] if 'bytes' in json_obj[
'args'] else 0
self.value = json_obj['args']['value'] if 'value' in json_obj[
'args'] else 0
self.occupancy = json_obj['args'][
'theoretical achieved occupancy %'] if 'theoretical achieved occupancy %' in json_obj[
'args'] else 0
self.blocks_per_sm = json_obj['args'][
"blocks per SM"] if "blocks per SM" in json_obj['args'] else 0
self.warps_per_sm = json_obj['args'][
"warps per SM"] if "warps per SM" in json_obj['args'] else 0
return self
class ProfilerResult:
def __init__(self, json_data):
self.device_infos = None
self.span_idx = None
self.data = None
self.extra_info = None
self.schema_version = None
self.has_hostnodes = True
self.has_devicenodes = True
self.has_memnodes = True
self.parse(json_data)
self.content = json_data
self.start_in_timeline_ns = None
def parse(self, json_data):
self.schema_version = json_data['schemaVersion']
self.span_idx = json_data['span_indx']
self.device_infos = {
device_info['id']: device_info
for device_info in json_data['deviceProperties']
}
hostnodes = []
runtimenodes = []
devicenodes = []
memnodes = []
for event in json_data['traceEvents']:
if not event or (event['ph'] != 'X' and event['ph'] != 'i'):
continue
if event['cat'] in host_node_type_map:
if event['cat'] == 'CudaRuntime' or event[
'cat'] == 'MluRuntime':
runtimenodes.append(HostNode.from_json(event))
else:
hostnodes.append(HostNode.from_json(event))
if hostnodes[-1].start_ns == 0:
self.start_in_timeline_ns = int(event['ts']) * 1000
elif event['cat'] in device_node_type_map:
devicenodes.append(DeviceNode.from_json(event))
elif event['cat'] in memory_node_event_map:
memnodes.append(MemNode.from_json(event))
if memnodes:
for memnode in memnodes:
assert self.start_in_timeline_ns is not None
memnode.timestamp_ns = memnode.timestamp_ns - self.start_in_timeline_ns
if not hostnodes:
self.has_hostnodes = False
if not devicenodes:
self.has_devicenodes = False
if not memnodes:
self.has_memnodes = False
self.data = self.build_tree(hostnodes, runtimenodes, devicenodes,
memnodes)
self.extra_info = json_data['ExtraInfo']
def build_tree( # noqa: C901
self, hostnodes, runtimenodes, devicenodes, memnodes):
thread2host_event_nodes = collections.defaultdict(list)
thread2runtime_event_nodes = collections.defaultdict(list)
thread2mem_event_nodes = collections.defaultdict(list)
correlation_id2runtime_event_node = {}
thread_event_trees = {}
thread_ids = set()
for hostnode in hostnodes:
thread2host_event_nodes[hostnode.thread_id].append(hostnode)
thread_ids.add(hostnode.thread_id)
# construct thread2runtime_event_nodes and correlation_id2runtime_event_node
for runtimenode in runtimenodes:
thread2runtime_event_nodes[runtimenode.thread_id].append(
runtimenode)
thread_ids.add(runtimenode.thread_id)
correlation_id2runtime_event_node[
runtimenode.correlation_id] = runtimenode
# associate CudaRuntimeTraceEventNode and DeviceTraceEventNode
# construct correlation_id2device_event_nodes
for devicenode in devicenodes:
if devicenode.correlation_id not in correlation_id2runtime_event_node:
continue
runtimenode = correlation_id2runtime_event_node[
devicenode.correlation_id]
runtimenode.device_node.append(devicenode)
# construct thread2mem_event_nodes
for memnode in memnodes:
thread2mem_event_nodes[memnode.thread_id].append(memnode)
# sort host event nodes and runtime event nodes according to start_ns and
# end_ns
# the smaller start_ns is, the further ahead position is.
# when start_ns of two nodes are equal, the one with bigger end_ns should be
# ahead.
def compare_hostnode_func(hostnode1, hostnode2):
if hostnode1.start_ns < hostnode2.start_ns:
return -1
if hostnode1.start_ns == hostnode2.start_ns:
if hostnode1.end_ns > hostnode2.end_ns:
return -1
return 1
def compare_memnode_func(memnode1, memnode2):
if memnode1.timestamp_ns <= memnode2.timestamp_ns:
return -1
return 1
for threadid, hostnodes in thread2host_event_nodes.items():
thread2host_event_nodes[threadid] = sorted(
hostnodes, key=functools.cmp_to_key(compare_hostnode_func))
for threadid, runtimenodes in thread2runtime_event_nodes.items():
thread2runtime_event_nodes[threadid] = sorted(
runtimenodes, key=functools.cmp_to_key(compare_hostnode_func))
for threadid, memnodes in thread2mem_event_nodes.items():
thread2mem_event_nodes[threadid] = sorted(
memnodes, key=functools.cmp_to_key(compare_memnode_func))
# construct trees
for threadid in thread_ids:
thread_event_trees[threadid] = self._build_tree_relationship(
thread2host_event_nodes[threadid],
thread2runtime_event_nodes[threadid],
thread2mem_event_nodes[threadid])
return thread_event_trees
def _build_tree_relationship( # noqa: C901
self, host_event_nodes, runtime_event_nodes, mem_event_nodes):
# root node
root_node = HostNode()
root_node.name, root_node.type, root_node.start_ns, root_node.end_ns = "root node", "UserDefined", \
0, sys.maxsize
# push root node into node_stack
node_stack = []
node_stack.append(root_node)
# handle host_event_nodes
for host_node in host_event_nodes:
while True:
stack_top_node = node_stack[-1]
if host_node.start_ns < stack_top_node.end_ns:
stack_top_node.children_node.append(host_node)
node_stack.append(host_node)
break
else:
node_stack.pop()
# insert runtime node
# select runtime nodes which time range within stack_top_node
hasenter = False
firstposition = 0
lastposition = len(runtime_event_nodes)
for i, runtimenode in enumerate(runtime_event_nodes):
if runtimenode.start_ns >= stack_top_node.start_ns and \
runtimenode.end_ns <= stack_top_node.end_ns:
if not hasenter:
firstposition = i
hasenter = True
stack_top_node.runtime_node.append(runtimenode)
else:
# from this runtime node, not within stack_top_node, erase the
# nodes from runtime_event_nodes
if runtimenode.start_ns > stack_top_node.end_ns:
lastposition = i
break
if hasenter:
del runtime_event_nodes[firstposition:lastposition]
# to insert left runtimenode into host_event_nodes
while node_stack:
stack_top_node = node_stack.pop()
# insert runtime node
# select runtime nodes which time range within stack_top_node
firstposition = 0
lastposition = len(runtime_event_nodes)
hasenter = False
for i, runtimenode in enumerate(runtime_event_nodes):
if runtimenode.start_ns >= stack_top_node.start_ns and runtimenode.end_ns <= stack_top_node.end_ns:
if not hasenter:
firstposition = i
hasenter = True
stack_top_node.runtime_node.append(runtimenode)
else:
# from this runtime node, not within stack_top_node, erase the
# nodes from runtime_event_nodes
if runtimenode.start_ns > stack_top_node.end_ns:
lastposition = i
break
if hasenter:
del runtime_event_nodes[firstposition:lastposition]
# build relationship between host event node and mem event node
# First, post-order traverse the tree. Then, insert the memory and op
# supplement node into correct host nodes.
stack = []
flag_stack = []
post_order_nodes = []
stack.append(root_node)
flag_stack.append(0)
while stack:
current_node = stack.pop()
flag = flag_stack.pop()
if flag == 0:
stack.append(current_node)
flag_stack.append(1)
for child in current_node.children_node[::-1]:
stack.append(child)
flag_stack.append(0)
else:
post_order_nodes.append(current_node)
for node in post_order_nodes:
hasenter = False
firstposition = 0
lastposition = len(mem_event_nodes)
for i, mem_node in enumerate(mem_event_nodes):
if mem_node.timestamp_ns >= node.start_ns and mem_node.timestamp_ns <= node.end_ns:
node.mem_node.append(mem_node)
if not hasenter:
firstposition = i
hasenter = True
else:
if mem_node.timestamp_ns > node.end_ns:
lastposition = i
break
if hasenter:
del mem_event_nodes[firstposition:lastposition]
return root_node
def get_data(self):
return self.data
def get_extra_info(self):
return self.extra_info
def get_schema_version(self):
return self.schema_version
def get_device_infos(self):
return self.device_infos
def get_span_idx(self):
return self.span_idx
def has_device(self):
return self.has_devicenodes
def has_host(self):
return self.has_hostnodes
def has_memory(self):
return self.has_memnodes
def save(self, path, format):
pass
def load_profiler_json(file_name):
content = json.load(open(file_name, 'r'))
return ProfilerResult(content)
# Copyright (c) 2022 VisualDL Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =======================================================================
import collections
class TC_Allowlist(dict):
# Refer to https://github.com/NVIDIA/PyProf/blob/fd1b2902e3306119eee40ba6b6e8b2f816920c29/pyprof/prof/tc.py#L19
allowlist = [
'h884', 's884', 'h1688', 's1688', 'hmma', 'i8816', '16816',
'dgrad_1x1_stride_2x2', 'first_layer_wgrad_kernel', 'conv1x1',
'conv2d_c1_k1', 'direct_group', 'xmma_implicit_gemm',
'xmma_sparse_conv', 'xmma_warp_specialized_implicit_gemm', 'xmma_gemm',
'xmma_sparse_gemm', 'c1688'
]
def __init__(self):
pass
def __contains__(self, item):
# If kernel name contains substring equal to any one in allowlist, then it uses tensor core.
for pattern in self.allowlist:
if pattern in item:
return True
return False
_allow_list = TC_Allowlist()
class DeviceItem:
def __init__(self, name):
self.name = name
self.call = 0
self.gpu_time = 0
self.max_gpu_time = 0
self.min_gpu_time = float('inf')
self.tensorcore_used = True if name in _allow_list else False
self.sum_blocks_per_sm = 0.0
self.sum_occupancy = 0.0
@property
def avg_gpu_time(self):
return self.gpu_time / self.call
def add_gpu_time(self, time):
if time > self.max_gpu_time:
self.max_gpu_time = time
if time < self.min_gpu_time:
self.min_gpu_time = time
self.gpu_time += time
def add_item(self, node):
self.call += 1
self.add_gpu_time(node.end_ns - node.start_ns)
self.sum_blocks_per_sm += node.blocks_per_sm
self.sum_occupancy += node.occupancy
class KernelParser:
def __init__(self):
self.kernel_items = {} # for kernel summary
self.kernel_items_with_op_name_attributes = collections.defaultdict(
dict)
self.gpu_ids = set()
self.occupancy = 0.0
self.sm_efficiency = 0.0
self.tensor_core_ratio = 0.0
def parse(self, nodelists): # noqa: C901
total_duration = 0.0
weighted_occupancy = 0.0
weighted_sm_efficiency = 0.0
for threadid, nodes in nodelists.items():
for node in nodes:
if node.type == 'Operator':
op_name = node.name
for children in node.children_node:
if children.type == 'OperatorInner':
for runtime_node in children.runtime_node:
for device_node in runtime_node.device_node:
if device_node.type == 'Kernel':
op_attribute_name = self._translate_op_name_attributes_to_string(
op_name, device_node)
if op_attribute_name not in self.kernel_items_with_op_name_attributes[
device_node.name]:
self.kernel_items_with_op_name_attributes[
device_node.name][
op_attribute_name] = DeviceItem(
device_node.name)
self.kernel_items_with_op_name_attributes[
device_node.
name][op_attribute_name].add_item(
device_node)
for runtime_node in node.runtime_node:
for device_node in runtime_node.device_node:
if device_node.type == 'Kernel':
op_attribute_name = self._translate_op_name_attributes_to_string(
op_name, device_node)
if op_attribute_name not in self.kernel_items_with_op_name_attributes[
device_node.name]:
self.kernel_items_with_op_name_attributes[
device_node.
name][op_attribute_name] = DeviceItem(
device_node.name)
self.kernel_items_with_op_name_attributes[
device_node.
name][op_attribute_name].add_item(
device_node)
elif node.type == 'OperatorInner':
continue
op_name = node.name
for runtime_node in node.runtime_node:
for device_node in runtime_node.device_node:
if device_node.type == 'Kernel':
op_attribute_name = self._translate_op_name_attributes_to_string(
op_name, device_node)
if op_attribute_name not in self.kernel_items_with_op_name_attributes[
device_node.name]:
self.kernel_items_with_op_name_attributes[
device_node.
name][op_attribute_name] = DeviceItem(
device_node.name)
self.kernel_items_with_op_name_attributes[
device_node.name][op_attribute_name].add_item(
device_node)
for threadid, nodes in nodelists.items():
for node in nodes:
for runtime_node in node.runtime_node:
for device_node in runtime_node.device_node:
if device_node.type == 'Kernel':
name = device_node.name
if name not in self.kernel_items:
self.kernel_items[name] = DeviceItem(name)
self.kernel_items[name].add_item(device_node)
weighted_occupancy += (
device_node.occupancy / 100) * (
device_node.end_ns - device_node.start_ns)
if device_node.blocks_per_sm > 1:
sm_efficiency = 1
else:
sm_efficiency = device_node.blocks_per_sm
weighted_sm_efficiency += sm_efficiency * (
device_node.end_ns - device_node.start_ns)
total_duration += (
device_node.end_ns - device_node.start_ns)
self.gpu_ids.add(device_node.device_id)
self.occupancy = weighted_occupancy / total_duration if total_duration != 0 else 0.0
self.sm_efficiency = weighted_sm_efficiency # to divide ProfileStep time in ProfileData
total_time = 0
total_tensorcore_time = 0
for name, node in self.kernel_items.items():
if node.tensorcore_used:
total_tensorcore_time += node.gpu_time
total_time += node.gpu_time
self.tensor_core_ratio = total_tensorcore_time / total_time if total_time != 0 else 0.0
def _translate_op_name_attributes_to_string(self, op_name, event):
result = '{}-[{},{},{}]-[{},{},{}]-{}-{}'.format(
op_name, event.grid_x, event.grid_y, event.grid_z, event.block_x,
event.block_y, event.block_z, event.registers_per_thread,
event.shared_memory)
return result
# Copyright (c) 2022 VisualDL Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =======================================================================
import collections
from .utils import traverse_tree
class MemoryItem:
def __init__(self, event_name, place, memory_type='Allocated'):
self.event_name = event_name
self.place = place
self.allocation_count = 0
self.free_count = 0
self.allocation_size = 0
self.free_size = 0
self.increase_size = 0
self.memory_type = memory_type
def add_memory_record(self, size, allocation_type):
if allocation_type == 'Allocate' or allocation_type == 'ReservedAllocate':
self.allocation_count += 1
self.allocation_size += size
elif allocation_type == 'Free' or allocation_type == 'ReservedFree':
self.free_count += 1
self.free_size -= size # size is sign(-) when free.
else:
print("No corresponding type.")
self.increase_size = self.allocation_size - self.free_size
class MemoryParser:
def __init__(self):
self.allocated_items = collections.defaultdict(
dict) # for memory summary, device type: event
self.reserved_items = collections.defaultdict(
dict) # for memory summary, device type: event
self.peak_allocation_values = collections.defaultdict(int)
self.peak_reserved_values = collections.defaultdict(int)
self.memory_events = collections.defaultdict(lambda: collections.
defaultdict(list))
self.memory_curve = collections.defaultdict(lambda: collections.
defaultdict(list))
self.paired_events = collections.defaultdict(list)
self.size_ranges = {}
def parse(self, nodetrees): # noqa: C901
r"""
Analyse memory event in the nodetress.
"""
thread2hostnodes = traverse_tree(nodetrees)
for threadid, host_nodes in thread2hostnodes.items():
for host_node in host_nodes[1:]: # skip root node
if host_node.type == 'OperatorInner':
continue
if host_node.type == 'Operator':
for child in host_node.children_node:
self._analyse_node_memory(host_node.name, child)
self._analyse_node_memory(host_node.name, host_node)
# pair for memory events
for device_type, memory_events in self.memory_events.items():
max_size = 0
for (addr, memory_type), memory_lists in memory_events.items():
memory_lists = sorted(memory_lists, key=lambda x: x[0])
paired_results = []
for memory_list in memory_lists:
timestamp, memory_type, hostnodename, size = memory_list
if memory_type == 'Allocate' or memory_type == 'ReservedAllocate':
if size > max_size:
max_size = size
if memory_type == 'Allocate':
paired_results.append([
addr, 'Allocated', hostnodename, timestamp,
None, None, size
])
else:
paired_results.append([
addr, 'ReservedAllocate', hostnodename,
timestamp, None, None, size
])
elif memory_type == 'Free' or memory_type == 'ReservedFree':
if -size > max_size:
max_size = -size
if paired_results:
if paired_results[-1][-3] is None:
paired_results[-1][-3] = hostnodename
paired_results[-1][-2] = timestamp
self.paired_events[device_type].append(
paired_results.pop())
else:
if memory_type == 'Free':
paired_results.append([
addr, 'Allocated', None, None,
hostnodename, timestamp, -size
])
else:
paired_results.append([
addr, 'ReservedAllocate', None, None,
hostnodename, timestamp, -size
])
self.paired_events[device_type].append(
paired_results.pop())
else:
if memory_type == 'Free':
paired_results.append([
addr, 'Allocated', None, None,
hostnodename, timestamp, -size
])
else:
paired_results.append([
addr, 'ReservedAllocate', None, None,
hostnodename, timestamp, -size
])
self.paired_events[device_type].append(
paired_results.pop())
self.paired_events[device_type].extend(paired_results)
self.size_ranges[device_type] = (0, max_size)
def _analyse_node_memory(self, event_name, node):
for memnode in node.mem_node: # self mem node
if memnode.type == 'Allocate' or memnode.type == 'Free':
if event_name not in self.allocated_items[memnode.place]:
self.allocated_items[
memnode.place][event_name] = MemoryItem(
event_name, memnode.place, 'Allocated')
self.allocated_items[
memnode.place][event_name].add_memory_record(
memnode.increase_bytes, memnode.type)
self.memory_events[memnode.place][(memnode.addr,
'Allocated')].append([
memnode.timestamp_ns,
memnode.type,
event_name,
memnode.increase_bytes
])
elif memnode.type == 'ReservedAllocate' or memnode.type == 'ReservedFree':
if event_name not in self.reserved_items[memnode.place]:
self.reserved_items[
memnode.place][event_name] = MemoryItem(
event_name, memnode.place, 'Reserved')
self.reserved_items[
memnode.place][event_name].add_memory_record(
memnode.increase_bytes, memnode.type)
self.memory_events[memnode.place][(memnode.addr,
"Reserved")].append([
memnode.timestamp_ns,
memnode.type,
event_name,
memnode.increase_bytes
])
self.memory_curve[memnode.place]['Allocated'].append(
(memnode.timestamp_ns, memnode.current_allocated, event_name))
self.memory_curve[memnode.place]['Reserved'].append(
(memnode.timestamp_ns, memnode.current_reserved, event_name))
self.memory_curve[memnode.place]['PeakAllocated'].append(
(memnode.timestamp_ns, memnode.peak_allocated, event_name))
self.memory_curve[memnode.place]['PeakReserved'].append(
(memnode.timestamp_ns, memnode.peak_reserved, event_name))
self.peak_allocation_values[memnode.place] = max(
self.peak_allocation_values[memnode.place],
memnode.peak_allocated)
self.peak_reserved_values[memnode.place] = max(
self.peak_reserved_values[memnode.place],
memnode.peak_reserved)
# Copyright (c) 2022 VisualDL Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =======================================================================
import collections
from .kernel_parser import DeviceItem
from .utils import wrap_tree
class OperatorItem:
def __init__(self, name):
self.name = name
self.call = 0
self.cpu_time = 0
self.gpu_time = 0
self.max_cpu_time = 0
self.min_cpu_time = float('inf')
self.max_gpu_time = 0
self.min_gpu_time = float('inf')
self.devices = {}
self.operator_inners = {}
self.general_gpu_time = 0
self.min_general_gpu_time = float('inf')
self.max_general_gpu_time = 0
@property
def avg_cpu_time(self):
return self.cpu_time / self.call
@property
def avg_gpu_time(self):
return self.gpu_time / self.call
@property
def avg_general_gpu_time(self):
return self.general_gpu_time / self.call
def add_cpu_time(self, time):
if time > self.max_cpu_time:
self.max_cpu_time = time
if time < self.min_cpu_time:
self.min_cpu_time = time
self.cpu_time += time
def add_gpu_time(self, time):
if time > self.max_gpu_time:
self.max_gpu_time = time
if time < self.min_gpu_time:
self.min_gpu_time = time
self.gpu_time += time
def add_general_gpu_time(self, time):
if time > self.max_general_gpu_time:
self.max_general_gpu_time = time
if time < self.min_general_gpu_time:
self.min_general_gpu_time = time
self.general_gpu_time += time
def add_call(self):
self.call += 1
def add_item(self, node):
self.add_call()
self.add_cpu_time(node.cpu_time)
self.add_gpu_time(node.gpu_time)
self.add_general_gpu_time(node.general_gpu_time)
for child in node.children_node:
if child.type != 'Operator':
if child.name not in self.operator_inners:
self.operator_inners[child.name] = OperatorItem(child.name)
self.operator_inners[child.name].add_item(child)
for runtimenode in node.runtime_node:
for devicenode in runtimenode.device_node:
name = devicenode.name
if name not in self.devices:
self.devices[name] = DeviceItem(name)
self.devices[name].add_item(devicenode)
class OperatorParser:
r"""
Analyse operator event in profiling data, correlate with its device event.
"""
def __init__(self):
self.items = {} # for operator summary
self.items_with_input_shape = collections.defaultdict(dict)
def parse(self, nodetrees):
r"""
Analysis operator event in the nodetress.
"""
node_statistic_trees, thread2host_statistic_nodes = wrap_tree(
nodetrees)
for threadid, host_statistic_nodes in thread2host_statistic_nodes.items(
):
for host_statistic_node in host_statistic_nodes[
1:]: # skip root node
if host_statistic_node.type == 'Operator':
self.add_operator_item(host_statistic_node)
def add_operator_item(self, operator_node):
if operator_node.name not in self.items:
self.items[operator_node.name] = OperatorItem(operator_node.name)
input_shape_str = self._translate_op_input_shape_to_string(
operator_node.input_shapes)
if input_shape_str not in self.items_with_input_shape[
operator_node.name]:
self.items_with_input_shape[
operator_node.name][input_shape_str] = OperatorItem(
operator_node.name)
self.items[operator_node.name].add_item(operator_node)
self.items_with_input_shape[
operator_node.name][input_shape_str].add_item(operator_node)
def _translate_op_input_shape_to_string(self, input_shape):
result = ''
for arg, shape in input_shape.items():
result += '{}-{}\t'.format(arg, shape)
return result
此差异已折叠。
# Copyright (c) 2022 VisualDL Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =======================================================================
class TraceParser:
def __init__(self):
pass
def parse(self, content):
self.content = content
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
StageType = ['Dataloader', 'Forward', 'Backward', 'Optimization']
def sum_ranges(ranges):
result = 0
for time_range in ranges:
result += (time_range[1] - time_range[0])
return result
def merge_self_ranges(src_ranges, is_sorted=False):
merged_ranges = []
if len(src_ranges) > 0:
if not is_sorted:
src_ranges.sort(key=lambda x: x[0])
cur_indx = 0
merged_ranges.append((src_ranges[cur_indx][0],
src_ranges[cur_indx][1]))
for cur_indx in range(1, len(src_ranges)):
if src_ranges[cur_indx][1] > merged_ranges[-1][1]:
if src_ranges[cur_indx][0] <= merged_ranges[-1][1]:
merged_ranges[-1] = (merged_ranges[-1][0],
src_ranges[cur_indx][1])
else:
merged_ranges.append((src_ranges[cur_indx][0],
src_ranges[cur_indx][1]))
return merged_ranges
def merge_ranges(range_list1, range_list2, is_sorted=False): # noqa:C901
merged_ranges = []
if not is_sorted:
range_list1 = merge_self_ranges(range_list1)
range_list2 = merge_self_ranges(range_list2)
len1 = len(range_list1)
len2 = len(range_list2)
if len1 == 0 and len2 == 0:
return merged_ranges
elif len1 == 0:
return range_list2
elif len2 == 0:
return range_list1
else:
indx1 = 0
indx2 = 0
range1 = range_list1[indx1]
range2 = range_list2[indx2]
if range1[0] < range2[0]:
merged_ranges.append(range1)
indx1 += 1
else:
merged_ranges.append(range2)
indx2 += 1
while indx1 < len1 and indx2 < len2:
range1 = range_list1[indx1]
range2 = range_list2[indx2]
if range1[0] < range2[0]:
if range1[1] > merged_ranges[-1][1]:
if range1[0] <= merged_ranges[-1][1]:
merged_ranges[-1] = (merged_ranges[-1][0], range1[1])
else:
merged_ranges.append((range1[0], range1[1]))
indx1 += 1
else:
indx1 += 1
else:
if range2[1] > merged_ranges[-1][1]:
if range2[0] <= merged_ranges[-1][1]:
merged_ranges[-1] = (merged_ranges[-1][0], range2[1])
else:
merged_ranges.append((range2[0], range2[1]))
indx2 += 1
else:
indx2 += 1
while indx1 < len1:
range1 = range_list1[indx1]
if range1[1] > merged_ranges[-1][1]:
if range1[0] <= merged_ranges[-1][1]:
merged_ranges[-1] = (merged_ranges[-1][0], range1[1])
else:
merged_ranges.append((range1[0], range1[1]))
indx1 += 1
else:
indx1 += 1
while indx2 < len2:
range2 = range_list2[indx2]
if range2[1] > merged_ranges[-1][1]:
if range2[0] <= merged_ranges[-1][1]:
merged_ranges[-1] = (merged_ranges[-1][0], range2[1])
else:
merged_ranges.append((range2[0], range2[1]))
indx2 += 1
else:
indx2 += 1
return merged_ranges
def intersection_ranges(range_list1, range_list2, is_sorted=False):
result_range = []
if len(range_list1) == 0 or len(range_list2) == 0:
return result_range
if not is_sorted:
range_list1 = merge_self_ranges(range_list1)
range_list2 = merge_self_ranges(range_list2)
len1 = len(range_list1)
len2 = len(range_list2)
indx1 = 0
indx2 = 0
range1 = range_list1[indx1]
range2 = range_list2[indx2]
while indx1 < len1 and indx2 < len2:
if range2[1] <= range1[0]:
indx2 += 1
if indx2 == len2:
break
range2 = range_list2[indx2]
elif range2[0] <= range1[0] and range2[1] < range1[1]:
assert (range2[1] > range1[0])
result_range.append((range1[0], range2[1]))
range1 = (range2[1], range1[1])
indx2 += 1
if indx2 == len2:
break
range2 = range_list2[indx2]
elif range2[0] <= range1[0]:
assert (range2[1] >= range1[1])
result_range.append(range1)
range2 = (range1[1], range2[1])
indx1 += 1
if indx1 == len1:
break
range1 = range_list1[indx1]
elif range2[1] < range1[1]:
assert (range2[0] > range1[0])
result_range.append(range2)
range1 = (range2[1], range1[1])
indx2 += 1
if indx2 == len2:
break
range2 = range_list2[indx2]
elif range2[0] < range1[1]:
assert (range2[1] >= range1[1])
result_range.append((range2[0], range1[1]))
range2 = (range1[1], range2[1])
indx1 += 1
if indx1 == len1:
break
range1 = range_list1[indx1]
else:
assert (range2[0] >= range1[1])
indx1 += 1
if indx1 == len1:
break
range1 = range_list1[indx1]
return result_range
def subtract_ranges(range_list1, range_list2, is_sorted=False):
result_range = []
if not is_sorted:
range_list1 = merge_self_ranges(range_list1)
range_list2 = merge_self_ranges(range_list2)
if len(range_list1) == 0:
return result_range
if len(range_list2) == 0:
return range_list1
len1 = len(range_list1)
len2 = len(range_list2)
indx1 = 0
indx2 = 0
range1 = range_list1[indx1]
range2 = range_list2[indx2]
while indx1 < len(range_list1):
if indx2 == len(range_list2):
result_range.append(range1)
indx1 += 1
if indx1 == len1:
break
range1 = range_list1[indx1]
elif range2[1] <= range1[0]:
indx2 += 1
if indx2 != len2:
range2 = range_list2[indx2]
elif range2[0] <= range1[0] and range2[1] < range1[1]:
range1 = (range2[1], range1[1])
indx2 += 1
if indx2 != len2:
range2 = range_list2[indx2]
elif range2[0] <= range1[0]:
assert (range2[1] >= range1[1])
range2 = (range1[1], range2[1])
indx1 += 1
if indx1 != len1:
range1 = range_list1[indx1]
elif range2[0] < range1[1]:
assert (range2[0] > range1[0])
result_range.append((range1[0], range2[0]))
range1 = (range2[0], range1[1])
else:
assert (range2[0] >= range1[1])
result_range.append(range1)
indx1 += 1
if indx1 != len1:
range1 = range_list1[indx1]
return result_range
class HostStatisticNode:
r'''
Wrap original node for calculating statistic metrics.
'''
def __init__(self, hostnode):
self.hostnode = hostnode
self.children_node = []
self.runtime_node = []
self.cpu_time = 0
self.self_cpu_time = 0
self.gpu_time = 0 # kernel time
self.self_gpu_time = 0
self.general_gpu_time = 0 # besides kernel, include time of gpu events like memcpy and memset
self.self_general_gpu_time = 0
self.is_terminal_operator_node = True
def cal_statistic(self):
for child in self.children_node:
child.cal_statistic()
if child.is_terminal_operator_node is False:
self.is_terminal_operator_node = False
for rt in self.runtime_node:
rt.cal_statistic()
self.cpu_time = self.hostnode.end_ns - self.hostnode.start_ns
self.self_cpu_time = self.cpu_time
for child in self.children_node:
if child.type == 'Operator':
self.is_terminal_operator_node = False
self.gpu_time += child.gpu_time
self.general_gpu_time += child.general_gpu_time
self.self_cpu_time -= (child.end_ns - child.start_ns)
for rt in self.runtime_node:
self.self_cpu_time -= (rt.end_ns - rt.start_ns)
self.gpu_time += rt.gpu_time
self.self_gpu_time += rt.gpu_time
self.general_gpu_time += rt.general_gpu_time
self.self_general_gpu_time += rt.general_gpu_time
for device in self.hostnode.device_node:
if device.type == 'Kernel':
self.gpu_time += (device.end_ns - device.start_ns)
self.self_gpu_time += (device.end_ns - device.start_ns)
self.general_gpu_time += (device.end_ns - device.start_ns)
self.self_general_gpu_time += (device.end_ns - device.start_ns)
@property
def end_ns(self):
return self.hostnode.end_ns
@property
def start_ns(self):
return self.hostnode.start_ns
def __getattr__(self, name):
return getattr(self.hostnode, name)
def traverse_tree(nodetrees):
results = collections.defaultdict(list)
for thread_id, rootnode in nodetrees.items():
stack = []
stack.append(rootnode)
threadlist = results[thread_id]
while stack:
current_node = stack.pop()
threadlist.append(current_node)
for childnode in current_node.children_node:
stack.append(childnode)
return results
def get_device_nodes(hostnode):
'''
Get all device nodes called in the time range of hostnode.
'''
stack = []
device_nodes = []
stack.append(hostnode)
while stack:
current_node = stack.pop()
for childnode in current_node.children_node:
stack.append(childnode)
for runtimenode in current_node.runtime_node:
for devicenode in runtimenode.device_node:
device_nodes.append(devicenode)
return device_nodes
def wrap_tree(nodetrees):
'''
Using HostStatisticNode to wrap original profiler result tree, and calculate node statistic metrics.
'''
node_statistic_tree = {}
results = collections.defaultdict(list)
newresults = collections.defaultdict(list)
for thread_id, rootnode in nodetrees.items():
stack = []
stack.append(rootnode)
root_statistic_node = HostStatisticNode(rootnode)
newstack = []
newstack.append(root_statistic_node)
node_statistic_tree[thread_id] = root_statistic_node
threadlist = results[thread_id]
newthreadlist = newresults[thread_id]
while stack:
current_node = stack.pop()
threadlist.append(current_node)
current_statistic_node = newstack.pop()
newthreadlist.append(current_statistic_node)
for childnode in current_node.children_node:
stack.append(childnode)
child_statistic_node = HostStatisticNode(childnode)
current_statistic_node.children_node.append(
child_statistic_node)
newstack.append(child_statistic_node)
for runtimenode in current_node.runtime_node:
runtime_statistic_node = HostStatisticNode(runtimenode)
current_statistic_node.runtime_node.append(
runtime_statistic_node)
# recursive calculate node statistic values
for thread_id, root_statistic_node in node_statistic_tree.items():
root_statistic_node.cal_statistic()
return node_statistic_tree, newresults
def rebuild_node_trees(nodetrees): # noqa:C901
template_root = None
# First, we find the tree which includes Forward event.
for threadid, root in nodetrees.items():
has_find_template_root = False
template_root = HostStatisticNode(root)
for children in root.children_node:
if children.type == 'ProfileStep':
profiler_step_node = HostStatisticNode(children)
template_root.children_node.append(profiler_step_node)
has_find_template_root = True
for stage_node in children.children_node:
if stage_node.type in StageType:
profiler_step_node.children_node.append(
HostStatisticNode(stage_node))
else:
break
if has_find_template_root is True:
break
if template_root is None:
print('No profiler steps found, overview page will have no data.')
wrapped_tree = {}
for thread_id, rootnode in nodetrees.items():
has_find_template_root = False
for children in rootnode.children_node:
if children.type == 'ProfileStep':
has_find_template_root = True
break
unwrapped_stack = []
warpped_stack = []
root_statistic_node = HostStatisticNode(rootnode)
wrapped_tree[thread_id] = root_statistic_node
if has_find_template_root is False:
for profiler_step_node in template_root.children_node:
profiler_step_wrap_node = HostStatisticNode(
profiler_step_node.hostnode)
root_statistic_node.children_node.append(
profiler_step_wrap_node)
for stage_node in profiler_step_node.children_node:
stage_wrap_node = HostStatisticNode(stage_node.hostnode)
profiler_step_wrap_node.children_node.append(
stage_wrap_node)
# insert nodes in original root into new stage nodes
# algorithm: post order traversal the tree
stack = []
flag_stack = []
post_order_nodes = []
stack.append(root_statistic_node)
flag_stack.append(0)
while stack:
current_node = stack.pop()
flag = flag_stack.pop()
if flag == 0:
stack.append(current_node)
flag_stack.append(1)
for children_node in reversed(current_node.children_node):
stack.append(children_node)
flag_stack.append(0)
else:
post_order_nodes.append(current_node)
# traverse post_order_nodes and insert right position
for runtimenode in rootnode.runtime_node:
runtime_wrapped_node = HostStatisticNode(runtimenode)
root_statistic_node.runtime_node.append(runtime_wrapped_node)
for node in rootnode.children_node:
unwrapped_stack.append(node)
for wrapped_node in post_order_nodes:
if node.start_ns >= wrapped_node.start_ns and node.end_ns <= wrapped_node.end_ns:
child_wrapped_node = HostStatisticNode(node)
warpped_stack.append(child_wrapped_node)
wrapped_node.children_node.append(child_wrapped_node)
break
else:
unwrapped_stack.append(rootnode)
warpped_stack.append(root_statistic_node)
while unwrapped_stack:
current_node = unwrapped_stack.pop()
current_wrapped_node = warpped_stack.pop()
for childnode in current_node.children_node:
unwrapped_stack.append(childnode)
child_wrapped_node = HostStatisticNode(childnode)
current_wrapped_node.children_node.append(child_wrapped_node)
warpped_stack.append(child_wrapped_node)
for runtimenode in current_node.runtime_node:
runtime_wrapped_node = HostStatisticNode(runtimenode)
current_wrapped_node.runtime_node.append(runtime_wrapped_node)
# recursive calculate node statistic values
for thread_id, root_wrapped_node in wrapped_tree.items():
root_wrapped_node.cal_statistic()
return wrapped_tree
def format_time(time, unit='ms', inf_subs='-'):
r"""
Transform time in ns to time in unit.
"""
if time == float('inf'):
return inf_subs
else:
result = float(time)
if unit == 's':
result /= 1e9
elif unit == 'ms':
result /= 1e6
elif unit == 'us':
result /= 1e3
# return '{:.2f}'.format(result)
return round(result, 2)
def format_ratio(ratio):
r"""
Transform ratio within [0, 1] to percentage presentation.
"""
# return '{:.2f}'.format(ratio * 100)
return round(ratio * 100, 2)
def format_float(float_data):
return round(float_data, 2)
def format_memory(memory, memory_unit='KB'):
result = float(memory)
if memory_unit == 'GB':
result /= (1024 * 1024 * 1024)
elif memory_unit == 'MB':
result /= (1024 * 1024)
elif memory_unit == 'KB':
result /= 1024
# return '{:.2f}'.format(result)
return round(result, 2)
此差异已折叠。
# Copyright (c) 2022 VisualDL Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =======================================================================
import os
import re
from threading import Thread
from multiprocess import Process
from multiprocess import Queue
from .parser.const_description import * # noqa: F403
from .parser.event_node import load_profiler_json
from .run_manager import RunManager
from visualdl.io import bfile
_name_pattern = re.compile(r"(.+)_time_(.+)\.paddle_trace\.((pb)|(json))")
def is_VDLProfiler_file(path):
"""Determine whether it is a paddle profile file that can be read by vdl according to the file name.
File name of a paddle profile file must contain `paddle_trace`.
Args:
path: File name to determine.
Returns:
True if the file is a paddle profile file, otherwise false.
"""
if "paddle_trace" not in path:
return False
return True
class ProfilerReader(object):
"""Profile reader to read paddle profile files, support for frontend api in lib.py.
"""
def __init__(self, logdir=''):
"""Instance of ProfileReader
Args:
logdir: The dir include paddle profile files, multiple subfolders allowed.
"""
if isinstance(logdir, str):
self.dir = [logdir]
else:
self.dir = logdir
self.walks = {}
self.displayname2runs = {}
self.runs2displayname = {}
self.run_managers = {}
self.profile_result_queue = Queue()
self.tempfile = None
self.runs()
Thread(target=self._get_data_from_queue, args=()).start()
@property
def logdir(self):
return self.dir
def get_all_walk(self):
flush_walks = {}
for dir in self.dir:
for root, dirs, files in bfile.walk(dir):
flush_walks.update({root: files})
return flush_walks
def get_run_manager(self, run):
if run in self.run_managers:
self.run_managers[run].join()
return self.run_managers[run]
else:
return None
def profile_runs(self, update=False):
"""Get profile run files.
Every dir(means `run` in vdl) has may have more than one profiler file.
Returns:
walks: A dict like {"exp1": ["1587375595_paddle_trace.json", "1587375685_paddle_trace.json"],
"exp2": ["1587375686_paddle_trace.json"]}
"""
if not self.walks or update is True:
flush_walks = self.get_all_walk()
walks_temp = {}
for run, filenames in flush_walks.items():
tags_temp = [
filename for filename in filenames
if is_VDLProfiler_file(filename)
]
if len(tags_temp) > 0:
walks_temp.update({run: tags_temp})
self.walks = walks_temp
return self.walks
def runs(self, update=True):
self.profile_runs(update=update)
for run, filenames in self.walks.items():
if run not in self.run_managers:
self.run_managers[run] = RunManager(run)
self.run_managers[run].set_all_filenames(filenames)
for filename in filenames:
if self.run_managers[run].has_handled(filename):
continue
self._read_data(run, filename)
return list(self.walks.keys())
def get_descriptions(self, lang):
if lang == 'zh':
return {
"overview_environment": TOOLTIP_DEVICE_INFO_CN, # noqa: F405
"overview_model_perspective":
TOOLTIP_MODEL_PERSPECTIVE_CN, # noqa: F405
"overview_model_perspective_perstep":
TOOLTIP_MODEL_PERSPECTIVE_PERSTEP_CN, # noqa: F405
"overview_event_type_perspective":
TOOLTIP_EVENT_TYPE_PERSPECTIVE_CN, # noqa: F405
"overview_event_type_model_perspective":
TOOLTIP_EVENT_TYPE_MODEL_PERSPECTIVE_CN, # noqa: F405
"distributed_histogram":
TOOLTIP_EVENT_DISTRIBUTED_HISTOGRAM_CN # noqa: F405
}
else:
return {
"overview_environment": TOOLTIP_DEVICE_INFO_EN, # noqa: F405
"overview_model_perspective":
TOOLTIP_MODEL_PERSPECTIVE_EN, # noqa: F405
"overview_model_perspective_perstep":
TOOLTIP_MODEL_PERSPECTIVE_PERSTEP_EN, # noqa: F405
"overview_event_type_perspective":
TOOLTIP_EVENT_TYPE_PERSPECTIVE_EN, # noqa: F405
"overview_event_type_model_perspective":
TOOLTIP_EVENT_TYPE_MODEL_PERSPECTIVE_EN, # noqa: F405
"distributed_histogram":
TOOLTIP_EVENT_DISTRIBUTED_HISTOGRAM_EN # noqa: F405
}
def set_displayname(self, log_reader):
self.displayname2runs = log_reader.name2tags
self.runs2displayname = log_reader.tags2name
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def _get_data_from_queue(self):
while True:
try:
run, filename, worker_name, profile_result = self.profile_result_queue.get(
)
self.run_managers[run].add_profile_result(
filename, worker_name, profile_result)
except Exception as e:
print('Read profiler data error in multiprocess, error: {}'.
format(e))
def _read_data(self, run, filename):
match = _name_pattern.match(filename)
if match:
worker_name = match.group(1)
if '.pb' in filename:
try:
from paddle.profiler import load_profiler_result
except Exception:
print(
'Load paddle.profiler error. Please check paddle >= 2.3.0'
)
exit(0)
profile_result = load_profiler_result(
os.path.join(run, filename))
self.run_managers[run].add_profile_result(
filename, worker_name, profile_result)
else:
def _load_profiler_json(run, filename, worker_name):
profile_result = load_profiler_json(
os.path.join(run, filename))
self.profile_result_queue.put((run, filename, worker_name,
profile_result))
Process(
target=_load_profiler_json,
args=(run, filename, worker_name)).start()
# Copyright (c) 2022 VisualDL Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =======================================================================
import json
from .profiler_reader import ProfilerReader
from visualdl.server.api import gen_result
from visualdl.server.api import result
class ProfilerApi(object):
def __init__(self, logdir):
self._reader = ProfilerReader(logdir)
@result()
def runs(self):
return self._reader.runs()
@result()
def views(self, run):
run_manager = self._reader.get_run_manager(run)
if run_manager is None:
return []
return list(run_manager.get_views())
@result()
def workers(self, run, view):
if view == 'Distributed':
return ['All']
run_manager = self._reader.get_run_manager(run)
return run_manager.get_workers(view)
@result()
def spans(self, run, worker):
run_manager = self._reader.get_run_manager(run)
if worker == 'All':
return run_manager.get_distributed_spans()
return run_manager.get_spans(worker)
@result()
def timeunits(self):
return ['ns', 'us', 'ms', 's']
@result()
def descriptions(self, lang):
if lang == 'undefined' or lang is None:
lang = 'zh'
lang = lang.lower()
return self._reader.get_descriptions(lang)
@result()
def overview_environment(self, run, worker, span):
run_manager = self._reader.get_run_manager(run)
span = str(span)
profiler_data = run_manager.get_profiler_data(worker, span)
result = profiler_data.get_device_infos()
num_workers = len(run_manager.get_workers('Overview'))
result['num_workers'] = num_workers
return result
@result()
def model_perspective(self, run, worker, span, time_unit='ms'):
run_manager = self._reader.get_run_manager(run)
profiler_data = run_manager.get_profiler_data(worker, span)
return profiler_data.get_model_perspective(time_unit)
@result()
def model_perspective_perstep(self,
run,
worker,
span,
device_type,
time_unit='ms'):
device_type = device_type.lower()
run_manager = self._reader.get_run_manager(run)
profiler_data = run_manager.get_profiler_data(worker, span)
return profiler_data.get_model_perspective_perstep(
device_type, time_unit)
@result()
def event_type_perspective(self,
run,
worker,
span,
device_type,
time_unit='ms'):
device_type = device_type.lower()
run_manager = self._reader.get_run_manager(run)
profiler_data = run_manager.get_profiler_data(worker, span)
return profiler_data.get_event_type_perspective(device_type, time_unit)
@result()
def event_type_model_perspective(self, run, worker, span, time_unit='ms'):
run_manager = self._reader.get_run_manager(run)
profiler_data = run_manager.get_profiler_data(worker, span)
return profiler_data.get_event_type_model_perspective(time_unit)
@result()
def userdefined_perspective(self, run, worker, span, time_unit='ms'):
run_manager = self._reader.get_run_manager(run)
profiler_data = run_manager.get_profiler_data(worker, span)
return profiler_data.get_userdefined_perspective(time_unit)
@result()
def operator_pie(self, run, worker, span, topk, time_unit='ms'):
run_manager = self._reader.get_run_manager(run)
profiler_data = run_manager.get_profiler_data(worker, span)
topk = int(topk)
return profiler_data.get_operator_pie(topk, time_unit)
@result()
def operator_pie_expand(self, run, worker, span, topk, device_type,
time_unit):
device_type = device_type.lower()
run_manager = self._reader.get_run_manager(run)
profiler_data = run_manager.get_profiler_data(worker, span)
topk = int(topk)
return profiler_data.get_operator_pie_expand(topk, device_type,
time_unit)
@result()
def operator_table(self,
run,
worker,
span,
group_by,
search_name,
time_unit='ms'):
run_manager = self._reader.get_run_manager(run)
profiler_data = run_manager.get_profiler_data(worker, span)
return profiler_data.get_operator_table(group_by, search_name,
time_unit)
@result()
def operator_stack_table(self,
run,
worker,
span,
op_name,
group_by,
input_shape,
time_unit='ms'):
pass
@result()
def kernel_pie(self, run, worker, span, topk, time_unit='ms'):
run_manager = self._reader.get_run_manager(run)
profiler_data = run_manager.get_profiler_data(worker, span)
topk = int(topk)
return profiler_data.get_kernel_pie(topk, time_unit)
@result()
def kernel_table(self,
run,
worker,
span,
group_by,
search_name,
time_unit='ms'):
run_manager = self._reader.get_run_manager(run)
profiler_data = run_manager.get_profiler_data(worker, span)
return profiler_data.get_kernel_table(group_by, search_name, time_unit)
@result()
def kernel_tc_pie(self, run, worker, span, topk, time_unit='ms'):
run_manager = self._reader.get_run_manager(run)
profiler_data = run_manager.get_profiler_data(worker, span)
topk = int(topk)
return profiler_data.get_kernel_tc_pie(topk, time_unit)
@result()
def distributed_info(self, run, worker, span):
run_manager = self._reader.get_run_manager(run)
distributed_profiler_data = run_manager.get_distributed_profiler_data(
span)
if distributed_profiler_data is None:
return
return distributed_profiler_data.get_distributed_info()
@result()
def distributed_steps(self, run, worker, span):
run_manager = self._reader.get_run_manager(run)
distributed_profiler_data = run_manager.get_distributed_profiler_data(
span)
return distributed_profiler_data.get_distributed_steps()
@result()
def distributed_histogram(self, run, worker, span, step, time_unit='ms'):
run_manager = self._reader.get_run_manager(run)
distributed_profiler_data = run_manager.get_distributed_profiler_data(
span)
return distributed_profiler_data.get_distributed_histogram(
step, time_unit)
@result(headers={'content-encoding': 'gzip'})
def trace(self, run, worker, span):
run_manager = self._reader.get_run_manager(run)
profiler_data = run_manager.get_profiler_data(worker, span)
return profiler_data.get_trace_data()
@result()
def memory_devices(self, run, worker, span):
run_manager = self._reader.get_run_manager(run)
profiler_data = run_manager.get_profiler_data(worker, span)
return profiler_data.get_memory_devices()
@result(headers={'content-encoding': 'gzip'})
def memory_curve(self, run, worker, span, device_type, time_unit='ms'):
if device_type == 'undefined':
return
run_manager = self._reader.get_run_manager(run)
profiler_data = run_manager.get_profiler_data(worker, span)
return profiler_data.get_memory_curve(device_type, time_unit)
@result(headers={'content-encoding': 'gzip'})
def memory_events(self,
run,
worker,
span,
device_type,
min_size=0,
max_size=float('inf'),
search_name=None,
time_unit='ms'):
if device_type == 'undefined':
return
try:
min_size = float(min_size)
except Exception:
min_size = 0
try:
max_size = float(max_size)
except Exception:
max_size = float('inf')
if search_name == 'undefined' or not search_name:
search_name = None
run_manager = self._reader.get_run_manager(run)
profiler_data = run_manager.get_profiler_data(worker, span)
return profiler_data.get_memory_events(device_type, min_size, max_size,
search_name, time_unit)
@result(headers={'content-encoding': 'gzip'})
def op_memory_events(self,
run,
worker,
span,
device_type,
search_name=None):
if search_name == 'undefined' or not search_name:
search_name = None
if device_type == 'undefined':
return
run_manager = self._reader.get_run_manager(run)
profiler_data = run_manager.get_profiler_data(worker, span)
return profiler_data.get_op_memory_events(device_type, search_name)
@result()
def comparison_phase(self, base_run, base_worker, base_span, exp_run,
exp_worker, exp_span):
pass
@result()
def comparison_phase_diff(self, base_run, base_worker, base_span, exp_run,
exp_worker, exp_span):
pass
@result()
def comparison_phase_table(self, base_run, base_worker, base_span, exp_run,
exp_worker, exp_span):
pass
@result()
def comparison_phase_inner(self, base_run, base_worker, base_span, exp_run,
exp_worker, exp_span, phase_name):
pass
@result()
def comparison_phase_diff_inner(self, base_run, base_worker, base_span,
exp_run, exp_worker, exp_span, phase_name):
pass
@result()
def comparison_phase_table_inner(self, base_run, base_worker, base_span,
exp_run, exp_worker, exp_span,
phase_name):
pass
def create_profiler_api_call(logdir):
api = ProfilerApi(logdir)
routes = {
'runs': (api.runs, []),
'views': (api.views, ["run"]),
'workers': (api.workers, ["run", "view"]),
'spans': (api.spans, ["run", "worker"]),
'timeunits': (api.timeunits, []),
'descriptions': (api.descriptions, ["lang"]),
'overview/environment': (api.overview_environment,
["run", "worker", "span"]),
'overview/model_perspective': (api.model_perspective,
["run", "worker", "span", "time_unit"]),
'overview/model_perspective_perstep': (api.model_perspective_perstep, [
"run", "worker", "span", "device_type", "time_unit"
]),
'overview/event_type_perspective': (api.event_type_perspective, [
"run", "worker", "span", "device_type", "time_unit"
]),
'overview/event_type_model_perspective':
(api.event_type_model_perspective,
["run", "worker", "span", "time_unit"]),
'overview/userdefined_perspective':
(api.userdefined_perspective, ["run", "worker", "span", "time_unit"]),
'operator/pie': (api.operator_pie,
["run", "worker", "span", "topk", "time_unit"]),
'operator/pie_expand': (api.operator_pie_expand, [
"run", "worker", "span", "topk", "device_type", "time_unit"
]),
'operator/table': (api.operator_table, [
"run", "worker", "span", "group_by", "search_name", "time_unit"
]),
'operator/stack_table': (api.operator_stack_table, [
"run", "worker", "span", "op_name", "group_by", "time_unit"
"input_shape"
]),
'kernel/pie': (api.kernel_pie,
["run", "worker", "span", "topk", "time_unit"]),
'kernel/tensorcore_pie':
(api.kernel_tc_pie, ["run", "worker", "span", "topk", "time_unit"]),
'kernel/table': (api.kernel_table, [
"run", "worker", "span", "group_by", "search_name", "time_unit"
]),
'distributed/info': (api.distributed_info, ["run", "worker", "span"]),
'distributed/steps': (api.distributed_steps, ["run", "worker",
"span"]),
'distributed/histogram': (api.distributed_histogram, [
"run", "worker", "span", "step", "time_unit"
]),
'trace': (api.trace, ["run", "worker", "span"]),
'memory/devices': (api.memory_devices, ["run", "worker", "span"]),
'memory/curve': (api.memory_curve,
["run", "worker", "span", "device_type",
"time_unit"]),
'memory/memory_events': (api.memory_events, [
"run", "worker", "span", "device_type", "min_size", "max_size",
"search_name", "time_unit"
]),
'memory/op_memory_events': (api.op_memory_events, [
"run", "worker", "span", "device_type", "search_name"
]),
'comparison/phase': (api.comparison_phase, [
"base_run", "base_worker", "base_span", "exp_run", "exp_worker",
"exp_span"
]),
'comparison/phase_diff': (api.comparison_phase_diff, [
"base_run", "base_worker", "base_span", "exp_run", "exp_worker",
"exp_span"
]),
'comparison/phase_table': (api.comparison_phase_table, [
"base_run", "base_worker", "base_span", "exp_run", "exp_worker",
"exp_span"
]),
'comparison/phase_inner': (api.comparison_phase_inner, [
"base_run", "base_worker", "base_span", "exp_run", "exp_worker",
"exp_span", "phase_name"
]),
'comparison/phase_diff_inner': (api.comparison_phase_diff_inner, [
"base_run", "base_worker", "base_span", "exp_run", "exp_worker",
"exp_span", "phase_name"
]),
'comparison/phase_table_inner': (api.comparison_phase_table_inner, [
"base_run", "base_worker", "base_span", "exp_run", "exp_worker",
"exp_span", "phase_name"
])
}
def call(path: str, args):
route = routes.get(path)
if not route:
return json.dumps(gen_result(
status=1, msg='api not found')), 'application/json', None
method, call_arg_names = route
call_args = [args.get(name) for name in call_arg_names]
return method(*call_args)
return call
# Copyright (c) 2022 VisualDL Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =======================================================================
from collections import defaultdict
from threading import Thread
from .profiler_data import DistributedProfilerData
from .profiler_data import ProfilerData
class RunManager:
'''
Manage profile data for each run, each run may have multiple workers and spans.
We should manage profile data of each (worker, span) unit.
Besides, a special worker "all" is created to merge all profile data for distributed view.
'''
def __init__(self, run):
self.run = run
# worker:
# span:
# ProfileData
self.profiler_data = defaultdict(dict)
self.all_filenames = set()
self.handled_filenames = set()
# span:
# DistributedProfileData
self.distributed_data = {}
self.threads = {}
self.has_join = False
def get_profiler_data(self, worker, span):
if worker in self.profiler_data:
if span in self.profiler_data[worker]:
return self.profiler_data[worker][span]
def get_distributed_profiler_data(self, span):
if span in self.distributed_data:
return self.distributed_data[span]
def get_views(self):
'''
Return all views supported in current run data.
'''
all_views = set()
for worker, span_data in self.profiler_data.items():
for span, profiler_data in span_data.items():
all_views.update(profiler_data.get_views())
ordered_views = [
'Overview', 'Operator', 'GPU Kernel', 'Distributed', 'Trace',
'Memory'
]
final_views = []
for view in ordered_views:
if view in all_views:
final_views.append(view)
return final_views
def get_workers(self, view_name):
'''
Return all workers(processes) in current run data.
'''
workers = []
for worker, span_data in self.profiler_data.items():
for span, profiler_data in span_data.items():
if view_name in profiler_data.get_views():
workers.append(worker)
break
return workers
def get_spans(self, worker_name):
'''
Return all spans in current run data.
spans: Collecting profile data when training your model can be divided into several parts supported by\
paddle.profiler api, for example, you may profile steps 2-4, 6-8. Each range is called a span here. \
And We index each span by orders.
'''
spans = list(self.profiler_data[worker_name].keys())
spans = sorted([int(span) for span in spans])
spans = [str(span) for span in spans]
return spans
def get_distributed_spans(self):
spans = list(self.distributed_data.keys())
spans = sorted([int(span) for span in spans])
spans = [str(span) for span in spans]
return spans
def _parse_file(self, worker_name, result):
span = result.get_span_idx()
self.profiler_data[worker_name][span] = ProfilerData(
self.run, worker_name, span, result)
return
def join(self):
if self.has_join:
return
for thread in self.threads.values():
thread.join()
self.has_join = True
distributed_profiler_data = defaultdict(list)
for worker_name, span_data in self.profiler_data.items():
for span_idx, profiler_data in span_data.items():
distributed_profiler_data[span_idx].append(profiler_data)
for span_idx, profiler_datas in distributed_profiler_data.items():
self.distributed_data[span_idx] = DistributedProfilerData(
self.run, span_idx, profiler_datas)
def add_profile_result(self, filename, worker_name, profile_result):
thread = Thread(
target=self._parse_file, args=(worker_name, profile_result))
thread.start()
self.handled_filenames.add(filename)
self.threads[filename] = thread
def set_all_filenames(self, filenames):
self.all_filenames.update(filenames)
def has_handled(self, filename):
if filename in self.handled_filenames:
return True
else:
return False
此差异已折叠。
此差异已折叠。
......@@ -13,4 +13,4 @@
# limitations under the License.
# =======================================================================
vdl_version = '2.3.0'
vdl_version = '2.4.0'
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册