Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
PaddlePaddle
Serving
提交
f942f5da
S
Serving
项目概览
PaddlePaddle
/
Serving
大约 1 年 前同步成功
通知
186
Star
833
Fork
253
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
105
列表
看板
标记
里程碑
合并请求
10
Wiki
2
Wiki
分析
仓库
DevOps
项目成员
Pages
S
Serving
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
105
Issue
105
列表
看板
标记
里程碑
合并请求
10
合并请求
10
Pages
分析
分析
仓库分析
DevOps
Wiki
2
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
f942f5da
编写于
2月 10, 2022
作者:
S
ShiningZhang
提交者:
GitHub
2月 10, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #1628 from ShiningZhang/v0.8.0
Merge pull request #1586 from develop to v0.8.0
上级
e999e9ae
8d37559d
变更
6
显示空白变更内容
内联
并排
Showing
6 changed file
with
107 addition
and
15 deletion
+107
-15
core/predictor/framework/prometheus_metric.cpp
core/predictor/framework/prometheus_metric.cpp
+5
-5
python/pipeline/dag.py
python/pipeline/dag.py
+71
-5
python/pipeline/operator.py
python/pipeline/operator.py
+4
-0
python/pipeline/profiler.py
python/pipeline/profiler.py
+18
-5
python/pipeline/prometheus_metrics.py
python/pipeline/prometheus_metrics.py
+8
-0
python/requirements.txt
python/requirements.txt
+1
-0
未找到文件。
core/predictor/framework/prometheus_metric.cpp
浏览文件 @
f942f5da
...
...
@@ -30,26 +30,26 @@ PrometheusMetric::PrometheusMetric()
serializer_
(
new
prometheus
::
TextSerializer
()),
query_success_family_
(
prometheus
::
BuildCounter
()
.
Name
(
"pd_query_request_success"
)
.
Name
(
"pd_query_request_success
_total
"
)
.
Help
(
"Number of successful query requests"
)
.
Register
(
*
registry_
)),
query_failure_family_
(
prometheus
::
BuildCounter
()
.
Name
(
"pd_query_request_failure"
)
.
Name
(
"pd_query_request_failure
_total
"
)
.
Help
(
"Number of failed query requests"
)
.
Register
(
*
registry_
)),
inf_count_family_
(
prometheus
::
BuildCounter
()
.
Name
(
"pd_inference_count"
)
.
Name
(
"pd_inference_count
_total
"
)
.
Help
(
"Number of inferences performed"
)
.
Register
(
*
registry_
)),
query_duration_us_family_
(
prometheus
::
BuildCounter
()
.
Name
(
"pd_query_request_duration_us"
)
.
Name
(
"pd_query_request_duration_us
_total
"
)
.
Help
(
"Cummulative query request duration in microseconds"
)
.
Register
(
*
registry_
)),
inf_duration_us_family_
(
prometheus
::
BuildCounter
()
.
Name
(
"pd_inference_duration_us"
)
.
Name
(
"pd_inference_duration_us
_total
"
)
.
Help
(
"Cummulative inference duration in microseconds"
)
.
Register
(
*
registry_
)),
metrics_enabled_
(
false
)
...
...
python/pipeline/dag.py
浏览文件 @
f942f5da
...
...
@@ -62,6 +62,13 @@ class DAGExecutor(object):
self
.
_retry
=
dag_conf
[
"retry"
]
self
.
_server_use_profile
=
dag_conf
[
"use_profile"
]
self
.
_enable_prometheus
=
False
if
"enable_prometheus"
in
dag_conf
:
self
.
_enable_prometheus
=
dag_conf
[
"enable_prometheus"
]
if
"prometheus_port"
in
dag_conf
and
self
.
_enable_prometheus
:
self
.
_prometheus_port
=
dag_conf
[
"prometheus_port"
]
else
:
self
.
_prometheus_port
=
None
channel_size
=
dag_conf
[
"channel_size"
]
channel_recv_frist_arrive
=
dag_conf
[
"channel_recv_frist_arrive"
]
self
.
_is_thread_op
=
dag_conf
[
"is_thread_op"
]
...
...
@@ -77,8 +84,10 @@ class DAGExecutor(object):
if
tracer_interval_s
>=
1
:
self
.
_tracer
=
PerformanceTracer
(
self
.
_is_thread_op
,
tracer_interval_s
,
server_worker_num
)
if
self
.
_enable_prometheus
:
self
.
_tracer
.
set_enable_dict
(
True
)
self
.
_dag
=
DAG
(
self
.
name
,
response_op
,
self
.
_server_use_profile
,
self
.
_dag
=
DAG
(
self
.
name
,
response_op
,
self
.
_server_use_profile
,
self
.
_prometheus_port
,
self
.
_is_thread_op
,
channel_size
,
build_dag_each_worker
,
self
.
_tracer
,
channel_recv_frist_arrive
)
(
in_channel
,
out_channel
,
pack_rpc_func
,
...
...
@@ -480,10 +489,10 @@ class DAG(object):
"""
Directed Acyclic Graph(DAG) engine, builds one DAG topology.
"""
def
__init__
(
self
,
request_name
,
response_op
,
use_profile
,
is_thread_op
,
def
__init__
(
self
,
request_name
,
response_op
,
use_profile
,
prometheus_port
,
is_thread_op
,
channel_size
,
build_dag_each_worker
,
tracer
,
channel_recv_frist_arrive
):
_LOGGER
.
info
(
"{}, {}, {}, {}, {}
,{} ,{} ,{}"
.
format
(
request_name
,
response_op
,
use_profile
,
is_thread_op
,
_LOGGER
.
info
(
"{}, {}, {}, {}, {}
, {} ,{} ,{} ,{}"
.
format
(
request_name
,
response_op
,
use_profile
,
prometheus_port
,
is_thread_op
,
channel_size
,
build_dag_each_worker
,
tracer
,
channel_recv_frist_arrive
))
@
ErrorCatch
...
...
@@ -491,6 +500,7 @@ class DAG(object):
def
init_helper
(
self
,
request_name
:
str
,
response_op
,
use_profile
:
[
bool
,
None
],
prometheus_port
:
[
int
,
None
],
is_thread_op
:
bool
,
channel_size
,
build_dag_each_worker
:
[
bool
,
None
],
...
...
@@ -499,6 +509,8 @@ class DAG(object):
self
.
_request_name
=
request_name
self
.
_response_op
=
response_op
self
.
_use_profile
=
use_profile
self
.
_prometheus_port
=
prometheus_port
self
.
_use_prometheus
=
(
self
.
_prometheus_port
is
not
None
)
self
.
_is_thread_op
=
is_thread_op
self
.
_channel_size
=
channel_size
self
.
_build_dag_each_worker
=
build_dag_each_worker
...
...
@@ -506,7 +518,7 @@ class DAG(object):
self
.
_channel_recv_frist_arrive
=
channel_recv_frist_arrive
if
not
self
.
_is_thread_op
:
self
.
_manager
=
PipelineProcSyncManager
()
init_helper
(
self
,
request_name
,
response_op
,
use_profile
,
is_thread_op
,
init_helper
(
self
,
request_name
,
response_op
,
use_profile
,
prometheus_port
,
is_thread_op
,
channel_size
,
build_dag_each_worker
,
tracer
,
channel_recv_frist_arrive
)
print
(
"[DAG] Succ init"
)
...
...
@@ -828,6 +840,56 @@ class DAG(object):
return
self
.
_input_channel
,
self
.
_output_channel
,
self
.
_pack_func
,
self
.
_unpack_func
def
start_prom
(
self
,
prometheus_port
):
import
prometheus_client
from
prometheus_client
import
Counter
from
prometheus_client.core
import
CollectorRegistry
from
flask
import
Response
,
Flask
from
.prometheus_metrics
import
registry
from
.prometheus_metrics
import
metric_query_success
,
metric_query_failure
,
metric_inf_count
,
metric_query_duration_us
,
metric_inf_duration_us
app
=
Flask
(
__name__
)
# requests_total = Counter('c1','A counter')
@
app
.
route
(
"/metrics"
)
def
requests_count
():
item
=
self
.
_tracer
.
profile_dict
_LOGGER
.
info
(
"metrics: {}"
.
format
(
item
))
# {'uci': {'in': 727.443, 'prep': 0.5525833333333333, 'midp': 2.21375, 'postp': 1.32375, 'out': 0.9396666666666667}, 'DAG': {'call_0': 29.479, 'call_1': 8.176, 'call_2': 8.045, 'call_3': 7.988, 'call_4': 7.609, 'call_5': 7.629, 'call_6': 7.625, 'call_7': 8.32, 'call_8': 8.57, 'call_9': 8.055, 'call_10': 7.915, 'call_11': 7.873, 'query_count': 12, 'qps': 1.2, 'succ': 1.0, 'avg': 9.773666666666667, '50': 8.045, '60': 8.055, '70': 8.176, '80': 8.32, '90': 8.57, '95': 29.479, '99': 29.479}}
if
"DAG"
in
item
:
total
=
item
[
"DAG"
][
"query_count"
]
succ
=
total
*
item
[
"DAG"
][
"succ"
]
fail
=
total
*
(
1
-
item
[
"DAG"
][
"succ"
])
query_duration
=
total
*
item
[
"DAG"
][
"avg"
]
metric_query_success
.
inc
(
succ
)
metric_query_failure
.
_value
.
inc
(
fail
)
metric_query_duration_us
.
_value
.
inc
(
query_duration
)
inf_cnt
=
0
infer_duration
=
0.0
for
name
in
item
:
if
name
!=
"DAG"
:
if
"count"
in
item
[
name
]:
inf_cnt
+=
item
[
name
][
"count"
]
if
"midp"
in
item
[
name
]:
infer_duration
+=
item
[
name
][
"count"
]
*
item
[
name
][
"midp"
]
metric_inf_count
.
_value
.
inc
(
inf_cnt
)
metric_inf_duration_us
.
_value
.
inc
(
infer_duration
)
#return str(item)
self
.
_tracer
.
profile_dict
=
{}
return
Response
(
prometheus_client
.
generate_latest
(
registry
),
mimetype
=
"text/plain"
)
def
prom_run
():
app
.
run
(
host
=
"0.0.0.0"
,
port
=
prometheus_port
)
p
=
threading
.
Thread
(
target
=
prom_run
,
args
=
())
_LOGGER
.
info
(
"Prometheus Start 2"
)
p
.
daemon
=
True
p
.
start
()
def
start
(
self
):
"""
Each OP starts a thread or process by _is_thread_op
...
...
@@ -842,11 +904,15 @@ class DAG(object):
for
op
in
self
.
_actual_ops
:
op
.
use_profiler
(
self
.
_use_profile
)
op
.
set_tracer
(
self
.
_tracer
)
op
.
set_use_prometheus
(
self
.
_use_prometheus
)
if
self
.
_is_thread_op
:
self
.
_threads_or_proces
.
extend
(
op
.
start_with_thread
())
else
:
self
.
_threads_or_proces
.
extend
(
op
.
start_with_process
())
_LOGGER
.
info
(
"[DAG] start"
)
if
self
.
_use_prometheus
:
_LOGGER
.
info
(
"Prometheus Start 1"
)
self
.
start_prom
(
self
.
_prometheus_port
)
# not join yet
return
self
.
_threads_or_proces
...
...
python/pipeline/operator.py
浏览文件 @
f942f5da
...
...
@@ -371,6 +371,9 @@ class Op(object):
def
set_tracer
(
self
,
tracer
):
self
.
_tracer
=
tracer
def
set_use_prometheus
(
self
,
use_prometheus
):
self
.
_use_prometheus
=
use_prometheus
def
init_client
(
self
,
client_config
,
server_endpoints
):
"""
Initialize the client object. There are three types of clients, brpc,
...
...
@@ -1448,6 +1451,7 @@ class Op(object):
midped_data_dict
,
err_channeldata_dict
\
=
self
.
_run_process
(
preped_data_dict
,
op_info_prefix
,
skip_process_dict
,
logid_dict
)
end
=
profiler
.
record
(
"midp#{}_1"
.
format
(
op_info_prefix
))
_LOGGER
.
info
(
"prometheus inf count +1"
)
midp_time
=
end
-
start
_LOGGER
.
debug
(
"op:{} process_end:{}, cost:{}"
.
format
(
op_info_prefix
,
time
.
time
(),
midp_time
))
...
...
python/pipeline/profiler.py
浏览文件 @
f942f5da
...
...
@@ -49,13 +49,18 @@ class PerformanceTracer(object):
self
.
_channels
=
[]
# The size of data in Channel will not exceed server_worker_num
self
.
_server_worker_num
=
server_worker_num
if
_is_profile
:
self
.
profile_dict
=
{}
self
.
_enable_dict
=
False
def
data_buffer
(
self
):
return
self
.
_data_buffer
def
start
(
self
):
self
.
_thrd
=
threading
.
Thread
(
target
=
self
.
_trace_func
,
args
=
(
self
.
_channels
,
))
self
.
_thrd
.
daemon
=
True
self
.
_thrd
.
start
()
"""
if self._is_thread_mode:
self._thrd = threading.Thread(
target=self._trace_func, args=(self._channels, ))
...
...
@@ -66,10 +71,14 @@ class PerformanceTracer(object):
target=self._trace_func, args=(self._channels, ))
self._proc.daemon = True
self._proc.start()
"""
def
set_channels
(
self
,
channels
):
self
.
_channels
=
channels
def
set_enable_dict
(
self
,
enable
):
self
.
_enable_dict
=
enable
def
_trace_func
(
self
,
channels
):
all_actions
=
[
"in"
,
"prep"
,
"midp"
,
"postp"
,
"out"
]
calcu_actions
=
[
"prep"
,
"midp"
,
"postp"
]
...
...
@@ -106,9 +115,14 @@ class PerformanceTracer(object):
if
len
(
op_cost
)
!=
0
:
for
name
in
op_cost
:
tot_cost
,
calcu_cost
=
0.0
,
0.0
count
=
0
for
action
,
costs
in
op_cost
[
name
].
items
():
op_cost
[
name
][
action
]
=
sum
(
costs
)
/
(
1e3
*
len
(
costs
))
tot_cost
+=
op_cost
[
name
][
action
]
if
action
==
"midp"
:
count
=
len
(
costs
)
if
"midp"
in
op_cost
[
name
].
keys
():
op_cost
[
name
][
'count'
]
=
count
if
name
!=
"DAG"
:
_LOGGER
.
info
(
"Op({}):"
.
format
(
name
))
...
...
@@ -121,7 +135,6 @@ class PerformanceTracer(object):
calcu_cost
+=
op_cost
[
name
][
action
]
_LOGGER
.
info
(
"
\t
idle[{}]"
.
format
(
1
-
1.0
*
calcu_cost
/
tot_cost
))
if
_is_profile
:
self
.
profile_dict
=
copy
.
deepcopy
(
op_cost
)
if
"DAG"
in
op_cost
:
...
...
@@ -142,7 +155,7 @@ class PerformanceTracer(object):
for
latency
in
latencys
:
_LOGGER
.
info
(
"
\t\t
.{}[{} ms]"
.
format
(
latency
,
calls
[
int
(
tot
*
latency
/
100.0
)]))
if
_is_profile
:
if
_is_profile
or
self
.
_enable_dict
:
self
.
profile_dict
[
"DAG"
][
"query_count"
]
=
tot
self
.
profile_dict
[
"DAG"
][
"qps"
]
=
qps
self
.
profile_dict
[
"DAG"
][
"succ"
]
=
1
-
1.0
*
err_count
/
tot
...
...
python/pipeline/prometheus_metrics.py
0 → 100644
浏览文件 @
f942f5da
from
prometheus_client
import
Counter
,
generate_latest
,
CollectorRegistry
,
Gauge
registry
=
CollectorRegistry
()
metric_query_success
=
Counter
(
"pd_query_request_success_total"
,
"metric_query_success"
,
registry
=
registry
)
metric_query_failure
=
Counter
(
"pd_query_request_failure_total"
,
"metric_query_failure"
,
registry
=
registry
)
metric_inf_count
=
Counter
(
"pd_inference_count_total"
,
"metric_inf_count"
,
registry
=
registry
)
metric_query_duration_us
=
Counter
(
"pd_query_request_duration_us_total"
,
"metric_query_duration_us"
,
registry
=
registry
)
metric_inf_duration_us
=
Counter
(
"pd_inference_duration_us_total"
,
"metric_inf_duration_us"
,
registry
=
registry
)
python/requirements.txt
浏览文件 @
f942f5da
...
...
@@ -21,3 +21,4 @@ sentencepiece; platform_machine == "aarch64"
opencv-python==4.3.0.38; platform_machine != "aarch64"
opencv-python; platform_machine == "aarch64"
pytest
prometheus-client==0.12.0
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录