提交 ac472045 编写于 作者: W wangshuide2020

1.remove redundant data to save memory and simplify the tensorcontainer.

2.kill children processes of worker before worker has been killed by gunicorn master.
上级 d4d952e0
...@@ -23,7 +23,7 @@ MindInsight为MindSpore提供了简单易用的调优调试能力。在训练过 ...@@ -23,7 +23,7 @@ MindInsight为MindSpore提供了简单易用的调优调试能力。在训练过
请从[MindSpore下载页面](https://www.mindspore.cn/versions)下载并安装whl包。 请从[MindSpore下载页面](https://www.mindspore.cn/versions)下载并安装whl包。
``` ```
pip install mindinsight-{version}-cp37-cp37m-linux_{arch}.whl pip install -U mindinsight-{version}-cp37-cp37m-linux_{arch}.whl
``` ```
更多MindInsight的安装方法,请点击[安装教程](https://www.mindspore.cn/install/)中的MindInsight章节进行查看。 更多MindInsight的安装方法,请点击[安装教程](https://www.mindspore.cn/install/)中的MindInsight章节进行查看。
......
...@@ -15,9 +15,13 @@ ...@@ -15,9 +15,13 @@
"""Config file for gunicorn.""" """Config file for gunicorn."""
import os import os
import multiprocessing
import signal
import threading import threading
import time
from importlib import import_module from importlib import import_module
import psutil
import gunicorn import gunicorn
...@@ -43,3 +47,44 @@ def on_starting(server): ...@@ -43,3 +47,44 @@ def on_starting(server):
hook_module = import_module('mindinsight.utils.hook') hook_module = import_module('mindinsight.utils.hook')
for hook in hook_module.HookUtils.instance().hooks(): for hook in hook_module.HookUtils.instance().hooks():
threading.Thread(target=hook.on_startup, args=(server.log,)).start() threading.Thread(target=hook.on_startup, args=(server.log,)).start()
def post_fork(server, worker):
"""
Launch a process to listen worker after gunicorn fork worker.
Children processes of gunicorn worker should be killed when worker has been killed
because gunicorn master murders this worker for some reasons such as worker timeout.
Args:
server (Arbiter): gunicorn server instance.
worker (ThreadWorker): worker instance.
"""
def murder_worker_children_processes():
processes_to_kill = []
# sleep 3 seconds so that all worker children processes have been launched.
time.sleep(3)
process = psutil.Process(worker.pid)
for child in process.children(recursive=True):
if child.pid != os.getpid():
processes_to_kill.append(child)
while True:
if os.getppid() != worker.pid:
current_worker_pid = os.getppid()
for proc in processes_to_kill:
server.log.info("Original worker pid: %d, current worker pid: %d, stop process %d",
worker.pid, current_worker_pid, proc.pid)
try:
proc.send_signal(signal.SIGKILL)
except psutil.NoSuchProcess:
continue
except psutil.Error as ex:
server.log.error("Stop process %d failed. Detail: %s.", proc.pid, str(ex))
server.log.info("%d processes have been killed.", len(processes_to_kill))
break
time.sleep(1)
listen_process = multiprocessing.Process(target=murder_worker_children_processes,
name="murder_worker_children_processes")
listen_process.start()
server.log.info("Server pid: %d, start to listening.", server.pid)
...@@ -209,7 +209,7 @@ class GunicornLogger(Logger): ...@@ -209,7 +209,7 @@ class GunicornLogger(Logger):
super(GunicornLogger, self).__init__(cfg) super(GunicornLogger, self).__init__(cfg)
def now(self): def now(self):
"""return the log format""" """Get log format."""
return time.strftime('[%Y-%m-%d-%H:%M:%S %z]') return time.strftime('[%Y-%m-%d-%H:%M:%S %z]')
def setup(self, cfg): def setup(self, cfg):
......
...@@ -188,6 +188,15 @@ class ResponseDataExceedMaxValueError(MindInsightException): ...@@ -188,6 +188,15 @@ class ResponseDataExceedMaxValueError(MindInsightException):
http_code=400) http_code=400)
class DataTypeError(MindInsightException):
"""Data_type does not support."""
def __init__(self, error_detail):
error_msg = f'Data type does not support. Detail: {error_detail}'
super(DataTypeError, self).__init__(DataVisualErrors.DATA_TYPE_NOT_SUPPORT,
error_msg,
http_code=400)
class TrainJobDetailNotInCacheError(MindInsightException): class TrainJobDetailNotInCacheError(MindInsightException):
"""Detail info of given train job is not in cache.""" """Detail info of given train job is not in cache."""
def __init__(self, error_detail="no detail provided."): def __init__(self, error_detail="no detail provided."):
......
...@@ -296,6 +296,7 @@ class _SummaryParser(_Parser): ...@@ -296,6 +296,7 @@ class _SummaryParser(_Parser):
self._load_single_file(self._summary_file_handler, executor) self._load_single_file(self._summary_file_handler, executor)
# Wait for data in this file to be processed to avoid loading multiple files at the same time. # Wait for data in this file to be processed to avoid loading multiple files at the same time.
executor.wait_all_tasks_finish() executor.wait_all_tasks_finish()
logger.info("Parse summary file finished, file path: %s.", file_path)
except UnknownError as ex: except UnknownError as ex:
logger.warning("Parse summary file failed, detail: %r," logger.warning("Parse summary file failed, detail: %r,"
"file path: %s.", str(ex), file_path) "file path: %s.", str(ex), file_path)
...@@ -383,7 +384,7 @@ class _SummaryParser(_Parser): ...@@ -383,7 +384,7 @@ class _SummaryParser(_Parser):
# read the header # read the header
header_str = file_handler.read(HEADER_SIZE) header_str = file_handler.read(HEADER_SIZE)
if not header_str: if not header_str:
logger.info("End of file, file_path=%s.", file_handler.file_path) logger.info("Load summary file finished, file_path=%s.", file_handler.file_path)
return None return None
header_crc_str = file_handler.read(CRC_STR_SIZE) header_crc_str = file_handler.read(CRC_STR_SIZE)
if not header_crc_str: if not header_crc_str:
...@@ -441,12 +442,9 @@ class _SummaryParser(_Parser): ...@@ -441,12 +442,9 @@ class _SummaryParser(_Parser):
elif plugin == PluginNameEnum.TENSOR.value: elif plugin == PluginNameEnum.TENSOR.value:
tensor_event_value = TensorContainer(tensor_event_value) tensor_event_value = TensorContainer(tensor_event_value)
tensor_count = 1 if tensor_event_value.size > MAX_TENSOR_COUNT:
for d in tensor_event_value.dims:
tensor_count *= d
if tensor_count > MAX_TENSOR_COUNT:
logger.warning('tag: %s/tensor, dims: %s, tensor count: %d exceeds %d and drop it.', logger.warning('tag: %s/tensor, dims: %s, tensor count: %d exceeds %d and drop it.',
value.tag, tensor_event_value.dims, tensor_count, MAX_TENSOR_COUNT) value.tag, tensor_event_value.dims, tensor_event_value.size, MAX_TENSOR_COUNT)
return None return None
elif plugin == PluginNameEnum.IMAGE.value: elif plugin == PluginNameEnum.IMAGE.value:
......
...@@ -13,12 +13,12 @@ ...@@ -13,12 +13,12 @@
# limitations under the License. # limitations under the License.
# ============================================================================ # ============================================================================
"""Tensor data container.""" """Tensor data container."""
import threading
import numpy as np import numpy as np
from mindinsight.datavisual.common.exceptions import DataTypeError
from mindinsight.datavisual.common.log import logger from mindinsight.datavisual.common.log import logger
from mindinsight.datavisual.data_transform.histogram import Histogram, Bucket from mindinsight.datavisual.data_transform.histogram import Histogram, Bucket
from mindinsight.datavisual.proto_files import mindinsight_anf_ir_pb2 as anf_ir_pb2
from mindinsight.datavisual.utils.utils import calc_histogram_bins from mindinsight.datavisual.utils.utils import calc_histogram_bins
from mindinsight.utils.exceptions import ParamValueError from mindinsight.utils.exceptions import ParamValueError
...@@ -139,19 +139,6 @@ def get_statistics_from_tensor(tensors): ...@@ -139,19 +139,6 @@ def get_statistics_from_tensor(tensors):
return statistics return statistics
def _get_data_from_tensor(tensor):
"""
Get data from tensor and convert to tuple.
Args:
tensor (TensorProto): Tensor proto data.
Returns:
tuple, the item of tensor value.
"""
return tuple(tensor.float_data)
def calc_original_buckets(np_value, stats): def calc_original_buckets(np_value, stats):
""" """
Calculate buckets from tensor data. Calculate buckets from tensor data.
...@@ -199,19 +186,24 @@ class TensorContainer: ...@@ -199,19 +186,24 @@ class TensorContainer:
""" """
def __init__(self, tensor_message): def __init__(self, tensor_message):
self._lock = threading.Lock
# Original dims can not be pickled to transfer to other process, so tuple is used. # Original dims can not be pickled to transfer to other process, so tuple is used.
self._dims = tuple(tensor_message.dims) self._dims = tuple(tensor_message.dims)
self._data_type = tensor_message.data_type self._data_type = tensor_message.data_type
self._np_array = None self._np_array = self.get_ndarray(tensor_message.float_data)
self._data = _get_data_from_tensor(tensor_message) self._stats = get_statistics_from_tensor(self._np_array)
self._stats = get_statistics_from_tensor(self.get_or_calc_ndarray()) original_buckets = calc_original_buckets(self._np_array, self._stats)
original_buckets = calc_original_buckets(self.get_or_calc_ndarray(), self._stats)
self._count = sum(bucket.count for bucket in original_buckets) self._count = sum(bucket.count for bucket in original_buckets)
self._max = self._stats.max # convert the type of max and min value to np.float64 so that it cannot overflow
self._min = self._stats.min # when calculating width of histogram.
self._max = np.float64(self._stats.max)
self._min = np.float64(self._stats.min)
self._histogram = Histogram(tuple(original_buckets), self._max, self._min, self._count) self._histogram = Histogram(tuple(original_buckets), self._max, self._min, self._count)
@property
def size(self):
"""Get size of tensor."""
return self._np_array.size
@property @property
def dims(self): def dims(self):
"""Get dims of tensor.""" """Get dims of tensor."""
...@@ -222,6 +214,11 @@ class TensorContainer: ...@@ -222,6 +214,11 @@ class TensorContainer:
"""Get data type of tensor.""" """Get data type of tensor."""
return self._data_type return self._data_type
@property
def ndarray(self):
"""Get ndarray of tensor."""
return self._np_array
@property @property
def max(self): def max(self):
"""Get max value of tensor.""" """Get max value of tensor."""
...@@ -251,19 +248,24 @@ class TensorContainer: ...@@ -251,19 +248,24 @@ class TensorContainer:
"""Get histogram buckets.""" """Get histogram buckets."""
return self._histogram.buckets() return self._histogram.buckets()
def get_or_calc_ndarray(self): def get_ndarray(self, tensor):
"""Get or calculate ndarray.""" """
with self._lock(): Get ndarray of tensor.
if self._np_array is None:
self._convert_to_numpy_array() Args:
return self._np_array tensor (float16|float32|float64): tensor data.
def _convert_to_numpy_array(self): Returns:
"""Convert a list data to numpy array.""" numpy.ndarray, ndarray of tensor.
try:
ndarray = np.array(self._data).reshape(self._dims) Raises:
except ValueError as ex: DataTypeError, If data type of tensor is not among float16 or float32 or float64.
logger.error("Reshape array fail, detail: %r", str(ex)) """
return data_type_str = anf_ir_pb2.DataType.Name(self.data_type)
if data_type_str == 'DT_FLOAT16':
self._np_array = ndarray return np.array(tuple(tensor), dtype=np.float16).reshape(self.dims)
if data_type_str == 'DT_FLOAT32':
return np.array(tuple(tensor), dtype=np.float32).reshape(self.dims)
if data_type_str == 'DT_FLOAT64':
return np.array(tuple(tensor), dtype=np.float64).reshape(self.dims)
raise DataTypeError("Data type: {}.".format(data_type_str))
...@@ -99,9 +99,9 @@ def get_statistics_dict(stats): ...@@ -99,9 +99,9 @@ def get_statistics_dict(stats):
dict, a dict including 'max', 'min', 'avg', 'count', 'nan_count', 'neg_inf_count', 'pos_inf_count'. dict, a dict including 'max', 'min', 'avg', 'count', 'nan_count', 'neg_inf_count', 'pos_inf_count'.
""" """
statistics = { statistics = {
"max": stats.max, "max": float(stats.max),
"min": stats.min, "min": float(stats.min),
"avg": stats.avg, "avg": float(stats.avg),
"count": stats.count, "count": stats.count,
"nan_count": stats.nan_count, "nan_count": stats.nan_count,
"neg_inf_count": stats.neg_inf_count, "neg_inf_count": stats.neg_inf_count,
...@@ -302,8 +302,7 @@ class TensorProcessor(BaseProcessor): ...@@ -302,8 +302,7 @@ class TensorProcessor(BaseProcessor):
if step != tensor.step: if step != tensor.step:
continue continue
step_in_cache = True step_in_cache = True
ndarray = value.get_or_calc_ndarray() res_data = get_specific_dims_data(value.ndarray, dims, list(value.dims))
res_data = get_specific_dims_data(ndarray, dims, list(value.dims))
flatten_data = res_data.flatten().tolist() flatten_data = res_data.flatten().tolist()
if len(flatten_data) > MAX_TENSOR_RESPONSE_DATA_SIZE: if len(flatten_data) > MAX_TENSOR_RESPONSE_DATA_SIZE:
raise ResponseDataExceedMaxValueError("the size of response data: {} exceed max value: {}." raise ResponseDataExceedMaxValueError("the size of response data: {} exceed max value: {}."
...@@ -326,7 +325,7 @@ class TensorProcessor(BaseProcessor): ...@@ -326,7 +325,7 @@ class TensorProcessor(BaseProcessor):
elif np.isposinf(data): elif np.isposinf(data):
transfer_data[index] = 'INF' transfer_data[index] = 'INF'
else: else:
transfer_data[index] = data transfer_data[index] = float(data)
return transfer_data return transfer_data
stats = get_statistics_from_tensor(res_data) stats = get_statistics_from_tensor(res_data)
......
...@@ -77,6 +77,7 @@ class DataVisualErrors(Enum): ...@@ -77,6 +77,7 @@ class DataVisualErrors(Enum):
TENSOR_NOT_EXIST = 18 TENSOR_NOT_EXIST = 18
MAX_RESPONSE_DATA_EXCEEDED_ERROR = 19 MAX_RESPONSE_DATA_EXCEEDED_ERROR = 19
STEP_TENSOR_DATA_NOT_IN_CACHE = 20 STEP_TENSOR_DATA_NOT_IN_CACHE = 20
DATA_TYPE_NOT_SUPPORT = 21
class ScriptConverterErrors(Enum): class ScriptConverterErrors(Enum):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册