提交 e6de6b93 编写于 作者: B barrierye

for python2, Queue is empty right after put from the same process/thread

上级 c5d98039
......@@ -215,7 +215,13 @@ class Channel(multiprocessing.queues.Queue):
def __init__(self, manager, name=None, maxsize=0, timeout=None):
# https://stackoverflow.com/questions/39496554/cannot-subclass-multiprocessing-queue-in-python-3-5/
multiprocessing.queues.Queue.__init__(self, maxsize=maxsize)
if sys.version_info.major == 2:
super(Channel, self).__init__(maxsize=maxsize)
elif sys.version_info.major == 3:
super(Channel, self).__init__(
maxsize=maxsize, ctx=multiprocessing.get_context())
else:
raise Exception("Error Python version")
self._maxsize = maxsize
self._timeout = timeout
self.name = name
......@@ -355,17 +361,25 @@ class Channel(multiprocessing.queues.Queue):
while self._stop is False and resp is None:
try:
logging.debug(
self._log("{} try to get(with channel size: {})".
format(op_name, self.qsize())))
#TODO: bug to fix
# (multiple processes) the queue is not empty, but it raise Queue.Empty
resp = self.get(timeout=1e-3)
self._log("{} try to get(with channel empty: {})".
format(op_name, self.empty())))
# For Python2, after putting an object on an empty queue there may
# be an infinitessimal delay before the queue's :meth:`~Queue.empty`
# see more:
# - https://bugs.python.org/issue18277
# - https://hg.python.org/cpython/rev/860fc6a2bd21
if sys.version_info.major == 2:
resp = self.get(timeout=1e-3)
elif sys.version_info.major == 3:
resp = self.get(timeout=0)
else:
raise Exception("Error Python version")
break
except Queue.Empty:
logging.debug(
self._log(
"{} wait for empty queue(with channel size: {})".
format(op_name, self.qsize())))
"{} wait for empty queue(with channel empty: {})".
format(op_name, self.empty())))
self._cv.wait()
logging.debug(
self._log("{} get data succ: {}".format(op_name, resp.__str__(
......@@ -390,9 +404,17 @@ class Channel(multiprocessing.queues.Queue):
logging.debug(
self._log("{} try to get(with channel size: {})".format(
op_name, self.qsize())))
#TODO: bug to fix
# (multiple processes) the queue is not empty, but it raise Queue.Empty
channeldata = self.get(timeout=1e-3)
# For Python2, after putting an object on an empty queue there may
# be an infinitessimal delay before the queue's :meth:`~Queue.empty`
# see more:
# - https://bugs.python.org/issue18277
# - https://hg.python.org/cpython/rev/860fc6a2bd21
if sys.version_info.major == 2:
channeldata = self.get(timeout=1e-3)
elif sys.version_info.major == 3:
channeldata = self.get(timeout=0)
else:
raise Exception("Error Python version")
self._front_res.append(channeldata)
break
except Queue.Empty:
......@@ -798,7 +820,7 @@ class VirtualOp(Op):
class GeneralPythonService(
general_python_service_pb2_grpc.GeneralPythonService):
general_python_service_pb2_grpc.GeneralPythonServiceServicer):
def __init__(self, in_channel, out_channel, retry=2):
super(GeneralPythonService, self).__init__()
self.name = "#G"
......
......@@ -215,7 +215,13 @@ class Channel(multiprocessing.queues.Queue):
def __init__(self, manager, name=None, maxsize=0, timeout=None):
# https://stackoverflow.com/questions/39496554/cannot-subclass-multiprocessing-queue-in-python-3-5/
multiprocessing.queues.Queue.__init__(self, maxsize=maxsize)
if sys.version_info.major == 2:
super(Channel, self).__init__(maxsize=maxsize)
elif sys.version_info.major == 3:
super(Channel, self).__init__(
maxsize=maxsize, ctx=multiprocessing.get_context())
else:
raise Exception("Error Python version")
self._maxsize = maxsize
self._timeout = timeout
self.name = name
......@@ -355,17 +361,25 @@ class Channel(multiprocessing.queues.Queue):
while self._stop is False and resp is None:
try:
logging.debug(
self._log("{} try to get(with channel size: {})".
format(op_name, self.qsize())))
#TODO: bug to fix
# (multiple processes) the queue is not empty, but it raise Queue.Empty
resp = self.get(timeout=1e-3)
self._log("{} try to get(with channel empty: {})".
format(op_name, self.empty())))
# For Python2, after putting an object on an empty queue there may
# be an infinitessimal delay before the queue's :meth:`~Queue.empty`
# see more:
# - https://bugs.python.org/issue18277
# - https://hg.python.org/cpython/rev/860fc6a2bd21
if sys.version_info.major == 2:
resp = self.get(timeout=1e-3)
elif sys.version_info.major == 3:
resp = self.get(timeout=0)
else:
raise Exception("Error Python version")
break
except Queue.Empty:
logging.debug(
self._log(
"{} wait for empty queue(with channel size: {})".
format(op_name, self.qsize())))
"{} wait for empty queue(with channel empty: {})".
format(op_name, self.empty())))
self._cv.wait()
logging.debug(
self._log("{} get data succ: {}".format(op_name, resp.__str__(
......@@ -390,9 +404,17 @@ class Channel(multiprocessing.queues.Queue):
logging.debug(
self._log("{} try to get(with channel size: {})".format(
op_name, self.qsize())))
#TODO: bug to fix
# (multiple processes) the queue is not empty, but it raise Queue.Empty
channeldata = self.get(timeout=1e-3)
# For Python2, after putting an object on an empty queue there may
# be an infinitessimal delay before the queue's :meth:`~Queue.empty`
# see more:
# - https://bugs.python.org/issue18277
# - https://hg.python.org/cpython/rev/860fc6a2bd21
if sys.version_info.major == 2:
channeldata = self.get(timeout=1e-3)
elif sys.version_info.major == 3:
channeldata = self.get(timeout=0)
else:
raise Exception("Error Python version")
self._front_res.append(channeldata)
break
except Queue.Empty:
......@@ -798,7 +820,7 @@ class VirtualOp(Op):
class GeneralPythonService(
general_python_service_pb2_grpc.GeneralPythonService):
general_python_service_pb2_grpc.GeneralPythonServiceServicer):
def __init__(self, in_channel, out_channel, retry=2):
super(GeneralPythonService, self).__init__()
self.name = "#G"
......
numpy>=1.12, <=1.16.4 ; python_version<"3.5"
grpcio-tools>=1.28.1
grpcio>=1.28.1
func-timeout==4.3.5
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册