提交 d0a08407 编写于 作者: B barriery 提交者: GitHub

Merge pull request #19 from barrierye/pipeling-log

Pipeling log
...@@ -4,12 +4,26 @@ ...@@ -4,12 +4,26 @@
## Compilation environment requirements ## Compilation environment requirements
- OS: CentOS 7 | module | version |
- GCC: 4.8.2 and later | :--------------------------: | :----------------------------------------------------------: |
- Golang: 1.9.2 and later | OS | CentOS 7 |
- Git:2.17.1 and later | gcc | 4.8.5 and later |
- CMake:3.2.2 and later | gcc-c++ | 4.8.5 and later |
- Python:2.7.2 and later / 3.6 and later | git | 3.82 and later |
| cmake | 3.2.0 and later |
| Python | 2.7.2 and later / 3.6 and later |
| Go | 1.9.2 and later |
| git | 2.17.1 and later |
| glibc-static | 2.17 |
| openssl-devel | 1.0.2k |
| bzip2-devel | 1.0.6 and later |
| python-devel / python3-devel | 2.7.5 and later / 3.6.8 and later |
| sqlite-devel | 3.7.17 and later |
| patchelf | 0.9 and later |
| libXext | 1.3.3 |
| libSM | 1.2.2 |
| libXrender | 0.9.10 |
| python-whl | numpy>=1.12, <=1.16.4<br/>google>=2.0.3<br/>protobuf>=3.12.2<br/>grpcio-tools>=1.28.1<br/>grpcio>=1.28.1<br/>func-timeout>=4.3.5<br/>pyyaml>=1.3.0<br/>sentencepiece==0.1.92<br>flask>=1.1.2<br>ujson>=2.0.3 |
It is recommended to use Docker for compilation. We have prepared the Paddle Serving compilation environment for you, see [this document](DOCKER_IMAGES.md). It is recommended to use Docker for compilation. We have prepared the Paddle Serving compilation environment for you, see [this document](DOCKER_IMAGES.md).
......
...@@ -4,12 +4,26 @@ ...@@ -4,12 +4,26 @@
## 编译环境设置 ## 编译环境设置
- OS: CentOS 7 | 组件 | 版本要求 |
- GCC: 4.8.2及以上 | :--------------------------: | :----------------------------------------------------------: |
- Golang: 1.9.2及以上 | OS | CentOS 7 |
- Git:2.17.1及以上 | gcc | 4.8.5 and later |
- CMake:3.2.2及以上 | gcc-c++ | 4.8.5 and later |
- Python:2.7.2及以上 / 3.6及以上 | git | 3.82 and later |
| cmake | 3.2.0 and later |
| Python | 2.7.2 and later / 3.6 and later |
| Go | 1.9.2 and later |
| git | 2.17.1 and later |
| glibc-static | 2.17 |
| openssl-devel | 1.0.2k |
| bzip2-devel | 1.0.6 and later |
| python-devel / python3-devel | 2.7.5 and later / 3.6.8 and later |
| sqlite-devel | 3.7.17 and later |
| patchelf | 0.9 |
| libXext | 1.3.3 |
| libSM | 1.2.2 |
| libXrender | 0.9.10 |
| python-whl | numpy>=1.12, <=1.16.4<br/>google>=2.0.3<br/>protobuf>=3.12.2<br/>grpcio-tools>=1.28.1<br/>grpcio>=1.28.1<br/>func-timeout>=4.3.5<br/>pyyaml>=1.3.0<br/>sentencepiece==0.1.92<br/>flask>=1.1.2<br/>ujson>=2.0.3 |
推荐使用Docker编译,我们已经为您准备好了Paddle Serving编译环境,详见[该文档](DOCKER_IMAGES_CN.md) 推荐使用Docker编译,我们已经为您准备好了Paddle Serving编译环境,详见[该文档](DOCKER_IMAGES_CN.md)
......
...@@ -11,7 +11,11 @@ ...@@ -11,7 +11,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logger # this module must be the first to import
from operator import Op, RequestOp, ResponseOp
from pipeline_server import PipelineServer
from pipeline_client import PipelineClient
from analyse import Analyst
from operator import Op, RequestOp, ResponseOp from operator import Op, RequestOp, ResponseOp
from pipeline_server import PipelineServer from pipeline_server import PipelineServer
from pipeline_client import PipelineClient from pipeline_client import PipelineClient
......
...@@ -164,7 +164,7 @@ class OpAnalyst(object): ...@@ -164,7 +164,7 @@ class OpAnalyst(object):
def add(self, name_str, ts_list): def add(self, name_str, ts_list):
if self._close: if self._close:
_LOGGER.error("OpAnalyst is closed.") _LOGGER.error("Failed to add item: OpAnalyst is closed.")
return return
op_name, curr_idx, step = self._parse(name_str) op_name, curr_idx, step = self._parse(name_str)
if op_name not in self.op_time_list_dict: if op_name not in self.op_time_list_dict:
......
...@@ -70,7 +70,8 @@ class ChannelData(object): ...@@ -70,7 +70,8 @@ class ChannelData(object):
''' '''
if ecode is not None: if ecode is not None:
if data_id is None or error_info is None: if data_id is None or error_info is None:
_LOGGER.critical("data_id and error_info cannot be None") _LOGGER.critical("Failed to generate ChannelData: data_id"
" and error_info cannot be None")
os._exit(-1) os._exit(-1)
datatype = ChannelDataType.ERROR.value datatype = ChannelDataType.ERROR.value
else: else:
...@@ -78,14 +79,15 @@ class ChannelData(object): ...@@ -78,14 +79,15 @@ class ChannelData(object):
ecode, error_info = ChannelData.check_npdata(npdata) ecode, error_info = ChannelData.check_npdata(npdata)
if ecode != ChannelDataEcode.OK.value: if ecode != ChannelDataEcode.OK.value:
datatype = ChannelDataType.ERROR.value datatype = ChannelDataType.ERROR.value
_LOGGER.error(error_info) _LOGGER.error("(logid={}) {}".format(data_id, error_info))
elif datatype == ChannelDataType.DICT.value: elif datatype == ChannelDataType.DICT.value:
ecode, error_info = ChannelData.check_dictdata(dictdata) ecode, error_info = ChannelData.check_dictdata(dictdata)
if ecode != ChannelDataEcode.OK.value: if ecode != ChannelDataEcode.OK.value:
datatype = ChannelDataType.ERROR.value datatype = ChannelDataType.ERROR.value
_LOGGER.error(error_info) _LOGGER.error("(logid={}) {}".format(data_id, error_info))
else: else:
_LOGGER.critical("datatype not match") _LOGGER.critical("(logid={}) datatype not match".format(
data_id))
os._exit(-1) os._exit(-1)
self.datatype = datatype self.datatype = datatype
self.npdata = npdata self.npdata = npdata
...@@ -110,13 +112,13 @@ class ChannelData(object): ...@@ -110,13 +112,13 @@ class ChannelData(object):
for sample in dictdata: for sample in dictdata:
if not isinstance(sample, dict): if not isinstance(sample, dict):
ecode = ChannelDataEcode.TYPE_ERROR.value ecode = ChannelDataEcode.TYPE_ERROR.value
error_info = "the value of data must " \ error_info = "Failed to check data: the type of " \
"be dict, but get {}.".format(type(sample)) "data must be dict, but get {}.".format(type(sample))
break break
elif not isinstance(dictdata, dict): elif not isinstance(dictdata, dict):
# batch size = 1 # batch size = 1
ecode = ChannelDataEcode.TYPE_ERROR.value ecode = ChannelDataEcode.TYPE_ERROR.value
error_info = "the value of data must " \ error_info = "Failed to check data: the type of data must " \
"be dict, but get {}.".format(type(dictdata)) "be dict, but get {}.".format(type(dictdata))
return ecode, error_info return ecode, error_info
...@@ -139,27 +141,30 @@ class ChannelData(object): ...@@ -139,27 +141,30 @@ class ChannelData(object):
for sample in npdata: for sample in npdata:
if not isinstance(sample, dict): if not isinstance(sample, dict):
ecode = ChannelDataEcode.TYPE_ERROR.value ecode = ChannelDataEcode.TYPE_ERROR.value
error_info = "the value of data must " \ error_info = "Failed to check data: the " \
"be dict, but get {}.".format(type(sample)) "value of data must be dict, but get {}.".format(
type(sample))
break break
for _, value in sample.items(): for _, value in sample.items():
if not isinstance(value, np.ndarray): if not isinstance(value, np.ndarray):
ecode = ChannelDataEcode.TYPE_ERROR.value ecode = ChannelDataEcode.TYPE_ERROR.value
error_info = "the value of data must " \ error_info = "Failed to check data: the" \
"be np.ndarray, but get {}.".format(type(value)) " value of data must be np.ndarray, but get {}.".format(
type(value))
return ecode, error_info return ecode, error_info
elif isinstance(npdata, dict): elif isinstance(npdata, dict):
# batch_size = 1 # batch_size = 1
for _, value in npdata.items(): for _, value in npdata.items():
if not isinstance(value, np.ndarray): if not isinstance(value, np.ndarray):
ecode = ChannelDataEcode.TYPE_ERROR.value ecode = ChannelDataEcode.TYPE_ERROR.value
error_info = "the value of data must " \ error_info = "Failed to check data: the value " \
"be np.ndarray, but get {}.".format(type(value)) "of data must be np.ndarray, but get {}.".format(
type(value))
break break
else: else:
ecode = ChannelDataEcode.TYPE_ERROR.value ecode = ChannelDataEcode.TYPE_ERROR.value
error_info = "the value of data must " \ error_info = "Failed to check data: the value of data " \
"be dict, but get {}.".format(type(npdata)) "must be dict, but get {}.".format(type(npdata))
return ecode, error_info return ecode, error_info
def parse(self): def parse(self):
...@@ -171,8 +176,8 @@ class ChannelData(object): ...@@ -171,8 +176,8 @@ class ChannelData(object):
# return dict # return dict
feed = self.dictdata feed = self.dictdata
else: else:
_LOGGER.critical("Error type({}) in datatype.".format( _LOGGER.critical("Failed to parse channeldata: error " \
self.datatype)) "type({}) in datatype.".format(self.datatype))
os._exit(-1) os._exit(-1)
return feed return feed
...@@ -247,33 +252,36 @@ class ProcessChannel(object): ...@@ -247,33 +252,36 @@ class ProcessChannel(object):
""" not thread safe, and can only be called during initialization. """ """ not thread safe, and can only be called during initialization. """
if op_name in self._producers: if op_name in self._producers:
_LOGGER.critical( _LOGGER.critical(
self._log("producer({}) is already in channel".format(op_name))) self._log("Failed to add producer: producer({})" \
" is already in channel".format(op_name)))
os._exit(-1) os._exit(-1)
self._producers.append(op_name) self._producers.append(op_name)
_LOGGER.debug(self._log("add a producer: {}".format(op_name))) _LOGGER.debug(self._log("Succ add a producer: {}".format(op_name)))
def add_consumer(self, op_name): def add_consumer(self, op_name):
""" not thread safe, and can only be called during initialization. """ """ not thread safe, and can only be called during initialization. """
if op_name in self._consumer_cursors: if op_name in self._consumer_cursors:
_LOGGER.critical( _LOGGER.critical(
self._log("consumer({}) is already in channel".format(op_name))) self._log("Failed to add consumer: consumer({})" \
" is already in channel".format(op_name)))
os._exit(-1) os._exit(-1)
self._consumer_cursors[op_name] = 0 self._consumer_cursors[op_name] = 0
if self._cursor_count.get(0) is None: if self._cursor_count.get(0) is None:
self._cursor_count[0] = 0 self._cursor_count[0] = 0
self._cursor_count[0] += 1 self._cursor_count[0] += 1
_LOGGER.debug(self._log("add a consumer: {}".format(op_name))) _LOGGER.debug(self._log("Succ add a consumer: {}".format(op_name)))
def push(self, channeldata, op_name=None): def push(self, channeldata, op_name=None):
_LOGGER.debug( _LOGGER.debug(
self._log("{} try to push data[{}]".format(op_name, self._log("(logid={}) Op({}) Pushing data".format(channeldata.id,
channeldata.id))) op_name)))
if len(self._producers) == 0: if len(self._producers) == 0:
_LOGGER.critical( _LOGGER.critical(
self._log( self._log(
"expected number of producers to be greater than 0, but the it is 0." "(logid={}) Op({}) Failed to push data: expected number"
)) " of producers to be greater than 0, but the it is 0.".
format(channeldata.id, op_name)))
os._exit(-1) os._exit(-1)
elif len(self._producers) == 1: elif len(self._producers) == 1:
with self._cv: with self._cv:
...@@ -287,13 +295,15 @@ class ProcessChannel(object): ...@@ -287,13 +295,15 @@ class ProcessChannel(object):
raise ChannelStopError() raise ChannelStopError()
self._cv.notify_all() self._cv.notify_all()
_LOGGER.debug( _LOGGER.debug(
self._log("{} succ push data[{}] into internal queue.".format( self._log("(logid={}) Op({}) Pushed data into internal queue.".
op_name, channeldata.id))) format(channeldata.id, op_name)))
return True return True
elif op_name is None: elif op_name is None:
_LOGGER.critical( _LOGGER.critical(
self._log( self._log(
"There are multiple producers, so op_name cannot be None.")) "(logid={}) Op({}) Failed to push data: there are multiple "
"producers, so op_name cannot be None.".format(
channeldata.id, op_name)))
os._exit(-1) os._exit(-1)
producer_num = len(self._producers) producer_num = len(self._producers)
...@@ -321,8 +331,9 @@ class ProcessChannel(object): ...@@ -321,8 +331,9 @@ class ProcessChannel(object):
if put_data is None: if put_data is None:
_LOGGER.debug( _LOGGER.debug(
self._log("{} succ push data[{}] into input_buffer.".format( self._log(
op_name, data_id))) "(logid={}) Op({}) Pushed data into input_buffer.".
format(data_id, op_name)))
else: else:
while self._stop.value == 0: while self._stop.value == 0:
try: try:
...@@ -334,14 +345,15 @@ class ProcessChannel(object): ...@@ -334,14 +345,15 @@ class ProcessChannel(object):
raise ChannelStopError() raise ChannelStopError()
_LOGGER.debug( _LOGGER.debug(
self._log("{} succ push data[{}] into internal queue.". self._log(
format(op_name, data_id))) "(logid={}) Op({}) Pushed data into internal_queue.".
format(data_id, op_name)))
self._cv.notify_all() self._cv.notify_all()
return True return True
def front(self, op_name=None, timeout=None): def front(self, op_name=None, timeout=None):
_LOGGER.debug( _LOGGER.debug(
self._log("{} try to get data[?]; timeout(s)={}".format(op_name, self._log("Op({}) Getting data[?]; timeout(s)={}".format(op_name,
timeout))) timeout)))
endtime = None endtime = None
if timeout is not None: if timeout is not None:
...@@ -353,8 +365,8 @@ class ProcessChannel(object): ...@@ -353,8 +365,8 @@ class ProcessChannel(object):
if len(self._consumer_cursors) == 0: if len(self._consumer_cursors) == 0:
_LOGGER.critical( _LOGGER.critical(
self._log( self._log(
"expected number of consumers to be greater than 0, but the it is 0." "Op({}) Failed to get data: expected number of consumers to be " \
)) "greater than 0, but the it is 0.".format(op_name)))
os._exit(-1) os._exit(-1)
elif len(self._consumer_cursors) == 1: elif len(self._consumer_cursors) == 1:
resp = None resp = None
...@@ -368,8 +380,8 @@ class ProcessChannel(object): ...@@ -368,8 +380,8 @@ class ProcessChannel(object):
remaining = endtime - _time() remaining = endtime - _time()
if remaining <= 0.0: if remaining <= 0.0:
_LOGGER.debug( _LOGGER.debug(
self._log("{} get data[?] timeout".format( self._log("Op({}) Failed to get data: "
op_name))) "timeout".format(op_name)))
raise ChannelTimeoutError() raise ChannelTimeoutError()
self._cv.wait(remaining) self._cv.wait(remaining)
else: else:
...@@ -377,13 +389,14 @@ class ProcessChannel(object): ...@@ -377,13 +389,14 @@ class ProcessChannel(object):
if self._stop.value == 1: if self._stop.value == 1:
raise ChannelStopError() raise ChannelStopError()
_LOGGER.debug( _LOGGER.debug(
self._log("{} succ get data[{}]".format(op_name, self._log("(logid={}) Op({}) Got data".format(resp.values()[0]
resp.values()[0].id))) .id, op_name)))
return resp return resp
elif op_name is None: elif op_name is None:
_LOGGER.critical( _LOGGER.critical(
self._log( self._log(
"There are multiple consumers, so op_name cannot be None.")) "Op({}) Failed to get data: there are multiple consumers, "
"so op_name cannot be None.".format(op_name)))
os._exit(-1) os._exit(-1)
# In output_buf, different Ops (according to op_name) have different # In output_buf, different Ops (according to op_name) have different
...@@ -405,16 +418,17 @@ class ProcessChannel(object): ...@@ -405,16 +418,17 @@ class ProcessChannel(object):
channeldata = self._que.get(timeout=0) channeldata = self._que.get(timeout=0)
self._output_buf.append(channeldata) self._output_buf.append(channeldata)
_LOGGER.debug( _LOGGER.debug(
self._log("pop ready item[{}] into output_buffer". self._log(
format(channeldata.values()[0].id))) "(logid={}) Op({}) Pop ready item into output_buffer".
format(channeldata.values()[0].id, op_name)))
break break
except Queue.Empty: except Queue.Empty:
if timeout is not None: if timeout is not None:
remaining = endtime - _time() remaining = endtime - _time()
if remaining <= 0.0: if remaining <= 0.0:
_LOGGER.debug( _LOGGER.debug(
self._log("{} get data[?] timeout".format( self._log("Op({}) Failed to get data: timeout".
op_name))) format(op_name)))
raise ChannelTimeoutError() raise ChannelTimeoutError()
self._cv.wait(remaining) self._cv.wait(remaining)
else: else:
...@@ -437,7 +451,7 @@ class ProcessChannel(object): ...@@ -437,7 +451,7 @@ class ProcessChannel(object):
self._base_cursor.value += 1 self._base_cursor.value += 1
# to avoid cursor overflow # to avoid cursor overflow
if self._base_cursor.value >= self._reset_max_cursor: if self._base_cursor.value >= self._reset_max_cursor:
_LOGGER.info(self._log("reset cursor in Channel")) _LOGGER.info(self._log("Reset cursor in Channel"))
self._base_cursor.value -= self._reset_max_cursor self._base_cursor.value -= self._reset_max_cursor
for name in self._consumer_cursors.keys(): for name in self._consumer_cursors.keys():
self._consumer_cursors[name] -= self._reset_max_cursor self._consumer_cursors[name] -= self._reset_max_cursor
...@@ -458,8 +472,8 @@ class ProcessChannel(object): ...@@ -458,8 +472,8 @@ class ProcessChannel(object):
self._cv.notify_all() self._cv.notify_all()
_LOGGER.debug( _LOGGER.debug(
self._log("{} succ get data[{}] from output_buffer".format( self._log("(logid={}) Op({}) Got data from output_buffer".format(
op_name, resp.values()[0].id))) resp.values()[0].id, op_name)))
return resp return resp
def stop(self): def stop(self):
...@@ -529,33 +543,36 @@ class ThreadChannel(Queue.Queue): ...@@ -529,33 +543,36 @@ class ThreadChannel(Queue.Queue):
""" not thread safe, and can only be called during initialization. """ """ not thread safe, and can only be called during initialization. """
if op_name in self._producers: if op_name in self._producers:
_LOGGER.critical( _LOGGER.critical(
self._log("producer({}) is already in channel".format(op_name))) self._log("Failed to add producer: producer({}) is "
"already in channel".format(op_name)))
os._exit(-1) os._exit(-1)
self._producers.append(op_name) self._producers.append(op_name)
_LOGGER.debug(self._log("add a producer: {}".format(op_name))) _LOGGER.debug(self._log("Succ add a producer: {}".format(op_name)))
def add_consumer(self, op_name): def add_consumer(self, op_name):
""" not thread safe, and can only be called during initialization. """ """ not thread safe, and can only be called during initialization. """
if op_name in self._consumer_cursors: if op_name in self._consumer_cursors:
_LOGGER.critical( _LOGGER.critical(
self._log("consumer({}) is already in channel".format(op_name))) self._log("Failed to add consumer: consumer({}) is "
"already in channel".format(op_name)))
os._exit(-1) os._exit(-1)
self._consumer_cursors[op_name] = 0 self._consumer_cursors[op_name] = 0
if self._cursor_count.get(0) is None: if self._cursor_count.get(0) is None:
self._cursor_count[0] = 0 self._cursor_count[0] = 0
self._cursor_count[0] += 1 self._cursor_count[0] += 1
_LOGGER.debug(self._log("add a consumer: {}".format(op_name))) _LOGGER.debug(self._log("Succ add a consumer: {}".format(op_name)))
def push(self, channeldata, op_name=None): def push(self, channeldata, op_name=None):
_LOGGER.debug( _LOGGER.debug(
self._log("{} try to push data[{}]".format(op_name, self._log("(logid={}) Op({}) Pushing data".format(channeldata.id,
channeldata.id))) op_name)))
if len(self._producers) == 0: if len(self._producers) == 0:
_LOGGER.critical( _LOGGER.critical(
self._log( self._log(
"expected number of producers to be greater than 0, but the it is 0." "(logid={}) Op({}) Failed to push data: expected number of "
)) "producers to be greater than 0, but the it is 0.".format(
channeldata.id, op_name)))
os._exit(-1) os._exit(-1)
elif len(self._producers) == 1: elif len(self._producers) == 1:
with self._cv: with self._cv:
...@@ -569,13 +586,15 @@ class ThreadChannel(Queue.Queue): ...@@ -569,13 +586,15 @@ class ThreadChannel(Queue.Queue):
raise ChannelStopError() raise ChannelStopError()
self._cv.notify_all() self._cv.notify_all()
_LOGGER.debug( _LOGGER.debug(
self._log("{} succ push data[{}] into internal queue.".format( self._log("(logid={}) Op({}) Pushed data into internal_queue.".
op_name, channeldata.id))) format(channeldata.id, op_name)))
return True return True
elif op_name is None: elif op_name is None:
_LOGGER.critical( _LOGGER.critical(
self._log( self._log(
"There are multiple producers, so op_name cannot be None.")) "(logid={}) Op({}) Failed to push data: there are multiple"
" producers, so op_name cannot be None.".format(
channeldata.id, op_name)))
os._exit(-1) os._exit(-1)
producer_num = len(self._producers) producer_num = len(self._producers)
...@@ -598,8 +617,9 @@ class ThreadChannel(Queue.Queue): ...@@ -598,8 +617,9 @@ class ThreadChannel(Queue.Queue):
if put_data is None: if put_data is None:
_LOGGER.debug( _LOGGER.debug(
self._log("{} succ push data[{}] into input_buffer.".format( self._log(
op_name, data_id))) "(logid={}) Op({}) Pushed data into input_buffer.".
format(data_id, op_name)))
else: else:
while self._stop is False: while self._stop is False:
try: try:
...@@ -611,14 +631,15 @@ class ThreadChannel(Queue.Queue): ...@@ -611,14 +631,15 @@ class ThreadChannel(Queue.Queue):
raise ChannelStopError() raise ChannelStopError()
_LOGGER.debug( _LOGGER.debug(
self._log("{} succ push data[{}] into internal queue.". self._log(
format(op_name, data_id))) "(logid={}) Op({}) Pushed data into internal_queue.".
format(data_id, op_name)))
self._cv.notify_all() self._cv.notify_all()
return True return True
def front(self, op_name=None, timeout=None): def front(self, op_name=None, timeout=None):
_LOGGER.debug( _LOGGER.debug(
self._log("{} try to get data[?]; timeout(s)={}".format(op_name, self._log("Op({}) Getting data[?]; timeout(s)={}".format(op_name,
timeout))) timeout)))
endtime = None endtime = None
if timeout is not None: if timeout is not None:
...@@ -630,8 +651,8 @@ class ThreadChannel(Queue.Queue): ...@@ -630,8 +651,8 @@ class ThreadChannel(Queue.Queue):
if len(self._consumer_cursors) == 0: if len(self._consumer_cursors) == 0:
_LOGGER.critical( _LOGGER.critical(
self._log( self._log(
"expected number of consumers to be greater than 0, but the it is 0." "Op({}) Failed to get data: expected number of consumers to be "
)) "greater than 0, but the it is 0.".format(op_name)))
os._exit(-1) os._exit(-1)
elif len(self._consumer_cursors) == 1: elif len(self._consumer_cursors) == 1:
resp = None resp = None
...@@ -645,8 +666,9 @@ class ThreadChannel(Queue.Queue): ...@@ -645,8 +666,9 @@ class ThreadChannel(Queue.Queue):
remaining = endtime - _time() remaining = endtime - _time()
if remaining <= 0.0: if remaining <= 0.0:
_LOGGER.debug( _LOGGER.debug(
self._log("{} get data[?] timeout".format( self._log(
op_name))) "Op({}) Failed to get data: timeout".
format(op_name)))
raise ChannelTimeoutError() raise ChannelTimeoutError()
self._cv.wait(remaining) self._cv.wait(remaining)
else: else:
...@@ -654,13 +676,14 @@ class ThreadChannel(Queue.Queue): ...@@ -654,13 +676,14 @@ class ThreadChannel(Queue.Queue):
if self._stop: if self._stop:
raise ChannelStopError() raise ChannelStopError()
_LOGGER.debug( _LOGGER.debug(
self._log("{} succ get data[{}]".format(op_name, self._log("(logid={}) Op({}) Got data".format(resp.values()[0]
resp.values()[0].id))) .id, op_name)))
return resp return resp
elif op_name is None: elif op_name is None:
_LOGGER.critical( _LOGGER.critical(
self._log( self._log("Op({}) Failed to get data: there are multiple "
"There are multiple consumers, so op_name cannot be None.")) "consumers, so op_name cannot be None.".format(
op_name)))
os._exit(-1) os._exit(-1)
# In output_buf, different Ops (according to op_name) have different # In output_buf, different Ops (according to op_name) have different
...@@ -682,16 +705,17 @@ class ThreadChannel(Queue.Queue): ...@@ -682,16 +705,17 @@ class ThreadChannel(Queue.Queue):
channeldata = self.get(timeout=0) channeldata = self.get(timeout=0)
self._output_buf.append(channeldata) self._output_buf.append(channeldata)
_LOGGER.debug( _LOGGER.debug(
self._log("pop ready item[{}] into output_buffer". self._log(
format(channeldata.values()[0].id))) "(logid={}) Op({}) Pop ready item into output_buffer".
format(channeldata.values()[0].id, op_name)))
break break
except Queue.Empty: except Queue.Empty:
if timeout is not None: if timeout is not None:
remaining = endtime - _time() remaining = endtime - _time()
if remaining <= 0.0: if remaining <= 0.0:
_LOGGER.debug( _LOGGER.debug(
self._log("{} get data[?] timeout".format( self._log("Op({}) Failed to get data: timeout".
op_name))) format(op_name)))
raise ChannelTimeoutError() raise ChannelTimeoutError()
self._cv.wait(remaining) self._cv.wait(remaining)
else: else:
...@@ -715,7 +739,7 @@ class ThreadChannel(Queue.Queue): ...@@ -715,7 +739,7 @@ class ThreadChannel(Queue.Queue):
self._base_cursor += 1 self._base_cursor += 1
# to avoid cursor overflow # to avoid cursor overflow
if self._base_cursor >= self._reset_max_cursor: if self._base_cursor >= self._reset_max_cursor:
_LOGGER.info(self._log("reset cursor in Channel")) _LOGGER.info(self._log("Reset cursor in Channel"))
self._base_cursor -= self._reset_max_cursor self._base_cursor -= self._reset_max_cursor
for name in self._consumer_cursors: for name in self._consumer_cursors:
self._consumer_cursors[name] -= self._reset_max_cursor self._consumer_cursors[name] -= self._reset_max_cursor
...@@ -735,8 +759,8 @@ class ThreadChannel(Queue.Queue): ...@@ -735,8 +759,8 @@ class ThreadChannel(Queue.Queue):
self._cv.notify_all() self._cv.notify_all()
_LOGGER.debug( _LOGGER.debug(
self._log("{} succ get data[{}] from output_buffer".format( self._log("(logid={}) Op({}) Got data from output_buffer".format(
op_name, resp.values()[0].id))) resp.values()[0].id, op_name)))
return resp return resp
def stop(self): def stop(self):
......
...@@ -30,6 +30,7 @@ from .channel import (ThreadChannel, ProcessChannel, ChannelData, ...@@ -30,6 +30,7 @@ from .channel import (ThreadChannel, ProcessChannel, ChannelData,
ChannelDataEcode, ChannelDataType, ChannelStopError) ChannelDataEcode, ChannelDataType, ChannelStopError)
from .profiler import TimeProfiler from .profiler import TimeProfiler
from .util import NameGenerator from .util import NameGenerator
from .proto import pipeline_service_pb2
_LOGGER = logging.getLogger() _LOGGER = logging.getLogger()
...@@ -74,17 +75,18 @@ class DAGExecutor(object): ...@@ -74,17 +75,18 @@ class DAGExecutor(object):
self._recive_func = threading.Thread( self._recive_func = threading.Thread(
target=DAGExecutor._recive_out_channel_func, args=(self, )) target=DAGExecutor._recive_out_channel_func, args=(self, ))
self._recive_func.start() self._recive_func.start()
_LOGGER.debug("[DAG Executor] start recive thread") _LOGGER.debug("[DAG Executor] Start recive thread")
def stop(self): def stop(self):
self._dag.stop() self._dag.stop()
self._dag.join() self._dag.join()
_LOGGER.info("[DAG Executor] succ stop") _LOGGER.info("[DAG Executor] Stop")
def _get_next_data_id(self): def _get_next_data_id(self):
data_id = None data_id = None
with self._id_lock: with self._id_lock:
if self._id_counter >= self._reset_max_id: if self._id_counter >= self._reset_max_id:
_LOGGER.info("[DAG Executor] Reset request id")
self._id_counter -= self._reset_max_id self._id_counter -= self._reset_max_id
data_id = self._id_counter data_id = self._id_counter
self._id_counter += 1 self._id_counter += 1
...@@ -96,16 +98,18 @@ class DAGExecutor(object): ...@@ -96,16 +98,18 @@ class DAGExecutor(object):
def _set_in_channel(self, in_channel): def _set_in_channel(self, in_channel):
if not isinstance(in_channel, (ThreadChannel, ProcessChannel)): if not isinstance(in_channel, (ThreadChannel, ProcessChannel)):
_LOGGER.critical("[DAG Executor] in_channel must be Channel" _LOGGER.critical("[DAG Executor] Failed to set in_channel: "
" type, but get {}".format(type(in_channel))) "in_channel must be Channel type, but get {}".
format(type(in_channel)))
os._exit(-1) os._exit(-1)
in_channel.add_producer(self.name) in_channel.add_producer(self.name)
self._in_channel = in_channel self._in_channel = in_channel
def _set_out_channel(self, out_channel): def _set_out_channel(self, out_channel):
if not isinstance(out_channel, (ThreadChannel, ProcessChannel)): if not isinstance(out_channel, (ThreadChannel, ProcessChannel)):
_LOGGER.critical("[DAG Executor]iout_channel must be Channel" _LOGGER.critical("[DAG Executor] Failed to set out_channel: "
" type, but get {}".format(type(out_channel))) "must be Channel type, but get {}".format(
type(out_channel)))
os._exit(-1) os._exit(-1)
out_channel.add_consumer(self.name) out_channel.add_consumer(self.name)
self._out_channel = out_channel self._out_channel = out_channel
...@@ -116,7 +120,7 @@ class DAGExecutor(object): ...@@ -116,7 +120,7 @@ class DAGExecutor(object):
try: try:
channeldata_dict = self._out_channel.front(self.name) channeldata_dict = self._out_channel.front(self.name)
except ChannelStopError: except ChannelStopError:
_LOGGER.info("[DAG Executor] channel stop.") _LOGGER.info("[DAG Executor] Stop.")
with self._cv_for_cv_pool: with self._cv_for_cv_pool:
for data_id, cv in self._cv_pool.items(): for data_id, cv in self._cv_pool.items():
closed_errror_data = ChannelData( closed_errror_data = ChannelData(
...@@ -130,17 +134,20 @@ class DAGExecutor(object): ...@@ -130,17 +134,20 @@ class DAGExecutor(object):
if len(channeldata_dict) != 1: if len(channeldata_dict) != 1:
_LOGGER.critical( _LOGGER.critical(
"[DAG Executor] out_channel cannot have multiple input ops") "[DAG Executor] Failed to fetch result: out_channel "
"cannot have multiple input ops")
os._exit(-1) os._exit(-1)
(_, channeldata), = channeldata_dict.items() (_, channeldata), = channeldata_dict.items()
if not isinstance(channeldata, ChannelData): if not isinstance(channeldata, ChannelData):
_LOGGER.critical( _LOGGER.critical(
'[DAG Executor] data must be ChannelData type, but get {}' '[DAG Executor] Failed to fetch result: data in out_channel" \
" must be ChannelData type, but get {}'
.format(type(channeldata))) .format(type(channeldata)))
os._exit(-1) os._exit(-1)
data_id = channeldata.id data_id = channeldata.id
_LOGGER.debug("recive thread fetch data[{}]".format(data_id)) _LOGGER.debug("(logid={}) [recive thread] Fetched data".format(
data_id))
with self._cv_for_cv_pool: with self._cv_for_cv_pool:
cond_v = self._cv_pool[data_id] cond_v = self._cv_pool[data_id]
with cond_v: with cond_v:
...@@ -164,7 +171,7 @@ class DAGExecutor(object): ...@@ -164,7 +171,7 @@ class DAGExecutor(object):
ready_data = self._fetch_buffer[data_id] ready_data = self._fetch_buffer[data_id]
self._cv_pool.pop(data_id) self._cv_pool.pop(data_id)
self._fetch_buffer.pop(data_id) self._fetch_buffer.pop(data_id)
_LOGGER.debug("resp thread get resp data[{}]".format(data_id)) _LOGGER.debug("(logid={}) [resp thread] Got data".format(data_id))
return ready_data return ready_data
def _pack_channeldata(self, rpc_request, data_id): def _pack_channeldata(self, rpc_request, data_id):
...@@ -172,8 +179,10 @@ class DAGExecutor(object): ...@@ -172,8 +179,10 @@ class DAGExecutor(object):
try: try:
dictdata = self._unpack_rpc_func(rpc_request) dictdata = self._unpack_rpc_func(rpc_request)
except Exception as e: except Exception as e:
_LOGGER.error("parse RPC package to data[{}] Error: {}" _LOGGER.error(
.format(data_id, e)) "(logid={}) Failed to parse RPC request package: {}"
.format(data_id, e),
exc_info=True)
return ChannelData( return ChannelData(
ecode=ChannelDataEcode.RPC_PACKAGE_ERROR.value, ecode=ChannelDataEcode.RPC_PACKAGE_ERROR.value,
error_info="rpc package error: {}".format(e), error_info="rpc package error: {}".format(e),
...@@ -187,7 +196,7 @@ class DAGExecutor(object): ...@@ -187,7 +196,7 @@ class DAGExecutor(object):
profile_value = rpc_request.value[idx] profile_value = rpc_request.value[idx]
break break
client_need_profile = (profile_value == self._client_profile_value) client_need_profile = (profile_value == self._client_profile_value)
_LOGGER.debug("request[{}] need profile: {}".format( _LOGGER.debug("(logid={}) Need profile in client: {}".format(
data_id, client_need_profile)) data_id, client_need_profile))
return ChannelData( return ChannelData(
datatype=ChannelDataType.DICT.value, datatype=ChannelDataType.DICT.value,
...@@ -197,26 +206,28 @@ class DAGExecutor(object): ...@@ -197,26 +206,28 @@ class DAGExecutor(object):
def call(self, rpc_request): def call(self, rpc_request):
data_id, cond_v = self._get_next_data_id() data_id, cond_v = self._get_next_data_id()
_LOGGER.debug("generate Request id: {}".format(data_id)) _LOGGER.info("(logid={}) Succ generate id".format(data_id))
start_call, end_call = None, None
if not self._is_thread_op: if not self._is_thread_op:
self._profiler.record("call_{}#DAG-{}_0".format(data_id, data_id)) start_call = self._profiler.record("call_{}#DAG-{}_0".format(
data_id, data_id))
else: else:
self._profiler.record("call_{}#DAG_0".format(data_id)) start_call = self._profiler.record("call_{}#DAG_0".format(data_id))
_LOGGER.debug("try parse RPC request to channeldata[{}]".format( _LOGGER.debug("(logid={}) Parsing RPC request package".format(data_id))
data_id))
self._profiler.record("prepack_{}#{}_0".format(data_id, self.name)) self._profiler.record("prepack_{}#{}_0".format(data_id, self.name))
req_channeldata = self._pack_channeldata(rpc_request, data_id) req_channeldata = self._pack_channeldata(rpc_request, data_id)
self._profiler.record("prepack_{}#{}_1".format(data_id, self.name)) self._profiler.record("prepack_{}#{}_1".format(data_id, self.name))
resp_channeldata = None resp_channeldata = None
for i in range(self._retry): for i in range(self._retry):
_LOGGER.debug("push data[{}] into Graph engine".format(data_id)) _LOGGER.debug("(logid={}) Pushing data into Graph engine".format(
data_id))
try: try:
self._in_channel.push(req_channeldata, self.name) self._in_channel.push(req_channeldata, self.name)
except ChannelStopError: except ChannelStopError:
_LOGGER.debug("[DAG Executor] channel stop.") _LOGGER.debug("[DAG Executor] Stop")
with self._cv_for_cv_pool: with self._cv_for_cv_pool:
self._cv_pool.pop(data_id) self._cv_pool.pop(data_id)
return self._pack_for_rpc_resp( return self._pack_for_rpc_resp(
...@@ -225,32 +236,35 @@ class DAGExecutor(object): ...@@ -225,32 +236,35 @@ class DAGExecutor(object):
error_info="dag closed.", error_info="dag closed.",
data_id=data_id)) data_id=data_id))
_LOGGER.debug("wait Graph engine for data[{}]...".format(data_id)) _LOGGER.debug("(logid={}) Wait for Graph engine...".format(data_id))
resp_channeldata = self._get_channeldata_from_fetch_buffer(data_id, resp_channeldata = self._get_channeldata_from_fetch_buffer(data_id,
cond_v) cond_v)
if resp_channeldata.ecode == ChannelDataEcode.OK.value: if resp_channeldata.ecode == ChannelDataEcode.OK.value:
_LOGGER.debug("request[{}] succ predict".format(data_id)) _LOGGER.debug("(logid={}) Succ predict".format(data_id))
break break
else: else:
_LOGGER.warning("request[{}] predict failed: {}" _LOGGER.error("(logid={}) Failed to predict: {}"
.format(data_id, resp_channeldata.error_info)) .format(data_id, resp_channeldata.error_info))
if resp_channeldata.ecode != ChannelDataEcode.TIMEOUT.value: if resp_channeldata.ecode != ChannelDataEcode.TIMEOUT.value:
break break
if i + 1 < self._retry: if i + 1 < self._retry:
_LOGGER.warning("retry({}/{}) data[{}]".format( _LOGGER.warning("(logid={}) DAGExecutor retry({}/{})".format(
i + 1, self._retry, data_id)) data_id, i + 1, self._retry))
_LOGGER.debug("unpack channeldata[{}] into RPC response".format( _LOGGER.debug("(logid={}) Packing RPC response package".format(data_id))
data_id))
self._profiler.record("postpack_{}#{}_0".format(data_id, self.name)) self._profiler.record("postpack_{}#{}_0".format(data_id, self.name))
rpc_resp = self._pack_for_rpc_resp(resp_channeldata) rpc_resp = self._pack_for_rpc_resp(resp_channeldata)
self._profiler.record("postpack_{}#{}_1".format(data_id, self.name)) self._profiler.record("postpack_{}#{}_1".format(data_id, self.name))
if not self._is_thread_op: if not self._is_thread_op:
self._profiler.record("call_{}#DAG-{}_1".format(data_id, data_id)) end_call = self._profiler.record("call_{}#DAG-{}_1".format(data_id,
data_id))
else: else:
self._profiler.record("call_{}#DAG_1".format(data_id)) end_call = self._profiler.record("call_{}#DAG_1".format(data_id))
_LOGGER.log(level=1,
msg="(logid={}) call[{} ms]".format(
data_id, (end_call - start_call) / 1e3))
profile_str = self._profiler.gen_profile_str() profile_str = self._profiler.gen_profile_str()
if self._server_use_profile: if self._server_use_profile:
...@@ -268,7 +282,17 @@ class DAGExecutor(object): ...@@ -268,7 +282,17 @@ class DAGExecutor(object):
return rpc_resp return rpc_resp
def _pack_for_rpc_resp(self, channeldata): def _pack_for_rpc_resp(self, channeldata):
try:
return self._pack_rpc_func(channeldata) return self._pack_rpc_func(channeldata)
except Exception as e:
_LOGGER.error(
"(logid={}) Failed to pack RPC response package: {}"
.format(channeldata.id, e),
exc_info=True)
resp = pipeline_service_pb2.Response()
resp.ecode = ChannelDataEcode.RPC_PACKAGE_ERROR.value
resp.error_info = "rpc package error: {}".format(e)
return resp
class DAG(object): class DAG(object):
...@@ -283,7 +307,7 @@ class DAG(object): ...@@ -283,7 +307,7 @@ class DAG(object):
self._build_dag_each_worker = build_dag_each_worker self._build_dag_each_worker = build_dag_each_worker
if not self._is_thread_op: if not self._is_thread_op:
self._manager = multiprocessing.Manager() self._manager = multiprocessing.Manager()
_LOGGER.info("[DAG] succ init") _LOGGER.info("[DAG] Succ init")
def get_use_ops(self, response_op): def get_use_ops(self, response_op):
unique_names = set() unique_names = set()
...@@ -303,7 +327,8 @@ class DAG(object): ...@@ -303,7 +327,8 @@ class DAG(object):
used_ops.add(pred_op) used_ops.add(pred_op)
# check the name of op is globally unique # check the name of op is globally unique
if pred_op.name in unique_names: if pred_op.name in unique_names:
_LOGGER.critical("the name of Op must be unique: {}". _LOGGER.critical("Failed to get used Ops: the"
" name of Op must be unique: {}".
format(pred_op.name)) format(pred_op.name))
os._exit(-1) os._exit(-1)
unique_names.add(pred_op.name) unique_names.add(pred_op.name)
...@@ -317,12 +342,12 @@ class DAG(object): ...@@ -317,12 +342,12 @@ class DAG(object):
else: else:
channel = ProcessChannel( channel = ProcessChannel(
self._manager, name=name_gen.next(), maxsize=self._channel_size) self._manager, name=name_gen.next(), maxsize=self._channel_size)
_LOGGER.debug("[DAG] gen Channel: {}".format(channel.name)) _LOGGER.debug("[DAG] Generate channel: {}".format(channel.name))
return channel return channel
def _gen_virtual_op(self, name_gen): def _gen_virtual_op(self, name_gen):
vir_op = VirtualOp(name=name_gen.next()) vir_op = VirtualOp(name=name_gen.next())
_LOGGER.debug("[DAG] gen VirtualOp: {}".format(vir_op.name)) _LOGGER.debug("[DAG] Generate virtual_op: {}".format(vir_op.name))
return vir_op return vir_op
def _topo_sort(self, used_ops, response_op, out_degree_ops): def _topo_sort(self, used_ops, response_op, out_degree_ops):
...@@ -337,7 +362,8 @@ class DAG(object): ...@@ -337,7 +362,8 @@ class DAG(object):
if len(op.get_input_ops()) == 0: if len(op.get_input_ops()) == 0:
zero_indegree_num += 1 zero_indegree_num += 1
if zero_indegree_num != 1: if zero_indegree_num != 1:
_LOGGER.critical("DAG contains multiple RequestOps") _LOGGER.critical("Failed to topo sort: DAG contains "
"multiple RequestOps")
os._exit(-1) os._exit(-1)
last_op = response_op.get_input_ops()[0] last_op = response_op.get_input_ops()[0]
ques[que_idx].put(last_op) ques[que_idx].put(last_op)
...@@ -362,14 +388,15 @@ class DAG(object): ...@@ -362,14 +388,15 @@ class DAG(object):
break break
que_idx = (que_idx + 1) % 2 que_idx = (que_idx + 1) % 2
if sorted_op_num < len(used_ops): if sorted_op_num < len(used_ops):
_LOGGER.critical("not legal DAG") _LOGGER.critical("Failed to topo sort: not legal DAG")
os._exit(-1) os._exit(-1)
return dag_views, last_op return dag_views, last_op
def _build_dag(self, response_op): def _build_dag(self, response_op):
if response_op is None: if response_op is None:
_LOGGER.critical("ResponseOp has not been set.") _LOGGER.critical("Failed to build DAG: ResponseOp"
" has not been set.")
os._exit(-1) os._exit(-1)
used_ops, out_degree_ops = self.get_use_ops(response_op) used_ops, out_degree_ops = self.get_use_ops(response_op)
if not self._build_dag_each_worker: if not self._build_dag_each_worker:
...@@ -380,8 +407,8 @@ class DAG(object): ...@@ -380,8 +407,8 @@ class DAG(object):
_LOGGER.info("-------------------------------------------") _LOGGER.info("-------------------------------------------")
if len(used_ops) <= 1: if len(used_ops) <= 1:
_LOGGER.critical( _LOGGER.critical(
"Besides RequestOp and ResponseOp, there should be at least one Op in DAG." "Failed to build DAG: besides RequestOp and ResponseOp, "
) "there should be at least one Op in DAG.")
os._exit(-1) os._exit(-1)
if self._build_dag_each_worker: if self._build_dag_each_worker:
_LOGGER.info("Because `build_dag_each_worker` mode is used, " _LOGGER.info("Because `build_dag_each_worker` mode is used, "
...@@ -443,8 +470,6 @@ class DAG(object): ...@@ -443,8 +470,6 @@ class DAG(object):
continue continue
channel = self._gen_channel(channel_name_gen) channel = self._gen_channel(channel_name_gen)
channels.append(channel) channels.append(channel)
_LOGGER.debug("[DAG] Channel({}) => Op({})"
.format(channel.name, op.name))
op.add_input_channel(channel) op.add_input_channel(channel)
pred_ops = pred_op_of_next_view_op[op.name] pred_ops = pred_op_of_next_view_op[op.name]
if v_idx == 0: if v_idx == 0:
...@@ -452,8 +477,6 @@ class DAG(object): ...@@ -452,8 +477,6 @@ class DAG(object):
else: else:
# if pred_op is virtual op, it will use ancestors as producers to channel # if pred_op is virtual op, it will use ancestors as producers to channel
for pred_op in pred_ops: for pred_op in pred_ops:
_LOGGER.debug("[DAG] Op({}) => Channel({})"
.format(pred_op.name, channel.name))
pred_op.add_output_channel(channel) pred_op.add_output_channel(channel)
processed_op.add(op.name) processed_op.add(op.name)
# find same input op to combine channel # find same input op to combine channel
...@@ -469,8 +492,6 @@ class DAG(object): ...@@ -469,8 +492,6 @@ class DAG(object):
same_flag = False same_flag = False
break break
if same_flag: if same_flag:
_LOGGER.debug("[DAG] Channel({}) => Op({})"
.format(channel.name, other_op.name))
other_op.add_input_channel(channel) other_op.add_input_channel(channel)
processed_op.add(other_op.name) processed_op.add(other_op.name)
output_channel = self._gen_channel(channel_name_gen) output_channel = self._gen_channel(channel_name_gen)
...@@ -488,7 +509,7 @@ class DAG(object): ...@@ -488,7 +509,7 @@ class DAG(object):
actual_ops.append(op) actual_ops.append(op)
for c in channels: for c in channels:
_LOGGER.debug("Channel({}):\n\t-producers: {}\n\t-consumers: {}" _LOGGER.debug("Channel({}):\n\t- producers: {}\n\t- consumers: {}"
.format(c.name, c.get_producers(), c.get_consumers())) .format(c.name, c.get_producers(), c.get_consumers()))
return (actual_ops, channels, input_channel, output_channel, pack_func, return (actual_ops, channels, input_channel, output_channel, pack_func,
...@@ -497,7 +518,7 @@ class DAG(object): ...@@ -497,7 +518,7 @@ class DAG(object):
def build(self): def build(self):
(actual_ops, channels, input_channel, output_channel, pack_func, (actual_ops, channels, input_channel, output_channel, pack_func,
unpack_func) = self._build_dag(self._response_op) unpack_func) = self._build_dag(self._response_op)
_LOGGER.info("[DAG] succ build dag") _LOGGER.info("[DAG] Succ build DAG")
self._actual_ops = actual_ops self._actual_ops = actual_ops
self._channels = channels self._channels = channels
......
...@@ -73,8 +73,9 @@ class Op(object): ...@@ -73,8 +73,9 @@ class Op(object):
if self._auto_batching_timeout is not None: if self._auto_batching_timeout is not None:
if self._auto_batching_timeout <= 0 or self._batch_size == 1: if self._auto_batching_timeout <= 0 or self._batch_size == 1:
_LOGGER.warning( _LOGGER.warning(
self._log(
"Because auto_batching_timeout <= 0 or batch_size == 1," "Because auto_batching_timeout <= 0 or batch_size == 1,"
" set auto_batching_timeout to None.") " set auto_batching_timeout to None."))
self._auto_batching_timeout = None self._auto_batching_timeout = None
else: else:
self._auto_batching_timeout = self._auto_batching_timeout / 1000.0 self._auto_batching_timeout = self._auto_batching_timeout / 1000.0
...@@ -120,7 +121,8 @@ class Op(object): ...@@ -120,7 +121,8 @@ class Op(object):
def init_client(self, client_type, client_config, server_endpoints, def init_client(self, client_type, client_config, server_endpoints,
fetch_names): fetch_names):
if self.with_serving == False: if self.with_serving == False:
_LOGGER.info("Op({}) no client".format(self.name)) _LOGGER.info("Op({}) has no client (and it also do not "
"run the process function".format(self.name))
return None return None
if client_type == 'brpc': if client_type == 'brpc':
client = Client() client = Client()
...@@ -128,7 +130,8 @@ class Op(object): ...@@ -128,7 +130,8 @@ class Op(object):
elif client_type == 'grpc': elif client_type == 'grpc':
client = MultiLangClient() client = MultiLangClient()
else: else:
raise ValueError("unknow client type: {}".format(client_type)) raise ValueError("Failed to init client: unknow client "
"type {}".format(client_type))
client.connect(server_endpoints) client.connect(server_endpoints)
self._fetch_names = fetch_names self._fetch_names = fetch_names
return client return client
...@@ -143,16 +146,17 @@ class Op(object): ...@@ -143,16 +146,17 @@ class Op(object):
for op in ops: for op in ops:
if not isinstance(op, Op): if not isinstance(op, Op):
_LOGGER.critical( _LOGGER.critical(
self._log("input op must be Op type, not {}" self._log("Failed to set input_ops: input op "
.format(type(op)))) "must be Op type, not {}".format(type(op))))
os._exit(-1) os._exit(-1)
self._input_ops.append(op) self._input_ops.append(op)
def add_input_channel(self, channel): def add_input_channel(self, channel):
if not isinstance(channel, (ThreadChannel, ProcessChannel)): if not isinstance(channel, (ThreadChannel, ProcessChannel)):
_LOGGER.critical( _LOGGER.critical(
self._log("input channel must be Channel type, not {}" self._log("Failed to set input_channel: input "
.format(type(channel)))) "channel must be Channel type, not {}".format(
type(channel))))
os._exit(-1) os._exit(-1)
channel.add_consumer(self.name) channel.add_consumer(self.name)
self._input = channel self._input = channel
...@@ -166,8 +170,8 @@ class Op(object): ...@@ -166,8 +170,8 @@ class Op(object):
def add_output_channel(self, channel): def add_output_channel(self, channel):
if not isinstance(channel, (ThreadChannel, ProcessChannel)): if not isinstance(channel, (ThreadChannel, ProcessChannel)):
_LOGGER.critical( _LOGGER.critical(
self._log("output channel must be Channel type, not {}" self._log("Failed to add output_channel: output channel "
.format(type(channel)))) "must be Channel type, not {}".format(type(channel))))
os._exit(-1) os._exit(-1)
channel.add_producer(self.name) channel.add_producer(self.name)
self._outputs.append(channel) self._outputs.append(channel)
...@@ -183,8 +187,8 @@ class Op(object): ...@@ -183,8 +187,8 @@ class Op(object):
if len(input_dicts) != 1: if len(input_dicts) != 1:
_LOGGER.critical( _LOGGER.critical(
self._log( self._log(
"this Op has multiple previous inputs. Please override this func." "Failed to run preprocess: this Op has multiple previous "
)) "inputs. Please override this func."))
os._exit(-1) os._exit(-1)
(_, input_dict), = input_dicts.items() (_, input_dict), = input_dicts.items()
...@@ -194,8 +198,8 @@ class Op(object): ...@@ -194,8 +198,8 @@ class Op(object):
err, err_info = ChannelData.check_batch_npdata(feed_batch) err, err_info = ChannelData.check_batch_npdata(feed_batch)
if err != 0: if err != 0:
_LOGGER.critical( _LOGGER.critical(
self._log("{}, Please override preprocess func.".format( self._log("Failed to run process: {}. Please override "
err_info))) "preprocess func.".format(err_info)))
os._exit(-1) os._exit(-1)
call_result = self.client.predict( call_result = self.client.predict(
feed=feed_batch, fetch=self._fetch_names) feed=feed_batch, fetch=self._fetch_names)
...@@ -274,8 +278,8 @@ class Op(object): ...@@ -274,8 +278,8 @@ class Op(object):
def init_op(self): def init_op(self):
pass pass
def _run_preprocess(self, parsed_data_dict, log_func): def _run_preprocess(self, parsed_data_dict, op_info_prefix):
_LOGGER.debug(log_func("try to run preprocess")) _LOGGER.debug("{} Running preprocess".format(op_info_prefix))
preped_data_dict = {} preped_data_dict = {}
err_channeldata_dict = {} err_channeldata_dict = {}
for data_id, parsed_data in parsed_data_dict.items(): for data_id, parsed_data in parsed_data_dict.items():
...@@ -284,17 +288,17 @@ class Op(object): ...@@ -284,17 +288,17 @@ class Op(object):
preped_data = self.preprocess(parsed_data) preped_data = self.preprocess(parsed_data)
except TypeError as e: except TypeError as e:
# Error type in channeldata.datatype # Error type in channeldata.datatype
error_info = log_func("preprocess data[{}] failed: {}" error_info = "(logid={}) {} Failed to preprocess: {}".format(
.format(data_id, e)) data_id, op_info_prefix, e)
_LOGGER.error(error_info) _LOGGER.error(error_info, exc_info=True)
error_channeldata = ChannelData( error_channeldata = ChannelData(
ecode=ChannelDataEcode.TYPE_ERROR.value, ecode=ChannelDataEcode.TYPE_ERROR.value,
error_info=error_info, error_info=error_info,
data_id=data_id) data_id=data_id)
except Exception as e: except Exception as e:
error_info = log_func("preprocess data[{}] failed: {}" error_info = "(logid={}) {} Failed to preprocess: {}".format(
.format(data_id, e)) data_id, op_info_prefix, e)
_LOGGER.error(error_info) _LOGGER.error(error_info, exc_info=True)
error_channeldata = ChannelData( error_channeldata = ChannelData(
ecode=ChannelDataEcode.UNKNOW.value, ecode=ChannelDataEcode.UNKNOW.value,
error_info=error_info, error_info=error_info,
...@@ -303,11 +307,11 @@ class Op(object): ...@@ -303,11 +307,11 @@ class Op(object):
err_channeldata_dict[data_id] = error_channeldata err_channeldata_dict[data_id] = error_channeldata
else: else:
preped_data_dict[data_id] = preped_data preped_data_dict[data_id] = preped_data
_LOGGER.debug(log_func("succ run preprocess")) _LOGGER.debug("{} Succ preprocess".format(op_info_prefix))
return preped_data_dict, err_channeldata_dict return preped_data_dict, err_channeldata_dict
def _run_process(self, preped_data_dict, log_func): def _run_process(self, preped_data_dict, op_info_prefix):
_LOGGER.debug(log_func("try to run process")) _LOGGER.debug("{} Running process".format(op_info_prefix))
midped_data_dict = {} midped_data_dict = {}
err_channeldata_dict = {} err_channeldata_dict = {}
if self.with_serving: if self.with_serving:
...@@ -320,8 +324,9 @@ class Op(object): ...@@ -320,8 +324,9 @@ class Op(object):
midped_batch = self.process(feed_batch) midped_batch = self.process(feed_batch)
except Exception as e: except Exception as e:
ecode = ChannelDataEcode.UNKNOW.value ecode = ChannelDataEcode.UNKNOW.value
error_info = log_func("process batch failed: {}".format(e)) error_info = "{} Failed to process(batch: {}): {}".format(
_LOGGER.error(error_info) op_info_prefix, data_ids, e)
_LOGGER.error(error_info, exc_info=True)
else: else:
for i in range(self._retry): for i in range(self._retry):
try: try:
...@@ -330,30 +335,34 @@ class Op(object): ...@@ -330,30 +335,34 @@ class Op(object):
except func_timeout.FunctionTimedOut as e: except func_timeout.FunctionTimedOut as e:
if i + 1 >= self._retry: if i + 1 >= self._retry:
ecode = ChannelDataEcode.TIMEOUT.value ecode = ChannelDataEcode.TIMEOUT.value
error_info = log_func(e) error_info = "{} Failed to process(batch: {}): " \
"exceeded retry count.".format(
op_info_prefix, data_ids)
_LOGGER.error(error_info) _LOGGER.error(error_info)
else: else:
_LOGGER.warning( _LOGGER.warning(
log_func("PaddleService timeout, retry({}/{})" "{} Failed to process(batch: {}): timeout, and retrying({}/{})"
.format(i + 1, self._retry))) .format(op_info_prefix, data_ids, i + 1,
self._retry))
except Exception as e: except Exception as e:
ecode = ChannelDataEcode.UNKNOW.value ecode = ChannelDataEcode.UNKNOW.value
error_info = log_func("process batch failed: {}".format( error_info = "{} Failed to process(batch: {}): {}".format(
e)) op_info_prefix, data_ids, e)
_LOGGER.error(error_info) _LOGGER.error(error_info, exc_info=True)
break break
else: else:
break break
if ecode != ChannelDataEcode.OK.value: if ecode != ChannelDataEcode.OK.value:
for data_id in data_ids: for data_id in data_ids:
_LOGGER.error("(logid={}) {}".format(data_id, error_info))
err_channeldata_dict[data_id] = ChannelData( err_channeldata_dict[data_id] = ChannelData(
ecode=ecode, error_info=error_info, data_id=data_id) ecode=ecode, error_info=error_info, data_id=data_id)
elif midped_batch is None: elif midped_batch is None:
# op client return None # op client return None
error_info = log_func( error_info = "{} Failed to predict, please check if PaddleServingService" \
"predict failed. pls check the server side.") " is working properly.".format(op_info_prefix)
_LOGGER.error(error_info)
for data_id in data_ids: for data_id in data_ids:
_LOGGER.error("(logid={}) {}".format(data_id, error_info))
err_channeldata_dict[data_id] = ChannelData( err_channeldata_dict[data_id] = ChannelData(
ecode=ChannelDataEcode.CLIENT_ERROR.value, ecode=ChannelDataEcode.CLIENT_ERROR.value,
error_info=error_info, error_info=error_info,
...@@ -367,11 +376,12 @@ class Op(object): ...@@ -367,11 +376,12 @@ class Op(object):
} }
else: else:
midped_data_dict = preped_data_dict midped_data_dict = preped_data_dict
_LOGGER.debug(log_func("succ run process")) _LOGGER.debug("{} Succ process".format(op_info_prefix))
return midped_data_dict, err_channeldata_dict return midped_data_dict, err_channeldata_dict
def _run_postprocess(self, parsed_data_dict, midped_data_dict, log_func): def _run_postprocess(self, parsed_data_dict, midped_data_dict,
_LOGGER.debug(log_func("try to run postprocess")) op_info_prefix):
_LOGGER.debug("{} Running postprocess".format(op_info_prefix))
postped_data_dict = {} postped_data_dict = {}
err_channeldata_dict = {} err_channeldata_dict = {}
for data_id, midped_data in midped_data_dict.items(): for data_id, midped_data in midped_data_dict.items():
...@@ -380,9 +390,9 @@ class Op(object): ...@@ -380,9 +390,9 @@ class Op(object):
postped_data = self.postprocess(parsed_data_dict[data_id], postped_data = self.postprocess(parsed_data_dict[data_id],
midped_data) midped_data)
except Exception as e: except Exception as e:
error_info = log_func("postprocess data[{}] failed: {}" error_info = "(logid={}) {} Failed to postprocess: {}".format(
.format(data_id, e)) data_id, op_info_prefix, e)
_LOGGER.error(error_info) _LOGGER.error(error_info, exc_info=True)
err_channeldata = ChannelData( err_channeldata = ChannelData(
ecode=ChannelDataEcode.UNKNOW.value, ecode=ChannelDataEcode.UNKNOW.value,
error_info=error_info, error_info=error_info,
...@@ -392,9 +402,11 @@ class Op(object): ...@@ -392,9 +402,11 @@ class Op(object):
continue continue
else: else:
if not isinstance(postped_data, dict): if not isinstance(postped_data, dict):
error_info = log_func( error_info = "(logid={}) {} Failed to postprocess: " \
"output of postprocess funticon must be " "output of postprocess funticon must be " \
"dict type, but get {}".format(type(postped_data))) "dict type, but get {}".format(
data_id, op_info_prefix,
type(postped_data))
_LOGGER.error(error_info) _LOGGER.error(error_info)
err_channeldata = ChannelData( err_channeldata = ChannelData(
ecode=ChannelDataEcode.UNKNOW.value, ecode=ChannelDataEcode.UNKNOW.value,
...@@ -416,16 +428,13 @@ class Op(object): ...@@ -416,16 +428,13 @@ class Op(object):
dictdata=postped_data, dictdata=postped_data,
data_id=data_id) data_id=data_id)
postped_data_dict[data_id] = output_data postped_data_dict[data_id] = output_data
_LOGGER.debug(log_func("succ run postprocess")) _LOGGER.debug("{} Succ postprocess".format(op_info_prefix))
return postped_data_dict, err_channeldata_dict return postped_data_dict, err_channeldata_dict
def _auto_batching_generator(self, input_channel, op_name, batch_size, def _auto_batching_generator(self, input_channel, op_name, batch_size,
timeout, log_func): timeout, op_info_prefix):
while True: while True:
batch = [] batch = []
_LOGGER.debug(
log_func("Auto-batching expect size: {}; timeout(s): {}".format(
batch_size, timeout)))
while len(batch) == 0: while len(batch) == 0:
endtime = None endtime = None
if timeout is not None: if timeout is not None:
...@@ -436,7 +445,8 @@ class Op(object): ...@@ -436,7 +445,8 @@ class Op(object):
if timeout is not None: if timeout is not None:
remaining = endtime - _time() remaining = endtime - _time()
if remaining <= 0.0: if remaining <= 0.0:
_LOGGER.debug(log_func("Auto-batching timeout")) _LOGGER.debug("{} Failed to generate batch: "
"timeout".format(op_info_prefix))
break break
channeldata_dict = input_channel.front(op_name, channeldata_dict = input_channel.front(op_name,
timeout) timeout)
...@@ -444,10 +454,11 @@ class Op(object): ...@@ -444,10 +454,11 @@ class Op(object):
channeldata_dict = input_channel.front(op_name) channeldata_dict = input_channel.front(op_name)
batch.append(channeldata_dict) batch.append(channeldata_dict)
except ChannelTimeoutError: except ChannelTimeoutError:
_LOGGER.debug(log_func("Auto-batching timeout")) _LOGGER.debug("{} Failed to generate batch: "
"timeout".format(op_info_prefix))
break break
_LOGGER.debug( _LOGGER.debug("{} Got actual batch_size: {}".format(op_info_prefix,
log_func("Auto-batching actual size: {}".format(len(batch)))) len(batch)))
yield batch yield batch
def _parse_channeldata_batch(self, batch, output_channels): def _parse_channeldata_batch(self, batch, output_channels):
...@@ -472,14 +483,7 @@ class Op(object): ...@@ -472,14 +483,7 @@ class Op(object):
def _run(self, concurrency_idx, input_channel, output_channels, client_type, def _run(self, concurrency_idx, input_channel, output_channels, client_type,
is_thread_op): is_thread_op):
def get_log_func(op_info_prefix):
def log_func(info_str):
return "{} {}".format(op_info_prefix, info_str)
return log_func
op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx) op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx)
log = get_log_func(op_info_prefix)
tid = threading.current_thread().ident tid = threading.current_thread().ident
# init op # init op
...@@ -488,22 +492,27 @@ class Op(object): ...@@ -488,22 +492,27 @@ class Op(object):
profiler = self._initialize(is_thread_op, client_type, profiler = self._initialize(is_thread_op, client_type,
concurrency_idx) concurrency_idx)
except Exception as e: except Exception as e:
_LOGGER.critical(log("init op failed: {}".format(e))) _LOGGER.critical(
"{} Failed to init op: {}".format(op_info_prefix, e),
exc_info=True)
os._exit(-1) os._exit(-1)
_LOGGER.info(log("succ init")) _LOGGER.info("{} Succ init".format(op_info_prefix))
batch_generator = self._auto_batching_generator( batch_generator = self._auto_batching_generator(
input_channel=input_channel, input_channel=input_channel,
op_name=self.name, op_name=self.name,
batch_size=self._batch_size, batch_size=self._batch_size,
timeout=self._auto_batching_timeout, timeout=self._auto_batching_timeout,
log_func=log) op_info_prefix=op_info_prefix)
start_prep, end_prep = None, None
start_midp, end_midp = None, None
start_postp, end_postp = None, None
while True: while True:
try: try:
channeldata_dict_batch = next(batch_generator) channeldata_dict_batch = next(batch_generator)
except ChannelStopError: except ChannelStopError:
_LOGGER.debug(log("channel stop.")) _LOGGER.debug("{} Stop.".format(op_info_prefix))
self._finalize(is_thread_op) self._finalize(is_thread_op)
break break
...@@ -513,7 +522,7 @@ class Op(object): ...@@ -513,7 +522,7 @@ class Op(object):
= self._parse_channeldata_batch( = self._parse_channeldata_batch(
channeldata_dict_batch, output_channels) channeldata_dict_batch, output_channels)
except ChannelStopError: except ChannelStopError:
_LOGGER.debug(log("channel stop.")) _LOGGER.debug("{} Stop.".format(op_info_prefix))
self._finalize(is_thread_op) self._finalize(is_thread_op)
break break
if len(parsed_data_dict) == 0: if len(parsed_data_dict) == 0:
...@@ -521,10 +530,14 @@ class Op(object): ...@@ -521,10 +530,14 @@ class Op(object):
continue continue
# preprecess # preprecess
profiler.record("prep#{}_0".format(op_info_prefix)) start_prep = profiler.record("prep#{}_0".format(op_info_prefix))
preped_data_dict, err_channeldata_dict \ preped_data_dict, err_channeldata_dict \
= self._run_preprocess(parsed_data_dict, log) = self._run_preprocess(parsed_data_dict, op_info_prefix)
profiler.record("prep#{}_1".format(op_info_prefix)) end_prep = profiler.record("prep#{}_1".format(op_info_prefix))
_LOGGER.log(level=1,
msg="(logid={}) {} prep[{} ms]".format(
parsed_data_dict.keys(), op_info_prefix,
(end_prep - start_prep) / 1e3))
try: try:
for data_id, err_channeldata in err_channeldata_dict.items(): for data_id, err_channeldata in err_channeldata_dict.items():
self._push_to_output_channels( self._push_to_output_channels(
...@@ -533,17 +546,21 @@ class Op(object): ...@@ -533,17 +546,21 @@ class Op(object):
client_need_profile=need_profile_dict[data_id], client_need_profile=need_profile_dict[data_id],
profile_set=profile_dict[data_id]) profile_set=profile_dict[data_id])
except ChannelStopError: except ChannelStopError:
_LOGGER.debug(log("channel stop.")) _LOGGER.debug("{} Stop.".format(op_info_prefix))
self._finalize(is_thread_op) self._finalize(is_thread_op)
break break
if len(parsed_data_dict) == 0: if len(parsed_data_dict) == 0:
continue continue
# process # process
profiler.record("midp#{}_0".format(op_info_prefix)) start_midp = profiler.record("midp#{}_0".format(op_info_prefix))
midped_data_dict, err_channeldata_dict \ midped_data_dict, err_channeldata_dict \
= self._run_process(preped_data_dict, log) = self._run_process(preped_data_dict, op_info_prefix)
profiler.record("midp#{}_1".format(op_info_prefix)) end_midp = profiler.record("midp#{}_1".format(op_info_prefix))
_LOGGER.log(level=1,
msg="(logid={}) {} midp[{} ms]".format(
preped_data_dict.keys(), op_info_prefix,
(end_midp - start_midp) / 1e3))
try: try:
for data_id, err_channeldata in err_channeldata_dict.items(): for data_id, err_channeldata in err_channeldata_dict.items():
self._push_to_output_channels( self._push_to_output_channels(
...@@ -552,18 +569,22 @@ class Op(object): ...@@ -552,18 +569,22 @@ class Op(object):
client_need_profile=need_profile_dict[data_id], client_need_profile=need_profile_dict[data_id],
profile_set=profile_dict[data_id]) profile_set=profile_dict[data_id])
except ChannelStopError: except ChannelStopError:
_LOGGER.debug(log("channel stop.")) _LOGGER.debug("{} Stop.".format(op_info_prefix))
self._finalize(is_thread_op) self._finalize(is_thread_op)
break break
if len(midped_data_dict) == 0: if len(midped_data_dict) == 0:
continue continue
# postprocess # postprocess
profiler.record("postp#{}_0".format(op_info_prefix)) start_postp = profiler.record("postp#{}_0".format(op_info_prefix))
postped_data_dict, err_channeldata_dict \ postped_data_dict, err_channeldata_dict \
= self._run_postprocess( = self._run_postprocess(
parsed_data_dict, midped_data_dict, log) parsed_data_dict, midped_data_dict, op_info_prefix)
profiler.record("postp#{}_1".format(op_info_prefix)) end_postp = profiler.record("postp#{}_1".format(op_info_prefix))
_LOGGER.log(level=1,
msg="(logid={}) {} postp[{} ms]".format(
midped_data_dict.keys(), op_info_prefix,
(end_midp - start_midp) / 1e3))
try: try:
for data_id, err_channeldata in err_channeldata_dict.items(): for data_id, err_channeldata in err_channeldata_dict.items():
self._push_to_output_channels( self._push_to_output_channels(
...@@ -572,7 +593,7 @@ class Op(object): ...@@ -572,7 +593,7 @@ class Op(object):
client_need_profile=need_profile_dict[data_id], client_need_profile=need_profile_dict[data_id],
profile_set=profile_dict[data_id]) profile_set=profile_dict[data_id])
except ChannelStopError: except ChannelStopError:
_LOGGER.debug(log("channel stop.")) _LOGGER.debug("{} Stop.".format(op_info_prefix))
self._finalize(is_thread_op) self._finalize(is_thread_op)
break break
if len(postped_data_dict) == 0: if len(postped_data_dict) == 0:
...@@ -591,7 +612,7 @@ class Op(object): ...@@ -591,7 +612,7 @@ class Op(object):
client_need_profile=need_profile_dict[data_id], client_need_profile=need_profile_dict[data_id],
profile_set=profile_dict[data_id]) profile_set=profile_dict[data_id])
except ChannelStopError: except ChannelStopError:
_LOGGER.debug(log("channel stop.")) _LOGGER.debug("{} Stop.".format(op_info_prefix))
self._finalize(is_thread_op) self._finalize(is_thread_op)
break break
...@@ -646,7 +667,7 @@ class RequestOp(Op): ...@@ -646,7 +667,7 @@ class RequestOp(Op):
try: try:
self.init_op() self.init_op()
except Exception as e: except Exception as e:
_LOGGER.critical("Op(Request) init op failed: {}".format(e)) _LOGGER.critical("Op(Request) Failed to init: {}".format(e))
os._exit(-1) os._exit(-1)
def unpack_request_package(self, request): def unpack_request_package(self, request):
...@@ -670,7 +691,8 @@ class ResponseOp(Op): ...@@ -670,7 +691,8 @@ class ResponseOp(Op):
try: try:
self.init_op() self.init_op()
except Exception as e: except Exception as e:
_LOGGER.critical("Op(ResponseOp) init op failed: {}".format(e)) _LOGGER.critical("Op(ResponseOp) Failed to init: {}".format(
e, exc_info=True))
os._exit(-1) os._exit(-1)
def pack_response_package(self, channeldata): def pack_response_package(self, channeldata):
...@@ -693,14 +715,19 @@ class ResponseOp(Op): ...@@ -693,14 +715,19 @@ class ResponseOp(Op):
resp.error_info = self._log( resp.error_info = self._log(
"fetch var type must be str({}).".format( "fetch var type must be str({}).".format(
type(var))) type(var)))
_LOGGER.error("(logid={}) Failed to pack RPC "
"response package: {}".format(
channeldata.id, resp.error_info))
break break
resp.value.append(var) resp.value.append(var)
resp.key.append(name) resp.key.append(name)
else: else:
resp.ecode = ChannelDataEcode.TYPE_ERROR.value resp.ecode = ChannelDataEcode.TYPE_ERROR.value
resp.error_info = self._log( resp.error_info = self._log(
"Error type({}) in datatype.".format(channeldata.datatype)) "error type({}) in datatype.".format(channeldata.datatype))
_LOGGER.error(resp.error_info) _LOGGER.error("(logid={}) Failed to pack RPC response"
" package: {}".format(channeldata.id,
resp.error_info))
else: else:
resp.error_info = channeldata.error_info resp.error_info = channeldata.error_info
return resp return resp
...@@ -718,6 +745,7 @@ class VirtualOp(Op): ...@@ -718,6 +745,7 @@ class VirtualOp(Op):
self._virtual_pred_ops.append(op) self._virtual_pred_ops.append(op)
def _actual_pred_op_names(self, op): def _actual_pred_op_names(self, op):
# can use disjoint-set, but it's not necessary
if not isinstance(op, VirtualOp): if not isinstance(op, VirtualOp):
return [op.name] return [op.name]
names = [] names = []
...@@ -728,8 +756,9 @@ class VirtualOp(Op): ...@@ -728,8 +756,9 @@ class VirtualOp(Op):
def add_output_channel(self, channel): def add_output_channel(self, channel):
if not isinstance(channel, (ThreadChannel, ProcessChannel)): if not isinstance(channel, (ThreadChannel, ProcessChannel)):
_LOGGER.critical( _LOGGER.critical(
self._log("output channel must be Channel type, not {}" self._log("Failed to add output_channel: output_channel"
.format(type(channel)))) " must be Channel type, not {}".format(
type(channel))))
os._exit(-1) os._exit(-1)
for op in self._virtual_pred_ops: for op in self._virtual_pred_ops:
for op_name in self._actual_pred_op_names(op): for op_name in self._actual_pred_op_names(op):
...@@ -738,12 +767,6 @@ class VirtualOp(Op): ...@@ -738,12 +767,6 @@ class VirtualOp(Op):
def _run(self, concurrency_idx, input_channel, output_channels, client_type, def _run(self, concurrency_idx, input_channel, output_channels, client_type,
is_thread_op): is_thread_op):
def get_log_func(op_info_prefix):
def log_func(info_str):
return "{} {}".format(op_info_prefix, info_str)
return log_func
op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx) op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx)
log = get_log_func(op_info_prefix) log = get_log_func(op_info_prefix)
tid = threading.current_thread().ident tid = threading.current_thread().ident
...@@ -759,7 +782,7 @@ class VirtualOp(Op): ...@@ -759,7 +782,7 @@ class VirtualOp(Op):
try: try:
channeldata_dict_batch = next(batch_generator) channeldata_dict_batch = next(batch_generator)
except ChannelStopError: except ChannelStopError:
_LOGGER.debug(log("channel stop.")) _LOGGER.debug("{} Stop.".format(op_info_prefix))
self._finalize(is_thread_op) self._finalize(is_thread_op)
break break
...@@ -769,6 +792,6 @@ class VirtualOp(Op): ...@@ -769,6 +792,6 @@ class VirtualOp(Op):
self._push_to_output_channels( self._push_to_output_channels(
data, channels=output_channels, name=name) data, channels=output_channels, name=name)
except ChannelStopError: except ChannelStopError:
_LOGGER.debug(log("Channel stop.")) _LOGGER.debug("{} Stop.".format(op_info_prefix))
self._finalize(is_thread_op) self._finalize(is_thread_op)
break break
...@@ -67,9 +67,11 @@ class PipelineServer(object): ...@@ -67,9 +67,11 @@ class PipelineServer(object):
def set_response_op(self, response_op): def set_response_op(self, response_op):
if not isinstance(response_op, ResponseOp): if not isinstance(response_op, ResponseOp):
raise Exception("response_op must be ResponseOp type.") raise Exception("Failed to set response_op: response_op "
"must be ResponseOp type.")
if len(response_op.get_input_ops()) != 1: if len(response_op.get_input_ops()) != 1:
raise Exception("response_op can only have one previous op.") raise Exception("Failed to set response_op: response_op "
"can only have one previous op.")
self._response_op = response_op self._response_op = response_op
def _port_is_available(self, port): def _port_is_available(self, port):
...@@ -83,7 +85,8 @@ class PipelineServer(object): ...@@ -83,7 +85,8 @@ class PipelineServer(object):
self._port = conf["port"] self._port = conf["port"]
if not self._port_is_available(self._port): if not self._port_is_available(self._port):
raise SystemExit("Prot {} is already used".format(self._port)) raise SystemExit("Failed to prepare_server: prot {} "
"is already used".format(self._port))
self._worker_num = conf["worker_num"] self._worker_num = conf["worker_num"]
self._build_dag_each_worker = conf["build_dag_each_worker"] self._build_dag_each_worker = conf["build_dag_each_worker"]
......
...@@ -43,8 +43,9 @@ class UnsafeTimeProfiler(object): ...@@ -43,8 +43,9 @@ class UnsafeTimeProfiler(object):
def record(self, name): def record(self, name):
if self._enable is False: if self._enable is False:
return return
self.time_record.append('{}:{} '.format(name, timestamp = int(round(_time() * 1000000))
int(round(_time() * 1000000)))) self.time_record.append('{}:{} '.format(name, timestamp))
return timestamp
def print_profile(self): def print_profile(self):
if self._enable is False: if self._enable is False:
...@@ -80,6 +81,7 @@ class TimeProfiler(object): ...@@ -80,6 +81,7 @@ class TimeProfiler(object):
name = '_'.join(name_with_tag[:-1]) name = '_'.join(name_with_tag[:-1])
with self._lock: with self._lock:
self._time_record.put((name, tag, timestamp)) self._time_record.put((name, tag, timestamp))
return timestamp
def print_profile(self): def print_profile(self):
if self._enable is False: if self._enable is False:
......
numpy>=1.12, <=1.16.4 ; python_version<"3.5" numpy>=1.12, <=1.16.4 ; python_version<"3.5"
google>=2.0.3
protobuf>=3.12.2 protobuf>=3.12.2
grpcio-tools>=1.28.1 grpcio-tools>=1.28.1
grpcio>=1.28.1 grpcio>=1.28.1
func-timeout>=4.3.5 func-timeout>=4.3.5
pyyaml>=1.3.0 pyyaml>=1.3.0
sentencepiece==0.1.92
flask>=1.1.2
ujson>=2.0.3
...@@ -54,7 +54,6 @@ function build_app() { ...@@ -54,7 +54,6 @@ function build_app() {
local DIRNAME=build-app-$TYPE local DIRNAME=build-app-$TYPE
mkdir $DIRNAME # pwd: /Serving mkdir $DIRNAME # pwd: /Serving
cd $DIRNAME # pwd: /Serving/build-app-$TYPE cd $DIRNAME # pwd: /Serving/build-app-$TYPE
pip install numpy sentencepiece
case $TYPE in case $TYPE in
CPU|GPU) CPU|GPU)
cmake -DPYTHON_INCLUDE_DIR=$PYTHONROOT/include/python2.7/ \ cmake -DPYTHON_INCLUDE_DIR=$PYTHONROOT/include/python2.7/ \
...@@ -295,8 +294,6 @@ function python_run_criteo_ctr_with_cube() { ...@@ -295,8 +294,6 @@ function python_run_criteo_ctr_with_cube() {
function python_test_bert() { function python_test_bert() {
# pwd: /Serving/python/examples # pwd: /Serving/python/examples
local TYPE=$1 local TYPE=$1
yum install -y libXext libSM libXrender >/dev/null
pip install ujson
export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving
cd bert # pwd: /Serving/python/examples/bert cd bert # pwd: /Serving/python/examples/bert
case $TYPE in case $TYPE in
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册