queue.py 3.1 KB
Newer Older
W
wuyefeilin 已提交
1 2
# coding: utf8
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserve.
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import sys
import six
if six.PY3:
    import pickle
    from io import BytesIO as StringIO
else:
    import cPickle as pickle
    from cStringIO import StringIO

import logging
import traceback
import multiprocessing as mp
from multiprocessing.queues import Queue
from .sharedmemory import SharedMemoryMgr

logger = logging.getLogger(__name__)


class SharedQueueError(ValueError):
    """ SharedQueueError
    """
    pass


class SharedQueue(Queue):
    """ a Queue based on shared memory to communicate data between Process,
        and it's interface is compatible with 'multiprocessing.queues.Queue'
    """

    def __init__(self, maxsize=0, mem_mgr=None, memsize=None, pagesize=None):
        """ init
        """
        if six.PY3:
            super(SharedQueue, self).__init__(maxsize, ctx=mp.get_context())
        else:
            super(SharedQueue, self).__init__(maxsize)

        if mem_mgr is not None:
            self._shared_mem = mem_mgr
        else:
            self._shared_mem = SharedMemoryMgr(
                capacity=memsize, pagesize=pagesize)

    def put(self, obj, **kwargs):
        """ put an object to this queue
        """
        obj = pickle.dumps(obj, -1)
        buff = None
        try:
            buff = self._shared_mem.malloc(len(obj))
            buff.put(obj)
            super(SharedQueue, self).put(buff, **kwargs)
        except Exception as e:
            stack_info = traceback.format_exc()
            err_msg = 'failed to put a element to SharedQueue '\
                'with stack info[%s]' % (stack_info)
            logger.warn(err_msg)

            if buff is not None:
                buff.free()
            raise e

    def get(self, **kwargs):
        """ get an object from this queue
        """
        buff = None
        try:
            buff = super(SharedQueue, self).get(**kwargs)
            data = buff.get()
            return pickle.load(StringIO(data))
        except Exception as e:
            stack_info = traceback.format_exc()
            err_msg = 'failed to get element from SharedQueue '\
                        'with stack info[%s]' % (stack_info)
            logger.warn(err_msg)
            raise e
        finally:
            if buff is not None:
                buff.free()

    def release(self):
        self._shared_mem.release()
        self._shared_mem = None