提交 1a4a6df1 编写于 作者: C chenguowei01

update dataset

上级 e5cabbb1
......@@ -12,5 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from .dataset import Dataset
from .optic_disc_seg import OpticDiscSeg
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from threading import Thread
import multiprocessing
import collections
import numpy as np
import six
import sys
import copy
import random
import platform
import chardet
import utils.logging as logging
class EndSignal():
pass
def is_pic(img_name):
valid_suffix = ['jpeg', 'jpg', 'bmp', 'png']
suffix = img_name.split('.')[-1]
suffix = suffix.lower()
if suffix not in valid_suffix:
return False
return True
def is_valid(sample):
if sample is None:
return False
if isinstance(sample, tuple):
for s in sample:
if s is None:
return False
elif isinstance(s, np.ndarray) and s.size == 0:
return False
elif isinstance(s, collections.Sequence) and len(s) == 0:
return False
return True
def get_encoding(path):
f = open(path, 'rb')
data = f.read()
file_encoding = chardet.detect(data).get('encoding')
return file_encoding
def multithread_reader(mapper,
reader,
num_workers=4,
buffer_size=1024,
batch_size=8,
drop_last=True):
from queue import Queue
end = EndSignal()
# define a worker to read samples from reader to in_queue
def read_worker(reader, in_queue):
for i in reader():
in_queue.put(i)
in_queue.put(end)
# define a worker to handle samples from in_queue by mapper
# and put mapped samples into out_queue
def handle_worker(in_queue, out_queue, mapper):
sample = in_queue.get()
while not isinstance(sample, EndSignal):
if len(sample) == 2:
r = mapper(sample[0], sample[1])
elif len(sample) == 3:
r = mapper(sample[0], sample[1], sample[2])
else:
raise Exception('The sample\'s length must be 2 or 3.')
if is_valid(r):
out_queue.put(r)
sample = in_queue.get()
in_queue.put(end)
out_queue.put(end)
def xreader():
in_queue = Queue(buffer_size)
out_queue = Queue(buffer_size)
# start a read worker in a thread
target = read_worker
t = Thread(target=target, args=(reader, in_queue))
t.daemon = True
t.start()
# start several handle_workers
target = handle_worker
args = (in_queue, out_queue, mapper)
workers = []
for i in range(num_workers):
worker = Thread(target=target, args=args)
worker.daemon = True
workers.append(worker)
for w in workers:
w.start()
batch_data = []
sample = out_queue.get()
while not isinstance(sample, EndSignal):
batch_data.append(sample)
if len(batch_data) == batch_size:
yield batch_data
batch_data = []
sample = out_queue.get()
finish = 1
while finish < num_workers:
sample = out_queue.get()
if isinstance(sample, EndSignal):
finish += 1
else:
batch_data.append(sample)
if len(batch_data) == batch_size:
yield batch_data
batch_data = []
if not drop_last and len(batch_data) != 0:
yield batch_data
batch_data = []
return xreader
def multiprocess_reader(mapper,
reader,
num_workers=4,
buffer_size=1024,
batch_size=8,
drop_last=True):
from .shared_queue import SharedQueue as Queue
def _read_into_queue(samples, mapper, queue):
end = EndSignal()
try:
for sample in samples:
if sample is None:
raise ValueError("sample has None")
if len(sample) == 2:
result = mapper(sample[0], sample[1])
elif len(sample) == 3:
result = mapper(sample[0], sample[1], sample[2])
else:
raise Exception('The sample\'s length must be 2 or 3.')
if is_valid(result):
queue.put(result)
queue.put(end)
except:
queue.put("")
six.reraise(*sys.exc_info())
def queue_reader():
queue = Queue(buffer_size, memsize=3 * 1024**3)
total_samples = [[] for i in range(num_workers)]
for i, sample in enumerate(reader()):
index = i % num_workers
total_samples[index].append(sample)
for i in range(num_workers):
p = multiprocessing.Process(target=_read_into_queue,
args=(total_samples[i], mapper, queue))
p.start()
finish_num = 0
batch_data = list()
while finish_num < num_workers:
sample = queue.get()
if isinstance(sample, EndSignal):
finish_num += 1
elif sample == "":
raise ValueError("multiprocess reader raises an exception")
else:
batch_data.append(sample)
if len(batch_data) == batch_size:
yield batch_data
batch_data = []
if len(batch_data) != 0 and not drop_last:
yield batch_data
batch_data = []
return queue_reader
class Dataset:
def __init__(self,
data_dir,
file_list,
label_list=None,
transforms=None,
num_workers='auto',
buffer_size=100,
parallel_method='thread',
shuffle=False):
if num_workers == 'auto':
import multiprocessing as mp
num_workers = mp.cpu_count() // 2 if mp.cpu_count() // 2 < 8 else 8
if transforms is None:
raise Exception("transform should be defined.")
self.transforms = transforms
self.num_workers = num_workers
self.buffer_size = buffer_size
self.parallel_method = parallel_method
self.shuffle = shuffle
self.file_list = list()
self.labels = list()
self._epoch = 0
if label_list is not None:
with open(label_list, encoding=get_encoding(label_list)) as f:
for line in f:
item = line.strip()
self.labels.append(item)
with open(file_list, encoding=get_encoding(file_list)) as f:
for line in f:
items = line.strip().split()
if not is_pic(items[0]):
continue
full_path_im = os.path.join(data_dir, items[0])
full_path_label = os.path.join(data_dir, items[1])
if not os.path.exists(full_path_im):
raise IOError(
'The image file {} is not exist!'.format(full_path_im))
if not os.path.exists(full_path_label):
raise IOError('The image file {} is not exist!'.format(
full_path_label))
self.file_list.append([full_path_im, full_path_label])
self.num_samples = len(self.file_list)
logging.info("{} samples in file {}".format(len(self.file_list),
file_list))
def iterator(self):
self._epoch += 1
self._pos = 0
files = copy.deepcopy(self.file_list)
if self.shuffle:
random.shuffle(files)
files = files[:self.num_samples]
self.num_samples = len(files)
for f in files:
label_path = f[1]
sample = [f[0], None, label_path]
yield sample
def generator(self, batch_size=1, drop_last=True):
self.batch_size = batch_size
parallel_reader = multithread_reader
if self.parallel_method == "process":
if platform.platform().startswith("Windows"):
logging.debug(
"multiprocess_reader is not supported in Windows platform, force to use multithread_reader."
)
else:
parallel_reader = multiprocess_reader
return parallel_reader(self.transforms,
self.iterator,
num_workers=self.num_workers,
buffer_size=self.buffer_size,
batch_size=batch_size,
drop_last=drop_last)
......@@ -22,16 +22,13 @@ import cv2
from utils.download import download_file_and_uncompress
LOCAL_PATH = os.path.dirname(os.path.abspath(__file__))
DATA_HOME = os.path.expanduser('~/.cache/paddle/dataset')
URL = "https://paddleseg.bj.bcebos.com/dataset/optic_disc_seg.zip"
class OpticDiscSeg(Dataset):
def __init__(self,
data_dir=None,
train_list=None,
val_list=None,
test_list=None,
transforms=None,
mode='train',
download=True):
......@@ -39,6 +36,7 @@ class OpticDiscSeg(Dataset):
self.transforms = transforms
self.file_list = list()
self.mode = mode
self.num_classes = 2
if mode.lower() not in ['train', 'eval', 'test']:
raise Exception(
......@@ -53,20 +51,14 @@ class OpticDiscSeg(Dataset):
if not download:
raise Exception("data_file not set and auto download disabled.")
self.data_dir = download_file_and_uncompress(
url=URL, savepath=LOCAL_PATH, extrapath=LOCAL_PATH)
if mode == 'train':
file_list = os.path.join(self.data_dir, 'train_list.txt')
elif mode == 'eval':
file_list = os.path.join(self.data_dir, 'val_list.txt')
else:
file_list = os.path.join(self.data_dir, 'test_list.txt')
url=URL, savepath=DATA_HOME, extrapath=DATA_HOME)
if mode == 'train':
file_list = os.path.join(self.data_dir, 'train_list.txt')
elif mode == 'eval':
file_list = os.path.join(self.data_dir, 'val_list.txt')
else:
if mode == 'train':
file_list = train_list
elif mode == 'eval':
file_list = val_list
else:
file_list = test_list
file_list = os.path.join(self.data_dir, 'test_list.txt')
with open(file_list, 'r') as f:
for line in f:
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
__all__ = ['SharedBuffer', 'SharedMemoryMgr', 'SharedQueue']
from .sharedmemory import SharedBuffer
from .sharedmemory import SharedMemoryMgr
from .sharedmemory import SharedMemoryError
from .queue import SharedQueue
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import sys
import six
if six.PY3:
import pickle
from io import BytesIO as StringIO
else:
import cPickle as pickle
from cStringIO import StringIO
import logging
import traceback
import multiprocessing as mp
from multiprocessing.queues import Queue
from .sharedmemory import SharedMemoryMgr
logger = logging.getLogger(__name__)
class SharedQueueError(ValueError):
""" SharedQueueError
"""
pass
class SharedQueue(Queue):
""" a Queue based on shared memory to communicate data between Process,
and it's interface is compatible with 'multiprocessing.queues.Queue'
"""
def __init__(self, maxsize=0, mem_mgr=None, memsize=None, pagesize=None):
""" init
"""
if six.PY3:
super(SharedQueue, self).__init__(maxsize, ctx=mp.get_context())
else:
super(SharedQueue, self).__init__(maxsize)
if mem_mgr is not None:
self._shared_mem = mem_mgr
else:
self._shared_mem = SharedMemoryMgr(
capacity=memsize, pagesize=pagesize)
def put(self, obj, **kwargs):
""" put an object to this queue
"""
obj = pickle.dumps(obj, -1)
buff = None
try:
buff = self._shared_mem.malloc(len(obj))
buff.put(obj)
super(SharedQueue, self).put(buff, **kwargs)
except Exception as e:
stack_info = traceback.format_exc()
err_msg = 'failed to put a element to SharedQueue '\
'with stack info[%s]' % (stack_info)
logger.warn(err_msg)
if buff is not None:
buff.free()
raise e
def get(self, **kwargs):
""" get an object from this queue
"""
buff = None
try:
buff = super(SharedQueue, self).get(**kwargs)
data = buff.get()
return pickle.load(StringIO(data))
except Exception as e:
stack_info = traceback.format_exc()
err_msg = 'failed to get element from SharedQueue '\
'with stack info[%s]' % (stack_info)
logger.warn(err_msg)
raise e
finally:
if buff is not None:
buff.free()
def release(self):
self._shared_mem.release()
self._shared_mem = None
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import os
import time
import math
import struct
import sys
import six
if six.PY3:
import pickle
else:
import cPickle as pickle
import json
import uuid
import random
import numpy as np
import weakref
import logging
from multiprocessing import Lock
from multiprocessing import RawArray
logger = logging.getLogger(__name__)
class SharedMemoryError(ValueError):
""" SharedMemoryError
"""
pass
class SharedBufferError(SharedMemoryError):
""" SharedBufferError
"""
pass
class MemoryFullError(SharedMemoryError):
""" MemoryFullError
"""
def __init__(self, errmsg=''):
super(MemoryFullError, self).__init__()
self.errmsg = errmsg
def memcopy(dst, src, offset=0, length=None):
""" copy data from 'src' to 'dst' in bytes
"""
length = length if length is not None else len(src)
assert type(dst) == np.ndarray, 'invalid type for "dst" in memcopy'
if type(src) is not np.ndarray:
if type(src) is str and six.PY3:
src = src.encode()
src = np.frombuffer(src, dtype='uint8', count=len(src))
dst[:] = src[offset:offset + length]
class SharedBuffer(object):
""" Buffer allocated from SharedMemoryMgr, and it stores data on shared memory
note that:
every instance of this should be freed explicitely by calling 'self.free'
"""
def __init__(self, owner, capacity, pos, size=0, alloc_status=''):
""" Init
Args:
owner (str): manager to own this buffer
capacity (int): capacity in bytes for this buffer
pos (int): page position in shared memory
size (int): bytes already used
alloc_status (str): debug info about allocator when allocate this
"""
self._owner = owner
self._cap = capacity
self._pos = pos
self._size = size
self._alloc_status = alloc_status
assert self._pos >= 0 and self._cap > 0, \
"invalid params[%d:%d] to construct SharedBuffer" \
% (self._pos, self._cap)
def owner(self):
""" get owner
"""
return SharedMemoryMgr.get_mgr(self._owner)
def put(self, data, override=False):
""" put data to this buffer
Args:
data (str): data to be stored in this buffer
Returns:
None
Raises:
SharedMemoryError when not enough space in this buffer
"""
assert type(data) in [str, bytes], \
'invalid type[%s] for SharedBuffer::put' % (str(type(data)))
if self._size > 0 and not override:
raise SharedBufferError('already has already been setted before')
if self.capacity() < len(data):
raise SharedBufferError('data[%d] is larger than size of buffer[%s]'\
% (len(data), str(self)))
self.owner().put_data(self, data)
self._size = len(data)
def get(self, offset=0, size=None, no_copy=True):
""" get the data stored this buffer
Args:
offset (int): position for the start point to 'get'
size (int): size to get
Returns:
data (np.ndarray('uint8')): user's data in numpy
which is passed in by 'put'
None: if no data stored in
"""
offset = offset if offset >= 0 else self._size + offset
if self._size <= 0:
return None
size = self._size if size is None else size
assert offset + size <= self._cap, 'invalid offset[%d] '\
'or size[%d] for capacity[%d]' % (offset, size, self._cap)
return self.owner().get_data(self, offset, size, no_copy=no_copy)
def size(self):
""" bytes of used memory
"""
return self._size
def resize(self, size):
""" resize the used memory to 'size', should not be greater than capacity
"""
assert size >= 0 and size <= self._cap, \
"invalid size[%d] for resize" % (size)
self._size = size
def capacity(self):
""" size of allocated memory
"""
return self._cap
def __str__(self):
""" human readable format
"""
return "SharedBuffer(owner:%s, pos:%d, size:%d, "\
"capacity:%d, alloc_status:[%s], pid:%d)" \
% (str(self._owner), self._pos, self._size, \
self._cap, self._alloc_status, os.getpid())
def free(self):
""" free this buffer to it's owner
"""
if self._owner is not None:
self.owner().free(self)
self._owner = None
self._cap = 0
self._pos = -1
self._size = 0
return True
else:
return False
class PageAllocator(object):
""" allocator used to malloc and free shared memory which
is split into pages
"""
s_allocator_header = 12
def __init__(self, base, total_pages, page_size):
""" init
"""
self._magic_num = 1234321000 + random.randint(100, 999)
self._base = base
self._total_pages = total_pages
self._page_size = page_size
header_pages = int(
math.ceil((total_pages + self.s_allocator_header) / page_size))
self._header_pages = header_pages
self._free_pages = total_pages - header_pages
self._header_size = self._header_pages * page_size
self._reset()
def _dump_alloc_info(self, fname):
hpages, tpages, pos, used = self.header()
start = self.s_allocator_header
end = start + self._page_size * hpages
alloc_flags = self._base[start:end].tostring()
info = {
'magic_num': self._magic_num,
'header_pages': hpages,
'total_pages': tpages,
'pos': pos,
'used': used
}
info['alloc_flags'] = alloc_flags
fname = fname + '.' + str(uuid.uuid4())[:6]
with open(fname, 'wb') as f:
f.write(pickle.dumps(info, -1))
logger.warn('dump alloc info to file[%s]' % (fname))
def _reset(self):
alloc_page_pos = self._header_pages
used_pages = self._header_pages
header_info = struct.pack(
str('III'), self._magic_num, alloc_page_pos, used_pages)
assert len(header_info) == self.s_allocator_header, \
'invalid size of header_info'
memcopy(self._base[0:self.s_allocator_header], header_info)
self.set_page_status(0, self._header_pages, '1')
self.set_page_status(self._header_pages, self._free_pages, '0')
def header(self):
""" get header info of this allocator
"""
header_str = self._base[0:self.s_allocator_header].tostring()
magic, pos, used = struct.unpack(str('III'), header_str)
assert magic == self._magic_num, \
'invalid header magic[%d] in shared memory' % (magic)
return self._header_pages, self._total_pages, pos, used
def empty(self):
""" are all allocatable pages available
"""
header_pages, pages, pos, used = self.header()
return header_pages == used
def full(self):
""" are all allocatable pages used
"""
header_pages, pages, pos, used = self.header()
return header_pages + used == pages
def __str__(self):
header_pages, pages, pos, used = self.header()
desc = '{page_info[magic:%d,total:%d,used:%d,header:%d,alloc_pos:%d,pagesize:%d]}' \
% (self._magic_num, pages, used, header_pages, pos, self._page_size)
return 'PageAllocator:%s' % (desc)
def set_alloc_info(self, alloc_pos, used_pages):
""" set allocating position to new value
"""
memcopy(self._base[4:12], struct.pack(str('II'), alloc_pos, used_pages))
def set_page_status(self, start, page_num, status):
""" set pages from 'start' to 'end' with new same status 'status'
"""
assert status in ['0', '1'], 'invalid status[%s] for page status '\
'in allocator[%s]' % (status, str(self))
start += self.s_allocator_header
end = start + page_num
assert start >= 0 and end <= self._header_size, 'invalid end[%d] of pages '\
'in allocator[%s]' % (end, str(self))
memcopy(self._base[start:end], str(status * page_num))
def get_page_status(self, start, page_num, ret_flag=False):
start += self.s_allocator_header
end = start + page_num
assert start >= 0 and end <= self._header_size, 'invalid end[%d] of pages '\
'in allocator[%s]' % (end, str(self))
status = self._base[start:end].tostring().decode()
if ret_flag:
return status
zero_num = status.count('0')
if zero_num == 0:
return (page_num, 1)
else:
return (zero_num, 0)
def malloc_page(self, page_num):
header_pages, pages, pos, used = self.header()
end = pos + page_num
if end > pages:
pos = self._header_pages
end = pos + page_num
start_pos = pos
flags = ''
while True:
# maybe flags already has some '0' pages,
# so just check 'page_num - len(flags)' pages
flags = self.get_page_status(pos, page_num, ret_flag=True)
if flags.count('0') == page_num:
break
# not found enough pages, so shift to next few pages
free_pos = flags.rfind('1') + 1
pos += free_pos
end = pos + page_num
if end > pages:
pos = self._header_pages
end = pos + page_num
flags = ''
# not found available pages after scan all pages
if pos <= start_pos and end >= start_pos:
logger.debug('not found available pages after scan all pages')
break
page_status = (flags.count('0'), 0)
if page_status != (page_num, 0):
free_pages = self._total_pages - used
if free_pages == 0:
err_msg = 'all pages have been used:%s' % (str(self))
else:
err_msg = 'not found available pages with page_status[%s] '\
'and %d free pages' % (str(page_status), free_pages)
err_msg = 'failed to malloc %d pages at pos[%d] for reason[%s] and allocator status[%s]' \
% (page_num, pos, err_msg, str(self))
raise MemoryFullError(err_msg)
self.set_page_status(pos, page_num, '1')
used += page_num
self.set_alloc_info(end, used)
return pos
def free_page(self, start, page_num):
""" free 'page_num' pages start from 'start'
"""
page_status = self.get_page_status(start, page_num)
assert page_status == (page_num, 1), \
'invalid status[%s] when free [%d, %d]' \
% (str(page_status), start, page_num)
self.set_page_status(start, page_num, '0')
_, _, pos, used = self.header()
used -= page_num
self.set_alloc_info(pos, used)
DEFAULT_SHARED_MEMORY_SIZE = 1024 * 1024 * 1024
class SharedMemoryMgr(object):
""" manage a continouse block of memory, provide
'malloc' to allocate new buffer, and 'free' to free buffer
"""
s_memory_mgrs = weakref.WeakValueDictionary()
s_mgr_num = 0
s_log_statis = False
@classmethod
def get_mgr(cls, id):
""" get a SharedMemoryMgr with size of 'capacity'
"""
assert id in cls.s_memory_mgrs, 'invalid id[%s] for memory managers' % (
id)
return cls.s_memory_mgrs[id]
def __init__(self, capacity=None, pagesize=None):
""" init
"""
logger.debug('create SharedMemoryMgr')
pagesize = 64 * 1024 if pagesize is None else pagesize
assert type(pagesize) is int, "invalid type of pagesize[%s]" \
% (str(pagesize))
capacity = DEFAULT_SHARED_MEMORY_SIZE if capacity is None else capacity
assert type(capacity) is int, "invalid type of capacity[%s]" \
% (str(capacity))
assert capacity > 0, '"size of shared memory should be greater than 0'
self._released = False
self._cap = capacity
self._page_size = pagesize
assert self._cap % self._page_size == 0, \
"capacity[%d] and pagesize[%d] are not consistent" \
% (self._cap, self._page_size)
self._total_pages = self._cap // self._page_size
self._pid = os.getpid()
SharedMemoryMgr.s_mgr_num += 1
self._id = self._pid * 100 + SharedMemoryMgr.s_mgr_num
SharedMemoryMgr.s_memory_mgrs[self._id] = self
self._locker = Lock()
self._setup()
def _setup(self):
self._shared_mem = RawArray('c', self._cap)
self._base = np.frombuffer(
self._shared_mem, dtype='uint8', count=self._cap)
self._locker.acquire()
try:
self._allocator = PageAllocator(self._base, self._total_pages,
self._page_size)
finally:
self._locker.release()
def malloc(self, size, wait=True):
""" malloc a new SharedBuffer
Args:
size (int): buffer size to be malloc
wait (bool): whether to wait when no enough memory
Returns:
SharedBuffer
Raises:
SharedMemoryError when not found available memory
"""
page_num = int(math.ceil(size / self._page_size))
size = page_num * self._page_size
start = None
ct = 0
errmsg = ''
while True:
self._locker.acquire()
try:
start = self._allocator.malloc_page(page_num)
alloc_status = str(self._allocator)
except MemoryFullError as e:
start = None
errmsg = e.errmsg
if not wait:
raise e
finally:
self._locker.release()
if start is None:
time.sleep(0.1)
if ct % 100 == 0:
logger.warn('not enough space for reason[%s]' % (errmsg))
ct += 1
else:
break
return SharedBuffer(self._id, size, start, alloc_status=alloc_status)
def free(self, shared_buf):
""" free a SharedBuffer
Args:
shared_buf (SharedBuffer): buffer to be freed
Returns:
None
Raises:
SharedMemoryError when failed to release this buffer
"""
assert shared_buf._owner == self._id, "invalid shared_buf[%s] "\
"for it's not allocated from me[%s]" % (str(shared_buf), str(self))
cap = shared_buf.capacity()
start_page = shared_buf._pos
page_num = cap // self._page_size
#maybe we don't need this lock here
self._locker.acquire()
try:
self._allocator.free_page(start_page, page_num)
finally:
self._locker.release()
def put_data(self, shared_buf, data):
""" fill 'data' into 'shared_buf'
"""
assert len(data) <= shared_buf.capacity(), 'too large data[%d] '\
'for this buffer[%s]' % (len(data), str(shared_buf))
start = shared_buf._pos * self._page_size
end = start + len(data)
assert start >= 0 and end <= self._cap, "invalid start "\
"position[%d] when put data to buff:%s" % (start, str(shared_buf))
self._base[start:end] = np.frombuffer(data, 'uint8', len(data))
def get_data(self, shared_buf, offset, size, no_copy=True):
""" extract 'data' from 'shared_buf' in range [offset, offset + size)
"""
start = shared_buf._pos * self._page_size
start += offset
if no_copy:
return self._base[start:start + size]
else:
return self._base[start:start + size].tostring()
def __str__(self):
return 'SharedMemoryMgr:{id:%d, %s}' % (self._id, str(self._allocator))
def __del__(self):
if SharedMemoryMgr.s_log_statis:
logger.info('destroy [%s]' % (self))
if not self._released and not self._allocator.empty():
logger.debug(
'not empty when delete this SharedMemoryMgr[%s]' % (self))
else:
self._released = True
if self._id in SharedMemoryMgr.s_memory_mgrs:
del SharedMemoryMgr.s_memory_mgrs[self._id]
SharedMemoryMgr.s_mgr_num -= 1
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册