“e3bb98eb38f8938ee3a0f8b07d8f486aca6ccfe3”上不存在“paddle/phi/kernels/funcs/cpu_vec.h”
提交 7aa4876f 编写于 作者: B barrierye

Merge branch 'pipeline-auto-batch' of https://github.com/barrierye/Serving into pipeline-auto-batch

...@@ -4,12 +4,26 @@ ...@@ -4,12 +4,26 @@
## Compilation environment requirements ## Compilation environment requirements
- OS: CentOS 7 | module | version |
- GCC: 4.8.2 and later | :--------------------------: | :----------------------------------------------------------: |
- Golang: 1.9.2 and later | OS | CentOS 7 |
- Git:2.17.1 and later | gcc | 4.8.5 and later |
- CMake:3.2.2 and later | gcc-c++ | 4.8.5 and later |
- Python:2.7.2 and later / 3.6 and later | git | 3.82 and later |
| cmake | 3.2.0 and later |
| Python | 2.7.2 and later / 3.6 and later |
| Go | 1.9.2 and later |
| git | 2.17.1 and later |
| glibc-static | 2.17 |
| openssl-devel | 1.0.2k |
| bzip2-devel | 1.0.6 and later |
| python-devel / python3-devel | 2.7.5 and later / 3.6.8 and later |
| sqlite-devel | 3.7.17 and later |
| patchelf | 0.9 and later |
| libXext | 1.3.3 |
| libSM | 1.2.2 |
| libXrender | 0.9.10 |
| python-whl | numpy>=1.12, <=1.16.4<br/>google>=2.0.3<br/>protobuf>=3.12.2<br/>grpcio-tools>=1.28.1<br/>grpcio>=1.28.1<br/>func-timeout>=4.3.5<br/>pyyaml>=1.3.0<br/>sentencepiece==0.1.92<br>flask>=1.1.2<br>ujson>=2.0.3 |
It is recommended to use Docker for compilation. We have prepared the Paddle Serving compilation environment for you, see [this document](DOCKER_IMAGES.md). It is recommended to use Docker for compilation. We have prepared the Paddle Serving compilation environment for you, see [this document](DOCKER_IMAGES.md).
......
...@@ -4,12 +4,26 @@ ...@@ -4,12 +4,26 @@
## 编译环境设置 ## 编译环境设置
- OS: CentOS 7 | 组件 | 版本要求 |
- GCC: 4.8.2及以上 | :--------------------------: | :----------------------------------------------------------: |
- Golang: 1.9.2及以上 | OS | CentOS 7 |
- Git:2.17.1及以上 | gcc | 4.8.5 and later |
- CMake:3.2.2及以上 | gcc-c++ | 4.8.5 and later |
- Python:2.7.2及以上 / 3.6及以上 | git | 3.82 and later |
| cmake | 3.2.0 and later |
| Python | 2.7.2 and later / 3.6 and later |
| Go | 1.9.2 and later |
| git | 2.17.1 and later |
| glibc-static | 2.17 |
| openssl-devel | 1.0.2k |
| bzip2-devel | 1.0.6 and later |
| python-devel / python3-devel | 2.7.5 and later / 3.6.8 and later |
| sqlite-devel | 3.7.17 and later |
| patchelf | 0.9 |
| libXext | 1.3.3 |
| libSM | 1.2.2 |
| libXrender | 0.9.10 |
| python-whl | numpy>=1.12, <=1.16.4<br/>google>=2.0.3<br/>protobuf>=3.12.2<br/>grpcio-tools>=1.28.1<br/>grpcio>=1.28.1<br/>func-timeout>=4.3.5<br/>pyyaml>=1.3.0<br/>sentencepiece==0.1.92<br/>flask>=1.1.2<br/>ujson>=2.0.3 |
推荐使用Docker编译,我们已经为您准备好了Paddle Serving编译环境,详见[该文档](DOCKER_IMAGES_CN.md) 推荐使用Docker编译,我们已经为您准备好了Paddle Serving编译环境,详见[该文档](DOCKER_IMAGES_CN.md)
......
...@@ -95,7 +95,7 @@ The meaning of each parameter is as follows: ...@@ -95,7 +95,7 @@ The meaning of each parameter is as follows:
| fetch_list | (list) List of fetch variable names for remote Paddle Serving Service. | | fetch_list | (list) List of fetch variable names for remote Paddle Serving Service. |
| client_config | (str) The path of the client configuration file corresponding to the Paddle Serving Service. | | client_config | (str) The path of the client configuration file corresponding to the Paddle Serving Service. |
| concurrency | (int) The number of concurrent OPs. | | concurrency | (int) The number of concurrent OPs. |
| timeout | (int) The timeout time of the process operation, in seconds. If the value is less than zero, no timeout is considered. | | timeout | (int) The timeout time of the process operation, in ms. If the value is less than zero, no timeout is considered. |
| retry | (int) Timeout number of retries. When the value is 1, no retries are made. | | retry | (int) Timeout number of retries. When the value is 1, no retries are made. |
| batch_size | (int) The expected batch_size of Auto-Batching, since building batches may time out, the actual batch_size may be less than the set value. | | batch_size | (int) The expected batch_size of Auto-Batching, since building batches may time out, the actual batch_size may be less than the set value. |
| auto_batching_timeout | (float) Timeout for building batches of Auto-Batching (the unit is ms). | | auto_batching_timeout | (float) Timeout for building batches of Auto-Batching (the unit is ms). |
......
...@@ -95,7 +95,7 @@ def __init__(name=None, ...@@ -95,7 +95,7 @@ def __init__(name=None,
| fetch_list | (list)远程 Paddle Serving Service 的 fetch 列表。 | | fetch_list | (list)远程 Paddle Serving Service 的 fetch 列表。 |
| client_config | (str)Paddle Serving Service 对应的 Client 端配置文件路径。 | | client_config | (str)Paddle Serving Service 对应的 Client 端配置文件路径。 |
| concurrency | (int)OP 的并发数。 | | concurrency | (int)OP 的并发数。 |
| timeout | (int)process 操作的超时时间,单位为秒。若该值小于零,则视作不超时。 | | timeout | (int)process 操作的超时时间,单位为秒。若该值小于零,则视作不超时。 |
| retry | (int)超时重试次数。当该值为 1 时,不进行重试。 | | retry | (int)超时重试次数。当该值为 1 时,不进行重试。 |
| batch_size | (int)进行 Auto-Batching 的期望 batch_size 大小,由于构建 batch 可能超时,实际 batch_size 可能小于设定值。 | | batch_size | (int)进行 Auto-Batching 的期望 batch_size 大小,由于构建 batch 可能超时,实际 batch_size 可能小于设定值。 |
| auto_batching_timeout | (float)进行 Auto-Batching 构建 batch 的超时时间,单位为毫秒。 | | auto_batching_timeout | (float)进行 Auto-Batching 构建 batch 的超时时间,单位为毫秒。 |
......
...@@ -6,3 +6,5 @@ dag: ...@@ -6,3 +6,5 @@ dag:
client_type: brpc client_type: brpc
retry: 1 retry: 1
use_profile: false use_profile: false
tracer:
interval_s: 10
...@@ -12,20 +12,22 @@ ...@@ -12,20 +12,22 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# pylint: disable=doc-string-missing # pylint: disable=doc-string-missing
import paddle_serving_server.pipeline as pipeline
import logging
logging.basicConfig(
format="[%(process)d](%(threadName)s) %(levelname)s %(asctime)s [%(filename)s:%(lineno)d] %(message)s",
level=logging.INFO)
from paddle_serving_server.pipeline import Op, RequestOp, ResponseOp from paddle_serving_server.pipeline import Op, RequestOp, ResponseOp
from paddle_serving_server.pipeline import PipelineServer from paddle_serving_server.pipeline import PipelineServer
from paddle_serving_server.pipeline.proto import pipeline_service_pb2 from paddle_serving_server.pipeline.proto import pipeline_service_pb2
from paddle_serving_server.pipeline.channel import ChannelDataEcode from paddle_serving_server.pipeline.channel import ChannelDataEcode
import numpy as np import numpy as np
from paddle_serving_app.reader import IMDBDataset from paddle_serving_app.reader import IMDBDataset
import logging
_LOGGER = logging.getLogger() _LOGGER = logging.getLogger()
console_handler = pipeline.logger.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(
logging.Formatter(
"%(levelname)s %(asctime)s [%(filename)s:%(lineno)d] %(message)s"))
_LOGGER.addHandler(console_handler)
class ImdbRequestOp(RequestOp): class ImdbRequestOp(RequestOp):
......
...@@ -11,7 +11,11 @@ ...@@ -11,7 +11,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logger # this module must be the first to import
from operator import Op, RequestOp, ResponseOp
from pipeline_server import PipelineServer
from pipeline_client import PipelineClient
from analyse import Analyst
from operator import Op, RequestOp, ResponseOp from operator import Op, RequestOp, ResponseOp
from pipeline_server import PipelineServer from pipeline_server import PipelineServer
from pipeline_client import PipelineClient from pipeline_client import PipelineClient
......
...@@ -17,7 +17,7 @@ import copy ...@@ -17,7 +17,7 @@ import copy
import re import re
import logging import logging
_LOGGER = logging.getLogger() _LOGGER = logging.getLogger("pipeline.analyse")
class Analyst(object): class Analyst(object):
...@@ -164,7 +164,7 @@ class OpAnalyst(object): ...@@ -164,7 +164,7 @@ class OpAnalyst(object):
def add(self, name_str, ts_list): def add(self, name_str, ts_list):
if self._close: if self._close:
_LOGGER.error("OpAnalyst is closed.") _LOGGER.error("Failed to add item: OpAnalyst is closed.")
return return
op_name, curr_idx, step = self._parse(name_str) op_name, curr_idx, step = self._parse(name_str)
if op_name not in self.op_time_list_dict: if op_name not in self.op_time_list_dict:
......
...@@ -26,9 +26,10 @@ else: ...@@ -26,9 +26,10 @@ else:
import numpy as np import numpy as np
import logging import logging
import enum import enum
import os
import copy import copy
_LOGGER = logging.getLogger() _LOGGER = logging.getLogger("pipeline.channel")
class ChannelDataEcode(enum.Enum): class ChannelDataEcode(enum.Enum):
...@@ -69,21 +70,25 @@ class ChannelData(object): ...@@ -69,21 +70,25 @@ class ChannelData(object):
''' '''
if ecode is not None: if ecode is not None:
if data_id is None or error_info is None: if data_id is None or error_info is None:
raise ValueError("data_id and error_info cannot be None") _LOGGER.critical("Failed to generate ChannelData: data_id"
" and error_info cannot be None")
os._exit(-1)
datatype = ChannelDataType.ERROR.value datatype = ChannelDataType.ERROR.value
else: else:
if datatype == ChannelDataType.CHANNEL_NPDATA.value: if datatype == ChannelDataType.CHANNEL_NPDATA.value:
ecode, error_info = ChannelData.check_npdata(npdata) ecode, error_info = ChannelData.check_npdata(npdata)
if ecode != ChannelDataEcode.OK.value: if ecode != ChannelDataEcode.OK.value:
datatype = ChannelDataType.ERROR.value datatype = ChannelDataType.ERROR.value
_LOGGER.error(error_info) _LOGGER.error("(logid={}) {}".format(data_id, error_info))
elif datatype == ChannelDataType.DICT.value: elif datatype == ChannelDataType.DICT.value:
ecode, error_info = ChannelData.check_dictdata(dictdata) ecode, error_info = ChannelData.check_dictdata(dictdata)
if ecode != ChannelDataEcode.OK.value: if ecode != ChannelDataEcode.OK.value:
datatype = ChannelDataType.ERROR.value datatype = ChannelDataType.ERROR.value
_LOGGER.error(error_info) _LOGGER.error("(logid={}) {}".format(data_id, error_info))
else: else:
raise ValueError("datatype not match") _LOGGER.critical("(logid={}) datatype not match".format(
data_id))
os._exit(-1)
self.datatype = datatype self.datatype = datatype
self.npdata = npdata self.npdata = npdata
self.dictdata = dictdata self.dictdata = dictdata
...@@ -107,13 +112,13 @@ class ChannelData(object): ...@@ -107,13 +112,13 @@ class ChannelData(object):
for sample in dictdata: for sample in dictdata:
if not isinstance(sample, dict): if not isinstance(sample, dict):
ecode = ChannelDataEcode.TYPE_ERROR.value ecode = ChannelDataEcode.TYPE_ERROR.value
error_info = "the value of data must " \ error_info = "Failed to check data: the type of " \
"be dict, but get {}.".format(type(sample)) "data must be dict, but get {}.".format(type(sample))
break break
elif not isinstance(dictdata, dict): elif not isinstance(dictdata, dict):
# batch size = 1 # batch size = 1
ecode = ChannelDataEcode.TYPE_ERROR.value ecode = ChannelDataEcode.TYPE_ERROR.value
error_info = "the value of data must " \ error_info = "Failed to check data: the type of data must " \
"be dict, but get {}.".format(type(dictdata)) "be dict, but get {}.".format(type(dictdata))
return ecode, error_info return ecode, error_info
...@@ -136,27 +141,30 @@ class ChannelData(object): ...@@ -136,27 +141,30 @@ class ChannelData(object):
for sample in npdata: for sample in npdata:
if not isinstance(sample, dict): if not isinstance(sample, dict):
ecode = ChannelDataEcode.TYPE_ERROR.value ecode = ChannelDataEcode.TYPE_ERROR.value
error_info = "the value of data must " \ error_info = "Failed to check data: the " \
"be dict, but get {}.".format(type(sample)) "value of data must be dict, but get {}.".format(
type(sample))
break break
for _, value in sample.items(): for _, value in sample.items():
if not isinstance(value, np.ndarray): if not isinstance(value, np.ndarray):
ecode = ChannelDataEcode.TYPE_ERROR.value ecode = ChannelDataEcode.TYPE_ERROR.value
error_info = "the value of data must " \ error_info = "Failed to check data: the" \
"be np.ndarray, but get {}.".format(type(value)) " value of data must be np.ndarray, but get {}.".format(
type(value))
return ecode, error_info return ecode, error_info
elif isinstance(npdata, dict): elif isinstance(npdata, dict):
# batch_size = 1 # batch_size = 1
for _, value in npdata.items(): for _, value in npdata.items():
if not isinstance(value, np.ndarray): if not isinstance(value, np.ndarray):
ecode = ChannelDataEcode.TYPE_ERROR.value ecode = ChannelDataEcode.TYPE_ERROR.value
error_info = "the value of data must " \ error_info = "Failed to check data: the value " \
"be np.ndarray, but get {}.".format(type(value)) "of data must be np.ndarray, but get {}.".format(
type(value))
break break
else: else:
ecode = ChannelDataEcode.TYPE_ERROR.value ecode = ChannelDataEcode.TYPE_ERROR.value
error_info = "the value of data must " \ error_info = "Failed to check data: the value of data " \
"be dict, but get {}.".format(type(npdata)) "must be dict, but get {}.".format(type(npdata))
return ecode, error_info return ecode, error_info
def parse(self): def parse(self):
...@@ -168,7 +176,9 @@ class ChannelData(object): ...@@ -168,7 +176,9 @@ class ChannelData(object):
# return dict # return dict
feed = self.dictdata feed = self.dictdata
else: else:
raise TypeError("Error type({}) in datatype.".format(self.datatype)) _LOGGER.critical("Failed to parse channeldata: error " \
"type({}) in datatype.".format(self.datatype))
os._exit(-1)
return feed return feed
def __str__(self): def __str__(self):
...@@ -229,6 +239,12 @@ class ProcessChannel(object): ...@@ -229,6 +239,12 @@ class ProcessChannel(object):
self._base_cursor = manager.Value('i', 0) self._base_cursor = manager.Value('i', 0)
self._output_buf = manager.list() self._output_buf = manager.list()
def get_maxsize(self):
return self._maxsize
def size(self):
return self._que.qsize()
def get_producers(self): def get_producers(self):
return self._producers return self._producers
...@@ -241,30 +257,38 @@ class ProcessChannel(object): ...@@ -241,30 +257,38 @@ class ProcessChannel(object):
def add_producer(self, op_name): def add_producer(self, op_name):
""" not thread safe, and can only be called during initialization. """ """ not thread safe, and can only be called during initialization. """
if op_name in self._producers: if op_name in self._producers:
raise ValueError( _LOGGER.critical(
self._log("producer({}) is already in channel".format(op_name))) self._log("Failed to add producer: producer({})" \
" is already in channel".format(op_name)))
os._exit(-1)
self._producers.append(op_name) self._producers.append(op_name)
_LOGGER.debug(self._log("Succ add a producer: {}".format(op_name)))
def add_consumer(self, op_name): def add_consumer(self, op_name):
""" not thread safe, and can only be called during initialization. """ """ not thread safe, and can only be called during initialization. """
if op_name in self._consumer_cursors: if op_name in self._consumer_cursors:
raise ValueError( _LOGGER.critical(
self._log("consumer({}) is already in channel".format(op_name))) self._log("Failed to add consumer: consumer({})" \
" is already in channel".format(op_name)))
os._exit(-1)
self._consumer_cursors[op_name] = 0 self._consumer_cursors[op_name] = 0
if self._cursor_count.get(0) is None: if self._cursor_count.get(0) is None:
self._cursor_count[0] = 0 self._cursor_count[0] = 0
self._cursor_count[0] += 1 self._cursor_count[0] += 1
_LOGGER.debug(self._log("Succ add a consumer: {}".format(op_name)))
def push(self, channeldata, op_name=None): def push(self, channeldata, op_name=None):
_LOGGER.debug( _LOGGER.debug(
self._log("{} try to push data[{}]".format(op_name, self._log("(logid={}) Op({}) Pushing data".format(channeldata.id,
channeldata.id))) op_name)))
if len(self._producers) == 0: if len(self._producers) == 0:
raise Exception( _LOGGER.critical(
self._log( self._log(
"expected number of producers to be greater than 0, but the it is 0." "(logid={}) Op({}) Failed to push data: expected number"
)) " of producers to be greater than 0, but the it is 0.".
format(channeldata.id, op_name)))
os._exit(-1)
elif len(self._producers) == 1: elif len(self._producers) == 1:
with self._cv: with self._cv:
while self._stop.value == 0: while self._stop.value == 0:
...@@ -277,13 +301,16 @@ class ProcessChannel(object): ...@@ -277,13 +301,16 @@ class ProcessChannel(object):
raise ChannelStopError() raise ChannelStopError()
self._cv.notify_all() self._cv.notify_all()
_LOGGER.debug( _LOGGER.debug(
self._log("{} succ push data[{}] into internal queue.".format( self._log("(logid={}) Op({}) Pushed data into internal queue.".
op_name, channeldata.id))) format(channeldata.id, op_name)))
return True return True
elif op_name is None: elif op_name is None:
raise Exception( _LOGGER.critical(
self._log( self._log(
"There are multiple producers, so op_name cannot be None.")) "(logid={}) Op({}) Failed to push data: there are multiple "
"producers, so op_name cannot be None.".format(
channeldata.id, op_name)))
os._exit(-1)
producer_num = len(self._producers) producer_num = len(self._producers)
data_id = channeldata.id data_id = channeldata.id
...@@ -310,8 +337,9 @@ class ProcessChannel(object): ...@@ -310,8 +337,9 @@ class ProcessChannel(object):
if put_data is None: if put_data is None:
_LOGGER.debug( _LOGGER.debug(
self._log("{} succ push data[{}] into input_buffer.".format( self._log(
op_name, data_id))) "(logid={}) Op({}) Pushed data into input_buffer.".
format(data_id, op_name)))
else: else:
while self._stop.value == 0: while self._stop.value == 0:
try: try:
...@@ -323,14 +351,15 @@ class ProcessChannel(object): ...@@ -323,14 +351,15 @@ class ProcessChannel(object):
raise ChannelStopError() raise ChannelStopError()
_LOGGER.debug( _LOGGER.debug(
self._log("{} succ push data[{}] into internal queue.". self._log(
format(op_name, data_id))) "(logid={}) Op({}) Pushed data into internal_queue.".
format(data_id, op_name)))
self._cv.notify_all() self._cv.notify_all()
return True return True
def front(self, op_name=None, timeout=None): def front(self, op_name=None, timeout=None):
_LOGGER.debug( _LOGGER.debug(
self._log("{} try to get data[?]; timeout={}".format(op_name, self._log("Op({}) Getting data[?]; timeout(s)={}".format(op_name,
timeout))) timeout)))
endtime = None endtime = None
if timeout is not None: if timeout is not None:
...@@ -340,10 +369,11 @@ class ProcessChannel(object): ...@@ -340,10 +369,11 @@ class ProcessChannel(object):
endtime = _time() + timeout endtime = _time() + timeout
if len(self._consumer_cursors) == 0: if len(self._consumer_cursors) == 0:
raise Exception( _LOGGER.critical(
self._log( self._log(
"expected number of consumers to be greater than 0, but the it is 0." "Op({}) Failed to get data: expected number of consumers to be " \
)) "greater than 0, but the it is 0.".format(op_name)))
os._exit(-1)
elif len(self._consumer_cursors) == 1: elif len(self._consumer_cursors) == 1:
resp = None resp = None
with self._cv: with self._cv:
...@@ -356,8 +386,8 @@ class ProcessChannel(object): ...@@ -356,8 +386,8 @@ class ProcessChannel(object):
remaining = endtime - _time() remaining = endtime - _time()
if remaining <= 0.0: if remaining <= 0.0:
_LOGGER.debug( _LOGGER.debug(
self._log("{} get data[?] timeout".format( self._log("Op({}) Failed to get data: "
op_name))) "timeout".format(op_name)))
raise ChannelTimeoutError() raise ChannelTimeoutError()
self._cv.wait(remaining) self._cv.wait(remaining)
else: else:
...@@ -365,13 +395,15 @@ class ProcessChannel(object): ...@@ -365,13 +395,15 @@ class ProcessChannel(object):
if self._stop.value == 1: if self._stop.value == 1:
raise ChannelStopError() raise ChannelStopError()
_LOGGER.debug( _LOGGER.debug(
self._log("{} succ get data[{}]".format(op_name, self._log("(logid={}) Op({}) Got data".format(resp.values()[0]
resp.values()[0].id))) .id, op_name)))
return resp return resp
elif op_name is None: elif op_name is None:
raise Exception( _LOGGER.critical(
self._log( self._log(
"There are multiple consumers, so op_name cannot be None.")) "Op({}) Failed to get data: there are multiple consumers, "
"so op_name cannot be None.".format(op_name)))
os._exit(-1)
# In output_buf, different Ops (according to op_name) have different # In output_buf, different Ops (according to op_name) have different
# cursors. In addition, there is a base_cursor. Their difference is # cursors. In addition, there is a base_cursor. Their difference is
...@@ -392,16 +424,17 @@ class ProcessChannel(object): ...@@ -392,16 +424,17 @@ class ProcessChannel(object):
channeldata = self._que.get(timeout=0) channeldata = self._que.get(timeout=0)
self._output_buf.append(channeldata) self._output_buf.append(channeldata)
_LOGGER.debug( _LOGGER.debug(
self._log("pop ready item[{}] into output_buffer". self._log(
format(channeldata.values()[0].id))) "(logid={}) Op({}) Pop ready item into output_buffer".
format(channeldata.values()[0].id, op_name)))
break break
except Queue.Empty: except Queue.Empty:
if timeout is not None: if timeout is not None:
remaining = endtime - _time() remaining = endtime - _time()
if remaining <= 0.0: if remaining <= 0.0:
_LOGGER.debug( _LOGGER.debug(
self._log("{} get data[?] timeout".format( self._log("Op({}) Failed to get data: timeout".
op_name))) format(op_name)))
raise ChannelTimeoutError() raise ChannelTimeoutError()
self._cv.wait(remaining) self._cv.wait(remaining)
else: else:
...@@ -424,7 +457,7 @@ class ProcessChannel(object): ...@@ -424,7 +457,7 @@ class ProcessChannel(object):
self._base_cursor.value += 1 self._base_cursor.value += 1
# to avoid cursor overflow # to avoid cursor overflow
if self._base_cursor.value >= self._reset_max_cursor: if self._base_cursor.value >= self._reset_max_cursor:
_LOGGER.info(self._log("reset cursor in Channel")) _LOGGER.info(self._log("Reset cursor in Channel"))
self._base_cursor.value -= self._reset_max_cursor self._base_cursor.value -= self._reset_max_cursor
for name in self._consumer_cursors.keys(): for name in self._consumer_cursors.keys():
self._consumer_cursors[name] -= self._reset_max_cursor self._consumer_cursors[name] -= self._reset_max_cursor
...@@ -445,12 +478,12 @@ class ProcessChannel(object): ...@@ -445,12 +478,12 @@ class ProcessChannel(object):
self._cv.notify_all() self._cv.notify_all()
_LOGGER.debug( _LOGGER.debug(
self._log("{} succ get data[{}] from output_buffer".format( self._log("(logid={}) Op({}) Got data from output_buffer".format(
op_name, resp.values()[0].id))) resp.values()[0].id, op_name)))
return resp return resp
def stop(self): def stop(self):
_LOGGER.debug(self._log("stop.")) _LOGGER.info(self._log("stop."))
self._stop.value = 1 self._stop.value = 1
with self._cv: with self._cv:
self._cv.notify_all() self._cv.notify_all()
...@@ -503,6 +536,12 @@ class ThreadChannel(Queue.Queue): ...@@ -503,6 +536,12 @@ class ThreadChannel(Queue.Queue):
self._base_cursor = 0 self._base_cursor = 0
self._output_buf = [] self._output_buf = []
def get_maxsize(self):
return self._maxsize
def size(self):
return self.qsize()
def get_producers(self): def get_producers(self):
return self._producers return self._producers
...@@ -512,37 +551,41 @@ class ThreadChannel(Queue.Queue): ...@@ -512,37 +551,41 @@ class ThreadChannel(Queue.Queue):
def _log(self, info_str): def _log(self, info_str):
return "[{}] {}".format(self.name, info_str) return "[{}] {}".format(self.name, info_str)
def debug(self):
return self._log("p: {}, c: {}".format(self.get_producers(),
self.get_consumers()))
def add_producer(self, op_name): def add_producer(self, op_name):
""" not thread safe, and can only be called during initialization. """ """ not thread safe, and can only be called during initialization. """
if op_name in self._producers: if op_name in self._producers:
raise ValueError( _LOGGER.critical(
self._log("producer({}) is already in channel".format(op_name))) self._log("Failed to add producer: producer({}) is "
"already in channel".format(op_name)))
os._exit(-1)
self._producers.append(op_name) self._producers.append(op_name)
_LOGGER.debug(self._log("Succ add a producer: {}".format(op_name)))
def add_consumer(self, op_name): def add_consumer(self, op_name):
""" not thread safe, and can only be called during initialization. """ """ not thread safe, and can only be called during initialization. """
if op_name in self._consumer_cursors: if op_name in self._consumer_cursors:
raise ValueError( _LOGGER.critical(
self._log("consumer({}) is already in channel".format(op_name))) self._log("Failed to add consumer: consumer({}) is "
"already in channel".format(op_name)))
os._exit(-1)
self._consumer_cursors[op_name] = 0 self._consumer_cursors[op_name] = 0
if self._cursor_count.get(0) is None: if self._cursor_count.get(0) is None:
self._cursor_count[0] = 0 self._cursor_count[0] = 0
self._cursor_count[0] += 1 self._cursor_count[0] += 1
_LOGGER.debug(self._log("Succ add a consumer: {}".format(op_name)))
def push(self, channeldata, op_name=None): def push(self, channeldata, op_name=None):
_LOGGER.debug( _LOGGER.debug(
self._log("{} try to push data[{}]".format(op_name, self._log("(logid={}) Op({}) Pushing data".format(channeldata.id,
channeldata.id))) op_name)))
if len(self._producers) == 0: if len(self._producers) == 0:
raise Exception( _LOGGER.critical(
self._log( self._log(
"expected number of producers to be greater than 0, but the it is 0." "(logid={}) Op({}) Failed to push data: expected number of "
)) "producers to be greater than 0, but the it is 0.".format(
channeldata.id, op_name)))
os._exit(-1)
elif len(self._producers) == 1: elif len(self._producers) == 1:
with self._cv: with self._cv:
while self._stop is False: while self._stop is False:
...@@ -555,13 +598,16 @@ class ThreadChannel(Queue.Queue): ...@@ -555,13 +598,16 @@ class ThreadChannel(Queue.Queue):
raise ChannelStopError() raise ChannelStopError()
self._cv.notify_all() self._cv.notify_all()
_LOGGER.debug( _LOGGER.debug(
self._log("{} succ push data[{}] into internal queue.".format( self._log("(logid={}) Op({}) Pushed data into internal_queue.".
op_name, channeldata.id))) format(channeldata.id, op_name)))
return True return True
elif op_name is None: elif op_name is None:
raise Exception( _LOGGER.critical(
self._log( self._log(
"There are multiple producers, so op_name cannot be None.")) "(logid={}) Op({}) Failed to push data: there are multiple"
" producers, so op_name cannot be None.".format(
channeldata.id, op_name)))
os._exit(-1)
producer_num = len(self._producers) producer_num = len(self._producers)
data_id = channeldata.id data_id = channeldata.id
...@@ -583,8 +629,9 @@ class ThreadChannel(Queue.Queue): ...@@ -583,8 +629,9 @@ class ThreadChannel(Queue.Queue):
if put_data is None: if put_data is None:
_LOGGER.debug( _LOGGER.debug(
self._log("{} succ push data[{}] into input_buffer.".format( self._log(
op_name, data_id))) "(logid={}) Op({}) Pushed data into input_buffer.".
format(data_id, op_name)))
else: else:
while self._stop is False: while self._stop is False:
try: try:
...@@ -596,14 +643,15 @@ class ThreadChannel(Queue.Queue): ...@@ -596,14 +643,15 @@ class ThreadChannel(Queue.Queue):
raise ChannelStopError() raise ChannelStopError()
_LOGGER.debug( _LOGGER.debug(
self._log("{} succ push data[{}] into internal queue.". self._log(
format(op_name, data_id))) "(logid={}) Op({}) Pushed data into internal_queue.".
format(data_id, op_name)))
self._cv.notify_all() self._cv.notify_all()
return True return True
def front(self, op_name=None, timeout=None): def front(self, op_name=None, timeout=None):
_LOGGER.debug( _LOGGER.debug(
self._log("{} try to get data[?]; timeout={}".format(op_name, self._log("Op({}) Getting data[?]; timeout(s)={}".format(op_name,
timeout))) timeout)))
endtime = None endtime = None
if timeout is not None: if timeout is not None:
...@@ -613,10 +661,11 @@ class ThreadChannel(Queue.Queue): ...@@ -613,10 +661,11 @@ class ThreadChannel(Queue.Queue):
endtime = _time() + timeout endtime = _time() + timeout
if len(self._consumer_cursors) == 0: if len(self._consumer_cursors) == 0:
raise Exception( _LOGGER.critical(
self._log( self._log(
"expected number of consumers to be greater than 0, but the it is 0." "Op({}) Failed to get data: expected number of consumers to be "
)) "greater than 0, but the it is 0.".format(op_name)))
os._exit(-1)
elif len(self._consumer_cursors) == 1: elif len(self._consumer_cursors) == 1:
resp = None resp = None
with self._cv: with self._cv:
...@@ -629,8 +678,9 @@ class ThreadChannel(Queue.Queue): ...@@ -629,8 +678,9 @@ class ThreadChannel(Queue.Queue):
remaining = endtime - _time() remaining = endtime - _time()
if remaining <= 0.0: if remaining <= 0.0:
_LOGGER.debug( _LOGGER.debug(
self._log("{} get data[?] timeout".format( self._log(
op_name))) "Op({}) Failed to get data: timeout".
format(op_name)))
raise ChannelTimeoutError() raise ChannelTimeoutError()
self._cv.wait(remaining) self._cv.wait(remaining)
else: else:
...@@ -638,13 +688,15 @@ class ThreadChannel(Queue.Queue): ...@@ -638,13 +688,15 @@ class ThreadChannel(Queue.Queue):
if self._stop: if self._stop:
raise ChannelStopError() raise ChannelStopError()
_LOGGER.debug( _LOGGER.debug(
self._log("{} succ get data[{}]".format(op_name, self._log("(logid={}) Op({}) Got data".format(resp.values()[0]
resp.values()[0].id))) .id, op_name)))
return resp return resp
elif op_name is None: elif op_name is None:
raise Exception( _LOGGER.critical(
self._log( self._log("Op({}) Failed to get data: there are multiple "
"There are multiple consumers, so op_name cannot be None.")) "consumers, so op_name cannot be None.".format(
op_name)))
os._exit(-1)
# In output_buf, different Ops (according to op_name) have different # In output_buf, different Ops (according to op_name) have different
# cursors. In addition, there is a base_cursor. Their difference is # cursors. In addition, there is a base_cursor. Their difference is
...@@ -665,16 +717,17 @@ class ThreadChannel(Queue.Queue): ...@@ -665,16 +717,17 @@ class ThreadChannel(Queue.Queue):
channeldata = self.get(timeout=0) channeldata = self.get(timeout=0)
self._output_buf.append(channeldata) self._output_buf.append(channeldata)
_LOGGER.debug( _LOGGER.debug(
self._log("pop ready item[{}] into output_buffer". self._log(
format(channeldata.values()[0].id))) "(logid={}) Op({}) Pop ready item into output_buffer".
format(channeldata.values()[0].id, op_name)))
break break
except Queue.Empty: except Queue.Empty:
if timeout is not None: if timeout is not None:
remaining = endtime - _time() remaining = endtime - _time()
if remaining <= 0.0: if remaining <= 0.0:
_LOGGER.debug( _LOGGER.debug(
self._log("{} get data[?] timeout".format( self._log("Op({}) Failed to get data: timeout".
op_name))) format(op_name)))
raise ChannelTimeoutError() raise ChannelTimeoutError()
self._cv.wait(remaining) self._cv.wait(remaining)
else: else:
...@@ -698,7 +751,7 @@ class ThreadChannel(Queue.Queue): ...@@ -698,7 +751,7 @@ class ThreadChannel(Queue.Queue):
self._base_cursor += 1 self._base_cursor += 1
# to avoid cursor overflow # to avoid cursor overflow
if self._base_cursor >= self._reset_max_cursor: if self._base_cursor >= self._reset_max_cursor:
_LOGGER.info(self._log("reset cursor in Channel")) _LOGGER.info(self._log("Reset cursor in Channel"))
self._base_cursor -= self._reset_max_cursor self._base_cursor -= self._reset_max_cursor
for name in self._consumer_cursors: for name in self._consumer_cursors:
self._consumer_cursors[name] -= self._reset_max_cursor self._consumer_cursors[name] -= self._reset_max_cursor
...@@ -718,12 +771,12 @@ class ThreadChannel(Queue.Queue): ...@@ -718,12 +771,12 @@ class ThreadChannel(Queue.Queue):
self._cv.notify_all() self._cv.notify_all()
_LOGGER.debug( _LOGGER.debug(
self._log("{} succ get data[{}] from output_buffer".format( self._log("(logid={}) Op({}) Got data from output_buffer".format(
op_name, resp.values()[0].id))) resp.values()[0].id, op_name)))
return resp return resp
def stop(self): def stop(self):
_LOGGER.debug(self._log("stop.")) _LOGGER.info(self._log("stop."))
self._stop = True self._stop = True
with self._cv: with self._cv:
self._cv.notify_all() self._cv.notify_all()
......
...@@ -28,48 +28,40 @@ import logging ...@@ -28,48 +28,40 @@ import logging
from .operator import Op, RequestOp, ResponseOp, VirtualOp from .operator import Op, RequestOp, ResponseOp, VirtualOp
from .channel import (ThreadChannel, ProcessChannel, ChannelData, from .channel import (ThreadChannel, ProcessChannel, ChannelData,
ChannelDataEcode, ChannelDataType, ChannelStopError) ChannelDataEcode, ChannelDataType, ChannelStopError)
from .profiler import TimeProfiler from .profiler import TimeProfiler, PerformanceTracer
from .util import NameGenerator from .util import NameGenerator
from .proto import pipeline_service_pb2
_LOGGER = logging.getLogger() _LOGGER = logging.getLogger("pipeline.dag")
class DAGExecutor(object): class DAGExecutor(object):
def __init__(self, response_op, dag_config, show_info): def __init__(self, response_op, server_conf):
default_conf = { build_dag_each_worker = server_conf["build_dag_each_worker"]
"retry": 1, server_worker_num = server_conf["worker_num"]
"client_type": "brpc", dag_conf = server_conf["dag"]
"use_profile": False,
"channel_size": 0,
"is_thread_op": True
}
for key, val in default_conf.items(): self._retry = dag_conf["retry"]
if dag_config.get(key) is None: client_type = dag_conf["client_type"]
_LOGGER.warning("[CONF] {} not set, use default: {}" self._server_use_profile = dag_conf["use_profile"]
.format(key, val)) channel_size = dag_conf["channel_size"]
dag_config[key] = val self._is_thread_op = dag_conf["is_thread_op"]
self._retry = dag_config["retry"] tracer_conf = dag_conf["tracer"]
client_type = dag_config["client_type"] tracer_interval_s = tracer_conf["interval_s"]
self._server_use_profile = dag_config["use_profile"]
channel_size = dag_config["channel_size"]
self._is_thread_op = dag_config["is_thread_op"]
build_dag_each_worker = dag_config["build_dag_each_worker"]
if show_info:
_LOGGER.info("=============== DAGExecutor ===============")
for key in default_conf.keys():
_LOGGER.info("{}: {}".format(key, dag_config[key]))
_LOGGER.info("-------------------------------------------")
self.name = "@G" self.name = "@DAGExecutor"
self._profiler = TimeProfiler() self._profiler = TimeProfiler()
self._profiler.enable(True) self._profiler.enable(True)
self._tracer = None
if tracer_interval_s >= 1:
self._tracer = PerformanceTracer(
self._is_thread_op, tracer_interval_s, server_worker_num)
self._dag = DAG(self.name, response_op, self._server_use_profile, self._dag = DAG(self.name, response_op, self._server_use_profile,
self._is_thread_op, client_type, channel_size, self._is_thread_op, client_type, channel_size,
show_info, build_dag_each_worker) build_dag_each_worker, self._tracer)
(in_channel, out_channel, pack_rpc_func, (in_channel, out_channel, pack_rpc_func,
unpack_rpc_func) = self._dag.build() unpack_rpc_func) = self._dag.build()
self._dag.start() self._dag.start()
...@@ -79,12 +71,15 @@ class DAGExecutor(object): ...@@ -79,12 +71,15 @@ class DAGExecutor(object):
self._pack_rpc_func = pack_rpc_func self._pack_rpc_func = pack_rpc_func
self._unpack_rpc_func = unpack_rpc_func self._unpack_rpc_func = unpack_rpc_func
if self._tracer is not None:
self._tracer.start()
self._id_lock = threading.Lock() self._id_lock = threading.Lock()
self._id_counter = 0 self._id_counter = 0
self._reset_max_id = 1000000000000000000 self._reset_max_id = 1000000000000000000
self._cv_pool = {} self._cv_pool = {}
self._cv_for_cv_pool = threading.Condition() self._cv_for_cv_pool = threading.Condition()
self._fetch_buffer = None self._fetch_buffer = {}
self._recive_func = None self._recive_func = None
self._client_profile_key = "pipeline.profile" self._client_profile_key = "pipeline.profile"
...@@ -93,32 +88,44 @@ class DAGExecutor(object): ...@@ -93,32 +88,44 @@ class DAGExecutor(object):
def start(self): def start(self):
self._recive_func = threading.Thread( self._recive_func = threading.Thread(
target=DAGExecutor._recive_out_channel_func, args=(self, )) target=DAGExecutor._recive_out_channel_func, args=(self, ))
self._recive_func.daemon = True
self._recive_func.start() self._recive_func.start()
_LOGGER.debug("[DAG Executor] start recive thread") _LOGGER.debug("[DAG Executor] Start recive thread")
def stop(self): def stop(self):
self._dag.stop() self._dag.stop()
self._dag.join() self._dag.join()
_LOGGER.info("[DAG Executor] succ stop") _LOGGER.info("[DAG Executor] Stop")
def _get_next_data_id(self): def _get_next_data_id(self):
data_id = None
with self._id_lock: with self._id_lock:
if self._id_counter >= self._reset_max_id: if self._id_counter >= self._reset_max_id:
_LOGGER.info("[DAG Executor] Reset request id")
self._id_counter -= self._reset_max_id self._id_counter -= self._reset_max_id
data_id = self._id_counter
self._id_counter += 1 self._id_counter += 1
return self._id_counter - 1 cond_v = threading.Condition()
with self._cv_for_cv_pool:
self._cv_pool[data_id] = cond_v
self._fetch_buffer[data_id] = None
return data_id, cond_v
def _set_in_channel(self, in_channel): def _set_in_channel(self, in_channel):
if not isinstance(in_channel, (ThreadChannel, ProcessChannel)): if not isinstance(in_channel, (ThreadChannel, ProcessChannel)):
raise TypeError("in_channel must be Channel type, but get {}". _LOGGER.critical("[DAG Executor] Failed to set in_channel: "
"in_channel must be Channel type, but get {}".
format(type(in_channel))) format(type(in_channel)))
os._exit(-1)
in_channel.add_producer(self.name) in_channel.add_producer(self.name)
self._in_channel = in_channel self._in_channel = in_channel
def _set_out_channel(self, out_channel): def _set_out_channel(self, out_channel):
if not isinstance(out_channel, (ThreadChannel, ProcessChannel)): if not isinstance(out_channel, (ThreadChannel, ProcessChannel)):
raise TypeError("iout_channel must be Channel type, but get {}". _LOGGER.critical("[DAG Executor] Failed to set out_channel: "
format(type(out_channel))) "must be Channel type, but get {}".format(
type(out_channel)))
os._exit(-1)
out_channel.add_consumer(self.name) out_channel.add_consumer(self.name)
self._out_channel = out_channel self._out_channel = out_channel
...@@ -128,7 +135,7 @@ class DAGExecutor(object): ...@@ -128,7 +135,7 @@ class DAGExecutor(object):
try: try:
channeldata_dict = self._out_channel.front(self.name) channeldata_dict = self._out_channel.front(self.name)
except ChannelStopError: except ChannelStopError:
_LOGGER.debug("[DAG Executor] channel stop.") _LOGGER.info("[DAG Executor] Stop.")
with self._cv_for_cv_pool: with self._cv_for_cv_pool:
for data_id, cv in self._cv_pool.items(): for data_id, cv in self._cv_pool.items():
closed_errror_data = ChannelData( closed_errror_data = ChannelData(
...@@ -136,49 +143,61 @@ class DAGExecutor(object): ...@@ -136,49 +143,61 @@ class DAGExecutor(object):
error_info="dag closed.", error_info="dag closed.",
data_id=data_id) data_id=data_id)
with cv: with cv:
self._fetch_buffer = closed_errror_data self._fetch_buffer[data_id] = closed_errror_data
cv.notify_all() cv.notify_all()
break break
if len(channeldata_dict) != 1: if len(channeldata_dict) != 1:
_LOGGER.error( _LOGGER.critical(
"[DAG Executor] out_channel cannot have multiple input ops") "[DAG Executor] Failed to fetch result: out_channel "
"cannot have multiple input ops")
os._exit(-1) os._exit(-1)
(_, channeldata), = channeldata_dict.items() (_, channeldata), = channeldata_dict.items()
if not isinstance(channeldata, ChannelData): if not isinstance(channeldata, ChannelData):
_LOGGER.error( _LOGGER.critical(
'[DAG Executor] data must be ChannelData type, but get {}' '[DAG Executor] Failed to fetch result: data in out_channel" \
" must be ChannelData type, but get {}'
.format(type(channeldata))) .format(type(channeldata)))
os._exit(-1) os._exit(-1)
data_id = channeldata.id data_id = channeldata.id
_LOGGER.debug("recive thread fetch data[{}]".format(data_id)) _LOGGER.debug("(logid={}) [recive thread] Fetched data".format(
data_id))
with self._cv_for_cv_pool: with self._cv_for_cv_pool:
cv = self._cv_pool[data_id] cond_v = self._cv_pool[data_id]
with cv: with cond_v:
self._fetch_buffer = channeldata self._fetch_buffer[data_id] = channeldata
cv.notify_all() cond_v.notify_all()
def _get_channeldata_from_fetch_buffer(self, data_id, cond_v):
ready_data = None
def _get_channeldata_from_fetch_buffer(self, data_id): with cond_v:
resp = None
cv = threading.Condition()
with self._cv_for_cv_pool: with self._cv_for_cv_pool:
self._cv_pool[data_id] = cv if self._fetch_buffer[data_id] is not None:
with cv: # The requested data is already ready
cv.wait() ready_data = self._fetch_buffer[data_id]
self._cv_pool.pop(data_id)
self._fetch_buffer.pop(data_id)
if ready_data is None:
# Wait for data ready
cond_v.wait()
with self._cv_for_cv_pool: with self._cv_for_cv_pool:
resp = copy.deepcopy(self._fetch_buffer) ready_data = self._fetch_buffer[data_id]
_LOGGER.debug("resp thread get resp data[{}]".format(data_id))
self._cv_pool.pop(data_id) self._cv_pool.pop(data_id)
return resp self._fetch_buffer.pop(data_id)
_LOGGER.debug("(logid={}) [resp thread] Got data".format(data_id))
return ready_data
def _pack_channeldata(self, rpc_request, data_id): def _pack_channeldata(self, rpc_request, data_id):
dictdata = None dictdata = None
try: try:
dictdata = self._unpack_rpc_func(rpc_request) dictdata = self._unpack_rpc_func(rpc_request)
except Exception as e: except Exception as e:
_LOGGER.error("parse RPC package to data[{}] Error: {}" _LOGGER.error(
.format(data_id, e)) "(logid={}) Failed to parse RPC request package: {}"
.format(data_id, e),
exc_info=True)
return ChannelData( return ChannelData(
ecode=ChannelDataEcode.RPC_PACKAGE_ERROR.value, ecode=ChannelDataEcode.RPC_PACKAGE_ERROR.value,
error_info="rpc package error: {}".format(e), error_info="rpc package error: {}".format(e),
...@@ -192,7 +211,7 @@ class DAGExecutor(object): ...@@ -192,7 +211,7 @@ class DAGExecutor(object):
profile_value = rpc_request.value[idx] profile_value = rpc_request.value[idx]
break break
client_need_profile = (profile_value == self._client_profile_value) client_need_profile = (profile_value == self._client_profile_value)
_LOGGER.debug("request[{}] need profile: {}".format( _LOGGER.debug("(logid={}) Need profile in client: {}".format(
data_id, client_need_profile)) data_id, client_need_profile))
return ChannelData( return ChannelData(
datatype=ChannelDataType.DICT.value, datatype=ChannelDataType.DICT.value,
...@@ -201,60 +220,74 @@ class DAGExecutor(object): ...@@ -201,60 +220,74 @@ class DAGExecutor(object):
client_need_profile=client_need_profile) client_need_profile=client_need_profile)
def call(self, rpc_request): def call(self, rpc_request):
data_id = self._get_next_data_id() if self._tracer is not None:
_LOGGER.debug("generate id: {}".format(data_id)) trace_buffer = self._tracer.data_buffer()
data_id, cond_v = self._get_next_data_id()
_LOGGER.info("(logid={}) Succ generate id".format(data_id))
start_call, end_call = None, None
if not self._is_thread_op: if not self._is_thread_op:
self._profiler.record("call_{}#DAG-{}_0".format(data_id, data_id)) start_call = self._profiler.record("call_{}#DAG-{}_0".format(
data_id, data_id))
else: else:
self._profiler.record("call_{}#DAG_0".format(data_id)) start_call = self._profiler.record("call_{}#DAG_0".format(data_id))
_LOGGER.debug("try parse RPC package to channeldata[{}]".format( _LOGGER.debug("(logid={}) Parsing RPC request package".format(data_id))
data_id))
self._profiler.record("prepack_{}#{}_0".format(data_id, self.name)) self._profiler.record("prepack_{}#{}_0".format(data_id, self.name))
req_channeldata = self._pack_channeldata(rpc_request, data_id) req_channeldata = self._pack_channeldata(rpc_request, data_id)
self._profiler.record("prepack_{}#{}_1".format(data_id, self.name)) self._profiler.record("prepack_{}#{}_1".format(data_id, self.name))
resp_channeldata = None resp_channeldata = None
for i in range(self._retry): for i in range(self._retry):
_LOGGER.debug("push data[{}] into Graph engine".format(data_id)) _LOGGER.debug("(logid={}) Pushing data into Graph engine".format(
data_id))
try: try:
self._in_channel.push(req_channeldata, self.name) self._in_channel.push(req_channeldata, self.name)
except ChannelStopError: except ChannelStopError:
_LOGGER.debug("[DAG Executor] channel stop.") _LOGGER.debug("[DAG Executor] Stop")
with self._cv_for_cv_pool:
self._cv_pool.pop(data_id)
return self._pack_for_rpc_resp( return self._pack_for_rpc_resp(
ChannelData( ChannelData(
ecode=ChannelDataEcode.CLOSED_ERROR.value, ecode=ChannelDataEcode.CLOSED_ERROR.value,
error_info="dag closed.", error_info="dag closed.",
data_id=data_id)) data_id=data_id))
_LOGGER.debug("wait for Graph engine for data[{}]...".format( _LOGGER.debug("(logid={}) Wait for Graph engine...".format(data_id))
data_id)) resp_channeldata = self._get_channeldata_from_fetch_buffer(data_id,
resp_channeldata = self._get_channeldata_from_fetch_buffer(data_id) cond_v)
if resp_channeldata.ecode == ChannelDataEcode.OK.value: if resp_channeldata.ecode == ChannelDataEcode.OK.value:
_LOGGER.debug("Graph engine predict data[{}] succ".format( _LOGGER.info("(logid={}) Succ predict".format(data_id))
data_id))
break break
else: else:
_LOGGER.warn("Graph engine predict data[{}] failed: {}" _LOGGER.error("(logid={}) Failed to predict: {}"
.format(data_id, resp_channeldata.error_info)) .format(data_id, resp_channeldata.error_info))
if resp_channeldata.ecode != ChannelDataEcode.TIMEOUT.value: if resp_channeldata.ecode != ChannelDataEcode.TIMEOUT.value:
break break
if i + 1 < self._retry: if i + 1 < self._retry:
_LOGGER.warn("retry({}/{}) data[{}]".format(i + 1, self._retry, _LOGGER.warning("(logid={}) DAGExecutor retry({}/{})".format(
data_id)) data_id, i + 1, self._retry))
_LOGGER.debug("unpack channeldata[{}] into RPC resp package".format( _LOGGER.debug("(logid={}) Packing RPC response package".format(data_id))
data_id))
self._profiler.record("postpack_{}#{}_0".format(data_id, self.name)) self._profiler.record("postpack_{}#{}_0".format(data_id, self.name))
rpc_resp = self._pack_for_rpc_resp(resp_channeldata) rpc_resp = self._pack_for_rpc_resp(resp_channeldata)
self._profiler.record("postpack_{}#{}_1".format(data_id, self.name)) self._profiler.record("postpack_{}#{}_1".format(data_id, self.name))
if not self._is_thread_op: if not self._is_thread_op:
self._profiler.record("call_{}#DAG-{}_1".format(data_id, data_id)) end_call = self._profiler.record("call_{}#DAG-{}_1".format(data_id,
data_id))
else: else:
self._profiler.record("call_{}#DAG_1".format(data_id)) end_call = self._profiler.record("call_{}#DAG_1".format(data_id))
if self._tracer is not None:
if resp_channeldata.ecode == ChannelDataEcode.OK.value:
trace_buffer.put(("DAG", "call_{}".format(data_id), True,
end_call - start_call))
else:
trace_buffer.put(("DAG", "call_{}".format(data_id), False,
end_call - start_call))
profile_str = self._profiler.gen_profile_str() profile_str = self._profiler.gen_profile_str()
if self._server_use_profile: if self._server_use_profile:
...@@ -272,23 +305,33 @@ class DAGExecutor(object): ...@@ -272,23 +305,33 @@ class DAGExecutor(object):
return rpc_resp return rpc_resp
def _pack_for_rpc_resp(self, channeldata): def _pack_for_rpc_resp(self, channeldata):
try:
return self._pack_rpc_func(channeldata) return self._pack_rpc_func(channeldata)
except Exception as e:
_LOGGER.error(
"(logid={}) Failed to pack RPC response package: {}"
.format(channeldata.id, e),
exc_info=True)
resp = pipeline_service_pb2.Response()
resp.ecode = ChannelDataEcode.RPC_PACKAGE_ERROR.value
resp.error_info = "rpc package error: {}".format(e)
return resp
class DAG(object): class DAG(object):
def __init__(self, request_name, response_op, use_profile, is_thread_op, def __init__(self, request_name, response_op, use_profile, is_thread_op,
client_type, channel_size, show_info, build_dag_each_worker): client_type, channel_size, build_dag_each_worker, tracer):
self._request_name = request_name self._request_name = request_name
self._response_op = response_op self._response_op = response_op
self._use_profile = use_profile self._use_profile = use_profile
self._is_thread_op = is_thread_op self._is_thread_op = is_thread_op
self._channel_size = channel_size self._channel_size = channel_size
self._client_type = client_type self._client_type = client_type
self._show_info = show_info
self._build_dag_each_worker = build_dag_each_worker self._build_dag_each_worker = build_dag_each_worker
self._tracer = tracer
if not self._is_thread_op: if not self._is_thread_op:
self._manager = multiprocessing.Manager() self._manager = multiprocessing.Manager()
_LOGGER.info("[DAG] succ init") _LOGGER.info("[DAG] Succ init")
def get_use_ops(self, response_op): def get_use_ops(self, response_op):
unique_names = set() unique_names = set()
...@@ -308,8 +351,10 @@ class DAG(object): ...@@ -308,8 +351,10 @@ class DAG(object):
used_ops.add(pred_op) used_ops.add(pred_op)
# check the name of op is globally unique # check the name of op is globally unique
if pred_op.name in unique_names: if pred_op.name in unique_names:
raise Exception("the name of Op must be unique: {}". _LOGGER.critical("Failed to get used Ops: the"
" name of Op must be unique: {}".
format(pred_op.name)) format(pred_op.name))
os._exit(-1)
unique_names.add(pred_op.name) unique_names.add(pred_op.name)
return used_ops, succ_ops_of_use_op return used_ops, succ_ops_of_use_op
...@@ -321,12 +366,12 @@ class DAG(object): ...@@ -321,12 +366,12 @@ class DAG(object):
else: else:
channel = ProcessChannel( channel = ProcessChannel(
self._manager, name=name_gen.next(), maxsize=self._channel_size) self._manager, name=name_gen.next(), maxsize=self._channel_size)
_LOGGER.debug("[DAG] gen Channel: {}".format(channel.name)) _LOGGER.debug("[DAG] Generate channel: {}".format(channel.name))
return channel return channel
def _gen_virtual_op(self, name_gen): def _gen_virtual_op(self, name_gen):
vir_op = VirtualOp(name=name_gen.next()) vir_op = VirtualOp(name=name_gen.next())
_LOGGER.debug("[DAG] gen VirtualOp: {}".format(vir_op.name)) _LOGGER.debug("[DAG] Generate virtual_op: {}".format(vir_op.name))
return vir_op return vir_op
def _topo_sort(self, used_ops, response_op, out_degree_ops): def _topo_sort(self, used_ops, response_op, out_degree_ops):
...@@ -341,7 +386,9 @@ class DAG(object): ...@@ -341,7 +386,9 @@ class DAG(object):
if len(op.get_input_ops()) == 0: if len(op.get_input_ops()) == 0:
zero_indegree_num += 1 zero_indegree_num += 1
if zero_indegree_num != 1: if zero_indegree_num != 1:
raise Exception("DAG contains multiple input Ops") _LOGGER.critical("Failed to topo sort: DAG contains "
"multiple RequestOps")
os._exit(-1)
last_op = response_op.get_input_ops()[0] last_op = response_op.get_input_ops()[0]
ques[que_idx].put(last_op) ques[que_idx].put(last_op)
...@@ -365,24 +412,28 @@ class DAG(object): ...@@ -365,24 +412,28 @@ class DAG(object):
break break
que_idx = (que_idx + 1) % 2 que_idx = (que_idx + 1) % 2
if sorted_op_num < len(used_ops): if sorted_op_num < len(used_ops):
raise Exception("not legal DAG") _LOGGER.critical("Failed to topo sort: not legal DAG")
os._exit(-1)
return dag_views, last_op return dag_views, last_op
def _build_dag(self, response_op): def _build_dag(self, response_op):
if response_op is None: if response_op is None:
raise Exception("response_op has not been set.") _LOGGER.critical("Failed to build DAG: ResponseOp"
" has not been set.")
os._exit(-1)
used_ops, out_degree_ops = self.get_use_ops(response_op) used_ops, out_degree_ops = self.get_use_ops(response_op)
if self._show_info: if not self._build_dag_each_worker:
_LOGGER.info("================= USED OP =================") _LOGGER.info("================= USED OP =================")
for op in used_ops: for op in used_ops:
if op.name != self._request_name: if op.name != self._request_name:
_LOGGER.info(op.name) _LOGGER.info(op.name)
_LOGGER.info("-------------------------------------------") _LOGGER.info("-------------------------------------------")
if len(used_ops) <= 1: if len(used_ops) <= 1:
raise Exception( _LOGGER.critical(
"Besides RequestOp and ResponseOp, there should be at least one Op in DAG." "Failed to build DAG: besides RequestOp and ResponseOp, "
) "there should be at least one Op in DAG.")
os._exit(-1)
if self._build_dag_each_worker: if self._build_dag_each_worker:
_LOGGER.info("Because `build_dag_each_worker` mode is used, " _LOGGER.info("Because `build_dag_each_worker` mode is used, "
"Auto-batching is set to the default config: " "Auto-batching is set to the default config: "
...@@ -393,15 +444,15 @@ class DAG(object): ...@@ -393,15 +444,15 @@ class DAG(object):
dag_views, last_op = self._topo_sort(used_ops, response_op, dag_views, last_op = self._topo_sort(used_ops, response_op,
out_degree_ops) out_degree_ops)
dag_views = list(reversed(dag_views)) dag_views = list(reversed(dag_views))
if self._show_info: if not self._build_dag_each_worker:
_LOGGER.info("================== DAG ====================") _LOGGER.debug("================== DAG ====================")
for idx, view in enumerate(dag_views): for idx, view in enumerate(dag_views):
_LOGGER.info("(VIEW {})".format(idx)) _LOGGER.debug("(VIEW {})".format(idx))
for op in view: for op in view:
_LOGGER.info(" [{}]".format(op.name)) _LOGGER.debug(" [{}]".format(op.name))
for out_op in out_degree_ops[op.name]: for out_op in out_degree_ops[op.name]:
_LOGGER.info(" - {}".format(out_op.name)) _LOGGER.debug(" - {}".format(out_op.name))
_LOGGER.info("-------------------------------------------") _LOGGER.debug("-------------------------------------------")
# create channels and virtual ops # create channels and virtual ops
virtual_op_name_gen = NameGenerator("vir") virtual_op_name_gen = NameGenerator("vir")
...@@ -443,8 +494,6 @@ class DAG(object): ...@@ -443,8 +494,6 @@ class DAG(object):
continue continue
channel = self._gen_channel(channel_name_gen) channel = self._gen_channel(channel_name_gen)
channels.append(channel) channels.append(channel)
_LOGGER.debug("[DAG] Channel({}) => Op({})"
.format(channel.name, op.name))
op.add_input_channel(channel) op.add_input_channel(channel)
pred_ops = pred_op_of_next_view_op[op.name] pred_ops = pred_op_of_next_view_op[op.name]
if v_idx == 0: if v_idx == 0:
...@@ -452,8 +501,6 @@ class DAG(object): ...@@ -452,8 +501,6 @@ class DAG(object):
else: else:
# if pred_op is virtual op, it will use ancestors as producers to channel # if pred_op is virtual op, it will use ancestors as producers to channel
for pred_op in pred_ops: for pred_op in pred_ops:
_LOGGER.debug("[DAG] Op({}) => Channel({})"
.format(pred_op.name, channel.name))
pred_op.add_output_channel(channel) pred_op.add_output_channel(channel)
processed_op.add(op.name) processed_op.add(op.name)
# find same input op to combine channel # find same input op to combine channel
...@@ -469,8 +516,6 @@ class DAG(object): ...@@ -469,8 +516,6 @@ class DAG(object):
same_flag = False same_flag = False
break break
if same_flag: if same_flag:
_LOGGER.debug("[DAG] Channel({}) => Op({})"
.format(channel.name, other_op.name))
other_op.add_input_channel(channel) other_op.add_input_channel(channel)
processed_op.add(other_op.name) processed_op.add(other_op.name)
output_channel = self._gen_channel(channel_name_gen) output_channel = self._gen_channel(channel_name_gen)
...@@ -488,16 +533,19 @@ class DAG(object): ...@@ -488,16 +533,19 @@ class DAG(object):
actual_ops.append(op) actual_ops.append(op)
for c in channels: for c in channels:
_LOGGER.debug("Channel({}):\n -producers: {}\n -consumers: {}" _LOGGER.debug("Channel({}):\n\t- producers: {}\n\t- consumers: {}"
.format(c.name, c.get_producers(), c.get_consumers())) .format(c.name, c.get_producers(), c.get_consumers()))
return (actual_ops, channels, input_channel, output_channel, pack_func, return (actual_ops, channels, input_channel, output_channel, pack_func,
unpack_func) unpack_func)
def get_channels(self):
return self._channels
def build(self): def build(self):
(actual_ops, channels, input_channel, output_channel, pack_func, (actual_ops, channels, input_channel, output_channel, pack_func,
unpack_func) = self._build_dag(self._response_op) unpack_func) = self._build_dag(self._response_op)
_LOGGER.info("[DAG] succ build dag") _LOGGER.info("[DAG] Succ build DAG")
self._actual_ops = actual_ops self._actual_ops = actual_ops
self._channels = channels self._channels = channels
...@@ -506,12 +554,15 @@ class DAG(object): ...@@ -506,12 +554,15 @@ class DAG(object):
self._pack_func = pack_func self._pack_func = pack_func
self._unpack_func = unpack_func self._unpack_func = unpack_func
self._tracer.set_channels(self._channels)
return self._input_channel, self._output_channel, self._pack_func, self._unpack_func return self._input_channel, self._output_channel, self._pack_func, self._unpack_func
def start(self): def start(self):
self._threads_or_proces = [] self._threads_or_proces = []
for op in self._actual_ops: for op in self._actual_ops:
op.use_profiler(self._use_profile) op.use_profiler(self._use_profile)
op.set_tracer(self._tracer)
if self._is_thread_op: if self._is_thread_op:
self._threads_or_proces.extend( self._threads_or_proces.extend(
op.start_with_thread(self._client_type)) op.start_with_thread(self._client_type))
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import logging.handlers
import os
class SectionLevelFilter(object):
def __init__(self, levels):
self._levels = levels
def filter(self, logRecord):
return logRecord.levelno in self._levels
class OutOfMouduleFilter(object):
def __init__(self, out_names):
self._out_names = out_names
def filter(self, logRecord):
return logRecord.name not in self._out_names
class OutOfMouduleAndSectionLevelFilter(object):
def __init__(self, out_names, levels):
self._out_names = out_names
self._levels = levels
def filter(self, logRecord):
if logRecord.name in self._out_names:
return False
return logRecord.levelno in self._levels
class StreamHandler(logging.StreamHandler):
def __init__(self, *args, **kwargs):
super(StreamHandler, self).__init__(*args, **kwargs)
self.addFilter(OutOfMouduleFilter(["pipeline.profiler"]))
log_dir = "PipelineServingLogs"
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# root logger
_LOGGER = logging.getLogger()
_LOGGER.setLevel(logging.DEBUG)
formatter = logging.Formatter(
"%(levelname)s %(asctime)s [%(filename)s:%(lineno)d] %(message)s")
# info and warn
file_info = logging.handlers.RotatingFileHandler(
os.path.join(log_dir, "INFO.log"))
file_info.addFilter(OutOfMouduleFilter(["pipeline.profiler"]))
file_info.addFilter(SectionLevelFilter([logging.INFO, logging.WARNING]))
file_info.setFormatter(formatter)
# err and critical
file_err = logging.handlers.RotatingFileHandler(
os.path.join(log_dir, "ERROR.log"))
file_err.addFilter(OutOfMouduleFilter(["pipeline.profiler"]))
file_err.setLevel(logging.ERROR)
file_err.setFormatter(formatter)
_LOGGER.addHandler(file_info)
_LOGGER.addHandler(file_err)
# tracer logger
_TRACER = logging.getLogger("pipeline.profiler")
_TRACER.setLevel(logging.INFO)
_TRACER.addFilter(logging.Filter("pipeline.profiler"))
# tracer
tracer_formatter = logging.Formatter("%(asctime)s %(message)s")
file_trace = logging.handlers.RotatingFileHandler(
os.path.join(log_dir, "TRACE.log"))
file_trace.setFormatter(tracer_formatter)
_TRACER.addHandler(file_trace)
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
# limitations under the License. # limitations under the License.
# pylint: disable=doc-string-missing # pylint: disable=doc-string-missing
from time import time as _time from time import time as _time
import time
import threading import threading
import multiprocessing import multiprocessing
from paddle_serving_client import MultiLangClient, Client from paddle_serving_client import MultiLangClient, Client
...@@ -31,7 +32,7 @@ from .channel import (ThreadChannel, ProcessChannel, ChannelDataEcode, ...@@ -31,7 +32,7 @@ from .channel import (ThreadChannel, ProcessChannel, ChannelDataEcode,
from .util import NameGenerator from .util import NameGenerator
from .profiler import UnsafeTimeProfiler as TimeProfiler from .profiler import UnsafeTimeProfiler as TimeProfiler
_LOGGER = logging.getLogger() _LOGGER = logging.getLogger("pipeline.operator")
_op_name_gen = NameGenerator("Op") _op_name_gen = NameGenerator("Op")
...@@ -60,7 +61,10 @@ class Op(object): ...@@ -60,7 +61,10 @@ class Op(object):
self._client_config = client_config self._client_config = client_config
self._fetch_names = fetch_list self._fetch_names = fetch_list
self._timeout = timeout if timeout > 0:
self._timeout = timeout / 1000.0
else:
self._timeout = -1
self._retry = max(1, retry) self._retry = max(1, retry)
self._input = None self._input = None
self._outputs = [] self._outputs = []
...@@ -69,13 +73,34 @@ class Op(object): ...@@ -69,13 +73,34 @@ class Op(object):
self._auto_batching_timeout = auto_batching_timeout self._auto_batching_timeout = auto_batching_timeout
if self._auto_batching_timeout is not None: if self._auto_batching_timeout is not None:
if self._auto_batching_timeout <= 0 or self._batch_size == 1: if self._auto_batching_timeout <= 0 or self._batch_size == 1:
_LOGGER.warning(
self._log(
"Because auto_batching_timeout <= 0 or batch_size == 1,"
" set auto_batching_timeout to None."))
self._auto_batching_timeout = None self._auto_batching_timeout = None
else: else:
self._auto_batching_timeout = self._auto_batching_timeout / 1000.0 self._auto_batching_timeout = self._auto_batching_timeout / 1000.0
if not isinstance(self, RequestOp) and not isinstance(self, ResponseOp):
_LOGGER.info(
self._log("\n\tinput_ops: {},"
"\n\tserver_endpoints: {}"
"\n\tfetch_list: {}"
"\n\tclient_config: {}"
"\n\tconcurrency: {},"
"\n\ttimeout(s): {},"
"\n\tretry: {},"
"\n\tbatch_size: {},"
"\n\tauto_batching_timeout(s): {}".format(
", ".join([op.name for op in input_ops
]), self._server_endpoints,
self._fetch_names, self._client_config,
self.concurrency, self._timeout, self._retry,
self._batch_size, self._auto_batching_timeout)))
self._server_use_profile = False self._server_use_profile = False
self._tracer = None
# only for multithread # only for thread op
self._for_init_op_lock = threading.Lock() self._for_init_op_lock = threading.Lock()
self._for_close_op_lock = threading.Lock() self._for_close_op_lock = threading.Lock()
self._succ_init_op = False self._succ_init_op = False
...@@ -83,34 +108,35 @@ class Op(object): ...@@ -83,34 +108,35 @@ class Op(object):
def use_default_auto_batching_config(self): def use_default_auto_batching_config(self):
if self._batch_size != 1: if self._batch_size != 1:
_LOGGER.warn("Op({}) reset batch_size=1 (original: {})" _LOGGER.warning("Op({}) reset batch_size=1 (original: {})"
.format(self.name, self._batch_size)) .format(self.name, self._batch_size))
self._batch_size = 1 self._batch_size = 1
if self._auto_batching_timeout != None: if self._auto_batching_timeout != None:
_LOGGER.warn("Op({}) reset auto_batching_timeout=1 (original: {})" _LOGGER.warning(
"Op({}) reset auto_batching_timeout=None (original: {})"
.format(self.name, self._auto_batching_timeout)) .format(self.name, self._auto_batching_timeout))
self._auto_batching_timeout = None self._auto_batching_timeout = None
def use_profiler(self, use_profile): def use_profiler(self, use_profile):
self._server_use_profile = use_profile self._server_use_profile = use_profile
def set_tracer(self, tracer):
self._tracer = tracer
def init_client(self, client_type, client_config, server_endpoints, def init_client(self, client_type, client_config, server_endpoints,
fetch_names): fetch_names):
if self.with_serving == False: if self.with_serving == False:
_LOGGER.info("Op({}) no client".format(self.name)) _LOGGER.info("Op({}) has no client (and it also do not "
"run the process function".format(self.name))
return None return None
_LOGGER.info("Op({}) service endpoints: {}".format(self.name,
server_endpoints))
_LOGGER.debug("Op({}) fetch_names: {}".format(self.name, fetch_names))
if client_type == 'brpc': if client_type == 'brpc':
_LOGGER.debug("Op({}) client_config: {}".format(self.name,
client_config))
client = Client() client = Client()
client.load_client_config(client_config) client.load_client_config(client_config)
elif client_type == 'grpc': elif client_type == 'grpc':
client = MultiLangClient() client = MultiLangClient()
else: else:
raise ValueError("unknow client type: {}".format(client_type)) raise ValueError("Failed to init client: unknow client "
"type {}".format(client_type))
client.connect(server_endpoints) client.connect(server_endpoints)
self._fetch_names = fetch_names self._fetch_names = fetch_names
return client return client
...@@ -124,16 +150,19 @@ class Op(object): ...@@ -124,16 +150,19 @@ class Op(object):
self._input_ops = [] self._input_ops = []
for op in ops: for op in ops:
if not isinstance(op, Op): if not isinstance(op, Op):
raise TypeError( _LOGGER.critical(
self._log('input op must be Op type, not {}'.format( self._log("Failed to set input_ops: input op "
type(op)))) "must be Op type, not {}".format(type(op))))
os._exit(-1)
self._input_ops.append(op) self._input_ops.append(op)
def add_input_channel(self, channel): def add_input_channel(self, channel):
if not isinstance(channel, (ThreadChannel, ProcessChannel)): if not isinstance(channel, (ThreadChannel, ProcessChannel)):
raise TypeError( _LOGGER.critical(
self._log('input channel must be Channel type, not {}'.format( self._log("Failed to set input_channel: input "
"channel must be Channel type, not {}".format(
type(channel)))) type(channel))))
os._exit(-1)
channel.add_consumer(self.name) channel.add_consumer(self.name)
self._input = channel self._input = channel
...@@ -145,9 +174,10 @@ class Op(object): ...@@ -145,9 +174,10 @@ class Op(object):
def add_output_channel(self, channel): def add_output_channel(self, channel):
if not isinstance(channel, (ThreadChannel, ProcessChannel)): if not isinstance(channel, (ThreadChannel, ProcessChannel)):
raise TypeError( _LOGGER.critical(
self._log('output channel must be Channel type, not {}'.format( self._log("Failed to add output_channel: output channel "
type(channel)))) "must be Channel type, not {}".format(type(channel))))
os._exit(-1)
channel.add_producer(self.name) channel.add_producer(self.name)
self._outputs.append(channel) self._outputs.append(channel)
...@@ -160,9 +190,11 @@ class Op(object): ...@@ -160,9 +190,11 @@ class Op(object):
def preprocess(self, input_dicts): def preprocess(self, input_dicts):
# multiple previous Op # multiple previous Op
if len(input_dicts) != 1: if len(input_dicts) != 1:
raise NotImplementedError( _LOGGER.critical(
'this Op has multiple previous inputs. Please override this func.' self._log(
) "Failed to run preprocess: this Op has multiple previous "
"inputs. Please override this func."))
os._exit(-1)
(_, input_dict), = input_dicts.items() (_, input_dict), = input_dicts.items()
return input_dict return input_dict
...@@ -170,10 +202,16 @@ class Op(object): ...@@ -170,10 +202,16 @@ class Op(object):
def process(self, feed_batch): def process(self, feed_batch):
err, err_info = ChannelData.check_batch_npdata(feed_batch) err, err_info = ChannelData.check_batch_npdata(feed_batch)
if err != 0: if err != 0:
raise NotImplementedError( _LOGGER.critical(
"{} Please override preprocess func.".format(err_info)) self._log("Failed to run process: {}. Please override "
"preprocess func.".format(err_info)))
os._exit(-1)
call_result = self.client.predict( call_result = self.client.predict(
feed=feed_batch, fetch=self._fetch_names) feed=feed_batch, fetch=self._fetch_names)
if isinstance(self.client, MultiLangClient):
if call_result is None or call_result["serving_status_code"] != 0:
return None
call_result.pop("serving_status_code")
return call_result return call_result
def postprocess(self, input_dict, fetch_dict): def postprocess(self, input_dict, fetch_dict):
...@@ -218,23 +256,35 @@ class Op(object): ...@@ -218,23 +256,35 @@ class Op(object):
channel.push(data, name) channel.push(data, name)
def start_with_process(self, client_type): def start_with_process(self, client_type):
trace_buffer = None
if self._tracer is not None:
trace_buffer = self._tracer.data_buffer()
proces = [] proces = []
for concurrency_idx in range(self.concurrency): for concurrency_idx in range(self.concurrency):
p = multiprocessing.Process( p = multiprocessing.Process(
target=self._run, target=self._run,
args=(concurrency_idx, self._get_input_channel(), args=(concurrency_idx, self._get_input_channel(),
self._get_output_channels(), client_type, False)) self._get_output_channels(), client_type, False,
trace_buffer))
p.daemon = True
p.start() p.start()
proces.append(p) proces.append(p)
return proces return proces
def start_with_thread(self, client_type): def start_with_thread(self, client_type):
trace_buffer = None
if self._tracer is not None:
trace_buffer = self._tracer.data_buffer()
threads = [] threads = []
for concurrency_idx in range(self.concurrency): for concurrency_idx in range(self.concurrency):
t = threading.Thread( t = threading.Thread(
target=self._run, target=self._run,
args=(concurrency_idx, self._get_input_channel(), args=(concurrency_idx, self._get_input_channel(),
self._get_output_channels(), client_type, True)) self._get_output_channels(), client_type, True,
trace_buffer))
# When a process exits, it attempts to terminate
# all of its daemonic child processes.
t.daemon = True
t.start() t.start()
threads.append(t) threads.append(t)
return threads return threads
...@@ -242,35 +292,27 @@ class Op(object): ...@@ -242,35 +292,27 @@ class Op(object):
def init_op(self): def init_op(self):
pass pass
def _run_preprocess(self, parsed_data_dict, log_func): def _run_preprocess(self, parsed_data_dict, op_info_prefix):
_LOGGER.debug(log_func("try to run preprocess")) _LOGGER.debug("{} Running preprocess".format(op_info_prefix))
preped_data_dict = {} preped_data_dict = {}
err_channeldata_dict = {} err_channeldata_dict = {}
for data_id, parsed_data in parsed_data_dict.items(): for data_id, parsed_data in parsed_data_dict.items():
preped_data, error_channeldata = None, None preped_data, error_channeldata = None, None
try: try:
preped_data = self.preprocess(parsed_data) preped_data = self.preprocess(parsed_data)
except NotImplementedError as e:
# preprocess function not implemented
error_info = log_func("preprocess data[{}] failed: {}".format(
data_id, e))
error_channeldata = ChannelData(
ecode=ChannelDataEcode.NOT_IMPLEMENTED.value,
error_info=error_info,
data_id=data_id)
except TypeError as e: except TypeError as e:
# Error type in channeldata.datatype # Error type in channeldata.datatype
error_info = log_func("preprocess data[{}] failed: {}".format( error_info = "(logid={}) {} Failed to preprocess: {}".format(
data_id, e)) data_id, op_info_prefix, e)
_LOGGER.error(error_info) _LOGGER.error(error_info, exc_info=True)
error_channeldata = ChannelData( error_channeldata = ChannelData(
ecode=ChannelDataEcode.TYPE_ERROR.value, ecode=ChannelDataEcode.TYPE_ERROR.value,
error_info=error_info, error_info=error_info,
data_id=data_id) data_id=data_id)
except Exception as e: except Exception as e:
error_info = log_func("preprocess data[{}] failed: {}".format( error_info = "(logid={}) {} Failed to preprocess: {}".format(
data_id, e)) data_id, op_info_prefix, e)
_LOGGER.error(error_info) _LOGGER.error(error_info, exc_info=True)
error_channeldata = ChannelData( error_channeldata = ChannelData(
ecode=ChannelDataEcode.UNKNOW.value, ecode=ChannelDataEcode.UNKNOW.value,
error_info=error_info, error_info=error_info,
...@@ -279,11 +321,11 @@ class Op(object): ...@@ -279,11 +321,11 @@ class Op(object):
err_channeldata_dict[data_id] = error_channeldata err_channeldata_dict[data_id] = error_channeldata
else: else:
preped_data_dict[data_id] = preped_data preped_data_dict[data_id] = preped_data
_LOGGER.debug(log_func("succ run preprocess")) _LOGGER.debug("{} Succ preprocess".format(op_info_prefix))
return preped_data_dict, err_channeldata_dict return preped_data_dict, err_channeldata_dict
def _run_process(self, preped_data_dict, log_func): def _run_process(self, preped_data_dict, op_info_prefix):
_LOGGER.debug(log_func("try to run process")) _LOGGER.debug("{} Running process".format(op_info_prefix))
midped_data_dict = {} midped_data_dict = {}
err_channeldata_dict = {} err_channeldata_dict = {}
if self.with_serving: if self.with_serving:
...@@ -296,8 +338,9 @@ class Op(object): ...@@ -296,8 +338,9 @@ class Op(object):
midped_batch = self.process(feed_batch) midped_batch = self.process(feed_batch)
except Exception as e: except Exception as e:
ecode = ChannelDataEcode.UNKNOW.value ecode = ChannelDataEcode.UNKNOW.value
error_info = log_func("process batch failed: {}".format(e)) error_info = "{} Failed to process(batch: {}): {}".format(
_LOGGER.error(error_info) op_info_prefix, data_ids, e)
_LOGGER.error(error_info, exc_info=True)
else: else:
for i in range(self._retry): for i in range(self._retry):
try: try:
...@@ -306,30 +349,34 @@ class Op(object): ...@@ -306,30 +349,34 @@ class Op(object):
except func_timeout.FunctionTimedOut as e: except func_timeout.FunctionTimedOut as e:
if i + 1 >= self._retry: if i + 1 >= self._retry:
ecode = ChannelDataEcode.TIMEOUT.value ecode = ChannelDataEcode.TIMEOUT.value
error_info = log_func(e) error_info = "{} Failed to process(batch: {}): " \
"exceeded retry count.".format(
op_info_prefix, data_ids)
_LOGGER.error(error_info) _LOGGER.error(error_info)
else: else:
_LOGGER.warn( _LOGGER.warning(
log_func("timeout, retry({}/{})" "{} Failed to process(batch: {}): timeout, and retrying({}/{})"
.format(i + 1, self._retry))) .format(op_info_prefix, data_ids, i + 1,
self._retry))
except Exception as e: except Exception as e:
ecode = ChannelDataEcode.UNKNOW.value ecode = ChannelDataEcode.UNKNOW.value
error_info = log_func("process batch failed: {}".format( error_info = "{} Failed to process(batch: {}): {}".format(
e)) op_info_prefix, data_ids, e)
_LOGGER.error(error_info) _LOGGER.error(error_info, exc_info=True)
break break
else: else:
break break
if ecode != ChannelDataEcode.OK.value: if ecode != ChannelDataEcode.OK.value:
for data_id in data_ids: for data_id in data_ids:
_LOGGER.error("(logid={}) {}".format(data_id, error_info))
err_channeldata_dict[data_id] = ChannelData( err_channeldata_dict[data_id] = ChannelData(
ecode=ecode, error_info=error_info, data_id=data_id) ecode=ecode, error_info=error_info, data_id=data_id)
elif midped_batch is None: elif midped_batch is None:
# op client return None # op client return None
error_info = log_func( error_info = "{} Failed to predict, please check if PaddleServingService" \
"predict failed. pls check the server side.") " is working properly.".format(op_info_prefix)
_LOGGER.error(error_info)
for data_id in data_ids: for data_id in data_ids:
_LOGGER.error("(logid={}) {}".format(data_id, error_info))
err_channeldata_dict[data_id] = ChannelData( err_channeldata_dict[data_id] = ChannelData(
ecode=ChannelDataEcode.CLIENT_ERROR.value, ecode=ChannelDataEcode.CLIENT_ERROR.value,
error_info=error_info, error_info=error_info,
...@@ -343,11 +390,12 @@ class Op(object): ...@@ -343,11 +390,12 @@ class Op(object):
} }
else: else:
midped_data_dict = preped_data_dict midped_data_dict = preped_data_dict
_LOGGER.debug(log_func("succ run process")) _LOGGER.debug("{} Succ process".format(op_info_prefix))
return midped_data_dict, err_channeldata_dict return midped_data_dict, err_channeldata_dict
def _run_postprocess(self, parsed_data_dict, midped_data_dict, log_func): def _run_postprocess(self, parsed_data_dict, midped_data_dict,
_LOGGER.debug(log_func("try to run postprocess")) op_info_prefix):
_LOGGER.debug("{} Running postprocess".format(op_info_prefix))
postped_data_dict = {} postped_data_dict = {}
err_channeldata_dict = {} err_channeldata_dict = {}
for data_id, midped_data in midped_data_dict.items(): for data_id, midped_data in midped_data_dict.items():
...@@ -356,9 +404,9 @@ class Op(object): ...@@ -356,9 +404,9 @@ class Op(object):
postped_data = self.postprocess(parsed_data_dict[data_id], postped_data = self.postprocess(parsed_data_dict[data_id],
midped_data) midped_data)
except Exception as e: except Exception as e:
error_info = log_func("postprocess data[{}] failed: {}" error_info = "(logid={}) {} Failed to postprocess: {}".format(
.format(data_id, e)) data_id, op_info_prefix, e)
_LOGGER.error(error_info) _LOGGER.error(error_info, exc_info=True)
err_channeldata = ChannelData( err_channeldata = ChannelData(
ecode=ChannelDataEcode.UNKNOW.value, ecode=ChannelDataEcode.UNKNOW.value,
error_info=error_info, error_info=error_info,
...@@ -368,8 +416,11 @@ class Op(object): ...@@ -368,8 +416,11 @@ class Op(object):
continue continue
else: else:
if not isinstance(postped_data, dict): if not isinstance(postped_data, dict):
error_info = log_func("output of postprocess funticon must be " \ error_info = "(logid={}) {} Failed to postprocess: " \
"dict type, but get {}".format(type(postped_data))) "output of postprocess funticon must be " \
"dict type, but get {}".format(
data_id, op_info_prefix,
type(postped_data))
_LOGGER.error(error_info) _LOGGER.error(error_info)
err_channeldata = ChannelData( err_channeldata = ChannelData(
ecode=ChannelDataEcode.UNKNOW.value, ecode=ChannelDataEcode.UNKNOW.value,
...@@ -391,16 +442,13 @@ class Op(object): ...@@ -391,16 +442,13 @@ class Op(object):
dictdata=postped_data, dictdata=postped_data,
data_id=data_id) data_id=data_id)
postped_data_dict[data_id] = output_data postped_data_dict[data_id] = output_data
_LOGGER.debug(log_func("succ run postprocess")) _LOGGER.debug("{} Succ postprocess".format(op_info_prefix))
return postped_data_dict, err_channeldata_dict return postped_data_dict, err_channeldata_dict
def _auto_batching_generator(self, input_channel, op_name, batch_size, def _auto_batching_generator(self, input_channel, op_name, batch_size,
timeout, log_func): timeout, op_info_prefix):
while True: while True:
batch = [] batch = []
_LOGGER.debug(
log_func("Auto-batching expect size: {}; timeout: {}".format(
batch_size, timeout)))
while len(batch) == 0: while len(batch) == 0:
endtime = None endtime = None
if timeout is not None: if timeout is not None:
...@@ -411,7 +459,8 @@ class Op(object): ...@@ -411,7 +459,8 @@ class Op(object):
if timeout is not None: if timeout is not None:
remaining = endtime - _time() remaining = endtime - _time()
if remaining <= 0.0: if remaining <= 0.0:
_LOGGER.debug(log_func("Auto-batching timeout")) _LOGGER.debug("{} Failed to generate batch: "
"timeout".format(op_info_prefix))
break break
channeldata_dict = input_channel.front(op_name, channeldata_dict = input_channel.front(op_name,
timeout) timeout)
...@@ -419,10 +468,11 @@ class Op(object): ...@@ -419,10 +468,11 @@ class Op(object):
channeldata_dict = input_channel.front(op_name) channeldata_dict = input_channel.front(op_name)
batch.append(channeldata_dict) batch.append(channeldata_dict)
except ChannelTimeoutError: except ChannelTimeoutError:
_LOGGER.debug(log_func("Auto-batching timeout")) _LOGGER.debug("{} Failed to generate batch: "
"timeout".format(op_info_prefix))
break break
_LOGGER.debug( _LOGGER.debug("{} Got actual batch_size: {}".format(op_info_prefix,
log_func("Auto-batching actual size: {}".format(len(batch)))) len(batch)))
yield batch yield batch
def _parse_channeldata_batch(self, batch, output_channels): def _parse_channeldata_batch(self, batch, output_channels):
...@@ -446,15 +496,8 @@ class Op(object): ...@@ -446,15 +496,8 @@ class Op(object):
return parsed_data_dict, need_profile_dict, profile_dict return parsed_data_dict, need_profile_dict, profile_dict
def _run(self, concurrency_idx, input_channel, output_channels, client_type, def _run(self, concurrency_idx, input_channel, output_channels, client_type,
is_thread_op): is_thread_op, trace_buffer):
def get_log_func(op_info_prefix):
def log_func(info_str):
return "{} {}".format(op_info_prefix, info_str)
return log_func
op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx) op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx)
log = get_log_func(op_info_prefix)
tid = threading.current_thread().ident tid = threading.current_thread().ident
# init op # init op
...@@ -463,24 +506,31 @@ class Op(object): ...@@ -463,24 +506,31 @@ class Op(object):
profiler = self._initialize(is_thread_op, client_type, profiler = self._initialize(is_thread_op, client_type,
concurrency_idx) concurrency_idx)
except Exception as e: except Exception as e:
_LOGGER.error(log("init op failed: {}".format(e))) _LOGGER.critical(
"{} Failed to init op: {}".format(op_info_prefix, e),
exc_info=True)
os._exit(-1) os._exit(-1)
_LOGGER.info(log("succ init")) _LOGGER.info("{} Succ init".format(op_info_prefix))
batch_generator = self._auto_batching_generator( batch_generator = self._auto_batching_generator(
input_channel=input_channel, input_channel=input_channel,
op_name=self.name, op_name=self.name,
batch_size=self._batch_size, batch_size=self._batch_size,
timeout=self._auto_batching_timeout, timeout=self._auto_batching_timeout,
log_func=log) op_info_prefix=op_info_prefix)
start, end = None, None
while True: while True:
start = int(round(_time() * 1000000))
try: try:
channeldata_dict_batch = next(batch_generator) channeldata_dict_batch = next(batch_generator)
except ChannelStopError: except ChannelStopError:
_LOGGER.debug(log("channel stop.")) _LOGGER.debug("{} Stop.".format(op_info_prefix))
self._finalize(is_thread_op) self._finalize(is_thread_op)
break break
end = int(round(_time() * 1000000))
if trace_buffer is not None:
trace_buffer.put((self.name, "in", True, end - start))
# parse channeldata batch # parse channeldata batch
try: try:
...@@ -488,7 +538,7 @@ class Op(object): ...@@ -488,7 +538,7 @@ class Op(object):
= self._parse_channeldata_batch( = self._parse_channeldata_batch(
channeldata_dict_batch, output_channels) channeldata_dict_batch, output_channels)
except ChannelStopError: except ChannelStopError:
_LOGGER.debug(log("channel stop.")) _LOGGER.debug("{} Stop.".format(op_info_prefix))
self._finalize(is_thread_op) self._finalize(is_thread_op)
break break
if len(parsed_data_dict) == 0: if len(parsed_data_dict) == 0:
...@@ -496,10 +546,12 @@ class Op(object): ...@@ -496,10 +546,12 @@ class Op(object):
continue continue
# preprecess # preprecess
profiler.record("prep#{}_0".format(op_info_prefix)) start = profiler.record("prep#{}_0".format(op_info_prefix))
preped_data_dict, err_channeldata_dict \ preped_data_dict, err_channeldata_dict \
= self._run_preprocess(parsed_data_dict, log) = self._run_preprocess(parsed_data_dict, op_info_prefix)
profiler.record("prep#{}_1".format(op_info_prefix)) end = profiler.record("prep#{}_1".format(op_info_prefix))
if trace_buffer is not None:
trace_buffer.put((self.name, "prep", True, end - start))
try: try:
for data_id, err_channeldata in err_channeldata_dict.items(): for data_id, err_channeldata in err_channeldata_dict.items():
self._push_to_output_channels( self._push_to_output_channels(
...@@ -508,17 +560,19 @@ class Op(object): ...@@ -508,17 +560,19 @@ class Op(object):
client_need_profile=need_profile_dict[data_id], client_need_profile=need_profile_dict[data_id],
profile_set=profile_dict[data_id]) profile_set=profile_dict[data_id])
except ChannelStopError: except ChannelStopError:
_LOGGER.debug(log("channel stop.")) _LOGGER.debug("{} Stop.".format(op_info_prefix))
self._finalize(is_thread_op) self._finalize(is_thread_op)
break break
if len(parsed_data_dict) == 0: if len(parsed_data_dict) == 0:
continue continue
# process # process
profiler.record("midp#{}_0".format(op_info_prefix)) start = profiler.record("midp#{}_0".format(op_info_prefix))
midped_data_dict, err_channeldata_dict \ midped_data_dict, err_channeldata_dict \
= self._run_process(preped_data_dict, log) = self._run_process(preped_data_dict, op_info_prefix)
profiler.record("midp#{}_1".format(op_info_prefix)) end = profiler.record("midp#{}_1".format(op_info_prefix))
if trace_buffer is not None:
trace_buffer.put((self.name, "midp", True, end - start))
try: try:
for data_id, err_channeldata in err_channeldata_dict.items(): for data_id, err_channeldata in err_channeldata_dict.items():
self._push_to_output_channels( self._push_to_output_channels(
...@@ -527,18 +581,20 @@ class Op(object): ...@@ -527,18 +581,20 @@ class Op(object):
client_need_profile=need_profile_dict[data_id], client_need_profile=need_profile_dict[data_id],
profile_set=profile_dict[data_id]) profile_set=profile_dict[data_id])
except ChannelStopError: except ChannelStopError:
_LOGGER.debug(log("channel stop.")) _LOGGER.debug("{} Stop.".format(op_info_prefix))
self._finalize(is_thread_op) self._finalize(is_thread_op)
break break
if len(midped_data_dict) == 0: if len(midped_data_dict) == 0:
continue continue
# postprocess # postprocess
profiler.record("postp#{}_0".format(op_info_prefix)) start = profiler.record("postp#{}_0".format(op_info_prefix))
postped_data_dict, err_channeldata_dict \ postped_data_dict, err_channeldata_dict \
= self._run_postprocess( = self._run_postprocess(
parsed_data_dict, midped_data_dict, log) parsed_data_dict, midped_data_dict, op_info_prefix)
profiler.record("postp#{}_1".format(op_info_prefix)) end = profiler.record("postp#{}_1".format(op_info_prefix))
if trace_buffer is not None:
trace_buffer.put((self.name, "postp", True, end - start))
try: try:
for data_id, err_channeldata in err_channeldata_dict.items(): for data_id, err_channeldata in err_channeldata_dict.items():
self._push_to_output_channels( self._push_to_output_channels(
...@@ -547,13 +603,14 @@ class Op(object): ...@@ -547,13 +603,14 @@ class Op(object):
client_need_profile=need_profile_dict[data_id], client_need_profile=need_profile_dict[data_id],
profile_set=profile_dict[data_id]) profile_set=profile_dict[data_id])
except ChannelStopError: except ChannelStopError:
_LOGGER.debug(log("channel stop.")) _LOGGER.debug("{} Stop.".format(op_info_prefix))
self._finalize(is_thread_op) self._finalize(is_thread_op)
break break
if len(postped_data_dict) == 0: if len(postped_data_dict) == 0:
continue continue
# push data to channel (if run succ) # push data to channel (if run succ)
start = int(round(_time() * 1000000))
try: try:
profile_str = profiler.gen_profile_str() profile_str = profiler.gen_profile_str()
for data_id, postped_data in postped_data_dict.items(): for data_id, postped_data in postped_data_dict.items():
...@@ -566,9 +623,12 @@ class Op(object): ...@@ -566,9 +623,12 @@ class Op(object):
client_need_profile=need_profile_dict[data_id], client_need_profile=need_profile_dict[data_id],
profile_set=profile_dict[data_id]) profile_set=profile_dict[data_id])
except ChannelStopError: except ChannelStopError:
_LOGGER.debug(log("channel stop.")) _LOGGER.debug("{} Stop.".format(op_info_prefix))
self._finalize(is_thread_op) self._finalize(is_thread_op)
break break
end = int(round(_time() * 1000000))
if trace_buffer is not None:
trace_buffer.put((self.name, "out", True, end - start))
def _initialize(self, is_thread_op, client_type, concurrency_idx): def _initialize(self, is_thread_op, client_type, concurrency_idx):
if is_thread_op: if is_thread_op:
...@@ -615,13 +675,13 @@ class RequestOp(Op): ...@@ -615,13 +675,13 @@ class RequestOp(Op):
""" RequestOp do not run preprocess, process, postprocess. """ """ RequestOp do not run preprocess, process, postprocess. """
def __init__(self): def __init__(self):
# PipelineService.name = "@G" # PipelineService.name = "@DAGExecutor"
super(RequestOp, self).__init__(name="@G", input_ops=[]) super(RequestOp, self).__init__(name="@DAGExecutor", input_ops=[])
# init op # init op
try: try:
self.init_op() self.init_op()
except Exception as e: except Exception as e:
_LOGGER.error("Op(Request) init op failed: {}".format(e)) _LOGGER.critical("Op(Request) Failed to init: {}".format(e))
os._exit(-1) os._exit(-1)
def unpack_request_package(self, request): def unpack_request_package(self, request):
...@@ -640,12 +700,14 @@ class ResponseOp(Op): ...@@ -640,12 +700,14 @@ class ResponseOp(Op):
""" ResponseOp do not run preprocess, process, postprocess. """ """ ResponseOp do not run preprocess, process, postprocess. """
def __init__(self, input_ops): def __init__(self, input_ops):
super(ResponseOp, self).__init__(name="@R", input_ops=input_ops) super(ResponseOp, self).__init__(
name="@DAGExecutor", input_ops=input_ops)
# init op # init op
try: try:
self.init_op() self.init_op()
except Exception as e: except Exception as e:
_LOGGER.error("Op(ResponseOp) init op failed: {}".format(e)) _LOGGER.critical("Op(ResponseOp) Failed to init: {}".format(
e, exc_info=True))
os._exit(-1) os._exit(-1)
def pack_response_package(self, channeldata): def pack_response_package(self, channeldata):
...@@ -668,14 +730,19 @@ class ResponseOp(Op): ...@@ -668,14 +730,19 @@ class ResponseOp(Op):
resp.error_info = self._log( resp.error_info = self._log(
"fetch var type must be str({}).".format( "fetch var type must be str({}).".format(
type(var))) type(var)))
_LOGGER.error("(logid={}) Failed to pack RPC "
"response package: {}".format(
channeldata.id, resp.error_info))
break break
resp.value.append(var) resp.value.append(var)
resp.key.append(name) resp.key.append(name)
else: else:
resp.ecode = ChannelDataEcode.TYPE_ERROR.value resp.ecode = ChannelDataEcode.TYPE_ERROR.value
resp.error_info = self._log( resp.error_info = self._log(
"Error type({}) in datatype.".format(channeldata.datatype)) "error type({}) in datatype.".format(channeldata.datatype))
_LOGGER.error(resp.error_info) _LOGGER.error("(logid={}) Failed to pack RPC response"
" package: {}".format(channeldata.id,
resp.error_info))
else: else:
resp.error_info = channeldata.error_info resp.error_info = channeldata.error_info
return resp return resp
...@@ -693,6 +760,7 @@ class VirtualOp(Op): ...@@ -693,6 +760,7 @@ class VirtualOp(Op):
self._virtual_pred_ops.append(op) self._virtual_pred_ops.append(op)
def _actual_pred_op_names(self, op): def _actual_pred_op_names(self, op):
# can use disjoint-set, but it's not necessary
if not isinstance(op, VirtualOp): if not isinstance(op, VirtualOp):
return [op.name] return [op.name]
names = [] names = []
...@@ -702,9 +770,11 @@ class VirtualOp(Op): ...@@ -702,9 +770,11 @@ class VirtualOp(Op):
def add_output_channel(self, channel): def add_output_channel(self, channel):
if not isinstance(channel, (ThreadChannel, ProcessChannel)): if not isinstance(channel, (ThreadChannel, ProcessChannel)):
raise TypeError( _LOGGER.critical(
self._log('output channel must be Channel type, not {}'.format( self._log("Failed to add output_channel: output_channel"
" must be Channel type, not {}".format(
type(channel)))) type(channel))))
os._exit(-1)
for op in self._virtual_pred_ops: for op in self._virtual_pred_ops:
for op_name in self._actual_pred_op_names(op): for op_name in self._actual_pred_op_names(op):
channel.add_producer(op_name) channel.add_producer(op_name)
...@@ -712,27 +782,31 @@ class VirtualOp(Op): ...@@ -712,27 +782,31 @@ class VirtualOp(Op):
def _run(self, concurrency_idx, input_channel, output_channels, client_type, def _run(self, concurrency_idx, input_channel, output_channels, client_type,
is_thread_op): is_thread_op):
def get_log_func(op_info_prefix):
def log_func(info_str):
return "{} {}".format(op_info_prefix, info_str)
return log_func
op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx) op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx)
log = get_log_func(op_info_prefix) log = get_log_func(op_info_prefix)
tid = threading.current_thread().ident tid = threading.current_thread().ident
batch_generator = self._auto_batching_generator(
input_channel=input_channel,
op_name=self.name,
batch_size=1,
timeout=None,
log_func=log)
while True: while True:
try: try:
channeldata_dict = input_channel.front(self.name) channeldata_dict_batch = next(batch_generator)
except ChannelStopError: except ChannelStopError:
_LOGGER.debug(log("Channel stop.")) _LOGGER.debug("{} Stop.".format(op_info_prefix))
self._finalize(is_thread_op)
break break
try: try:
for channeldata_dict in channeldata_dict_batch:
for name, data in channeldata_dict.items(): for name, data in channeldata_dict.items():
self._push_to_output_channels( self._push_to_output_channels(
data, channels=output_channels, name=name) data, channels=output_channels, name=name)
except ChannelStopError: except ChannelStopError:
_LOGGER.debug(log("Channel stop.")) _LOGGER.debug("{} Stop.".format(op_info_prefix))
self._finalize(is_thread_op)
break break
...@@ -22,7 +22,7 @@ from .channel import ChannelDataEcode ...@@ -22,7 +22,7 @@ from .channel import ChannelDataEcode
from .proto import pipeline_service_pb2 from .proto import pipeline_service_pb2
from .proto import pipeline_service_pb2_grpc from .proto import pipeline_service_pb2_grpc
_LOGGER = logging.getLogger() _LOGGER = logging.getLogger("pipeline.pipeline_client")
class PipelineClient(object): class PipelineClient(object):
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
from concurrent import futures from concurrent import futures
import grpc import grpc
import logging import logging
import json
import socket import socket
import contextlib import contextlib
from contextlib import closing from contextlib import closing
...@@ -25,15 +26,14 @@ from .proto import pipeline_service_pb2_grpc ...@@ -25,15 +26,14 @@ from .proto import pipeline_service_pb2_grpc
from .operator import ResponseOp from .operator import ResponseOp
from .dag import DAGExecutor from .dag import DAGExecutor
_LOGGER = logging.getLogger() _LOGGER = logging.getLogger("pipeline.pipeline_server")
class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer): class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer):
def __init__(self, response_op, dag_config, show_info): def __init__(self, response_op, dag_conf):
super(PipelineServicer, self).__init__() super(PipelineServicer, self).__init__()
# init dag executor # init dag executor
self._dag_executor = DAGExecutor( self._dag_executor = DAGExecutor(response_op, dag_conf)
response_op, dag_config, show_info=show_info)
self._dag_executor.start() self._dag_executor.start()
_LOGGER.info("[PipelineServicer] succ init") _LOGGER.info("[PipelineServicer] succ init")
...@@ -41,9 +41,6 @@ class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer): ...@@ -41,9 +41,6 @@ class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer):
resp = self._dag_executor.call(request) resp = self._dag_executor.call(request)
return resp return resp
def __del__(self):
self._dag_executor.stop()
@contextlib.contextmanager @contextlib.contextmanager
def _reserve_port(port): def _reserve_port(port):
...@@ -67,9 +64,11 @@ class PipelineServer(object): ...@@ -67,9 +64,11 @@ class PipelineServer(object):
def set_response_op(self, response_op): def set_response_op(self, response_op):
if not isinstance(response_op, ResponseOp): if not isinstance(response_op, ResponseOp):
raise Exception("response_op must be ResponseOp type.") raise Exception("Failed to set response_op: response_op "
"must be ResponseOp type.")
if len(response_op.get_input_ops()) != 1: if len(response_op.get_input_ops()) != 1:
raise Exception("response_op can only have one previous op.") raise Exception("Failed to set response_op: response_op "
"can only have one previous op.")
self._response_op = response_op self._response_op = response_op
def _port_is_available(self, port): def _port_is_available(self, port):
...@@ -79,36 +78,25 @@ class PipelineServer(object): ...@@ -79,36 +78,25 @@ class PipelineServer(object):
return result != 0 return result != 0
def prepare_server(self, yml_file): def prepare_server(self, yml_file):
with open(yml_file) as f: conf = ServerYamlConfChecker.load_server_yaml_conf(yml_file)
yml_config = yaml.load(f.read())
default_config = {
"port": 9292,
"worker_num": 1,
"build_dag_each_worker": False,
}
for key, val in default_config.items(): self._port = conf["port"]
if yml_config.get(key) is None:
_LOGGER.warning("[CONF] {} not set, use default: {}"
.format(key, val))
yml_config[key] = val
self._port = yml_config["port"]
if not self._port_is_available(self._port): if not self._port_is_available(self._port):
raise SystemExit("Prot {} is already used".format(self._port)) raise SystemExit("Failed to prepare_server: prot {} "
self._worker_num = yml_config["worker_num"] "is already used".format(self._port))
self._build_dag_each_worker = yml_config["build_dag_each_worker"] self._worker_num = conf["worker_num"]
self._build_dag_each_worker = conf["build_dag_each_worker"]
_LOGGER.info("============= PIPELINE SERVER =============") _LOGGER.info("============= PIPELINE SERVER =============")
for key in default_config.keys(): _LOGGER.info("\n{}".format(
_LOGGER.info("{}: {}".format(key, yml_config[key])) json.dumps(
conf, indent=4, separators=(',', ':'))))
if self._build_dag_each_worker is True: if self._build_dag_each_worker is True:
_LOGGER.info( _LOGGER.info(
"(Make sure that install grpcio whl with --no-binary flag)") "(Make sure that install grpcio whl with --no-binary flag)")
_LOGGER.info("-------------------------------------------") _LOGGER.info("-------------------------------------------")
self._dag_config = yml_config.get("dag", {}) self._conf = conf
self._dag_config["build_dag_each_worker"] = self._build_dag_each_worker
def run_server(self): def run_server(self):
if self._build_dag_each_worker: if self._build_dag_each_worker:
...@@ -119,8 +107,7 @@ class PipelineServer(object): ...@@ -119,8 +107,7 @@ class PipelineServer(object):
show_info = (i == 0) show_info = (i == 0)
worker = multiprocessing.Process( worker = multiprocessing.Process(
target=self._run_server_func, target=self._run_server_func,
args=(bind_address, self._response_op, args=(bind_address, self._response_op, self._conf))
self._dag_config))
worker.start() worker.start()
workers.append(worker) workers.append(worker)
for worker in workers: for worker in workers:
...@@ -129,19 +116,153 @@ class PipelineServer(object): ...@@ -129,19 +116,153 @@ class PipelineServer(object):
server = grpc.server( server = grpc.server(
futures.ThreadPoolExecutor(max_workers=self._worker_num)) futures.ThreadPoolExecutor(max_workers=self._worker_num))
pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server( pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server(
PipelineServicer(self._response_op, self._dag_config, True), PipelineServicer(self._response_op, self._conf), server)
server)
server.add_insecure_port('[::]:{}'.format(self._port)) server.add_insecure_port('[::]:{}'.format(self._port))
server.start() server.start()
server.wait_for_termination() server.wait_for_termination()
def _run_server_func(self, bind_address, response_op, dag_config): def _run_server_func(self, bind_address, response_op, dag_conf):
options = (('grpc.so_reuseport', 1), ) options = (('grpc.so_reuseport', 1), )
server = grpc.server( server = grpc.server(
futures.ThreadPoolExecutor( futures.ThreadPoolExecutor(
max_workers=1, ), options=options) max_workers=1, ), options=options)
pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server( pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server(
PipelineServicer(response_op, dag_config, False), server) PipelineServicer(response_op, dag_conf), server)
server.add_insecure_port(bind_address) server.add_insecure_port(bind_address)
server.start() server.start()
server.wait_for_termination() server.wait_for_termination()
class ServerYamlConfChecker(object):
def __init__(self):
pass
@staticmethod
def load_server_yaml_conf(yml_file):
with open(yml_file) as f:
conf = yaml.load(f.read())
ServerYamlConfChecker.check_server_conf(conf)
ServerYamlConfChecker.check_dag_conf(conf["dag"])
ServerYamlConfChecker.check_tracer_conf(conf["dag"]["tracer"])
return conf
@staticmethod
def check_conf(conf, default_conf, conf_type, conf_qualification):
ServerYamlConfChecker.fill_with_default_conf(conf, default_conf)
ServerYamlConfChecker.check_conf_type(conf, conf_type)
ServerYamlConfChecker.check_conf_qualification(conf, conf_qualification)
@staticmethod
def check_server_conf(conf):
default_conf = {
"port": 9292,
"worker_num": 1,
"build_dag_each_worker": False,
"dag": {},
}
conf_type = {
"port": int,
"worker_num": int,
"build_dag_each_worker": bool,
}
conf_qualification = {
"port": [(">=", 1024), ("<=", 65535)],
"worker_num": (">=", 1),
}
ServerYamlConfChecker.check_conf(conf, default_conf, conf_type,
conf_qualification)
@staticmethod
def check_tracer_conf(conf):
default_conf = {"interval_s": 600, }
conf_type = {"interval_s": int, }
conf_qualification = {}
ServerYamlConfChecker.check_conf(conf, default_conf, conf_type,
conf_qualification)
@staticmethod
def check_dag_conf(conf):
default_conf = {
"retry": 1,
"client_type": "brpc",
"use_profile": False,
"channel_size": 0,
"is_thread_op": True,
"tracer": {},
}
conf_type = {
"retry": int,
"client_type": str,
"use_profile": bool,
"channel_size": int,
"is_thread_op": bool,
}
conf_qualification = {
"retry": (">=", 1),
"client_type": ("in", ["brpc", "grpc"]),
"channel_size": (">=", 0),
}
ServerYamlConfChecker.check_conf(conf, default_conf, conf_type,
conf_qualification)
@staticmethod
def fill_with_default_conf(conf, default_conf):
for key, val in default_conf.items():
if conf.get(key) is None:
_LOGGER.warning("[CONF] {} not set, use default: {}"
.format(key, val))
conf[key] = val
@staticmethod
def check_conf_type(conf, conf_type):
for key, val in conf_type.items():
if not isinstance(conf[key], val):
raise SystemExit("[CONF] {} must be {} type, but get {}."
.format(key, val, type(conf[key])))
@staticmethod
def check_conf_qualification(conf, conf_qualification):
for key, qualification in conf_qualification.items():
if not isinstance(qualification, list):
qualification = [qualification]
if not ServerYamlConfChecker.qualification_check(conf[key],
qualification):
raise SystemExit("[CONF] {} must be {}, but get {}."
.format(key, ", ".join([
"{} {}"
.format(q[0], q[1]) for q in qualification
]), conf[key]))
@staticmethod
def qualification_check(value, qualifications):
if not isinstance(qualifications, list):
qualifications = [qualifications]
ok = True
for q in qualifications:
operator, limit = q
if operator == "<":
ok = value < limit
elif operator == "==":
ok = value == limit
elif operator == ">":
ok = value > limit
elif operator == "<=":
ok = value <= limit
elif operator == ">=":
ok = value >= limit
elif operator == "in":
ok = value in limit
else:
raise SystemExit("unknow operator: {}".format(operator))
if ok == False:
break
return ok
...@@ -23,12 +23,124 @@ elif sys.version_info.major == 3: ...@@ -23,12 +23,124 @@ elif sys.version_info.major == 3:
else: else:
raise Exception("Error Python version") raise Exception("Error Python version")
from time import time as _time from time import time as _time
import time
import threading import threading
import multiprocessing
_LOGGER = logging.getLogger() _TRACER = logging.getLogger("pipeline.profiler")
class PerformanceTracer(object):
def __init__(self, is_thread_mode, interval_s, server_worker_num):
self._is_thread_mode = is_thread_mode
if is_thread_mode:
# Because the Channel in the thread mode cannot be
# accessed across processes, when using thread mode,
# the PerformanceTracer is also the thread mode.
# However, performance may be affected by GIL.
self._data_buffer = Queue.Queue()
else:
self._data_buffer = multiprocessing.Manager().Queue()
self._interval_s = interval_s
self._thrd = None
self._proc = None
self._channels = []
# The size of data in Channel will not exceed server_worker_num
self._server_worker_num = server_worker_num
def data_buffer(self):
return self._data_buffer
def start(self):
if self._is_thread_mode:
self._thrd = threading.Thread(
target=self._trace_func, args=(self._channels, ))
self._thrd.daemon = True
self._thrd.start()
else:
self._proc = multiprocessing.Process(
target=self._trace_func, args=(self._channels, ))
self._proc.daemon = True
self._proc.start()
def set_channels(self, channels):
self._channels = channels
def _trace_func(self, channels):
actions = ["in", "prep", "midp", "postp", "out"]
calcu_actions = ["prep", "midp", "postp"]
while True:
op_cost = {}
err_count = 0
_TRACER.info("==================== TRACER ======================")
# op
while True:
try:
name, action, stage, cost = self._data_buffer.get_nowait()
if stage == False:
# only for name == DAG
assert name == "DAG"
err_count += 1
if name not in op_cost:
op_cost[name] = {}
if action not in op_cost[name]:
op_cost[name][action] = []
op_cost[name][action].append(cost)
except Queue.Empty:
break
if len(op_cost) != 0:
for name in op_cost:
tot_cost, calcu_cost = 0.0, 0.0
for action, costs in op_cost[name].items():
op_cost[name][action] = sum(costs) / (1e3 * len(costs))
tot_cost += op_cost[name][action]
if name != "DAG":
_TRACER.info("Op({}):".format(name))
for action in actions:
if action in op_cost[name]:
_TRACER.info("\t{}[{} ms]".format(
action, op_cost[name][action]))
for action in calcu_actions:
if action in op_cost[name]:
calcu_cost += op_cost[name][action]
_TRACER.info("\tidle[{}]".format(1 - 1.0 * calcu_cost /
tot_cost))
if "DAG" in op_cost:
calls = op_cost["DAG"].values()
calls.sort()
tot = len(calls)
qps = 1.0 * tot / self._interval_s
ave_cost = sum(calls) / tot
latencys = [50, 60, 70, 80, 90, 95, 99]
_TRACER.info("DAGExecutor:")
_TRACER.info("\tquery count[{}]".format(tot))
_TRACER.info("\tqps[{} q/s]".format(qps))
_TRACER.info("\tsucc[{}]".format(1 - 1.0 * err_count / tot))
_TRACER.info("\tlatency:")
_TRACER.info("\t\tave[{} ms]".format(ave_cost))
for latency in latencys:
_TRACER.info("\t\t.{}[{} ms]".format(latency, calls[int(
tot * latency / 100.0)]))
# channel
_TRACER.info("Channel (server worker num[{}]):".format(
self._server_worker_num))
for channel in channels:
_TRACER.info("\t{}(In: {}, Out: {}) size[{}/{}]".format(
channel.name,
channel.get_producers(),
channel.get_consumers(),
channel.size(), channel.get_maxsize()))
time.sleep(self._interval_s)
class UnsafeTimeProfiler(object): class UnsafeTimeProfiler(object):
""" thread unsafe profiler """
def __init__(self): def __init__(self):
self.pid = os.getpid() self.pid = os.getpid()
self.print_head = 'PROFILE\tpid:{}\t'.format(self.pid) self.print_head = 'PROFILE\tpid:{}\t'.format(self.pid)
...@@ -41,8 +153,9 @@ class UnsafeTimeProfiler(object): ...@@ -41,8 +153,9 @@ class UnsafeTimeProfiler(object):
def record(self, name): def record(self, name):
if self._enable is False: if self._enable is False:
return return
self.time_record.append('{}:{} '.format(name, timestamp = int(round(_time() * 1000000))
int(round(_time() * 1000000)))) self.time_record.append('{}:{} '.format(name, timestamp))
return timestamp
def print_profile(self): def print_profile(self):
if self._enable is False: if self._enable is False:
...@@ -78,6 +191,7 @@ class TimeProfiler(object): ...@@ -78,6 +191,7 @@ class TimeProfiler(object):
name = '_'.join(name_with_tag[:-1]) name = '_'.join(name_with_tag[:-1])
with self._lock: with self._lock:
self._time_record.put((name, tag, timestamp)) self._time_record.put((name, tag, timestamp))
return timestamp
def print_profile(self): def print_profile(self):
if self._enable is False: if self._enable is False:
......
numpy>=1.12, <=1.16.4 ; python_version<"3.5" numpy>=1.12, <=1.16.4 ; python_version<"3.5"
google>=2.0.3
protobuf>=3.12.2 protobuf>=3.12.2
grpcio-tools>=1.28.1 grpcio-tools>=1.28.1
grpcio>=1.28.1 grpcio>=1.28.1
func-timeout>=4.3.5 func-timeout>=4.3.5
pyyaml>=1.3.0 pyyaml>=1.3.0
sentencepiece==0.1.92
flask>=1.1.2
ujson>=2.0.3
...@@ -54,7 +54,6 @@ function build_app() { ...@@ -54,7 +54,6 @@ function build_app() {
local DIRNAME=build-app-$TYPE local DIRNAME=build-app-$TYPE
mkdir $DIRNAME # pwd: /Serving mkdir $DIRNAME # pwd: /Serving
cd $DIRNAME # pwd: /Serving/build-app-$TYPE cd $DIRNAME # pwd: /Serving/build-app-$TYPE
pip install numpy sentencepiece
case $TYPE in case $TYPE in
CPU|GPU) CPU|GPU)
cmake -DPYTHON_INCLUDE_DIR=$PYTHONROOT/include/python2.7/ \ cmake -DPYTHON_INCLUDE_DIR=$PYTHONROOT/include/python2.7/ \
...@@ -295,8 +294,6 @@ function python_run_criteo_ctr_with_cube() { ...@@ -295,8 +294,6 @@ function python_run_criteo_ctr_with_cube() {
function python_test_bert() { function python_test_bert() {
# pwd: /Serving/python/examples # pwd: /Serving/python/examples
local TYPE=$1 local TYPE=$1
yum install -y libXext libSM libXrender >/dev/null
pip install ujson
export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving
cd bert # pwd: /Serving/python/examples/bert cd bert # pwd: /Serving/python/examples/bert
case $TYPE in case $TYPE in
...@@ -779,7 +776,7 @@ function python_test_pipeline(){ ...@@ -779,7 +776,7 @@ function python_test_pipeline(){
# test: thread servicer & thread op # test: thread servicer & thread op
cat << EOF > config.yml cat << EOF > config.yml
port: 18080 port: 18080
worker_num: 2 worker_num: 4
build_dag_each_worker: false build_dag_each_worker: false
dag: dag:
is_thread_op: true is_thread_op: true
...@@ -796,7 +793,7 @@ EOF ...@@ -796,7 +793,7 @@ EOF
# test: thread servicer & process op # test: thread servicer & process op
cat << EOF > config.yml cat << EOF > config.yml
port: 18080 port: 18080
worker_num: 2 worker_num: 4
build_dag_each_worker: false build_dag_each_worker: false
dag: dag:
is_thread_op: false is_thread_op: false
...@@ -810,13 +807,13 @@ EOF ...@@ -810,13 +807,13 @@ EOF
ps -ef | grep "pipeline_server" | grep -v grep | awk '{print $2}' | xargs kill ps -ef | grep "pipeline_server" | grep -v grep | awk '{print $2}' | xargs kill
kill_process_by_port 18080 kill_process_by_port 18080
# test: process servicer & thread op # test: process servicer & process op
cat << EOF > config.yml cat << EOF > config.yml
port: 18080 port: 18080
worker_num: 2 worker_num: 4
build_dag_each_worker: true build_dag_each_worker: false
dag: dag:
is_thread_op: flase is_thread_op: false
client_type: brpc client_type: brpc
retry: 1 retry: 1
use_profile: false use_profile: false
...@@ -827,11 +824,13 @@ EOF ...@@ -827,11 +824,13 @@ EOF
ps -ef | grep "pipeline_server" | grep -v grep | awk '{print $2}' | xargs kill ps -ef | grep "pipeline_server" | grep -v grep | awk '{print $2}' | xargs kill
kill_process_by_port 18080 kill_process_by_port 18080
# test: process servicer & process op # test: process servicer & thread op
pip uninstall grpcio -y
pip install grpcio --no-binary=grpcio
cat << EOF > config.yml cat << EOF > config.yml
port: 18080 port: 18080
worker_num: 2 worker_num: 4
build_dag_each_worker: false build_dag_each_worker: true
dag: dag:
is_thread_op: false is_thread_op: false
client_type: brpc client_type: brpc
...@@ -854,7 +853,7 @@ EOF ...@@ -854,7 +853,7 @@ EOF
sleep 5 sleep 5
cat << EOF > config.yml cat << EOF > config.yml
port: 18080 port: 18080
worker_num: 2 worker_num: 4
build_dag_each_worker: false build_dag_each_worker: false
dag: dag:
is_thread_op: false is_thread_op: false
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册