diff --git a/doc/COMPILE.md b/doc/COMPILE.md
index 46ebfb4f1a882b6645cb1e9bb6155743e520951d..84b1b65cbdbb0dcf6079d30bd7ebc9baf4a8c6e1 100644
--- a/doc/COMPILE.md
+++ b/doc/COMPILE.md
@@ -4,12 +4,26 @@
## Compilation environment requirements
-- OS: CentOS 7
-- GCC: 4.8.2 and later
-- Golang: 1.9.2 and later
-- Git:2.17.1 and later
-- CMake:3.2.2 and later
-- Python:2.7.2 and later / 3.6 and later
+| module | version |
+| :--------------------------: | :----------------------------------------------------------: |
+| OS | CentOS 7 |
+| gcc | 4.8.5 and later |
+| gcc-c++ | 4.8.5 and later |
+| git | 3.82 and later |
+| cmake | 3.2.0 and later |
+| Python | 2.7.2 and later / 3.6 and later |
+| Go | 1.9.2 and later |
+| git | 2.17.1 and later |
+| glibc-static | 2.17 |
+| openssl-devel | 1.0.2k |
+| bzip2-devel | 1.0.6 and later |
+| python-devel / python3-devel | 2.7.5 and later / 3.6.8 and later |
+| sqlite-devel | 3.7.17 and later |
+| patchelf | 0.9 and later |
+| libXext | 1.3.3 |
+| libSM | 1.2.2 |
+| libXrender | 0.9.10 |
+| python-whl | numpy>=1.12, <=1.16.4
google>=2.0.3
protobuf>=3.12.2
grpcio-tools>=1.28.1
grpcio>=1.28.1
func-timeout>=4.3.5
pyyaml>=1.3.0
sentencepiece==0.1.92
flask>=1.1.2
ujson>=2.0.3 |
It is recommended to use Docker for compilation. We have prepared the Paddle Serving compilation environment for you, see [this document](DOCKER_IMAGES.md).
diff --git a/doc/COMPILE_CN.md b/doc/COMPILE_CN.md
index 54f80d54d334835600d08846dc0fb42efe6558ee..a38faff4289a4946d82f8b4a71afd521c7cd48fd 100644
--- a/doc/COMPILE_CN.md
+++ b/doc/COMPILE_CN.md
@@ -4,12 +4,26 @@
## 编译环境设置
-- OS: CentOS 7
-- GCC: 4.8.2及以上
-- Golang: 1.9.2及以上
-- Git:2.17.1及以上
-- CMake:3.2.2及以上
-- Python:2.7.2及以上 / 3.6及以上
+| 组件 | 版本要求 |
+| :--------------------------: | :----------------------------------------------------------: |
+| OS | CentOS 7 |
+| gcc | 4.8.5 and later |
+| gcc-c++ | 4.8.5 and later |
+| git | 3.82 and later |
+| cmake | 3.2.0 and later |
+| Python | 2.7.2 and later / 3.6 and later |
+| Go | 1.9.2 and later |
+| git | 2.17.1 and later |
+| glibc-static | 2.17 |
+| openssl-devel | 1.0.2k |
+| bzip2-devel | 1.0.6 and later |
+| python-devel / python3-devel | 2.7.5 and later / 3.6.8 and later |
+| sqlite-devel | 3.7.17 and later |
+| patchelf | 0.9 |
+| libXext | 1.3.3 |
+| libSM | 1.2.2 |
+| libXrender | 0.9.10 |
+| python-whl | numpy>=1.12, <=1.16.4
google>=2.0.3
protobuf>=3.12.2
grpcio-tools>=1.28.1
grpcio>=1.28.1
func-timeout>=4.3.5
pyyaml>=1.3.0
sentencepiece==0.1.92
flask>=1.1.2
ujson>=2.0.3 |
推荐使用Docker编译,我们已经为您准备好了Paddle Serving编译环境,详见[该文档](DOCKER_IMAGES_CN.md)。
diff --git a/python/pipeline/__init__.py b/python/pipeline/__init__.py
index f720e4d2c851cec6270d31d6d44a766acc246291..913ee39f03d480663a79b1b2b1503835100ee176 100644
--- a/python/pipeline/__init__.py
+++ b/python/pipeline/__init__.py
@@ -11,7 +11,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+import logger # this module must be the first to import
+from operator import Op, RequestOp, ResponseOp
+from pipeline_server import PipelineServer
+from pipeline_client import PipelineClient
+from analyse import Analyst
from operator import Op, RequestOp, ResponseOp
from pipeline_server import PipelineServer
from pipeline_client import PipelineClient
diff --git a/python/pipeline/analyse.py b/python/pipeline/analyse.py
index 65194889ae65bcd7a32f934613fe9514f3a1f6c3..b37484a3f31106ecee5c80d3f9cc6d81d6a25fc6 100644
--- a/python/pipeline/analyse.py
+++ b/python/pipeline/analyse.py
@@ -164,7 +164,7 @@ class OpAnalyst(object):
def add(self, name_str, ts_list):
if self._close:
- _LOGGER.error("OpAnalyst is closed.")
+ _LOGGER.error("Failed to add item: OpAnalyst is closed.")
return
op_name, curr_idx, step = self._parse(name_str)
if op_name not in self.op_time_list_dict:
diff --git a/python/pipeline/channel.py b/python/pipeline/channel.py
index a85d28ddc9f06d4b8e2abf0aafe92f031ff9a80f..67deb530f6df88eb384848d2b16bc7f364b2ba55 100644
--- a/python/pipeline/channel.py
+++ b/python/pipeline/channel.py
@@ -70,7 +70,8 @@ class ChannelData(object):
'''
if ecode is not None:
if data_id is None or error_info is None:
- _LOGGER.critical("data_id and error_info cannot be None")
+ _LOGGER.critical("Failed to generate ChannelData: data_id"
+ " and error_info cannot be None")
os._exit(-1)
datatype = ChannelDataType.ERROR.value
else:
@@ -78,14 +79,15 @@ class ChannelData(object):
ecode, error_info = ChannelData.check_npdata(npdata)
if ecode != ChannelDataEcode.OK.value:
datatype = ChannelDataType.ERROR.value
- _LOGGER.error(error_info)
+ _LOGGER.error("(logid={}) {}".format(data_id, error_info))
elif datatype == ChannelDataType.DICT.value:
ecode, error_info = ChannelData.check_dictdata(dictdata)
if ecode != ChannelDataEcode.OK.value:
datatype = ChannelDataType.ERROR.value
- _LOGGER.error(error_info)
+ _LOGGER.error("(logid={}) {}".format(data_id, error_info))
else:
- _LOGGER.critical("datatype not match")
+ _LOGGER.critical("(logid={}) datatype not match".format(
+ data_id))
os._exit(-1)
self.datatype = datatype
self.npdata = npdata
@@ -110,14 +112,14 @@ class ChannelData(object):
for sample in dictdata:
if not isinstance(sample, dict):
ecode = ChannelDataEcode.TYPE_ERROR.value
- error_info = "the value of data must " \
- "be dict, but get {}.".format(type(sample))
+ error_info = "Failed to check data: the type of " \
+ "data must be dict, but get {}.".format(type(sample))
break
elif not isinstance(dictdata, dict):
# batch size = 1
ecode = ChannelDataEcode.TYPE_ERROR.value
- error_info = "the value of data must " \
- "be dict, but get {}.".format(type(dictdata))
+ error_info = "Failed to check data: the type of data must " \
+ "be dict, but get {}.".format(type(dictdata))
return ecode, error_info
@staticmethod
@@ -139,27 +141,30 @@ class ChannelData(object):
for sample in npdata:
if not isinstance(sample, dict):
ecode = ChannelDataEcode.TYPE_ERROR.value
- error_info = "the value of data must " \
- "be dict, but get {}.".format(type(sample))
+ error_info = "Failed to check data: the " \
+ "value of data must be dict, but get {}.".format(
+ type(sample))
break
for _, value in sample.items():
if not isinstance(value, np.ndarray):
ecode = ChannelDataEcode.TYPE_ERROR.value
- error_info = "the value of data must " \
- "be np.ndarray, but get {}.".format(type(value))
+ error_info = "Failed to check data: the" \
+ " value of data must be np.ndarray, but get {}.".format(
+ type(value))
return ecode, error_info
elif isinstance(npdata, dict):
# batch_size = 1
for _, value in npdata.items():
if not isinstance(value, np.ndarray):
ecode = ChannelDataEcode.TYPE_ERROR.value
- error_info = "the value of data must " \
- "be np.ndarray, but get {}.".format(type(value))
+ error_info = "Failed to check data: the value " \
+ "of data must be np.ndarray, but get {}.".format(
+ type(value))
break
else:
ecode = ChannelDataEcode.TYPE_ERROR.value
- error_info = "the value of data must " \
- "be dict, but get {}.".format(type(npdata))
+ error_info = "Failed to check data: the value of data " \
+ "must be dict, but get {}.".format(type(npdata))
return ecode, error_info
def parse(self):
@@ -171,8 +176,8 @@ class ChannelData(object):
# return dict
feed = self.dictdata
else:
- _LOGGER.critical("Error type({}) in datatype.".format(
- self.datatype))
+ _LOGGER.critical("Failed to parse channeldata: error " \
+ "type({}) in datatype.".format(self.datatype))
os._exit(-1)
return feed
@@ -247,33 +252,36 @@ class ProcessChannel(object):
""" not thread safe, and can only be called during initialization. """
if op_name in self._producers:
_LOGGER.critical(
- self._log("producer({}) is already in channel".format(op_name)))
+ self._log("Failed to add producer: producer({})" \
+ " is already in channel".format(op_name)))
os._exit(-1)
self._producers.append(op_name)
- _LOGGER.debug(self._log("add a producer: {}".format(op_name)))
+ _LOGGER.debug(self._log("Succ add a producer: {}".format(op_name)))
def add_consumer(self, op_name):
""" not thread safe, and can only be called during initialization. """
if op_name in self._consumer_cursors:
_LOGGER.critical(
- self._log("consumer({}) is already in channel".format(op_name)))
+ self._log("Failed to add consumer: consumer({})" \
+ " is already in channel".format(op_name)))
os._exit(-1)
self._consumer_cursors[op_name] = 0
if self._cursor_count.get(0) is None:
self._cursor_count[0] = 0
self._cursor_count[0] += 1
- _LOGGER.debug(self._log("add a consumer: {}".format(op_name)))
+ _LOGGER.debug(self._log("Succ add a consumer: {}".format(op_name)))
def push(self, channeldata, op_name=None):
_LOGGER.debug(
- self._log("{} try to push data[{}]".format(op_name,
- channeldata.id)))
+ self._log("(logid={}) Op({}) Pushing data".format(channeldata.id,
+ op_name)))
if len(self._producers) == 0:
_LOGGER.critical(
self._log(
- "expected number of producers to be greater than 0, but the it is 0."
- ))
+ "(logid={}) Op({}) Failed to push data: expected number"
+ " of producers to be greater than 0, but the it is 0.".
+ format(channeldata.id, op_name)))
os._exit(-1)
elif len(self._producers) == 1:
with self._cv:
@@ -287,13 +295,15 @@ class ProcessChannel(object):
raise ChannelStopError()
self._cv.notify_all()
_LOGGER.debug(
- self._log("{} succ push data[{}] into internal queue.".format(
- op_name, channeldata.id)))
+ self._log("(logid={}) Op({}) Pushed data into internal queue.".
+ format(channeldata.id, op_name)))
return True
elif op_name is None:
_LOGGER.critical(
self._log(
- "There are multiple producers, so op_name cannot be None."))
+ "(logid={}) Op({}) Failed to push data: there are multiple "
+ "producers, so op_name cannot be None.".format(
+ channeldata.id, op_name)))
os._exit(-1)
producer_num = len(self._producers)
@@ -321,8 +331,9 @@ class ProcessChannel(object):
if put_data is None:
_LOGGER.debug(
- self._log("{} succ push data[{}] into input_buffer.".format(
- op_name, data_id)))
+ self._log(
+ "(logid={}) Op({}) Pushed data into input_buffer.".
+ format(data_id, op_name)))
else:
while self._stop.value == 0:
try:
@@ -334,15 +345,16 @@ class ProcessChannel(object):
raise ChannelStopError()
_LOGGER.debug(
- self._log("{} succ push data[{}] into internal queue.".
- format(op_name, data_id)))
+ self._log(
+ "(logid={}) Op({}) Pushed data into internal_queue.".
+ format(data_id, op_name)))
self._cv.notify_all()
return True
def front(self, op_name=None, timeout=None):
_LOGGER.debug(
- self._log("{} try to get data[?]; timeout(s)={}".format(op_name,
- timeout)))
+ self._log("Op({}) Getting data[?]; timeout(s)={}".format(op_name,
+ timeout)))
endtime = None
if timeout is not None:
if timeout <= 0:
@@ -353,8 +365,8 @@ class ProcessChannel(object):
if len(self._consumer_cursors) == 0:
_LOGGER.critical(
self._log(
- "expected number of consumers to be greater than 0, but the it is 0."
- ))
+ "Op({}) Failed to get data: expected number of consumers to be " \
+ "greater than 0, but the it is 0.".format(op_name)))
os._exit(-1)
elif len(self._consumer_cursors) == 1:
resp = None
@@ -368,8 +380,8 @@ class ProcessChannel(object):
remaining = endtime - _time()
if remaining <= 0.0:
_LOGGER.debug(
- self._log("{} get data[?] timeout".format(
- op_name)))
+ self._log("Op({}) Failed to get data: "
+ "timeout".format(op_name)))
raise ChannelTimeoutError()
self._cv.wait(remaining)
else:
@@ -377,13 +389,14 @@ class ProcessChannel(object):
if self._stop.value == 1:
raise ChannelStopError()
_LOGGER.debug(
- self._log("{} succ get data[{}]".format(op_name,
- resp.values()[0].id)))
+ self._log("(logid={}) Op({}) Got data".format(resp.values()[0]
+ .id, op_name)))
return resp
elif op_name is None:
_LOGGER.critical(
self._log(
- "There are multiple consumers, so op_name cannot be None."))
+ "Op({}) Failed to get data: there are multiple consumers, "
+ "so op_name cannot be None.".format(op_name)))
os._exit(-1)
# In output_buf, different Ops (according to op_name) have different
@@ -405,16 +418,17 @@ class ProcessChannel(object):
channeldata = self._que.get(timeout=0)
self._output_buf.append(channeldata)
_LOGGER.debug(
- self._log("pop ready item[{}] into output_buffer".
- format(channeldata.values()[0].id)))
+ self._log(
+ "(logid={}) Op({}) Pop ready item into output_buffer".
+ format(channeldata.values()[0].id, op_name)))
break
except Queue.Empty:
if timeout is not None:
remaining = endtime - _time()
if remaining <= 0.0:
_LOGGER.debug(
- self._log("{} get data[?] timeout".format(
- op_name)))
+ self._log("Op({}) Failed to get data: timeout".
+ format(op_name)))
raise ChannelTimeoutError()
self._cv.wait(remaining)
else:
@@ -437,7 +451,7 @@ class ProcessChannel(object):
self._base_cursor.value += 1
# to avoid cursor overflow
if self._base_cursor.value >= self._reset_max_cursor:
- _LOGGER.info(self._log("reset cursor in Channel"))
+ _LOGGER.info(self._log("Reset cursor in Channel"))
self._base_cursor.value -= self._reset_max_cursor
for name in self._consumer_cursors.keys():
self._consumer_cursors[name] -= self._reset_max_cursor
@@ -458,8 +472,8 @@ class ProcessChannel(object):
self._cv.notify_all()
_LOGGER.debug(
- self._log("{} succ get data[{}] from output_buffer".format(
- op_name, resp.values()[0].id)))
+ self._log("(logid={}) Op({}) Got data from output_buffer".format(
+ resp.values()[0].id, op_name)))
return resp
def stop(self):
@@ -529,33 +543,36 @@ class ThreadChannel(Queue.Queue):
""" not thread safe, and can only be called during initialization. """
if op_name in self._producers:
_LOGGER.critical(
- self._log("producer({}) is already in channel".format(op_name)))
+ self._log("Failed to add producer: producer({}) is "
+ "already in channel".format(op_name)))
os._exit(-1)
self._producers.append(op_name)
- _LOGGER.debug(self._log("add a producer: {}".format(op_name)))
+ _LOGGER.debug(self._log("Succ add a producer: {}".format(op_name)))
def add_consumer(self, op_name):
""" not thread safe, and can only be called during initialization. """
if op_name in self._consumer_cursors:
_LOGGER.critical(
- self._log("consumer({}) is already in channel".format(op_name)))
+ self._log("Failed to add consumer: consumer({}) is "
+ "already in channel".format(op_name)))
os._exit(-1)
self._consumer_cursors[op_name] = 0
if self._cursor_count.get(0) is None:
self._cursor_count[0] = 0
self._cursor_count[0] += 1
- _LOGGER.debug(self._log("add a consumer: {}".format(op_name)))
+ _LOGGER.debug(self._log("Succ add a consumer: {}".format(op_name)))
def push(self, channeldata, op_name=None):
_LOGGER.debug(
- self._log("{} try to push data[{}]".format(op_name,
- channeldata.id)))
+ self._log("(logid={}) Op({}) Pushing data".format(channeldata.id,
+ op_name)))
if len(self._producers) == 0:
_LOGGER.critical(
self._log(
- "expected number of producers to be greater than 0, but the it is 0."
- ))
+ "(logid={}) Op({}) Failed to push data: expected number of "
+ "producers to be greater than 0, but the it is 0.".format(
+ channeldata.id, op_name)))
os._exit(-1)
elif len(self._producers) == 1:
with self._cv:
@@ -569,13 +586,15 @@ class ThreadChannel(Queue.Queue):
raise ChannelStopError()
self._cv.notify_all()
_LOGGER.debug(
- self._log("{} succ push data[{}] into internal queue.".format(
- op_name, channeldata.id)))
+ self._log("(logid={}) Op({}) Pushed data into internal_queue.".
+ format(channeldata.id, op_name)))
return True
elif op_name is None:
_LOGGER.critical(
self._log(
- "There are multiple producers, so op_name cannot be None."))
+ "(logid={}) Op({}) Failed to push data: there are multiple"
+ " producers, so op_name cannot be None.".format(
+ channeldata.id, op_name)))
os._exit(-1)
producer_num = len(self._producers)
@@ -598,8 +617,9 @@ class ThreadChannel(Queue.Queue):
if put_data is None:
_LOGGER.debug(
- self._log("{} succ push data[{}] into input_buffer.".format(
- op_name, data_id)))
+ self._log(
+ "(logid={}) Op({}) Pushed data into input_buffer.".
+ format(data_id, op_name)))
else:
while self._stop is False:
try:
@@ -611,15 +631,16 @@ class ThreadChannel(Queue.Queue):
raise ChannelStopError()
_LOGGER.debug(
- self._log("{} succ push data[{}] into internal queue.".
- format(op_name, data_id)))
+ self._log(
+ "(logid={}) Op({}) Pushed data into internal_queue.".
+ format(data_id, op_name)))
self._cv.notify_all()
return True
def front(self, op_name=None, timeout=None):
_LOGGER.debug(
- self._log("{} try to get data[?]; timeout(s)={}".format(op_name,
- timeout)))
+ self._log("Op({}) Getting data[?]; timeout(s)={}".format(op_name,
+ timeout)))
endtime = None
if timeout is not None:
if timeout <= 0:
@@ -630,8 +651,8 @@ class ThreadChannel(Queue.Queue):
if len(self._consumer_cursors) == 0:
_LOGGER.critical(
self._log(
- "expected number of consumers to be greater than 0, but the it is 0."
- ))
+ "Op({}) Failed to get data: expected number of consumers to be "
+ "greater than 0, but the it is 0.".format(op_name)))
os._exit(-1)
elif len(self._consumer_cursors) == 1:
resp = None
@@ -645,8 +666,9 @@ class ThreadChannel(Queue.Queue):
remaining = endtime - _time()
if remaining <= 0.0:
_LOGGER.debug(
- self._log("{} get data[?] timeout".format(
- op_name)))
+ self._log(
+ "Op({}) Failed to get data: timeout".
+ format(op_name)))
raise ChannelTimeoutError()
self._cv.wait(remaining)
else:
@@ -654,13 +676,14 @@ class ThreadChannel(Queue.Queue):
if self._stop:
raise ChannelStopError()
_LOGGER.debug(
- self._log("{} succ get data[{}]".format(op_name,
- resp.values()[0].id)))
+ self._log("(logid={}) Op({}) Got data".format(resp.values()[0]
+ .id, op_name)))
return resp
elif op_name is None:
_LOGGER.critical(
- self._log(
- "There are multiple consumers, so op_name cannot be None."))
+ self._log("Op({}) Failed to get data: there are multiple "
+ "consumers, so op_name cannot be None.".format(
+ op_name)))
os._exit(-1)
# In output_buf, different Ops (according to op_name) have different
@@ -682,16 +705,17 @@ class ThreadChannel(Queue.Queue):
channeldata = self.get(timeout=0)
self._output_buf.append(channeldata)
_LOGGER.debug(
- self._log("pop ready item[{}] into output_buffer".
- format(channeldata.values()[0].id)))
+ self._log(
+ "(logid={}) Op({}) Pop ready item into output_buffer".
+ format(channeldata.values()[0].id, op_name)))
break
except Queue.Empty:
if timeout is not None:
remaining = endtime - _time()
if remaining <= 0.0:
_LOGGER.debug(
- self._log("{} get data[?] timeout".format(
- op_name)))
+ self._log("Op({}) Failed to get data: timeout".
+ format(op_name)))
raise ChannelTimeoutError()
self._cv.wait(remaining)
else:
@@ -715,7 +739,7 @@ class ThreadChannel(Queue.Queue):
self._base_cursor += 1
# to avoid cursor overflow
if self._base_cursor >= self._reset_max_cursor:
- _LOGGER.info(self._log("reset cursor in Channel"))
+ _LOGGER.info(self._log("Reset cursor in Channel"))
self._base_cursor -= self._reset_max_cursor
for name in self._consumer_cursors:
self._consumer_cursors[name] -= self._reset_max_cursor
@@ -735,8 +759,8 @@ class ThreadChannel(Queue.Queue):
self._cv.notify_all()
_LOGGER.debug(
- self._log("{} succ get data[{}] from output_buffer".format(
- op_name, resp.values()[0].id)))
+ self._log("(logid={}) Op({}) Got data from output_buffer".format(
+ resp.values()[0].id, op_name)))
return resp
def stop(self):
diff --git a/python/pipeline/dag.py b/python/pipeline/dag.py
index 73b39a19dcb4c9e70ce084bf58acab09845c2c6e..87aa7dfe54ff1551ac1824901cf06aac733b0481 100644
--- a/python/pipeline/dag.py
+++ b/python/pipeline/dag.py
@@ -30,6 +30,7 @@ from .channel import (ThreadChannel, ProcessChannel, ChannelData,
ChannelDataEcode, ChannelDataType, ChannelStopError)
from .profiler import TimeProfiler
from .util import NameGenerator
+from .proto import pipeline_service_pb2
_LOGGER = logging.getLogger()
@@ -74,17 +75,18 @@ class DAGExecutor(object):
self._recive_func = threading.Thread(
target=DAGExecutor._recive_out_channel_func, args=(self, ))
self._recive_func.start()
- _LOGGER.debug("[DAG Executor] start recive thread")
+ _LOGGER.debug("[DAG Executor] Start recive thread")
def stop(self):
self._dag.stop()
self._dag.join()
- _LOGGER.info("[DAG Executor] succ stop")
+ _LOGGER.info("[DAG Executor] Stop")
def _get_next_data_id(self):
data_id = None
with self._id_lock:
if self._id_counter >= self._reset_max_id:
+ _LOGGER.info("[DAG Executor] Reset request id")
self._id_counter -= self._reset_max_id
data_id = self._id_counter
self._id_counter += 1
@@ -96,16 +98,18 @@ class DAGExecutor(object):
def _set_in_channel(self, in_channel):
if not isinstance(in_channel, (ThreadChannel, ProcessChannel)):
- _LOGGER.critical("[DAG Executor] in_channel must be Channel"
- " type, but get {}".format(type(in_channel)))
+ _LOGGER.critical("[DAG Executor] Failed to set in_channel: "
+ "in_channel must be Channel type, but get {}".
+ format(type(in_channel)))
os._exit(-1)
in_channel.add_producer(self.name)
self._in_channel = in_channel
def _set_out_channel(self, out_channel):
if not isinstance(out_channel, (ThreadChannel, ProcessChannel)):
- _LOGGER.critical("[DAG Executor]iout_channel must be Channel"
- " type, but get {}".format(type(out_channel)))
+ _LOGGER.critical("[DAG Executor] Failed to set out_channel: "
+ "must be Channel type, but get {}".format(
+ type(out_channel)))
os._exit(-1)
out_channel.add_consumer(self.name)
self._out_channel = out_channel
@@ -116,7 +120,7 @@ class DAGExecutor(object):
try:
channeldata_dict = self._out_channel.front(self.name)
except ChannelStopError:
- _LOGGER.info("[DAG Executor] channel stop.")
+ _LOGGER.info("[DAG Executor] Stop.")
with self._cv_for_cv_pool:
for data_id, cv in self._cv_pool.items():
closed_errror_data = ChannelData(
@@ -130,17 +134,20 @@ class DAGExecutor(object):
if len(channeldata_dict) != 1:
_LOGGER.critical(
- "[DAG Executor] out_channel cannot have multiple input ops")
+ "[DAG Executor] Failed to fetch result: out_channel "
+ "cannot have multiple input ops")
os._exit(-1)
(_, channeldata), = channeldata_dict.items()
if not isinstance(channeldata, ChannelData):
_LOGGER.critical(
- '[DAG Executor] data must be ChannelData type, but get {}'
+ '[DAG Executor] Failed to fetch result: data in out_channel" \
+ " must be ChannelData type, but get {}'
.format(type(channeldata)))
os._exit(-1)
data_id = channeldata.id
- _LOGGER.debug("recive thread fetch data[{}]".format(data_id))
+ _LOGGER.debug("(logid={}) [recive thread] Fetched data".format(
+ data_id))
with self._cv_for_cv_pool:
cond_v = self._cv_pool[data_id]
with cond_v:
@@ -164,7 +171,7 @@ class DAGExecutor(object):
ready_data = self._fetch_buffer[data_id]
self._cv_pool.pop(data_id)
self._fetch_buffer.pop(data_id)
- _LOGGER.debug("resp thread get resp data[{}]".format(data_id))
+ _LOGGER.debug("(logid={}) [resp thread] Got data".format(data_id))
return ready_data
def _pack_channeldata(self, rpc_request, data_id):
@@ -172,8 +179,10 @@ class DAGExecutor(object):
try:
dictdata = self._unpack_rpc_func(rpc_request)
except Exception as e:
- _LOGGER.error("parse RPC package to data[{}] Error: {}"
- .format(data_id, e))
+ _LOGGER.error(
+ "(logid={}) Failed to parse RPC request package: {}"
+ .format(data_id, e),
+ exc_info=True)
return ChannelData(
ecode=ChannelDataEcode.RPC_PACKAGE_ERROR.value,
error_info="rpc package error: {}".format(e),
@@ -187,7 +196,7 @@ class DAGExecutor(object):
profile_value = rpc_request.value[idx]
break
client_need_profile = (profile_value == self._client_profile_value)
- _LOGGER.debug("request[{}] need profile: {}".format(
+ _LOGGER.debug("(logid={}) Need profile in client: {}".format(
data_id, client_need_profile))
return ChannelData(
datatype=ChannelDataType.DICT.value,
@@ -197,26 +206,28 @@ class DAGExecutor(object):
def call(self, rpc_request):
data_id, cond_v = self._get_next_data_id()
- _LOGGER.debug("generate Request id: {}".format(data_id))
+ _LOGGER.info("(logid={}) Succ generate id".format(data_id))
+ start_call, end_call = None, None
if not self._is_thread_op:
- self._profiler.record("call_{}#DAG-{}_0".format(data_id, data_id))
+ start_call = self._profiler.record("call_{}#DAG-{}_0".format(
+ data_id, data_id))
else:
- self._profiler.record("call_{}#DAG_0".format(data_id))
+ start_call = self._profiler.record("call_{}#DAG_0".format(data_id))
- _LOGGER.debug("try parse RPC request to channeldata[{}]".format(
- data_id))
+ _LOGGER.debug("(logid={}) Parsing RPC request package".format(data_id))
self._profiler.record("prepack_{}#{}_0".format(data_id, self.name))
req_channeldata = self._pack_channeldata(rpc_request, data_id)
self._profiler.record("prepack_{}#{}_1".format(data_id, self.name))
resp_channeldata = None
for i in range(self._retry):
- _LOGGER.debug("push data[{}] into Graph engine".format(data_id))
+ _LOGGER.debug("(logid={}) Pushing data into Graph engine".format(
+ data_id))
try:
self._in_channel.push(req_channeldata, self.name)
except ChannelStopError:
- _LOGGER.debug("[DAG Executor] channel stop.")
+ _LOGGER.debug("[DAG Executor] Stop")
with self._cv_for_cv_pool:
self._cv_pool.pop(data_id)
return self._pack_for_rpc_resp(
@@ -225,32 +236,35 @@ class DAGExecutor(object):
error_info="dag closed.",
data_id=data_id))
- _LOGGER.debug("wait Graph engine for data[{}]...".format(data_id))
+ _LOGGER.debug("(logid={}) Wait for Graph engine...".format(data_id))
resp_channeldata = self._get_channeldata_from_fetch_buffer(data_id,
cond_v)
if resp_channeldata.ecode == ChannelDataEcode.OK.value:
- _LOGGER.debug("request[{}] succ predict".format(data_id))
+ _LOGGER.debug("(logid={}) Succ predict".format(data_id))
break
else:
- _LOGGER.warning("request[{}] predict failed: {}"
- .format(data_id, resp_channeldata.error_info))
+ _LOGGER.error("(logid={}) Failed to predict: {}"
+ .format(data_id, resp_channeldata.error_info))
if resp_channeldata.ecode != ChannelDataEcode.TIMEOUT.value:
break
if i + 1 < self._retry:
- _LOGGER.warning("retry({}/{}) data[{}]".format(
- i + 1, self._retry, data_id))
+ _LOGGER.warning("(logid={}) DAGExecutor retry({}/{})".format(
+ data_id, i + 1, self._retry))
- _LOGGER.debug("unpack channeldata[{}] into RPC response".format(
- data_id))
+ _LOGGER.debug("(logid={}) Packing RPC response package".format(data_id))
self._profiler.record("postpack_{}#{}_0".format(data_id, self.name))
rpc_resp = self._pack_for_rpc_resp(resp_channeldata)
self._profiler.record("postpack_{}#{}_1".format(data_id, self.name))
if not self._is_thread_op:
- self._profiler.record("call_{}#DAG-{}_1".format(data_id, data_id))
+ end_call = self._profiler.record("call_{}#DAG-{}_1".format(data_id,
+ data_id))
else:
- self._profiler.record("call_{}#DAG_1".format(data_id))
+ end_call = self._profiler.record("call_{}#DAG_1".format(data_id))
+ _LOGGER.log(level=1,
+ msg="(logid={}) call[{} ms]".format(
+ data_id, (end_call - start_call) / 1e3))
profile_str = self._profiler.gen_profile_str()
if self._server_use_profile:
@@ -268,7 +282,17 @@ class DAGExecutor(object):
return rpc_resp
def _pack_for_rpc_resp(self, channeldata):
- return self._pack_rpc_func(channeldata)
+ try:
+ return self._pack_rpc_func(channeldata)
+ except Exception as e:
+ _LOGGER.error(
+ "(logid={}) Failed to pack RPC response package: {}"
+ .format(channeldata.id, e),
+ exc_info=True)
+ resp = pipeline_service_pb2.Response()
+ resp.ecode = ChannelDataEcode.RPC_PACKAGE_ERROR.value
+ resp.error_info = "rpc package error: {}".format(e)
+ return resp
class DAG(object):
@@ -283,7 +307,7 @@ class DAG(object):
self._build_dag_each_worker = build_dag_each_worker
if not self._is_thread_op:
self._manager = multiprocessing.Manager()
- _LOGGER.info("[DAG] succ init")
+ _LOGGER.info("[DAG] Succ init")
def get_use_ops(self, response_op):
unique_names = set()
@@ -303,7 +327,8 @@ class DAG(object):
used_ops.add(pred_op)
# check the name of op is globally unique
if pred_op.name in unique_names:
- _LOGGER.critical("the name of Op must be unique: {}".
+ _LOGGER.critical("Failed to get used Ops: the"
+ " name of Op must be unique: {}".
format(pred_op.name))
os._exit(-1)
unique_names.add(pred_op.name)
@@ -317,12 +342,12 @@ class DAG(object):
else:
channel = ProcessChannel(
self._manager, name=name_gen.next(), maxsize=self._channel_size)
- _LOGGER.debug("[DAG] gen Channel: {}".format(channel.name))
+ _LOGGER.debug("[DAG] Generate channel: {}".format(channel.name))
return channel
def _gen_virtual_op(self, name_gen):
vir_op = VirtualOp(name=name_gen.next())
- _LOGGER.debug("[DAG] gen VirtualOp: {}".format(vir_op.name))
+ _LOGGER.debug("[DAG] Generate virtual_op: {}".format(vir_op.name))
return vir_op
def _topo_sort(self, used_ops, response_op, out_degree_ops):
@@ -337,7 +362,8 @@ class DAG(object):
if len(op.get_input_ops()) == 0:
zero_indegree_num += 1
if zero_indegree_num != 1:
- _LOGGER.critical("DAG contains multiple RequestOps")
+ _LOGGER.critical("Failed to topo sort: DAG contains "
+ "multiple RequestOps")
os._exit(-1)
last_op = response_op.get_input_ops()[0]
ques[que_idx].put(last_op)
@@ -362,14 +388,15 @@ class DAG(object):
break
que_idx = (que_idx + 1) % 2
if sorted_op_num < len(used_ops):
- _LOGGER.critical("not legal DAG")
+ _LOGGER.critical("Failed to topo sort: not legal DAG")
os._exit(-1)
return dag_views, last_op
def _build_dag(self, response_op):
if response_op is None:
- _LOGGER.critical("ResponseOp has not been set.")
+ _LOGGER.critical("Failed to build DAG: ResponseOp"
+ " has not been set.")
os._exit(-1)
used_ops, out_degree_ops = self.get_use_ops(response_op)
if not self._build_dag_each_worker:
@@ -380,8 +407,8 @@ class DAG(object):
_LOGGER.info("-------------------------------------------")
if len(used_ops) <= 1:
_LOGGER.critical(
- "Besides RequestOp and ResponseOp, there should be at least one Op in DAG."
- )
+ "Failed to build DAG: besides RequestOp and ResponseOp, "
+ "there should be at least one Op in DAG.")
os._exit(-1)
if self._build_dag_each_worker:
_LOGGER.info("Because `build_dag_each_worker` mode is used, "
@@ -443,8 +470,6 @@ class DAG(object):
continue
channel = self._gen_channel(channel_name_gen)
channels.append(channel)
- _LOGGER.debug("[DAG] Channel({}) => Op({})"
- .format(channel.name, op.name))
op.add_input_channel(channel)
pred_ops = pred_op_of_next_view_op[op.name]
if v_idx == 0:
@@ -452,8 +477,6 @@ class DAG(object):
else:
# if pred_op is virtual op, it will use ancestors as producers to channel
for pred_op in pred_ops:
- _LOGGER.debug("[DAG] Op({}) => Channel({})"
- .format(pred_op.name, channel.name))
pred_op.add_output_channel(channel)
processed_op.add(op.name)
# find same input op to combine channel
@@ -469,8 +492,6 @@ class DAG(object):
same_flag = False
break
if same_flag:
- _LOGGER.debug("[DAG] Channel({}) => Op({})"
- .format(channel.name, other_op.name))
other_op.add_input_channel(channel)
processed_op.add(other_op.name)
output_channel = self._gen_channel(channel_name_gen)
@@ -488,7 +509,7 @@ class DAG(object):
actual_ops.append(op)
for c in channels:
- _LOGGER.debug("Channel({}):\n\t-producers: {}\n\t-consumers: {}"
+ _LOGGER.debug("Channel({}):\n\t- producers: {}\n\t- consumers: {}"
.format(c.name, c.get_producers(), c.get_consumers()))
return (actual_ops, channels, input_channel, output_channel, pack_func,
@@ -497,7 +518,7 @@ class DAG(object):
def build(self):
(actual_ops, channels, input_channel, output_channel, pack_func,
unpack_func) = self._build_dag(self._response_op)
- _LOGGER.info("[DAG] succ build dag")
+ _LOGGER.info("[DAG] Succ build DAG")
self._actual_ops = actual_ops
self._channels = channels
diff --git a/python/pipeline/operator.py b/python/pipeline/operator.py
index 0e9c77e2d30c953ed607f80aa904eac3a777c95c..287893f92e9bbca1ed1cfad058ddf3a9028473e0 100644
--- a/python/pipeline/operator.py
+++ b/python/pipeline/operator.py
@@ -73,8 +73,9 @@ class Op(object):
if self._auto_batching_timeout is not None:
if self._auto_batching_timeout <= 0 or self._batch_size == 1:
_LOGGER.warning(
- "Because auto_batching_timeout <= 0 or batch_size == 1,"
- " set auto_batching_timeout to None.")
+ self._log(
+ "Because auto_batching_timeout <= 0 or batch_size == 1,"
+ " set auto_batching_timeout to None."))
self._auto_batching_timeout = None
else:
self._auto_batching_timeout = self._auto_batching_timeout / 1000.0
@@ -120,7 +121,8 @@ class Op(object):
def init_client(self, client_type, client_config, server_endpoints,
fetch_names):
if self.with_serving == False:
- _LOGGER.info("Op({}) no client".format(self.name))
+ _LOGGER.info("Op({}) has no client (and it also do not "
+ "run the process function".format(self.name))
return None
if client_type == 'brpc':
client = Client()
@@ -128,7 +130,8 @@ class Op(object):
elif client_type == 'grpc':
client = MultiLangClient()
else:
- raise ValueError("unknow client type: {}".format(client_type))
+ raise ValueError("Failed to init client: unknow client "
+ "type {}".format(client_type))
client.connect(server_endpoints)
self._fetch_names = fetch_names
return client
@@ -143,16 +146,17 @@ class Op(object):
for op in ops:
if not isinstance(op, Op):
_LOGGER.critical(
- self._log("input op must be Op type, not {}"
- .format(type(op))))
+ self._log("Failed to set input_ops: input op "
+ "must be Op type, not {}".format(type(op))))
os._exit(-1)
self._input_ops.append(op)
def add_input_channel(self, channel):
if not isinstance(channel, (ThreadChannel, ProcessChannel)):
_LOGGER.critical(
- self._log("input channel must be Channel type, not {}"
- .format(type(channel))))
+ self._log("Failed to set input_channel: input "
+ "channel must be Channel type, not {}".format(
+ type(channel))))
os._exit(-1)
channel.add_consumer(self.name)
self._input = channel
@@ -166,8 +170,8 @@ class Op(object):
def add_output_channel(self, channel):
if not isinstance(channel, (ThreadChannel, ProcessChannel)):
_LOGGER.critical(
- self._log("output channel must be Channel type, not {}"
- .format(type(channel))))
+ self._log("Failed to add output_channel: output channel "
+ "must be Channel type, not {}".format(type(channel))))
os._exit(-1)
channel.add_producer(self.name)
self._outputs.append(channel)
@@ -183,8 +187,8 @@ class Op(object):
if len(input_dicts) != 1:
_LOGGER.critical(
self._log(
- "this Op has multiple previous inputs. Please override this func."
- ))
+ "Failed to run preprocess: this Op has multiple previous "
+ "inputs. Please override this func."))
os._exit(-1)
(_, input_dict), = input_dicts.items()
@@ -194,8 +198,8 @@ class Op(object):
err, err_info = ChannelData.check_batch_npdata(feed_batch)
if err != 0:
_LOGGER.critical(
- self._log("{}, Please override preprocess func.".format(
- err_info)))
+ self._log("Failed to run process: {}. Please override "
+ "preprocess func.".format(err_info)))
os._exit(-1)
call_result = self.client.predict(
feed=feed_batch, fetch=self._fetch_names)
@@ -274,8 +278,8 @@ class Op(object):
def init_op(self):
pass
- def _run_preprocess(self, parsed_data_dict, log_func):
- _LOGGER.debug(log_func("try to run preprocess"))
+ def _run_preprocess(self, parsed_data_dict, op_info_prefix):
+ _LOGGER.debug("{} Running preprocess".format(op_info_prefix))
preped_data_dict = {}
err_channeldata_dict = {}
for data_id, parsed_data in parsed_data_dict.items():
@@ -284,17 +288,17 @@ class Op(object):
preped_data = self.preprocess(parsed_data)
except TypeError as e:
# Error type in channeldata.datatype
- error_info = log_func("preprocess data[{}] failed: {}"
- .format(data_id, e))
- _LOGGER.error(error_info)
+ error_info = "(logid={}) {} Failed to preprocess: {}".format(
+ data_id, op_info_prefix, e)
+ _LOGGER.error(error_info, exc_info=True)
error_channeldata = ChannelData(
ecode=ChannelDataEcode.TYPE_ERROR.value,
error_info=error_info,
data_id=data_id)
except Exception as e:
- error_info = log_func("preprocess data[{}] failed: {}"
- .format(data_id, e))
- _LOGGER.error(error_info)
+ error_info = "(logid={}) {} Failed to preprocess: {}".format(
+ data_id, op_info_prefix, e)
+ _LOGGER.error(error_info, exc_info=True)
error_channeldata = ChannelData(
ecode=ChannelDataEcode.UNKNOW.value,
error_info=error_info,
@@ -303,11 +307,11 @@ class Op(object):
err_channeldata_dict[data_id] = error_channeldata
else:
preped_data_dict[data_id] = preped_data
- _LOGGER.debug(log_func("succ run preprocess"))
+ _LOGGER.debug("{} Succ preprocess".format(op_info_prefix))
return preped_data_dict, err_channeldata_dict
- def _run_process(self, preped_data_dict, log_func):
- _LOGGER.debug(log_func("try to run process"))
+ def _run_process(self, preped_data_dict, op_info_prefix):
+ _LOGGER.debug("{} Running process".format(op_info_prefix))
midped_data_dict = {}
err_channeldata_dict = {}
if self.with_serving:
@@ -320,8 +324,9 @@ class Op(object):
midped_batch = self.process(feed_batch)
except Exception as e:
ecode = ChannelDataEcode.UNKNOW.value
- error_info = log_func("process batch failed: {}".format(e))
- _LOGGER.error(error_info)
+ error_info = "{} Failed to process(batch: {}): {}".format(
+ op_info_prefix, data_ids, e)
+ _LOGGER.error(error_info, exc_info=True)
else:
for i in range(self._retry):
try:
@@ -330,30 +335,34 @@ class Op(object):
except func_timeout.FunctionTimedOut as e:
if i + 1 >= self._retry:
ecode = ChannelDataEcode.TIMEOUT.value
- error_info = log_func(e)
+ error_info = "{} Failed to process(batch: {}): " \
+ "exceeded retry count.".format(
+ op_info_prefix, data_ids)
_LOGGER.error(error_info)
else:
_LOGGER.warning(
- log_func("PaddleService timeout, retry({}/{})"
- .format(i + 1, self._retry)))
+ "{} Failed to process(batch: {}): timeout, and retrying({}/{})"
+ .format(op_info_prefix, data_ids, i + 1,
+ self._retry))
except Exception as e:
ecode = ChannelDataEcode.UNKNOW.value
- error_info = log_func("process batch failed: {}".format(
- e))
- _LOGGER.error(error_info)
+ error_info = "{} Failed to process(batch: {}): {}".format(
+ op_info_prefix, data_ids, e)
+ _LOGGER.error(error_info, exc_info=True)
break
else:
break
if ecode != ChannelDataEcode.OK.value:
for data_id in data_ids:
+ _LOGGER.error("(logid={}) {}".format(data_id, error_info))
err_channeldata_dict[data_id] = ChannelData(
ecode=ecode, error_info=error_info, data_id=data_id)
elif midped_batch is None:
# op client return None
- error_info = log_func(
- "predict failed. pls check the server side.")
- _LOGGER.error(error_info)
+ error_info = "{} Failed to predict, please check if PaddleServingService" \
+ " is working properly.".format(op_info_prefix)
for data_id in data_ids:
+ _LOGGER.error("(logid={}) {}".format(data_id, error_info))
err_channeldata_dict[data_id] = ChannelData(
ecode=ChannelDataEcode.CLIENT_ERROR.value,
error_info=error_info,
@@ -367,11 +376,12 @@ class Op(object):
}
else:
midped_data_dict = preped_data_dict
- _LOGGER.debug(log_func("succ run process"))
+ _LOGGER.debug("{} Succ process".format(op_info_prefix))
return midped_data_dict, err_channeldata_dict
- def _run_postprocess(self, parsed_data_dict, midped_data_dict, log_func):
- _LOGGER.debug(log_func("try to run postprocess"))
+ def _run_postprocess(self, parsed_data_dict, midped_data_dict,
+ op_info_prefix):
+ _LOGGER.debug("{} Running postprocess".format(op_info_prefix))
postped_data_dict = {}
err_channeldata_dict = {}
for data_id, midped_data in midped_data_dict.items():
@@ -380,9 +390,9 @@ class Op(object):
postped_data = self.postprocess(parsed_data_dict[data_id],
midped_data)
except Exception as e:
- error_info = log_func("postprocess data[{}] failed: {}"
- .format(data_id, e))
- _LOGGER.error(error_info)
+ error_info = "(logid={}) {} Failed to postprocess: {}".format(
+ data_id, op_info_prefix, e)
+ _LOGGER.error(error_info, exc_info=True)
err_channeldata = ChannelData(
ecode=ChannelDataEcode.UNKNOW.value,
error_info=error_info,
@@ -392,9 +402,11 @@ class Op(object):
continue
else:
if not isinstance(postped_data, dict):
- error_info = log_func(
- "output of postprocess funticon must be "
- "dict type, but get {}".format(type(postped_data)))
+ error_info = "(logid={}) {} Failed to postprocess: " \
+ "output of postprocess funticon must be " \
+ "dict type, but get {}".format(
+ data_id, op_info_prefix,
+ type(postped_data))
_LOGGER.error(error_info)
err_channeldata = ChannelData(
ecode=ChannelDataEcode.UNKNOW.value,
@@ -416,16 +428,13 @@ class Op(object):
dictdata=postped_data,
data_id=data_id)
postped_data_dict[data_id] = output_data
- _LOGGER.debug(log_func("succ run postprocess"))
+ _LOGGER.debug("{} Succ postprocess".format(op_info_prefix))
return postped_data_dict, err_channeldata_dict
def _auto_batching_generator(self, input_channel, op_name, batch_size,
- timeout, log_func):
+ timeout, op_info_prefix):
while True:
batch = []
- _LOGGER.debug(
- log_func("Auto-batching expect size: {}; timeout(s): {}".format(
- batch_size, timeout)))
while len(batch) == 0:
endtime = None
if timeout is not None:
@@ -436,7 +445,8 @@ class Op(object):
if timeout is not None:
remaining = endtime - _time()
if remaining <= 0.0:
- _LOGGER.debug(log_func("Auto-batching timeout"))
+ _LOGGER.debug("{} Failed to generate batch: "
+ "timeout".format(op_info_prefix))
break
channeldata_dict = input_channel.front(op_name,
timeout)
@@ -444,10 +454,11 @@ class Op(object):
channeldata_dict = input_channel.front(op_name)
batch.append(channeldata_dict)
except ChannelTimeoutError:
- _LOGGER.debug(log_func("Auto-batching timeout"))
+ _LOGGER.debug("{} Failed to generate batch: "
+ "timeout".format(op_info_prefix))
break
- _LOGGER.debug(
- log_func("Auto-batching actual size: {}".format(len(batch))))
+ _LOGGER.debug("{} Got actual batch_size: {}".format(op_info_prefix,
+ len(batch)))
yield batch
def _parse_channeldata_batch(self, batch, output_channels):
@@ -472,14 +483,7 @@ class Op(object):
def _run(self, concurrency_idx, input_channel, output_channels, client_type,
is_thread_op):
- def get_log_func(op_info_prefix):
- def log_func(info_str):
- return "{} {}".format(op_info_prefix, info_str)
-
- return log_func
-
op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx)
- log = get_log_func(op_info_prefix)
tid = threading.current_thread().ident
# init op
@@ -488,22 +492,27 @@ class Op(object):
profiler = self._initialize(is_thread_op, client_type,
concurrency_idx)
except Exception as e:
- _LOGGER.critical(log("init op failed: {}".format(e)))
+ _LOGGER.critical(
+ "{} Failed to init op: {}".format(op_info_prefix, e),
+ exc_info=True)
os._exit(-1)
- _LOGGER.info(log("succ init"))
+ _LOGGER.info("{} Succ init".format(op_info_prefix))
batch_generator = self._auto_batching_generator(
input_channel=input_channel,
op_name=self.name,
batch_size=self._batch_size,
timeout=self._auto_batching_timeout,
- log_func=log)
+ op_info_prefix=op_info_prefix)
+ start_prep, end_prep = None, None
+ start_midp, end_midp = None, None
+ start_postp, end_postp = None, None
while True:
try:
channeldata_dict_batch = next(batch_generator)
except ChannelStopError:
- _LOGGER.debug(log("channel stop."))
+ _LOGGER.debug("{} Stop.".format(op_info_prefix))
self._finalize(is_thread_op)
break
@@ -513,7 +522,7 @@ class Op(object):
= self._parse_channeldata_batch(
channeldata_dict_batch, output_channels)
except ChannelStopError:
- _LOGGER.debug(log("channel stop."))
+ _LOGGER.debug("{} Stop.".format(op_info_prefix))
self._finalize(is_thread_op)
break
if len(parsed_data_dict) == 0:
@@ -521,10 +530,14 @@ class Op(object):
continue
# preprecess
- profiler.record("prep#{}_0".format(op_info_prefix))
+ start_prep = profiler.record("prep#{}_0".format(op_info_prefix))
preped_data_dict, err_channeldata_dict \
- = self._run_preprocess(parsed_data_dict, log)
- profiler.record("prep#{}_1".format(op_info_prefix))
+ = self._run_preprocess(parsed_data_dict, op_info_prefix)
+ end_prep = profiler.record("prep#{}_1".format(op_info_prefix))
+ _LOGGER.log(level=1,
+ msg="(logid={}) {} prep[{} ms]".format(
+ parsed_data_dict.keys(), op_info_prefix,
+ (end_prep - start_prep) / 1e3))
try:
for data_id, err_channeldata in err_channeldata_dict.items():
self._push_to_output_channels(
@@ -533,17 +546,21 @@ class Op(object):
client_need_profile=need_profile_dict[data_id],
profile_set=profile_dict[data_id])
except ChannelStopError:
- _LOGGER.debug(log("channel stop."))
+ _LOGGER.debug("{} Stop.".format(op_info_prefix))
self._finalize(is_thread_op)
break
if len(parsed_data_dict) == 0:
continue
# process
- profiler.record("midp#{}_0".format(op_info_prefix))
+ start_midp = profiler.record("midp#{}_0".format(op_info_prefix))
midped_data_dict, err_channeldata_dict \
- = self._run_process(preped_data_dict, log)
- profiler.record("midp#{}_1".format(op_info_prefix))
+ = self._run_process(preped_data_dict, op_info_prefix)
+ end_midp = profiler.record("midp#{}_1".format(op_info_prefix))
+ _LOGGER.log(level=1,
+ msg="(logid={}) {} midp[{} ms]".format(
+ preped_data_dict.keys(), op_info_prefix,
+ (end_midp - start_midp) / 1e3))
try:
for data_id, err_channeldata in err_channeldata_dict.items():
self._push_to_output_channels(
@@ -552,18 +569,22 @@ class Op(object):
client_need_profile=need_profile_dict[data_id],
profile_set=profile_dict[data_id])
except ChannelStopError:
- _LOGGER.debug(log("channel stop."))
+ _LOGGER.debug("{} Stop.".format(op_info_prefix))
self._finalize(is_thread_op)
break
if len(midped_data_dict) == 0:
continue
# postprocess
- profiler.record("postp#{}_0".format(op_info_prefix))
+ start_postp = profiler.record("postp#{}_0".format(op_info_prefix))
postped_data_dict, err_channeldata_dict \
= self._run_postprocess(
- parsed_data_dict, midped_data_dict, log)
- profiler.record("postp#{}_1".format(op_info_prefix))
+ parsed_data_dict, midped_data_dict, op_info_prefix)
+ end_postp = profiler.record("postp#{}_1".format(op_info_prefix))
+ _LOGGER.log(level=1,
+ msg="(logid={}) {} postp[{} ms]".format(
+ midped_data_dict.keys(), op_info_prefix,
+ (end_midp - start_midp) / 1e3))
try:
for data_id, err_channeldata in err_channeldata_dict.items():
self._push_to_output_channels(
@@ -572,7 +593,7 @@ class Op(object):
client_need_profile=need_profile_dict[data_id],
profile_set=profile_dict[data_id])
except ChannelStopError:
- _LOGGER.debug(log("channel stop."))
+ _LOGGER.debug("{} Stop.".format(op_info_prefix))
self._finalize(is_thread_op)
break
if len(postped_data_dict) == 0:
@@ -591,7 +612,7 @@ class Op(object):
client_need_profile=need_profile_dict[data_id],
profile_set=profile_dict[data_id])
except ChannelStopError:
- _LOGGER.debug(log("channel stop."))
+ _LOGGER.debug("{} Stop.".format(op_info_prefix))
self._finalize(is_thread_op)
break
@@ -646,7 +667,7 @@ class RequestOp(Op):
try:
self.init_op()
except Exception as e:
- _LOGGER.critical("Op(Request) init op failed: {}".format(e))
+ _LOGGER.critical("Op(Request) Failed to init: {}".format(e))
os._exit(-1)
def unpack_request_package(self, request):
@@ -670,7 +691,8 @@ class ResponseOp(Op):
try:
self.init_op()
except Exception as e:
- _LOGGER.critical("Op(ResponseOp) init op failed: {}".format(e))
+ _LOGGER.critical("Op(ResponseOp) Failed to init: {}".format(
+ e, exc_info=True))
os._exit(-1)
def pack_response_package(self, channeldata):
@@ -693,14 +715,19 @@ class ResponseOp(Op):
resp.error_info = self._log(
"fetch var type must be str({}).".format(
type(var)))
+ _LOGGER.error("(logid={}) Failed to pack RPC "
+ "response package: {}".format(
+ channeldata.id, resp.error_info))
break
resp.value.append(var)
resp.key.append(name)
else:
resp.ecode = ChannelDataEcode.TYPE_ERROR.value
resp.error_info = self._log(
- "Error type({}) in datatype.".format(channeldata.datatype))
- _LOGGER.error(resp.error_info)
+ "error type({}) in datatype.".format(channeldata.datatype))
+ _LOGGER.error("(logid={}) Failed to pack RPC response"
+ " package: {}".format(channeldata.id,
+ resp.error_info))
else:
resp.error_info = channeldata.error_info
return resp
@@ -718,6 +745,7 @@ class VirtualOp(Op):
self._virtual_pred_ops.append(op)
def _actual_pred_op_names(self, op):
+ # can use disjoint-set, but it's not necessary
if not isinstance(op, VirtualOp):
return [op.name]
names = []
@@ -728,8 +756,9 @@ class VirtualOp(Op):
def add_output_channel(self, channel):
if not isinstance(channel, (ThreadChannel, ProcessChannel)):
_LOGGER.critical(
- self._log("output channel must be Channel type, not {}"
- .format(type(channel))))
+ self._log("Failed to add output_channel: output_channel"
+ " must be Channel type, not {}".format(
+ type(channel))))
os._exit(-1)
for op in self._virtual_pred_ops:
for op_name in self._actual_pred_op_names(op):
@@ -738,12 +767,6 @@ class VirtualOp(Op):
def _run(self, concurrency_idx, input_channel, output_channels, client_type,
is_thread_op):
- def get_log_func(op_info_prefix):
- def log_func(info_str):
- return "{} {}".format(op_info_prefix, info_str)
-
- return log_func
-
op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx)
log = get_log_func(op_info_prefix)
tid = threading.current_thread().ident
@@ -759,7 +782,7 @@ class VirtualOp(Op):
try:
channeldata_dict_batch = next(batch_generator)
except ChannelStopError:
- _LOGGER.debug(log("channel stop."))
+ _LOGGER.debug("{} Stop.".format(op_info_prefix))
self._finalize(is_thread_op)
break
@@ -769,6 +792,6 @@ class VirtualOp(Op):
self._push_to_output_channels(
data, channels=output_channels, name=name)
except ChannelStopError:
- _LOGGER.debug(log("Channel stop."))
+ _LOGGER.debug("{} Stop.".format(op_info_prefix))
self._finalize(is_thread_op)
break
diff --git a/python/pipeline/pipeline_server.py b/python/pipeline/pipeline_server.py
index d296af854bc08a214b8336ba703670623314b910..0d64a558078ce820109a94f51ea313b847536feb 100644
--- a/python/pipeline/pipeline_server.py
+++ b/python/pipeline/pipeline_server.py
@@ -67,9 +67,11 @@ class PipelineServer(object):
def set_response_op(self, response_op):
if not isinstance(response_op, ResponseOp):
- raise Exception("response_op must be ResponseOp type.")
+ raise Exception("Failed to set response_op: response_op "
+ "must be ResponseOp type.")
if len(response_op.get_input_ops()) != 1:
- raise Exception("response_op can only have one previous op.")
+ raise Exception("Failed to set response_op: response_op "
+ "can only have one previous op.")
self._response_op = response_op
def _port_is_available(self, port):
@@ -83,7 +85,8 @@ class PipelineServer(object):
self._port = conf["port"]
if not self._port_is_available(self._port):
- raise SystemExit("Prot {} is already used".format(self._port))
+ raise SystemExit("Failed to prepare_server: prot {} "
+ "is already used".format(self._port))
self._worker_num = conf["worker_num"]
self._build_dag_each_worker = conf["build_dag_each_worker"]
diff --git a/python/pipeline/profiler.py b/python/pipeline/profiler.py
index 3adf3883c80dc797bafa68093bffbda79daff706..60e4883a43ea8b3a3c3fda6a62bd5b659d3d318e 100644
--- a/python/pipeline/profiler.py
+++ b/python/pipeline/profiler.py
@@ -43,8 +43,9 @@ class UnsafeTimeProfiler(object):
def record(self, name):
if self._enable is False:
return
- self.time_record.append('{}:{} '.format(name,
- int(round(_time() * 1000000))))
+ timestamp = int(round(_time() * 1000000))
+ self.time_record.append('{}:{} '.format(name, timestamp))
+ return timestamp
def print_profile(self):
if self._enable is False:
@@ -80,6 +81,7 @@ class TimeProfiler(object):
name = '_'.join(name_with_tag[:-1])
with self._lock:
self._time_record.put((name, tag, timestamp))
+ return timestamp
def print_profile(self):
if self._enable is False:
diff --git a/python/requirements.txt b/python/requirements.txt
index 5f5cfdc52464d5c9dc9ad40ec11be72c86dc6b2c..697b24fd4db6aff6b30913d8a5d23416dc208c80 100644
--- a/python/requirements.txt
+++ b/python/requirements.txt
@@ -1,6 +1,10 @@
numpy>=1.12, <=1.16.4 ; python_version<"3.5"
+google>=2.0.3
protobuf>=3.12.2
grpcio-tools>=1.28.1
grpcio>=1.28.1
func-timeout>=4.3.5
pyyaml>=1.3.0
+sentencepiece==0.1.92
+flask>=1.1.2
+ujson>=2.0.3
diff --git a/tools/serving_build.sh b/tools/serving_build.sh
index 99696ca7bf6a24f3df363bb444386e4b98fb4018..c54631a733fecc532f22d3ce1793ff8554e21f7d 100644
--- a/tools/serving_build.sh
+++ b/tools/serving_build.sh
@@ -54,7 +54,6 @@ function build_app() {
local DIRNAME=build-app-$TYPE
mkdir $DIRNAME # pwd: /Serving
cd $DIRNAME # pwd: /Serving/build-app-$TYPE
- pip install numpy sentencepiece
case $TYPE in
CPU|GPU)
cmake -DPYTHON_INCLUDE_DIR=$PYTHONROOT/include/python2.7/ \
@@ -295,8 +294,6 @@ function python_run_criteo_ctr_with_cube() {
function python_test_bert() {
# pwd: /Serving/python/examples
local TYPE=$1
- yum install -y libXext libSM libXrender >/dev/null
- pip install ujson
export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving
cd bert # pwd: /Serving/python/examples/bert
case $TYPE in