未验证 提交 47d7e276 编写于 作者: K Kaipeng Deng 提交者: GitHub

enable shared memory (#2356)

* enable shared memory
上级 4f6ffb40
...@@ -74,3 +74,6 @@ dataset/wider_face/WIDER_test ...@@ -74,3 +74,6 @@ dataset/wider_face/WIDER_test
dataset/wider_face/WIDER_train dataset/wider_face/WIDER_train
dataset/wider_face/WIDER_val dataset/wider_face/WIDER_val
dataset/wider_face/wider_face_split dataset/wider_face/wider_face_split
# distribute launch log
log*
...@@ -21,7 +21,7 @@ TrainReader: ...@@ -21,7 +21,7 @@ TrainReader:
shuffle: true shuffle: true
drop_last: true drop_last: true
mixup_epoch: 25000 mixup_epoch: 25000
use_shared_memory: true
EvalReader: EvalReader:
sample_transforms: sample_transforms:
......
...@@ -12,6 +12,7 @@ TrainReader: ...@@ -12,6 +12,7 @@ TrainReader:
batch_size: 12 batch_size: 12
shuffle: true shuffle: true
drop_last: true drop_last: true
use_shared_memory: true
EvalReader: EvalReader:
sample_transforms: sample_transforms:
......
...@@ -21,7 +21,7 @@ TrainReader: ...@@ -21,7 +21,7 @@ TrainReader:
shuffle: true shuffle: true
drop_last: true drop_last: true
mixup_epoch: 250 mixup_epoch: 250
use_shared_memory: true
EvalReader: EvalReader:
inputs_def: inputs_def:
......
...@@ -29,6 +29,7 @@ from paddle.io import DistributedBatchSampler ...@@ -29,6 +29,7 @@ from paddle.io import DistributedBatchSampler
from ppdet.core.workspace import register, serializable, create from ppdet.core.workspace import register, serializable, create
from . import transform from . import transform
from .shm_utils import _get_shared_memory_size_in_M
from ppdet.utils.logger import setup_logger from ppdet.utils.logger import setup_logger
logger = setup_logger('reader') logger = setup_logger('reader')
...@@ -111,8 +112,32 @@ class BatchCompose(Compose): ...@@ -111,8 +112,32 @@ class BatchCompose(Compose):
class BaseDataLoader(object): class BaseDataLoader(object):
"""
Base DataLoader implementation for detection models
Args:
sample_transforms (list): a list of transforms to perform
on each sample
batch_transforms (list): a list of transforms to perform
on batch
batch_size (int): batch size for batch collating, default 1.
shuffle (bool): whether to shuffle samples
drop_last (bool): whether to drop the last incomplete,
default False
drop_empty (bool): whether to drop samples with no ground
truth labels, default True
num_classes (int): class number of dataset, default 80
use_shared_memory (bool): whether to use shared memory to
accelerate data loading, enable this only if you
are sure that the shared memory size of your OS
is larger than memory cost of input datas of model.
Note that shared memory will be automatically
disabled if the shared memory of OS is less than
1G, which is not enough for detection models.
Default False.
"""
def __init__(self, def __init__(self,
inputs_def=None,
sample_transforms=[], sample_transforms=[],
batch_transforms=[], batch_transforms=[],
batch_size=1, batch_size=1,
...@@ -120,6 +145,7 @@ class BaseDataLoader(object): ...@@ -120,6 +145,7 @@ class BaseDataLoader(object):
drop_last=False, drop_last=False,
drop_empty=True, drop_empty=True,
num_classes=80, num_classes=80,
use_shared_memory=False,
**kwargs): **kwargs):
# sample transform # sample transform
self._sample_transforms = Compose( self._sample_transforms = Compose(
...@@ -131,14 +157,14 @@ class BaseDataLoader(object): ...@@ -131,14 +157,14 @@ class BaseDataLoader(object):
self.batch_size = batch_size self.batch_size = batch_size
self.shuffle = shuffle self.shuffle = shuffle
self.drop_last = drop_last self.drop_last = drop_last
self.use_shared_memory = use_shared_memory
self.kwargs = kwargs self.kwargs = kwargs
def __call__(self, def __call__(self,
dataset, dataset,
worker_num, worker_num,
batch_sampler=None, batch_sampler=None,
return_list=False, return_list=False):
use_prefetch=True):
self.dataset = dataset self.dataset = dataset
self.dataset.parse_dataset() self.dataset.parse_dataset()
# get data # get data
...@@ -155,14 +181,22 @@ class BaseDataLoader(object): ...@@ -155,14 +181,22 @@ class BaseDataLoader(object):
else: else:
self._batch_sampler = batch_sampler self._batch_sampler = batch_sampler
use_shared_memory = self.use_shared_memory
# check whether shared memory size is bigger than 1G(1024M)
if use_shared_memory:
shm_size = _get_shared_memory_size_in_M()
if shm_size is not None and shm_size < 1024.:
logger.warn("Shared memory size is less than 1G, "
"disable shared_memory in DataLoader")
use_shared_memory = False
self.dataloader = DataLoader( self.dataloader = DataLoader(
dataset=self.dataset, dataset=self.dataset,
batch_sampler=self._batch_sampler, batch_sampler=self._batch_sampler,
collate_fn=self._batch_transforms, collate_fn=self._batch_transforms,
num_workers=worker_num, num_workers=worker_num,
return_list=return_list, return_list=return_list,
use_buffer_reader=use_prefetch, use_shared_memory=use_shared_memory)
use_shared_memory=False)
self.loader = iter(self.dataloader) self.loader = iter(self.dataloader)
return self return self
...@@ -197,7 +231,6 @@ class TrainReader(BaseDataLoader): ...@@ -197,7 +231,6 @@ class TrainReader(BaseDataLoader):
__shared__ = ['num_classes'] __shared__ = ['num_classes']
def __init__(self, def __init__(self,
inputs_def=None,
sample_transforms=[], sample_transforms=[],
batch_transforms=[], batch_transforms=[],
batch_size=1, batch_size=1,
...@@ -206,9 +239,9 @@ class TrainReader(BaseDataLoader): ...@@ -206,9 +239,9 @@ class TrainReader(BaseDataLoader):
drop_empty=True, drop_empty=True,
num_classes=80, num_classes=80,
**kwargs): **kwargs):
super(TrainReader, self).__init__( super(TrainReader, self).__init__(sample_transforms, batch_transforms,
inputs_def, sample_transforms, batch_transforms, batch_size, batch_size, shuffle, drop_last,
shuffle, drop_last, drop_empty, num_classes, **kwargs) drop_empty, num_classes, **kwargs)
@register @register
...@@ -216,7 +249,6 @@ class EvalReader(BaseDataLoader): ...@@ -216,7 +249,6 @@ class EvalReader(BaseDataLoader):
__shared__ = ['num_classes'] __shared__ = ['num_classes']
def __init__(self, def __init__(self,
inputs_def=None,
sample_transforms=[], sample_transforms=[],
batch_transforms=[], batch_transforms=[],
batch_size=1, batch_size=1,
...@@ -225,9 +257,9 @@ class EvalReader(BaseDataLoader): ...@@ -225,9 +257,9 @@ class EvalReader(BaseDataLoader):
drop_empty=True, drop_empty=True,
num_classes=80, num_classes=80,
**kwargs): **kwargs):
super(EvalReader, self).__init__( super(EvalReader, self).__init__(sample_transforms, batch_transforms,
inputs_def, sample_transforms, batch_transforms, batch_size, batch_size, shuffle, drop_last,
shuffle, drop_last, drop_empty, num_classes, **kwargs) drop_empty, num_classes, **kwargs)
@register @register
...@@ -235,7 +267,6 @@ class TestReader(BaseDataLoader): ...@@ -235,7 +267,6 @@ class TestReader(BaseDataLoader):
__shared__ = ['num_classes'] __shared__ = ['num_classes']
def __init__(self, def __init__(self,
inputs_def=None,
sample_transforms=[], sample_transforms=[],
batch_transforms=[], batch_transforms=[],
batch_size=1, batch_size=1,
...@@ -244,6 +275,6 @@ class TestReader(BaseDataLoader): ...@@ -244,6 +275,6 @@ class TestReader(BaseDataLoader):
drop_empty=True, drop_empty=True,
num_classes=80, num_classes=80,
**kwargs): **kwargs):
super(TestReader, self).__init__( super(TestReader, self).__init__(sample_transforms, batch_transforms,
inputs_def, sample_transforms, batch_transforms, batch_size, batch_size, shuffle, drop_last,
shuffle, drop_last, drop_empty, num_classes, **kwargs) drop_empty, num_classes, **kwargs)
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
SIZE_UNIT = ['K', 'M', 'G', 'T']
SHM_QUERY_CMD = 'df -h'
SHM_KEY = 'shm'
SHM_DEFAULT_MOUNT = '/dev/shm'
# [ shared memory size check ]
# In detection models, image/target data occupies a lot of memory, and
# will occupy lots of shared memory in multi-process DataLoader, we use
# following code to get shared memory size and perform a size check to
# disable shared memory use if shared memory size is not enough.
# Shared memory getting process as follows:
# 1. use `df -h` get all mount info
# 2. pick up spaces whose mount info contains 'shm'
# 3. if 'shm' space number is only 1, return its size
# 4. if there are multiple 'shm' space, try to find the default mount
# directory '/dev/shm' is Linux-like system, otherwise return the
# biggest space size.
def _parse_size_in_M(size_str):
num, unit = size_str[:-1], size_str[-1]
assert unit in SIZE_UNIT, \
"unknown shm size unit {}".format(unit)
return float(num) * \
(1024 ** (SIZE_UNIT.index(unit) - 1))
def _get_shared_memory_size_in_M():
try:
df_infos = os.popen(SHM_QUERY_CMD).readlines()
except:
return None
else:
shm_infos = []
for df_info in df_infos:
info = df_info.strip()
if info.find(SHM_KEY) >= 0:
shm_infos.append(info.split())
if len(shm_infos) == 0:
return None
elif len(shm_infos) == 1:
return _parse_size_in_M(shm_infos[0][3])
else:
shm_infos = [si for si in shm_infos \
if si[-1] == SHM_DEFAULT_MOUNT]
if len(shm_infos) == 0:
return _parse_size_in_M(shm_infos[0][3])
else:
return max([_parse_size_in_M(si[3]) \
for si in shm_infos])
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册