From e6de6b938fa7a7c31eb22ed82a21dcc722cb0805 Mon Sep 17 00:00:00 2001 From: barrierye Date: Mon, 15 Jun 2020 14:35:54 +0800 Subject: [PATCH] for python2, Queue is empty right after put from the same process/thread --- python/paddle_serving_server/pyserver.py | 46 +++++++++++++++----- python/paddle_serving_server_gpu/pyserver.py | 46 +++++++++++++++----- python/requirements.txt | 1 + 3 files changed, 69 insertions(+), 24 deletions(-) diff --git a/python/paddle_serving_server/pyserver.py b/python/paddle_serving_server/pyserver.py index f80e1e80..e9bc43be 100644 --- a/python/paddle_serving_server/pyserver.py +++ b/python/paddle_serving_server/pyserver.py @@ -215,7 +215,13 @@ class Channel(multiprocessing.queues.Queue): def __init__(self, manager, name=None, maxsize=0, timeout=None): # https://stackoverflow.com/questions/39496554/cannot-subclass-multiprocessing-queue-in-python-3-5/ - multiprocessing.queues.Queue.__init__(self, maxsize=maxsize) + if sys.version_info.major == 2: + super(Channel, self).__init__(maxsize=maxsize) + elif sys.version_info.major == 3: + super(Channel, self).__init__( + maxsize=maxsize, ctx=multiprocessing.get_context()) + else: + raise Exception("Error Python version") self._maxsize = maxsize self._timeout = timeout self.name = name @@ -355,17 +361,25 @@ class Channel(multiprocessing.queues.Queue): while self._stop is False and resp is None: try: logging.debug( - self._log("{} try to get(with channel size: {})". - format(op_name, self.qsize()))) - #TODO: bug to fix - # (multiple processes) the queue is not empty, but it raise Queue.Empty - resp = self.get(timeout=1e-3) + self._log("{} try to get(with channel empty: {})". + format(op_name, self.empty()))) + # For Python2, after putting an object on an empty queue there may + # be an infinitessimal delay before the queue's :meth:`~Queue.empty` + # see more: + # - https://bugs.python.org/issue18277 + # - https://hg.python.org/cpython/rev/860fc6a2bd21 + if sys.version_info.major == 2: + resp = self.get(timeout=1e-3) + elif sys.version_info.major == 3: + resp = self.get(timeout=0) + else: + raise Exception("Error Python version") break except Queue.Empty: logging.debug( self._log( - "{} wait for empty queue(with channel size: {})". - format(op_name, self.qsize()))) + "{} wait for empty queue(with channel empty: {})". + format(op_name, self.empty()))) self._cv.wait() logging.debug( self._log("{} get data succ: {}".format(op_name, resp.__str__( @@ -390,9 +404,17 @@ class Channel(multiprocessing.queues.Queue): logging.debug( self._log("{} try to get(with channel size: {})".format( op_name, self.qsize()))) - #TODO: bug to fix - # (multiple processes) the queue is not empty, but it raise Queue.Empty - channeldata = self.get(timeout=1e-3) + # For Python2, after putting an object on an empty queue there may + # be an infinitessimal delay before the queue's :meth:`~Queue.empty` + # see more: + # - https://bugs.python.org/issue18277 + # - https://hg.python.org/cpython/rev/860fc6a2bd21 + if sys.version_info.major == 2: + channeldata = self.get(timeout=1e-3) + elif sys.version_info.major == 3: + channeldata = self.get(timeout=0) + else: + raise Exception("Error Python version") self._front_res.append(channeldata) break except Queue.Empty: @@ -798,7 +820,7 @@ class VirtualOp(Op): class GeneralPythonService( - general_python_service_pb2_grpc.GeneralPythonService): + general_python_service_pb2_grpc.GeneralPythonServiceServicer): def __init__(self, in_channel, out_channel, retry=2): super(GeneralPythonService, self).__init__() self.name = "#G" diff --git a/python/paddle_serving_server_gpu/pyserver.py b/python/paddle_serving_server_gpu/pyserver.py index f80e1e80..e9bc43be 100644 --- a/python/paddle_serving_server_gpu/pyserver.py +++ b/python/paddle_serving_server_gpu/pyserver.py @@ -215,7 +215,13 @@ class Channel(multiprocessing.queues.Queue): def __init__(self, manager, name=None, maxsize=0, timeout=None): # https://stackoverflow.com/questions/39496554/cannot-subclass-multiprocessing-queue-in-python-3-5/ - multiprocessing.queues.Queue.__init__(self, maxsize=maxsize) + if sys.version_info.major == 2: + super(Channel, self).__init__(maxsize=maxsize) + elif sys.version_info.major == 3: + super(Channel, self).__init__( + maxsize=maxsize, ctx=multiprocessing.get_context()) + else: + raise Exception("Error Python version") self._maxsize = maxsize self._timeout = timeout self.name = name @@ -355,17 +361,25 @@ class Channel(multiprocessing.queues.Queue): while self._stop is False and resp is None: try: logging.debug( - self._log("{} try to get(with channel size: {})". - format(op_name, self.qsize()))) - #TODO: bug to fix - # (multiple processes) the queue is not empty, but it raise Queue.Empty - resp = self.get(timeout=1e-3) + self._log("{} try to get(with channel empty: {})". + format(op_name, self.empty()))) + # For Python2, after putting an object on an empty queue there may + # be an infinitessimal delay before the queue's :meth:`~Queue.empty` + # see more: + # - https://bugs.python.org/issue18277 + # - https://hg.python.org/cpython/rev/860fc6a2bd21 + if sys.version_info.major == 2: + resp = self.get(timeout=1e-3) + elif sys.version_info.major == 3: + resp = self.get(timeout=0) + else: + raise Exception("Error Python version") break except Queue.Empty: logging.debug( self._log( - "{} wait for empty queue(with channel size: {})". - format(op_name, self.qsize()))) + "{} wait for empty queue(with channel empty: {})". + format(op_name, self.empty()))) self._cv.wait() logging.debug( self._log("{} get data succ: {}".format(op_name, resp.__str__( @@ -390,9 +404,17 @@ class Channel(multiprocessing.queues.Queue): logging.debug( self._log("{} try to get(with channel size: {})".format( op_name, self.qsize()))) - #TODO: bug to fix - # (multiple processes) the queue is not empty, but it raise Queue.Empty - channeldata = self.get(timeout=1e-3) + # For Python2, after putting an object on an empty queue there may + # be an infinitessimal delay before the queue's :meth:`~Queue.empty` + # see more: + # - https://bugs.python.org/issue18277 + # - https://hg.python.org/cpython/rev/860fc6a2bd21 + if sys.version_info.major == 2: + channeldata = self.get(timeout=1e-3) + elif sys.version_info.major == 3: + channeldata = self.get(timeout=0) + else: + raise Exception("Error Python version") self._front_res.append(channeldata) break except Queue.Empty: @@ -798,7 +820,7 @@ class VirtualOp(Op): class GeneralPythonService( - general_python_service_pb2_grpc.GeneralPythonService): + general_python_service_pb2_grpc.GeneralPythonServiceServicer): def __init__(self, in_channel, out_channel, retry=2): super(GeneralPythonService, self).__init__() self.name = "#G" diff --git a/python/requirements.txt b/python/requirements.txt index 4b61fa6a..6f5e809c 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -1,3 +1,4 @@ numpy>=1.12, <=1.16.4 ; python_version<"3.5" grpcio-tools>=1.28.1 grpcio>=1.28.1 +func-timeout==4.3.5 -- GitLab