提交 ca56b3b8 编写于 作者: L luopengting

mainly to new a thread to load detail info

1. New a thread to load detail info. Loading detail info takes too much time,
so the summary list and lineage can not be loaded timely.
2. Add a status for DetailCacheManager to indicate it is INIT, LOADING or DONE.
3. Update UT/ST.
上级 46099e82
......@@ -32,6 +32,13 @@ class DataManagerStatus(BaseEnum):
INVALID = 'INVALID'
class DetailCacheManagerStatus(BaseEnum):
"""Data manager status."""
INIT = 'INIT'
LOADING = 'LOADING'
DONE = 'DONE'
class PluginNameEnum(BaseEnum):
"""Plugin Name Enum."""
IMAGE = 'image'
......
......@@ -35,7 +35,7 @@ from mindinsight.conf import settings
from mindinsight.datavisual.common import exceptions
from mindinsight.datavisual.common.enums import CacheStatus
from mindinsight.datavisual.common.log import logger
from mindinsight.datavisual.common.enums import DataManagerStatus
from mindinsight.datavisual.common.enums import DataManagerStatus, DetailCacheManagerStatus
from mindinsight.datavisual.common.enums import PluginNameEnum
from mindinsight.datavisual.common.exceptions import TrainJobNotExistError
from mindinsight.datavisual.data_transform.loader_generators.loader_generator import MAX_DATA_LOADER_SIZE
......@@ -44,6 +44,7 @@ from mindinsight.utils.computing_resource_mgr import ComputingResourceManager
from mindinsight.utils.exceptions import MindInsightException
from mindinsight.utils.exceptions import ParamValueError
from mindinsight.utils.exceptions import UnknownError
from mindinsight.datavisual.utils.tools import exception_wrapper
class _BasicTrainJob:
......@@ -415,6 +416,13 @@ class _DetailCacheManager(_BaseCacheManager):
self._loader_pool_mutex = threading.Lock()
self._max_threads_count = 30
self._loader_generators = loader_generators
self._status = DetailCacheManagerStatus.INIT.value
self._loading_mutex = threading.Lock()
@property
def status(self):
"""Get loading status, if it is loading, return True."""
return self._status
def has_content(self):
"""Whether this cache manager has train jobs."""
......@@ -435,6 +443,20 @@ class _DetailCacheManager(_BaseCacheManager):
"""Get loader pool size."""
return len(self._loader_pool)
def _load_in_cache(self):
"""Generate and execute loaders."""
def load():
self._generate_loaders()
self._execute_load_data()
try:
exception_wrapper(load())
except UnknownError as ex:
logger.warning("Load event data failed. Detail: %s.", str(ex))
finally:
self._status = DetailCacheManagerStatus.DONE.value
logger.info("Load event data end, status: %r, and loader pool size is %r.",
self._status, self.loader_pool_size())
def update_cache(self, disk_train_jobs: Iterable[_BasicTrainJob]):
"""
Update cache.
......@@ -445,8 +467,13 @@ class _DetailCacheManager(_BaseCacheManager):
disk_train_jobs (Iterable[_BasicTrainJob]): Basic info about train jobs on disk.
"""
self._generate_loaders()
self._execute_load_data()
with self._loading_mutex:
if self._status == DetailCacheManagerStatus.LOADING.value:
logger.debug("Event data is loading, and loader pool size is %r.", self.loader_pool_size())
return
self._status = DetailCacheManagerStatus.LOADING.value
thread = threading.Thread(target=self._load_in_cache, name="load_detail_in_cache")
thread.start()
def cache_train_job(self, train_id):
"""Cache given train job."""
......@@ -711,8 +738,7 @@ class _DetailCacheManager(_BaseCacheManager):
loader = self._get_loader(train_id)
if loader is None:
logger.warning("No valid summary log in train job %s, "
"or it is not in the cache.", train_id)
logger.info("No valid summary log in train job %s, or it is not in the cache.", train_id)
return None
train_job = loader.to_dict()
......@@ -897,19 +923,11 @@ class DataManager:
"""Wrapper for load data in thread."""
try:
with self._load_data_lock:
self._load_data_in_thread()
except MindInsightException as exc:
exception_wrapper(self._load_data())
except UnknownError as exc:
# Not raising the exception here to ensure that data reloading does not crash.
logger.warning(exc.message)
def _load_data_in_thread(self):
"""Log (but not swallow) exceptions in thread to help debugging."""
try:
self._load_data()
except Exception as exc:
logger.exception(exc)
raise UnknownError('Load data thread error.')
def _load_data(self):
"""This function will load data once and ignore it if the status is loading."""
logger.info("Start to load data, reload interval: %r.", self._reload_interval)
......@@ -939,13 +957,13 @@ class DataManager:
self._brief_cache.update_cache(basic_train_jobs)
self._detail_cache.update_cache(basic_train_jobs)
if not self._brief_cache.has_content() and not self._detail_cache.has_content():
if not self._brief_cache.has_content() and not self._detail_cache.has_content() \
and self._detail_cache.status == DetailCacheManagerStatus.DONE.value:
self.status = DataManagerStatus.INVALID.value
else:
self.status = DataManagerStatus.DONE.value
logger.info("Load event data end, status: %r, and loader pool size is %r.",
self.status, self._detail_cache.loader_pool_size())
logger.info("Load brief data end, and loader pool size is %r.", self._detail_cache.loader_pool_size())
@staticmethod
def check_reload_interval(reload_interval):
......@@ -1046,14 +1064,6 @@ class DataManager:
return TrainJob(brief_train_job, detail_train_job)
def list_train_jobs(self):
"""
List train jobs.
To be implemented.
"""
raise NotImplementedError()
@property
def status(self):
"""
......@@ -1088,5 +1098,9 @@ class DataManager:
"""Get brief train job."""
return self._brief_cache.get_train_job(train_id)
def get_detail_cache_status(self):
"""Get detail status, just for ut/st."""
return self._detail_cache.status
DATA_MANAGER = DataManager(settings.SUMMARY_BASE_DIR)
......@@ -21,7 +21,9 @@ from numbers import Number
from urllib.parse import unquote
from mindinsight.datavisual.common.exceptions import MaxCountExceededError
from mindinsight.datavisual.common.log import logger
from mindinsight.utils import exceptions
from mindinsight.utils.exceptions import UnknownError
_IMG_EXT_TO_MIMETYPE = {
'bmp': 'image/bmp',
......@@ -216,6 +218,16 @@ def if_nan_inf_to_none(name, value):
return value
def exception_wrapper(func):
def wrapper(*args, **kwargs):
try:
func(*args, **kwargs)
except Exception as exc:
logger.exception(exc)
raise UnknownError(str(exc))
return wrapper
class Counter:
"""Count accumulator with limit checking."""
......
......@@ -15,28 +15,15 @@
"""
Description: This file is used for some common util.
"""
from unittest.mock import Mock
import pytest
from flask import Response
from mindinsight.backend import datavisual
from mindinsight.datavisual.utils import tools
from mindinsight.backend.application import APP
@pytest.fixture
def client():
"""This fixture is flask client."""
mock_data_manager = Mock()
mock_data_manager.start_load_data = Mock()
datavisual.DATA_MANAGER = mock_data_manager
packages = ["mindinsight.backend.data_visual"]
mock_obj = Mock(return_value=packages)
tools.find_app_package = mock_obj
from mindinsight.backend.application import APP
APP.response_class = Response
app_client = APP.test_client()
......
......@@ -22,12 +22,10 @@ from unittest.mock import patch
from werkzeug.exceptions import MethodNotAllowed, NotFound
from mindinsight.datavisual.processors import scalars_processor
from mindinsight.datavisual.processors.scalars_processor import ScalarsProcessor
from ....utils.tools import get_url
from ...backend.datavisual.conftest import TRAIN_ROUTES
from ..mock import MockLogger
class TestErrorHandler:
......@@ -36,7 +34,6 @@ class TestErrorHandler:
@patch.object(ScalarsProcessor, 'get_metadata_list')
def test_handle_http_exception_error_not_found(self, mock_scalar_processor, client):
"""Test handle http exception error not found."""
scalars_processor.logger = MockLogger
text = 'Test Message'
# NotFound
......@@ -59,7 +56,6 @@ class TestErrorHandler:
@patch.object(ScalarsProcessor, 'get_metadata_list')
def test_handle_http_exception_error_method_not_allowed(self, mock_scalar_processor, client):
"""Test handling http exception error method not allowed."""
scalars_processor.logger = MockLogger
text = 'Test Message'
# MethodNotAllowed
......@@ -82,7 +78,6 @@ class TestErrorHandler:
@patch.object(ScalarsProcessor, 'get_metadata_list')
def test_handle_http_exception_error_method_other_errors(self, mock_scalar_processor, client):
"""Test handling http exception error method other errors."""
scalars_processor.logger = MockLogger
text = 'Test Message'
# Other errors
......
# Copyright 2019 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
# Copyright 2019 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""
Description: This file is used for some common util.
"""
from unittest.mock import Mock
import pytest
from flask import Response
from mindinsight.backend import datavisual
from mindinsight.datavisual import utils
@pytest.fixture
def client():
"""This fixture is flask client."""
mock_data_manager = Mock()
mock_data_manager.start_load_data = Mock()
datavisual.DATA_MANAGER = mock_data_manager
packages = ["mindinsight.backend.raw_dataset",
"mindinsight.backend.train_dataset",
"mindinsight.backend.data_visual"]
mock_obj = Mock(return_value=packages)
utils.find_app_package = mock_obj
from mindinsight.backend.application import APP
APP.response_class = Response
app_client = APP.test_client()
yield app_client
......@@ -81,8 +81,9 @@ class TestDataManager:
def test_start_load_data_success(self):
"""Test start_load_data method success."""
summary_base_dir = tempfile.mkdtemp()
dir_num = 3
train_ids = []
for i in range(3):
for i in range(dir_num):
log_path = os.path.join(summary_base_dir, f'dir{i}')
self._make_path_and_file_list(log_path)
train_ids.append(f'./dir{i}')
......@@ -215,7 +216,7 @@ class TestDataManager:
expected_loader_ids = expected_loader_ids[-MAX_DATA_LOADER_SIZE:]
# Make sure to finish loading, make it init.
mock_data_manager._status = DataManagerStatus.INIT
mock_data_manager._detail_cache._status = DataManagerStatus.INIT.value
mock_generate_loaders.return_value = loader_dict
mock_data_manager.start_load_data(reload_interval=0)
check_loading_done(mock_data_manager)
......
......@@ -26,7 +26,7 @@ from urllib.parse import urlencode
import numpy as np
from PIL import Image
from mindinsight.datavisual.common.enums import DataManagerStatus
from mindinsight.datavisual.common.enums import DetailCacheManagerStatus
def get_url(url, params):
......@@ -59,13 +59,13 @@ def check_loading_done(data_manager, time_limit=15, first_sleep_time=0):
if first_sleep_time > 0:
time.sleep(first_sleep_time)
start_time = time.time()
while data_manager.status not in (DataManagerStatus.DONE.value, DataManagerStatus.INVALID.value):
while data_manager.get_detail_cache_status() != DetailCacheManagerStatus.DONE.value:
time_used = time.time() - start_time
if time_used > time_limit:
break
time.sleep(0.1)
continue
return bool(data_manager.status == DataManagerStatus.DONE.value)
return bool(data_manager.get_detail_cache_status == DetailCacheManagerStatus.DONE.value)
def get_image_tensor_from_bytes(image_string):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册