提交 5e907604 编写于 作者: B barrierye

Merge branch 'pipeline-auto-batch' of https://github.com/barrierye/Serving into pipeline-auto-batch

...@@ -4,12 +4,26 @@ ...@@ -4,12 +4,26 @@
## Compilation environment requirements ## Compilation environment requirements
- OS: CentOS 7 | module | version |
- GCC: 4.8.2 and later | :--------------------------: | :----------------------------------------------------------: |
- Golang: 1.9.2 and later | OS | CentOS 7 |
- Git:2.17.1 and later | gcc | 4.8.5 and later |
- CMake:3.2.2 and later | gcc-c++ | 4.8.5 and later |
- Python:2.7.2 and later / 3.6 and later | git | 3.82 and later |
| cmake | 3.2.0 and later |
| Python | 2.7.2 and later / 3.6 and later |
| Go | 1.9.2 and later |
| git | 2.17.1 and later |
| glibc-static | 2.17 |
| openssl-devel | 1.0.2k |
| bzip2-devel | 1.0.6 and later |
| python-devel / python3-devel | 2.7.5 and later / 3.6.8 and later |
| sqlite-devel | 3.7.17 and later |
| patchelf | 0.9 and later |
| libXext | 1.3.3 |
| libSM | 1.2.2 |
| libXrender | 0.9.10 |
| python-whl | numpy>=1.12, <=1.16.4<br/>google>=2.0.3<br/>protobuf>=3.12.2<br/>grpcio-tools>=1.28.1<br/>grpcio>=1.28.1<br/>func-timeout>=4.3.5<br/>pyyaml>=1.3.0<br/>sentencepiece==0.1.92<br>flask>=1.1.2<br>ujson>=2.0.3 |
It is recommended to use Docker for compilation. We have prepared the Paddle Serving compilation environment for you, see [this document](DOCKER_IMAGES.md). It is recommended to use Docker for compilation. We have prepared the Paddle Serving compilation environment for you, see [this document](DOCKER_IMAGES.md).
......
...@@ -4,12 +4,26 @@ ...@@ -4,12 +4,26 @@
## 编译环境设置 ## 编译环境设置
- OS: CentOS 7 | 组件 | 版本要求 |
- GCC: 4.8.2及以上 | :--------------------------: | :----------------------------------------------------------: |
- Golang: 1.9.2及以上 | OS | CentOS 7 |
- Git:2.17.1及以上 | gcc | 4.8.5 and later |
- CMake:3.2.2及以上 | gcc-c++ | 4.8.5 and later |
- Python:2.7.2及以上 / 3.6及以上 | git | 3.82 and later |
| cmake | 3.2.0 and later |
| Python | 2.7.2 and later / 3.6 and later |
| Go | 1.9.2 and later |
| git | 2.17.1 and later |
| glibc-static | 2.17 |
| openssl-devel | 1.0.2k |
| bzip2-devel | 1.0.6 and later |
| python-devel / python3-devel | 2.7.5 and later / 3.6.8 and later |
| sqlite-devel | 3.7.17 and later |
| patchelf | 0.9 |
| libXext | 1.3.3 |
| libSM | 1.2.2 |
| libXrender | 0.9.10 |
| python-whl | numpy>=1.12, <=1.16.4<br/>google>=2.0.3<br/>protobuf>=3.12.2<br/>grpcio-tools>=1.28.1<br/>grpcio>=1.28.1<br/>func-timeout>=4.3.5<br/>pyyaml>=1.3.0<br/>sentencepiece==0.1.92<br/>flask>=1.1.2<br/>ujson>=2.0.3 |
推荐使用Docker编译,我们已经为您准备好了Paddle Serving编译环境,详见[该文档](DOCKER_IMAGES_CN.md) 推荐使用Docker编译,我们已经为您准备好了Paddle Serving编译环境,详见[该文档](DOCKER_IMAGES_CN.md)
......
...@@ -95,7 +95,7 @@ The meaning of each parameter is as follows: ...@@ -95,7 +95,7 @@ The meaning of each parameter is as follows:
| fetch_list | (list) List of fetch variable names for remote Paddle Serving Service. | | fetch_list | (list) List of fetch variable names for remote Paddle Serving Service. |
| client_config | (str) The path of the client configuration file corresponding to the Paddle Serving Service. | | client_config | (str) The path of the client configuration file corresponding to the Paddle Serving Service. |
| concurrency | (int) The number of concurrent OPs. | | concurrency | (int) The number of concurrent OPs. |
| timeout | (int) The timeout time of the process operation, in seconds. If the value is less than zero, no timeout is considered. | | timeout | (int) The timeout time of the process operation, in ms. If the value is less than zero, no timeout is considered. |
| retry | (int) Timeout number of retries. When the value is 1, no retries are made. | | retry | (int) Timeout number of retries. When the value is 1, no retries are made. |
| batch_size | (int) The expected batch_size of Auto-Batching, since building batches may time out, the actual batch_size may be less than the set value. | | batch_size | (int) The expected batch_size of Auto-Batching, since building batches may time out, the actual batch_size may be less than the set value. |
| auto_batching_timeout | (float) Timeout for building batches of Auto-Batching (the unit is ms). | | auto_batching_timeout | (float) Timeout for building batches of Auto-Batching (the unit is ms). |
......
...@@ -95,7 +95,7 @@ def __init__(name=None, ...@@ -95,7 +95,7 @@ def __init__(name=None,
| fetch_list | (list)远程 Paddle Serving Service 的 fetch 列表。 | | fetch_list | (list)远程 Paddle Serving Service 的 fetch 列表。 |
| client_config | (str)Paddle Serving Service 对应的 Client 端配置文件路径。 | | client_config | (str)Paddle Serving Service 对应的 Client 端配置文件路径。 |
| concurrency | (int)OP 的并发数。 | | concurrency | (int)OP 的并发数。 |
| timeout | (int)process 操作的超时时间,单位为秒。若该值小于零,则视作不超时。 | | timeout | (int)process 操作的超时时间,单位为秒。若该值小于零,则视作不超时。 |
| retry | (int)超时重试次数。当该值为 1 时,不进行重试。 | | retry | (int)超时重试次数。当该值为 1 时,不进行重试。 |
| batch_size | (int)进行 Auto-Batching 的期望 batch_size 大小,由于构建 batch 可能超时,实际 batch_size 可能小于设定值。 | | batch_size | (int)进行 Auto-Batching 的期望 batch_size 大小,由于构建 batch 可能超时,实际 batch_size 可能小于设定值。 |
| auto_batching_timeout | (float)进行 Auto-Batching 构建 batch 的超时时间,单位为毫秒。 | | auto_batching_timeout | (float)进行 Auto-Batching 构建 batch 的超时时间,单位为毫秒。 |
......
...@@ -6,3 +6,5 @@ dag: ...@@ -6,3 +6,5 @@ dag:
client_type: brpc client_type: brpc
retry: 1 retry: 1
use_profile: false use_profile: false
tracer:
interval_s: 10
...@@ -12,20 +12,22 @@ ...@@ -12,20 +12,22 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# pylint: disable=doc-string-missing # pylint: disable=doc-string-missing
import paddle_serving_server.pipeline as pipeline
import logging
logging.basicConfig(
format="[%(process)d](%(threadName)s) %(levelname)s %(asctime)s [%(filename)s:%(lineno)d] %(message)s",
level=logging.INFO)
from paddle_serving_server.pipeline import Op, RequestOp, ResponseOp from paddle_serving_server.pipeline import Op, RequestOp, ResponseOp
from paddle_serving_server.pipeline import PipelineServer from paddle_serving_server.pipeline import PipelineServer
from paddle_serving_server.pipeline.proto import pipeline_service_pb2 from paddle_serving_server.pipeline.proto import pipeline_service_pb2
from paddle_serving_server.pipeline.channel import ChannelDataEcode from paddle_serving_server.pipeline.channel import ChannelDataEcode
import numpy as np import numpy as np
from paddle_serving_app.reader import IMDBDataset from paddle_serving_app.reader import IMDBDataset
import logging
_LOGGER = logging.getLogger() _LOGGER = logging.getLogger()
console_handler = pipeline.logger.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(
logging.Formatter(
"%(levelname)s %(asctime)s [%(filename)s:%(lineno)d] %(message)s"))
_LOGGER.addHandler(console_handler)
class ImdbRequestOp(RequestOp): class ImdbRequestOp(RequestOp):
......
...@@ -11,7 +11,11 @@ ...@@ -11,7 +11,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logger # this module must be the first to import
from operator import Op, RequestOp, ResponseOp
from pipeline_server import PipelineServer
from pipeline_client import PipelineClient
from analyse import Analyst
from operator import Op, RequestOp, ResponseOp from operator import Op, RequestOp, ResponseOp
from pipeline_server import PipelineServer from pipeline_server import PipelineServer
from pipeline_client import PipelineClient from pipeline_client import PipelineClient
......
...@@ -17,7 +17,7 @@ import copy ...@@ -17,7 +17,7 @@ import copy
import re import re
import logging import logging
_LOGGER = logging.getLogger() _LOGGER = logging.getLogger("pipeline.analyse")
class Analyst(object): class Analyst(object):
...@@ -164,7 +164,7 @@ class OpAnalyst(object): ...@@ -164,7 +164,7 @@ class OpAnalyst(object):
def add(self, name_str, ts_list): def add(self, name_str, ts_list):
if self._close: if self._close:
_LOGGER.error("OpAnalyst is closed.") _LOGGER.error("Failed to add item: OpAnalyst is closed.")
return return
op_name, curr_idx, step = self._parse(name_str) op_name, curr_idx, step = self._parse(name_str)
if op_name not in self.op_time_list_dict: if op_name not in self.op_time_list_dict:
......
此差异已折叠。
此差异已折叠。
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import logging.handlers
import os
class SectionLevelFilter(object):
def __init__(self, levels):
self._levels = levels
def filter(self, logRecord):
return logRecord.levelno in self._levels
class OutOfMouduleFilter(object):
def __init__(self, out_names):
self._out_names = out_names
def filter(self, logRecord):
return logRecord.name not in self._out_names
class OutOfMouduleAndSectionLevelFilter(object):
def __init__(self, out_names, levels):
self._out_names = out_names
self._levels = levels
def filter(self, logRecord):
if logRecord.name in self._out_names:
return False
return logRecord.levelno in self._levels
class StreamHandler(logging.StreamHandler):
def __init__(self, *args, **kwargs):
super(StreamHandler, self).__init__(*args, **kwargs)
self.addFilter(OutOfMouduleFilter(["pipeline.profiler"]))
log_dir = "PipelineServingLogs"
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# root logger
_LOGGER = logging.getLogger()
_LOGGER.setLevel(logging.DEBUG)
formatter = logging.Formatter(
"%(levelname)s %(asctime)s [%(filename)s:%(lineno)d] %(message)s")
# info and warn
file_info = logging.handlers.RotatingFileHandler(
os.path.join(log_dir, "INFO.log"))
file_info.addFilter(OutOfMouduleFilter(["pipeline.profiler"]))
file_info.addFilter(SectionLevelFilter([logging.INFO, logging.WARNING]))
file_info.setFormatter(formatter)
# err and critical
file_err = logging.handlers.RotatingFileHandler(
os.path.join(log_dir, "ERROR.log"))
file_err.addFilter(OutOfMouduleFilter(["pipeline.profiler"]))
file_err.setLevel(logging.ERROR)
file_err.setFormatter(formatter)
_LOGGER.addHandler(file_info)
_LOGGER.addHandler(file_err)
# tracer logger
_TRACER = logging.getLogger("pipeline.profiler")
_TRACER.setLevel(logging.INFO)
_TRACER.addFilter(logging.Filter("pipeline.profiler"))
# tracer
tracer_formatter = logging.Formatter("%(asctime)s %(message)s")
file_trace = logging.handlers.RotatingFileHandler(
os.path.join(log_dir, "TRACE.log"))
file_trace.setFormatter(tracer_formatter)
_TRACER.addHandler(file_trace)
此差异已折叠。
...@@ -22,7 +22,7 @@ from .channel import ChannelDataEcode ...@@ -22,7 +22,7 @@ from .channel import ChannelDataEcode
from .proto import pipeline_service_pb2 from .proto import pipeline_service_pb2
from .proto import pipeline_service_pb2_grpc from .proto import pipeline_service_pb2_grpc
_LOGGER = logging.getLogger() _LOGGER = logging.getLogger("pipeline.pipeline_client")
class PipelineClient(object): class PipelineClient(object):
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
from concurrent import futures from concurrent import futures
import grpc import grpc
import logging import logging
import json
import socket import socket
import contextlib import contextlib
from contextlib import closing from contextlib import closing
...@@ -25,15 +26,14 @@ from .proto import pipeline_service_pb2_grpc ...@@ -25,15 +26,14 @@ from .proto import pipeline_service_pb2_grpc
from .operator import ResponseOp from .operator import ResponseOp
from .dag import DAGExecutor from .dag import DAGExecutor
_LOGGER = logging.getLogger() _LOGGER = logging.getLogger("pipeline.pipeline_server")
class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer): class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer):
def __init__(self, response_op, dag_config, show_info): def __init__(self, response_op, dag_conf):
super(PipelineServicer, self).__init__() super(PipelineServicer, self).__init__()
# init dag executor # init dag executor
self._dag_executor = DAGExecutor( self._dag_executor = DAGExecutor(response_op, dag_conf)
response_op, dag_config, show_info=show_info)
self._dag_executor.start() self._dag_executor.start()
_LOGGER.info("[PipelineServicer] succ init") _LOGGER.info("[PipelineServicer] succ init")
...@@ -41,9 +41,6 @@ class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer): ...@@ -41,9 +41,6 @@ class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer):
resp = self._dag_executor.call(request) resp = self._dag_executor.call(request)
return resp return resp
def __del__(self):
self._dag_executor.stop()
@contextlib.contextmanager @contextlib.contextmanager
def _reserve_port(port): def _reserve_port(port):
...@@ -67,9 +64,11 @@ class PipelineServer(object): ...@@ -67,9 +64,11 @@ class PipelineServer(object):
def set_response_op(self, response_op): def set_response_op(self, response_op):
if not isinstance(response_op, ResponseOp): if not isinstance(response_op, ResponseOp):
raise Exception("response_op must be ResponseOp type.") raise Exception("Failed to set response_op: response_op "
"must be ResponseOp type.")
if len(response_op.get_input_ops()) != 1: if len(response_op.get_input_ops()) != 1:
raise Exception("response_op can only have one previous op.") raise Exception("Failed to set response_op: response_op "
"can only have one previous op.")
self._response_op = response_op self._response_op = response_op
def _port_is_available(self, port): def _port_is_available(self, port):
...@@ -79,36 +78,25 @@ class PipelineServer(object): ...@@ -79,36 +78,25 @@ class PipelineServer(object):
return result != 0 return result != 0
def prepare_server(self, yml_file): def prepare_server(self, yml_file):
with open(yml_file) as f: conf = ServerYamlConfChecker.load_server_yaml_conf(yml_file)
yml_config = yaml.load(f.read())
default_config = {
"port": 9292,
"worker_num": 1,
"build_dag_each_worker": False,
}
for key, val in default_config.items(): self._port = conf["port"]
if yml_config.get(key) is None:
_LOGGER.warning("[CONF] {} not set, use default: {}"
.format(key, val))
yml_config[key] = val
self._port = yml_config["port"]
if not self._port_is_available(self._port): if not self._port_is_available(self._port):
raise SystemExit("Prot {} is already used".format(self._port)) raise SystemExit("Failed to prepare_server: prot {} "
self._worker_num = yml_config["worker_num"] "is already used".format(self._port))
self._build_dag_each_worker = yml_config["build_dag_each_worker"] self._worker_num = conf["worker_num"]
self._build_dag_each_worker = conf["build_dag_each_worker"]
_LOGGER.info("============= PIPELINE SERVER =============") _LOGGER.info("============= PIPELINE SERVER =============")
for key in default_config.keys(): _LOGGER.info("\n{}".format(
_LOGGER.info("{}: {}".format(key, yml_config[key])) json.dumps(
conf, indent=4, separators=(',', ':'))))
if self._build_dag_each_worker is True: if self._build_dag_each_worker is True:
_LOGGER.info( _LOGGER.info(
"(Make sure that install grpcio whl with --no-binary flag)") "(Make sure that install grpcio whl with --no-binary flag)")
_LOGGER.info("-------------------------------------------") _LOGGER.info("-------------------------------------------")
self._dag_config = yml_config.get("dag", {}) self._conf = conf
self._dag_config["build_dag_each_worker"] = self._build_dag_each_worker
def run_server(self): def run_server(self):
if self._build_dag_each_worker: if self._build_dag_each_worker:
...@@ -119,8 +107,7 @@ class PipelineServer(object): ...@@ -119,8 +107,7 @@ class PipelineServer(object):
show_info = (i == 0) show_info = (i == 0)
worker = multiprocessing.Process( worker = multiprocessing.Process(
target=self._run_server_func, target=self._run_server_func,
args=(bind_address, self._response_op, args=(bind_address, self._response_op, self._conf))
self._dag_config))
worker.start() worker.start()
workers.append(worker) workers.append(worker)
for worker in workers: for worker in workers:
...@@ -129,19 +116,153 @@ class PipelineServer(object): ...@@ -129,19 +116,153 @@ class PipelineServer(object):
server = grpc.server( server = grpc.server(
futures.ThreadPoolExecutor(max_workers=self._worker_num)) futures.ThreadPoolExecutor(max_workers=self._worker_num))
pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server( pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server(
PipelineServicer(self._response_op, self._dag_config, True), PipelineServicer(self._response_op, self._conf), server)
server)
server.add_insecure_port('[::]:{}'.format(self._port)) server.add_insecure_port('[::]:{}'.format(self._port))
server.start() server.start()
server.wait_for_termination() server.wait_for_termination()
def _run_server_func(self, bind_address, response_op, dag_config): def _run_server_func(self, bind_address, response_op, dag_conf):
options = (('grpc.so_reuseport', 1), ) options = (('grpc.so_reuseport', 1), )
server = grpc.server( server = grpc.server(
futures.ThreadPoolExecutor( futures.ThreadPoolExecutor(
max_workers=1, ), options=options) max_workers=1, ), options=options)
pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server( pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server(
PipelineServicer(response_op, dag_config, False), server) PipelineServicer(response_op, dag_conf), server)
server.add_insecure_port(bind_address) server.add_insecure_port(bind_address)
server.start() server.start()
server.wait_for_termination() server.wait_for_termination()
class ServerYamlConfChecker(object):
def __init__(self):
pass
@staticmethod
def load_server_yaml_conf(yml_file):
with open(yml_file) as f:
conf = yaml.load(f.read())
ServerYamlConfChecker.check_server_conf(conf)
ServerYamlConfChecker.check_dag_conf(conf["dag"])
ServerYamlConfChecker.check_tracer_conf(conf["dag"]["tracer"])
return conf
@staticmethod
def check_conf(conf, default_conf, conf_type, conf_qualification):
ServerYamlConfChecker.fill_with_default_conf(conf, default_conf)
ServerYamlConfChecker.check_conf_type(conf, conf_type)
ServerYamlConfChecker.check_conf_qualification(conf, conf_qualification)
@staticmethod
def check_server_conf(conf):
default_conf = {
"port": 9292,
"worker_num": 1,
"build_dag_each_worker": False,
"dag": {},
}
conf_type = {
"port": int,
"worker_num": int,
"build_dag_each_worker": bool,
}
conf_qualification = {
"port": [(">=", 1024), ("<=", 65535)],
"worker_num": (">=", 1),
}
ServerYamlConfChecker.check_conf(conf, default_conf, conf_type,
conf_qualification)
@staticmethod
def check_tracer_conf(conf):
default_conf = {"interval_s": 600, }
conf_type = {"interval_s": int, }
conf_qualification = {}
ServerYamlConfChecker.check_conf(conf, default_conf, conf_type,
conf_qualification)
@staticmethod
def check_dag_conf(conf):
default_conf = {
"retry": 1,
"client_type": "brpc",
"use_profile": False,
"channel_size": 0,
"is_thread_op": True,
"tracer": {},
}
conf_type = {
"retry": int,
"client_type": str,
"use_profile": bool,
"channel_size": int,
"is_thread_op": bool,
}
conf_qualification = {
"retry": (">=", 1),
"client_type": ("in", ["brpc", "grpc"]),
"channel_size": (">=", 0),
}
ServerYamlConfChecker.check_conf(conf, default_conf, conf_type,
conf_qualification)
@staticmethod
def fill_with_default_conf(conf, default_conf):
for key, val in default_conf.items():
if conf.get(key) is None:
_LOGGER.warning("[CONF] {} not set, use default: {}"
.format(key, val))
conf[key] = val
@staticmethod
def check_conf_type(conf, conf_type):
for key, val in conf_type.items():
if not isinstance(conf[key], val):
raise SystemExit("[CONF] {} must be {} type, but get {}."
.format(key, val, type(conf[key])))
@staticmethod
def check_conf_qualification(conf, conf_qualification):
for key, qualification in conf_qualification.items():
if not isinstance(qualification, list):
qualification = [qualification]
if not ServerYamlConfChecker.qualification_check(conf[key],
qualification):
raise SystemExit("[CONF] {} must be {}, but get {}."
.format(key, ", ".join([
"{} {}"
.format(q[0], q[1]) for q in qualification
]), conf[key]))
@staticmethod
def qualification_check(value, qualifications):
if not isinstance(qualifications, list):
qualifications = [qualifications]
ok = True
for q in qualifications:
operator, limit = q
if operator == "<":
ok = value < limit
elif operator == "==":
ok = value == limit
elif operator == ">":
ok = value > limit
elif operator == "<=":
ok = value <= limit
elif operator == ">=":
ok = value >= limit
elif operator == "in":
ok = value in limit
else:
raise SystemExit("unknow operator: {}".format(operator))
if ok == False:
break
return ok
...@@ -23,12 +23,124 @@ elif sys.version_info.major == 3: ...@@ -23,12 +23,124 @@ elif sys.version_info.major == 3:
else: else:
raise Exception("Error Python version") raise Exception("Error Python version")
from time import time as _time from time import time as _time
import time
import threading import threading
import multiprocessing
_LOGGER = logging.getLogger() _TRACER = logging.getLogger("pipeline.profiler")
class PerformanceTracer(object):
def __init__(self, is_thread_mode, interval_s, server_worker_num):
self._is_thread_mode = is_thread_mode
if is_thread_mode:
# Because the Channel in the thread mode cannot be
# accessed across processes, when using thread mode,
# the PerformanceTracer is also the thread mode.
# However, performance may be affected by GIL.
self._data_buffer = Queue.Queue()
else:
self._data_buffer = multiprocessing.Manager().Queue()
self._interval_s = interval_s
self._thrd = None
self._proc = None
self._channels = []
# The size of data in Channel will not exceed server_worker_num
self._server_worker_num = server_worker_num
def data_buffer(self):
return self._data_buffer
def start(self):
if self._is_thread_mode:
self._thrd = threading.Thread(
target=self._trace_func, args=(self._channels, ))
self._thrd.daemon = True
self._thrd.start()
else:
self._proc = multiprocessing.Process(
target=self._trace_func, args=(self._channels, ))
self._proc.daemon = True
self._proc.start()
def set_channels(self, channels):
self._channels = channels
def _trace_func(self, channels):
actions = ["in", "prep", "midp", "postp", "out"]
calcu_actions = ["prep", "midp", "postp"]
while True:
op_cost = {}
err_count = 0
_TRACER.info("==================== TRACER ======================")
# op
while True:
try:
name, action, stage, cost = self._data_buffer.get_nowait()
if stage == False:
# only for name == DAG
assert name == "DAG"
err_count += 1
if name not in op_cost:
op_cost[name] = {}
if action not in op_cost[name]:
op_cost[name][action] = []
op_cost[name][action].append(cost)
except Queue.Empty:
break
if len(op_cost) != 0:
for name in op_cost:
tot_cost, calcu_cost = 0.0, 0.0
for action, costs in op_cost[name].items():
op_cost[name][action] = sum(costs) / (1e3 * len(costs))
tot_cost += op_cost[name][action]
if name != "DAG":
_TRACER.info("Op({}):".format(name))
for action in actions:
if action in op_cost[name]:
_TRACER.info("\t{}[{} ms]".format(
action, op_cost[name][action]))
for action in calcu_actions:
if action in op_cost[name]:
calcu_cost += op_cost[name][action]
_TRACER.info("\tidle[{}]".format(1 - 1.0 * calcu_cost /
tot_cost))
if "DAG" in op_cost:
calls = op_cost["DAG"].values()
calls.sort()
tot = len(calls)
qps = 1.0 * tot / self._interval_s
ave_cost = sum(calls) / tot
latencys = [50, 60, 70, 80, 90, 95, 99]
_TRACER.info("DAGExecutor:")
_TRACER.info("\tquery count[{}]".format(tot))
_TRACER.info("\tqps[{} q/s]".format(qps))
_TRACER.info("\tsucc[{}]".format(1 - 1.0 * err_count / tot))
_TRACER.info("\tlatency:")
_TRACER.info("\t\tave[{} ms]".format(ave_cost))
for latency in latencys:
_TRACER.info("\t\t.{}[{} ms]".format(latency, calls[int(
tot * latency / 100.0)]))
# channel
_TRACER.info("Channel (server worker num[{}]):".format(
self._server_worker_num))
for channel in channels:
_TRACER.info("\t{}(In: {}, Out: {}) size[{}/{}]".format(
channel.name,
channel.get_producers(),
channel.get_consumers(),
channel.size(), channel.get_maxsize()))
time.sleep(self._interval_s)
class UnsafeTimeProfiler(object): class UnsafeTimeProfiler(object):
""" thread unsafe profiler """
def __init__(self): def __init__(self):
self.pid = os.getpid() self.pid = os.getpid()
self.print_head = 'PROFILE\tpid:{}\t'.format(self.pid) self.print_head = 'PROFILE\tpid:{}\t'.format(self.pid)
...@@ -41,8 +153,9 @@ class UnsafeTimeProfiler(object): ...@@ -41,8 +153,9 @@ class UnsafeTimeProfiler(object):
def record(self, name): def record(self, name):
if self._enable is False: if self._enable is False:
return return
self.time_record.append('{}:{} '.format(name, timestamp = int(round(_time() * 1000000))
int(round(_time() * 1000000)))) self.time_record.append('{}:{} '.format(name, timestamp))
return timestamp
def print_profile(self): def print_profile(self):
if self._enable is False: if self._enable is False:
...@@ -78,6 +191,7 @@ class TimeProfiler(object): ...@@ -78,6 +191,7 @@ class TimeProfiler(object):
name = '_'.join(name_with_tag[:-1]) name = '_'.join(name_with_tag[:-1])
with self._lock: with self._lock:
self._time_record.put((name, tag, timestamp)) self._time_record.put((name, tag, timestamp))
return timestamp
def print_profile(self): def print_profile(self):
if self._enable is False: if self._enable is False:
......
numpy>=1.12, <=1.16.4 ; python_version<"3.5" numpy>=1.12, <=1.16.4 ; python_version<"3.5"
google>=2.0.3
protobuf>=3.12.2 protobuf>=3.12.2
grpcio-tools>=1.28.1 grpcio-tools>=1.28.1
grpcio>=1.28.1 grpcio>=1.28.1
func-timeout>=4.3.5 func-timeout>=4.3.5
pyyaml>=1.3.0 pyyaml>=1.3.0
sentencepiece==0.1.92
flask>=1.1.2
ujson>=2.0.3
...@@ -54,7 +54,6 @@ function build_app() { ...@@ -54,7 +54,6 @@ function build_app() {
local DIRNAME=build-app-$TYPE local DIRNAME=build-app-$TYPE
mkdir $DIRNAME # pwd: /Serving mkdir $DIRNAME # pwd: /Serving
cd $DIRNAME # pwd: /Serving/build-app-$TYPE cd $DIRNAME # pwd: /Serving/build-app-$TYPE
pip install numpy sentencepiece
case $TYPE in case $TYPE in
CPU|GPU) CPU|GPU)
cmake -DPYTHON_INCLUDE_DIR=$PYTHONROOT/include/python2.7/ \ cmake -DPYTHON_INCLUDE_DIR=$PYTHONROOT/include/python2.7/ \
...@@ -295,8 +294,6 @@ function python_run_criteo_ctr_with_cube() { ...@@ -295,8 +294,6 @@ function python_run_criteo_ctr_with_cube() {
function python_test_bert() { function python_test_bert() {
# pwd: /Serving/python/examples # pwd: /Serving/python/examples
local TYPE=$1 local TYPE=$1
yum install -y libXext libSM libXrender >/dev/null
pip install ujson
export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving
cd bert # pwd: /Serving/python/examples/bert cd bert # pwd: /Serving/python/examples/bert
case $TYPE in case $TYPE in
...@@ -779,7 +776,7 @@ function python_test_pipeline(){ ...@@ -779,7 +776,7 @@ function python_test_pipeline(){
# test: thread servicer & thread op # test: thread servicer & thread op
cat << EOF > config.yml cat << EOF > config.yml
port: 18080 port: 18080
worker_num: 2 worker_num: 4
build_dag_each_worker: false build_dag_each_worker: false
dag: dag:
is_thread_op: true is_thread_op: true
...@@ -796,7 +793,7 @@ EOF ...@@ -796,7 +793,7 @@ EOF
# test: thread servicer & process op # test: thread servicer & process op
cat << EOF > config.yml cat << EOF > config.yml
port: 18080 port: 18080
worker_num: 2 worker_num: 4
build_dag_each_worker: false build_dag_each_worker: false
dag: dag:
is_thread_op: false is_thread_op: false
...@@ -810,13 +807,13 @@ EOF ...@@ -810,13 +807,13 @@ EOF
ps -ef | grep "pipeline_server" | grep -v grep | awk '{print $2}' | xargs kill ps -ef | grep "pipeline_server" | grep -v grep | awk '{print $2}' | xargs kill
kill_process_by_port 18080 kill_process_by_port 18080
# test: process servicer & thread op # test: process servicer & process op
cat << EOF > config.yml cat << EOF > config.yml
port: 18080 port: 18080
worker_num: 2 worker_num: 4
build_dag_each_worker: true build_dag_each_worker: false
dag: dag:
is_thread_op: flase is_thread_op: false
client_type: brpc client_type: brpc
retry: 1 retry: 1
use_profile: false use_profile: false
...@@ -826,12 +823,14 @@ EOF ...@@ -826,12 +823,14 @@ EOF
check_cmd "python test_pipeline_client.py" check_cmd "python test_pipeline_client.py"
ps -ef | grep "pipeline_server" | grep -v grep | awk '{print $2}' | xargs kill ps -ef | grep "pipeline_server" | grep -v grep | awk '{print $2}' | xargs kill
kill_process_by_port 18080 kill_process_by_port 18080
# test: process servicer & process op # test: process servicer & thread op
pip uninstall grpcio -y
pip install grpcio --no-binary=grpcio
cat << EOF > config.yml cat << EOF > config.yml
port: 18080 port: 18080
worker_num: 2 worker_num: 4
build_dag_each_worker: false build_dag_each_worker: true
dag: dag:
is_thread_op: false is_thread_op: false
client_type: brpc client_type: brpc
...@@ -843,7 +842,7 @@ EOF ...@@ -843,7 +842,7 @@ EOF
check_cmd "python test_pipeline_client.py" check_cmd "python test_pipeline_client.py"
ps -ef | grep "pipeline_server" | grep -v grep | awk '{print $2}' | xargs kill ps -ef | grep "pipeline_server" | grep -v grep | awk '{print $2}' | xargs kill
kill_process_by_port 18080 kill_process_by_port 18080
kill_server_process kill_server_process
kill_process_by_port 9292 kill_process_by_port 9292
kill_process_by_port 9393 kill_process_by_port 9393
...@@ -854,7 +853,7 @@ EOF ...@@ -854,7 +853,7 @@ EOF
sleep 5 sleep 5
cat << EOF > config.yml cat << EOF > config.yml
port: 18080 port: 18080
worker_num: 2 worker_num: 4
build_dag_each_worker: false build_dag_each_worker: false
dag: dag:
is_thread_op: false is_thread_op: false
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册