提交 07ce3428 编写于 作者: B barrierye

support build process version op in each request

上级 2a267ac5
port: 18080 port: 18080
worker_num: 1 worker_num: 1
build_dag_each_request: true build_dag_each_request: false
dag: dag:
is_thread_op: false is_thread_op: true
client_type: brpc client_type: brpc
retry: 1 retry: 1
use_profile: false use_profile: false
...@@ -197,7 +197,7 @@ class ProcessChannel(object): ...@@ -197,7 +197,7 @@ class ProcessChannel(object):
self._maxsize = maxsize self._maxsize = maxsize
self._timeout = timeout self._timeout = timeout
self.name = name self.name = name
self._stop = False self._stop = manager.Value('i', 0)
self._cv = multiprocessing.Condition() self._cv = multiprocessing.Condition()
...@@ -253,13 +253,13 @@ class ProcessChannel(object): ...@@ -253,13 +253,13 @@ class ProcessChannel(object):
)) ))
elif len(self._producers) == 1: elif len(self._producers) == 1:
with self._cv: with self._cv:
while self._stop is False: while self._stop.value == 0:
try: try:
self._que.put({op_name: channeldata}, timeout=0) self._que.put({op_name: channeldata}, timeout=0)
break break
except Queue.Full: except Queue.Full:
self._cv.wait() self._cv.wait()
if self._stop: if self._stop.value == 1:
raise ChannelStopError() raise ChannelStopError()
_LOGGER.debug( _LOGGER.debug(
self._log("{} channel size: {}".format(op_name, self._log("{} channel size: {}".format(op_name,
...@@ -302,7 +302,7 @@ class ProcessChannel(object): ...@@ -302,7 +302,7 @@ class ProcessChannel(object):
self._log("{} push data succ, but not push to queue.". self._log("{} push data succ, but not push to queue.".
format(op_name))) format(op_name)))
else: else:
while self._stop is False: while self._stop.value == 0:
try: try:
_LOGGER.debug( _LOGGER.debug(
self._log("{} push data succ: {}".format( self._log("{} push data succ: {}".format(
...@@ -311,7 +311,7 @@ class ProcessChannel(object): ...@@ -311,7 +311,7 @@ class ProcessChannel(object):
break break
except Queue.Empty: except Queue.Empty:
self._cv.wait() self._cv.wait()
if self._stop: if self._stop.value == 1:
raise ChannelStopError() raise ChannelStopError()
_LOGGER.debug( _LOGGER.debug(
...@@ -329,7 +329,7 @@ class ProcessChannel(object): ...@@ -329,7 +329,7 @@ class ProcessChannel(object):
elif len(self._consumer_cursors) == 1: elif len(self._consumer_cursors) == 1:
resp = None resp = None
with self._cv: with self._cv:
while self._stop is False and resp is None: while self._stop.value == 0 and resp is None:
try: try:
_LOGGER.debug( _LOGGER.debug(
self._log("{} try to get(with channel empty: {})". self._log("{} try to get(with channel empty: {})".
...@@ -342,7 +342,7 @@ class ProcessChannel(object): ...@@ -342,7 +342,7 @@ class ProcessChannel(object):
"{} wait for empty queue(with channel empty: {})". "{} wait for empty queue(with channel empty: {})".
format(op_name, self._que.empty()))) format(op_name, self._que.empty())))
self._cv.wait() self._cv.wait()
if self._stop: if self._stop.value == 1:
raise ChannelStopError() raise ChannelStopError()
_LOGGER.debug( _LOGGER.debug(
self._log("{} get data succ: {}".format(op_name, resp.__str__( self._log("{} get data succ: {}".format(op_name, resp.__str__(
...@@ -366,7 +366,7 @@ class ProcessChannel(object): ...@@ -366,7 +366,7 @@ class ProcessChannel(object):
with self._cv: with self._cv:
# When the data required by the current Op is not in output_buf, # When the data required by the current Op is not in output_buf,
# it is necessary to obtain a data from queue and add it to output_buf. # it is necessary to obtain a data from queue and add it to output_buf.
while self._stop is False and self._consumer_cursors[ while self._stop.value == 0 and self._consumer_cursors[
op_name] - self._base_cursor.value >= len(self._output_buf): op_name] - self._base_cursor.value >= len(self._output_buf):
_LOGGER.debug( _LOGGER.debug(
self._log( self._log(
...@@ -386,7 +386,7 @@ class ProcessChannel(object): ...@@ -386,7 +386,7 @@ class ProcessChannel(object):
"{} wait for empty queue(with channel size: {})". "{} wait for empty queue(with channel size: {})".
format(op_name, self._que.qsize()))) format(op_name, self._que.qsize())))
self._cv.wait() self._cv.wait()
if self._stop: if self._stop.value == 1:
raise ChannelStopError() raise ChannelStopError()
consumer_cursor = self._consumer_cursors[op_name] consumer_cursor = self._consumer_cursors[op_name]
...@@ -435,7 +435,7 @@ class ProcessChannel(object): ...@@ -435,7 +435,7 @@ class ProcessChannel(object):
def stop(self): def stop(self):
_LOGGER.info(self._log("stop.")) _LOGGER.info(self._log("stop."))
self._stop = True self._stop.value = 1
with self._cv: with self._cv:
self._cv.notify_all() self._cv.notify_all()
......
...@@ -470,3 +470,6 @@ class DAG(object): ...@@ -470,3 +470,6 @@ class DAG(object):
def stop(self): def stop(self):
for chl in self._channels: for chl in self._channels:
chl.stop() chl.stop()
for op in self._actual_ops:
op.clean_input_channel()
op.clean_output_channels()
...@@ -117,7 +117,7 @@ class Op(object): ...@@ -117,7 +117,7 @@ class Op(object):
channel.add_consumer(self.name) channel.add_consumer(self.name)
self._input = channel self._input = channel
def _clean_input_channel(self): def clean_input_channel(self):
self._input = None self._input = None
def _get_input_channel(self): def _get_input_channel(self):
...@@ -131,7 +131,7 @@ class Op(object): ...@@ -131,7 +131,7 @@ class Op(object):
channel.add_producer(self.name) channel.add_producer(self.name)
self._outputs.append(channel) self._outputs.append(channel)
def _clean_output_channels(self): def clean_output_channels(self):
self._outputs = [] self._outputs = []
def _get_output_channels(self): def _get_output_channels(self):
...@@ -316,7 +316,7 @@ class Op(object): ...@@ -316,7 +316,7 @@ class Op(object):
return output_data, error_channeldata return output_data, error_channeldata
def _run(self, concurrency_idx, input_channel, output_channels, client_type, def _run(self, concurrency_idx, input_channel, output_channels, client_type,
use_multithread): is_thread_op):
def get_log_func(op_info_prefix): def get_log_func(op_info_prefix):
def log_func(info_str): def log_func(info_str):
return "{} {}".format(op_info_prefix, info_str) return "{} {}".format(op_info_prefix, info_str)
...@@ -330,7 +330,7 @@ class Op(object): ...@@ -330,7 +330,7 @@ class Op(object):
# init op # init op
self.concurrency_idx = concurrency_idx self.concurrency_idx = concurrency_idx
try: try:
if use_multithread: if is_thread_op:
with self._for_init_op_lock: with self._for_init_op_lock:
if not self._succ_init_op: if not self._succ_init_op:
# init profiler # init profiler
...@@ -364,14 +364,13 @@ class Op(object): ...@@ -364,14 +364,13 @@ class Op(object):
channeldata_dict = input_channel.front(self.name) channeldata_dict = input_channel.front(self.name)
except ChannelStopError: except ChannelStopError:
_LOGGER.info(log("stop.")) _LOGGER.info(log("stop."))
with self._for_close_op_lock: if is_thread_op:
if not self._succ_close_op: with self._for_close_op_lock:
self._clean_input_channel() if not self._succ_close_op:
self._clean_output_channels() self._profiler = None
self._profiler = None self.client = None
self.client = None self._succ_init_op = False
self._succ_init_op = False self._succ_close_op = True
self._succ_close_op = True
break break
#self._profiler_record("get#{}_1".format(op_info_prefix)) #self._profiler_record("get#{}_1".format(op_info_prefix))
_LOGGER.debug(log("input_data: {}".format(channeldata_dict))) _LOGGER.debug(log("input_data: {}".format(channeldata_dict)))
...@@ -547,7 +546,7 @@ class VirtualOp(Op): ...@@ -547,7 +546,7 @@ class VirtualOp(Op):
self._outputs.append(channel) self._outputs.append(channel)
def _run(self, concurrency_idx, input_channel, output_channels, client_type, def _run(self, concurrency_idx, input_channel, output_channels, client_type,
use_multithread): is_thread_op):
def get_log_func(op_info_prefix): def get_log_func(op_info_prefix):
def log_func(info_str): def log_func(info_str):
return "{} {}".format(op_info_prefix, info_str) return "{} {}".format(op_info_prefix, info_str)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册