提交 9c3eab21 编写于 作者: T TeslaZhao

fix pipeline bugs of py3.6

上级 60ad3f0a
...@@ -326,7 +326,10 @@ class ProcessChannel(object): ...@@ -326,7 +326,10 @@ class ProcessChannel(object):
with self._cv: with self._cv:
while self._stop.value == 0: while self._stop.value == 0:
try: try:
self._que.put({op_name: channeldata}, timeout=0) self._que.put((channeldata.id, {
op_name: channeldata
}),
timeout=0)
break break
except Queue.Full: except Queue.Full:
self._cv.wait() self._cv.wait()
...@@ -378,7 +381,7 @@ class ProcessChannel(object): ...@@ -378,7 +381,7 @@ class ProcessChannel(object):
else: else:
while self._stop.value == 0: while self._stop.value == 0:
try: try:
self._que.put(put_data, timeout=0) self._que.put((data_id, put_data), timeout=0)
break break
except Queue.Empty: except Queue.Empty:
self._cv.wait() self._cv.wait()
...@@ -414,7 +417,7 @@ class ProcessChannel(object): ...@@ -414,7 +417,7 @@ class ProcessChannel(object):
with self._cv: with self._cv:
while self._stop.value == 0 and resp is None: while self._stop.value == 0 and resp is None:
try: try:
resp = self._que.get(timeout=0) resp = self._que.get(timeout=0)[1]
break break
except Queue.Empty: except Queue.Empty:
if timeout is not None: if timeout is not None:
...@@ -459,7 +462,7 @@ class ProcessChannel(object): ...@@ -459,7 +462,7 @@ class ProcessChannel(object):
while self._stop.value == 0 and self._consumer_cursors[ while self._stop.value == 0 and self._consumer_cursors[
op_name] - self._base_cursor.value >= len(self._output_buf): op_name] - self._base_cursor.value >= len(self._output_buf):
try: try:
channeldata = self._que.get(timeout=0) channeldata = self._que.get(timeout=0)[1]
self._output_buf.append(channeldata) self._output_buf.append(channeldata)
list_values = list(channeldata.values()) list_values = list(channeldata.values())
_LOGGER.debug( _LOGGER.debug(
...@@ -633,7 +636,10 @@ class ThreadChannel(Queue.PriorityQueue): ...@@ -633,7 +636,10 @@ class ThreadChannel(Queue.PriorityQueue):
with self._cv: with self._cv:
while self._stop is False: while self._stop is False:
try: try:
self.put({op_name: channeldata}, timeout=0) self.put((channeldata.id, {
op_name: channeldata
}),
timeout=0)
break break
except Queue.Full: except Queue.Full:
self._cv.wait() self._cv.wait()
...@@ -680,7 +686,7 @@ class ThreadChannel(Queue.PriorityQueue): ...@@ -680,7 +686,7 @@ class ThreadChannel(Queue.PriorityQueue):
else: else:
while self._stop is False: while self._stop is False:
try: try:
self.put(put_data, timeout=0) self.put((data_id, put_data), timeout=0)
break break
except Queue.Empty: except Queue.Empty:
self._cv.wait() self._cv.wait()
...@@ -716,7 +722,7 @@ class ThreadChannel(Queue.PriorityQueue): ...@@ -716,7 +722,7 @@ class ThreadChannel(Queue.PriorityQueue):
with self._cv: with self._cv:
while self._stop is False and resp is None: while self._stop is False and resp is None:
try: try:
resp = self.get(timeout=0) resp = self.get(timeout=0)[1]
break break
except Queue.Empty: except Queue.Empty:
if timeout is not None: if timeout is not None:
......
...@@ -120,7 +120,7 @@ class PerformanceTracer(object): ...@@ -120,7 +120,7 @@ class PerformanceTracer(object):
tot_cost)) tot_cost))
if "DAG" in op_cost: if "DAG" in op_cost:
calls = op_cost["DAG"].values() calls = list(op_cost["DAG"].values())
calls.sort() calls.sort()
tot = len(calls) tot = len(calls)
qps = 1.0 * tot / self._interval_s qps = 1.0 * tot / self._interval_s
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册