Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
PaddlePaddle
Serving
提交
7f8e1139
S
Serving
项目概览
PaddlePaddle
/
Serving
大约 1 年 前同步成功
通知
186
Star
833
Fork
253
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
105
列表
看板
标记
里程碑
合并请求
10
Wiki
2
Wiki
分析
仓库
DevOps
项目成员
Pages
S
Serving
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
105
Issue
105
列表
看板
标记
里程碑
合并请求
10
合并请求
10
Pages
分析
分析
仓库分析
DevOps
Wiki
2
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
7f8e1139
编写于
5月 11, 2021
作者:
T
TeslaZhao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Update Pipeline benchmark & trace logs & OCR Examples
上级
fa52ca27
变更
5
显示空白变更内容
内联
并排
Showing
5 changed file
with
219 addition
and
77 deletion
+219
-77
python/examples/pipeline/ocr/benchmark.py
python/examples/pipeline/ocr/benchmark.py
+81
-18
python/examples/pipeline/ocr/benchmark.sh
python/examples/pipeline/ocr/benchmark.sh
+66
-37
python/examples/pipeline/ocr/web_service.py
python/examples/pipeline/ocr/web_service.py
+7
-4
python/pipeline/channel.py
python/pipeline/channel.py
+43
-6
python/pipeline/operator.py
python/pipeline/operator.py
+22
-12
未找到文件。
python/examples/pipeline/ocr/benchmark.py
浏览文件 @
7f8e1139
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import
sys
import
sys
import
os
import
os
import
base64
import
base64
...
@@ -12,6 +26,8 @@ except ImportError:
...
@@ -12,6 +26,8 @@ except ImportError:
import
numpy
as
np
import
numpy
as
np
from
paddle_serving_client.utils
import
MultiThreadRunner
from
paddle_serving_client.utils
import
MultiThreadRunner
from
paddle_serving_client.utils
import
benchmark_args
,
show_latency
from
paddle_serving_client.utils
import
benchmark_args
,
show_latency
def
parse_benchmark
(
filein
,
fileout
):
def
parse_benchmark
(
filein
,
fileout
):
with
open
(
filein
,
"r"
)
as
fin
:
with
open
(
filein
,
"r"
)
as
fin
:
res
=
yaml
.
load
(
fin
)
res
=
yaml
.
load
(
fin
)
...
@@ -24,6 +40,7 @@ def parse_benchmark(filein, fileout):
...
@@ -24,6 +40,7 @@ def parse_benchmark(filein, fileout):
with
open
(
fileout
,
"w"
)
as
fout
:
with
open
(
fileout
,
"w"
)
as
fout
:
yaml
.
dump
(
res
,
fout
,
default_flow_style
=
False
)
yaml
.
dump
(
res
,
fout
,
default_flow_style
=
False
)
def
gen_yml
(
device
):
def
gen_yml
(
device
):
fin
=
open
(
"config.yml"
,
"r"
)
fin
=
open
(
"config.yml"
,
"r"
)
config
=
yaml
.
load
(
fin
)
config
=
yaml
.
load
(
fin
)
...
@@ -37,48 +54,95 @@ def gen_yml(device):
...
@@ -37,48 +54,95 @@ def gen_yml(device):
with
open
(
"config2.yml"
,
"w"
)
as
fout
:
with
open
(
"config2.yml"
,
"w"
)
as
fout
:
yaml
.
dump
(
config
,
fout
,
default_flow_style
=
False
)
yaml
.
dump
(
config
,
fout
,
default_flow_style
=
False
)
def
cv2_to_base64
(
image
):
def
cv2_to_base64
(
image
):
return
base64
.
b64encode
(
image
).
decode
(
'utf8'
)
return
base64
.
b64encode
(
image
).
decode
(
'utf8'
)
def
run_http
(
idx
,
batch_size
):
def
run_http
(
idx
,
batch_size
):
print
(
"start thread ({})"
.
format
(
idx
))
print
(
"start thread ({})"
.
format
(
idx
))
url
=
"http://127.0.0.1:9999/ocr/prediction"
url
=
"http://127.0.0.1:9999/ocr/prediction"
start
=
time
.
time
()
start
=
time
.
time
()
test_img_dir
=
"rctw_test/images/"
test_img_dir
=
"imgs/"
latency_list
=
[]
total_number
=
0
for
img_file
in
os
.
listdir
(
test_img_dir
):
for
img_file
in
os
.
listdir
(
test_img_dir
):
l_start
=
time
.
time
()
with
open
(
os
.
path
.
join
(
test_img_dir
,
img_file
),
'rb'
)
as
file
:
with
open
(
os
.
path
.
join
(
test_img_dir
,
img_file
),
'rb'
)
as
file
:
image_data1
=
file
.
read
()
image_data1
=
file
.
read
()
image
=
cv2_to_base64
(
image_data1
)
image
=
cv2_to_base64
(
image_data1
)
data
=
{
"key"
:
[
"image"
],
"value"
:
[
image
]}
data
=
{
"key"
:
[
"image"
],
"value"
:
[
image
]}
for
i
in
range
(
100
):
#
for i in range(100):
r
=
requests
.
post
(
url
=
url
,
data
=
json
.
dumps
(
data
))
r
=
requests
.
post
(
url
=
url
,
data
=
json
.
dumps
(
data
))
print
(
r
.
json
())
end
=
time
.
time
()
end
=
time
.
time
()
return
[[
end
-
start
]]
l_end
=
time
.
time
()
latency_list
.
append
(
l_end
*
1000
-
l_start
*
1000
)
total_number
=
total_number
+
1
return
[[
end
-
start
],
latency_list
,
[
total_number
]]
def
multithread_http
(
thread
,
batch_size
):
def
multithread_http
(
thread
,
batch_size
):
multi_thread_runner
=
MultiThreadRunner
()
multi_thread_runner
=
MultiThreadRunner
()
result
=
multi_thread_runner
.
run
(
run_http
,
thread
,
batch_size
)
start
=
time
.
time
()
result
=
multi_thread_runner
.
run
(
run_http
,
thread
,
batch_size
)
end
=
time
.
time
()
total_cost
=
end
-
start
avg_cost
=
0
total_number
=
0
for
i
in
range
(
thread
):
avg_cost
+=
result
[
0
][
i
]
total_number
+=
result
[
2
][
i
]
avg_cost
=
avg_cost
/
thread
print
(
"Total cost: {}s"
.
format
(
total_cost
))
print
(
"Each thread cost: {}s. "
.
format
(
avg_cost
))
print
(
"Total count: {}. "
.
format
(
total_number
))
print
(
"AVG QPS: {} samples/s"
.
format
(
batch_size
*
total_number
/
total_cost
))
show_latency
(
result
[
1
])
def
run_rpc
(
thread
,
batch_size
):
def
run_rpc
(
thread
,
batch_size
):
client
=
PipelineClient
()
client
=
PipelineClient
()
client
.
connect
([
'127.0.0.1:18090'
])
client
.
connect
([
'127.0.0.1:18090'
])
start
=
time
.
time
()
start
=
time
.
time
()
test_img_dir
=
"imgs/"
#test_img_dir = "imgs/"
test_img_dir
=
"rctw_test/images/"
latency_list
=
[]
total_number
=
0
for
img_file
in
os
.
listdir
(
test_img_dir
):
for
img_file
in
os
.
listdir
(
test_img_dir
):
l_start
=
time
.
time
()
with
open
(
os
.
path
.
join
(
test_img_dir
,
img_file
),
'rb'
)
as
file
:
with
open
(
os
.
path
.
join
(
test_img_dir
,
img_file
),
'rb'
)
as
file
:
image_data
=
file
.
read
()
image_data
=
file
.
read
()
image
=
cv2_to_base64
(
image_data
)
image
=
cv2_to_base64
(
image_data
)
for
i
in
range
(
100
):
ret
=
client
.
predict
(
feed_dict
=
{
"image"
:
image
},
fetch
=
[
"res"
])
ret
=
client
.
predict
(
feed_dict
=
{
"image"
:
image
},
fetch
=
[
"res"
])
print
(
ret
)
l_end
=
time
.
time
()
latency_list
.
append
(
l_end
*
1000
-
l_start
*
1000
)
total_number
=
total_number
+
1
end
=
time
.
time
()
end
=
time
.
time
()
return
[[
end
-
start
]]
return
[[
end
-
start
]
,
latency_list
,
[
total_number
]
]
def
multithread_rpc
(
thraed
,
batch_size
):
def
multithread_rpc
(
thraed
,
batch_size
):
multi_thread_runner
=
MultiThreadRunner
()
multi_thread_runner
=
MultiThreadRunner
()
result
=
multi_thread_runner
.
run
(
run_rpc
,
thread
,
batch_size
)
start
=
time
.
time
()
result
=
multi_thread_runner
.
run
(
run_rpc
,
thread
,
batch_size
)
end
=
time
.
time
()
total_cost
=
end
-
start
avg_cost
=
0
total_number
=
0
for
i
in
range
(
thread
):
avg_cost
+=
result
[
0
][
i
]
total_number
+=
result
[
2
][
i
]
avg_cost
=
avg_cost
/
thread
print
(
"Total cost: {}s"
.
format
(
total_cost
))
print
(
"Each thread cost: {}s. "
.
format
(
avg_cost
))
print
(
"Total count: {}. "
.
format
(
total_number
))
print
(
"AVG QPS: {} samples/s"
.
format
(
batch_size
*
total_number
/
total_cost
))
show_latency
(
result
[
1
])
if
__name__
==
"__main__"
:
if
__name__
==
"__main__"
:
if
sys
.
argv
[
1
]
==
"yaml"
:
if
sys
.
argv
[
1
]
==
"yaml"
:
...
@@ -98,4 +162,3 @@ if __name__ == "__main__":
...
@@ -98,4 +162,3 @@ if __name__ == "__main__":
filein
=
sys
.
argv
[
2
]
filein
=
sys
.
argv
[
2
]
fileout
=
sys
.
argv
[
3
]
fileout
=
sys
.
argv
[
3
]
parse_benchmark
(
filein
,
fileout
)
parse_benchmark
(
filein
,
fileout
)
python/examples/pipeline/ocr/benchmark.sh
浏览文件 @
7f8e1139
export
FLAGS_profile_pipeline
=
1
export
FLAGS_profile_pipeline
=
1
alias
python3
=
"python3.
7
"
alias
python3
=
"python3.
6
"
modelname
=
"ocr"
modelname
=
"ocr"
# HTTP
# HTTP
ps
-ef
|
grep
web_service |
awk
'{print $2}'
| xargs
kill
-9
#
ps -ef | grep web_service | awk '{print $2}' | xargs kill -9
sleep
3
sleep
3
python3 benchmark.py yaml local_predictor 1 gpu
# Create yaml,If you already have the config.yaml, ignore it.
#python3 benchmark.py yaml local_predictor 1 gpu
rm
-rf
profile_log_
$modelname
rm
-rf
profile_log_
$modelname
for
thread_num
in
1 8 16
echo
"Starting HTTP Clients..."
# Start a client in each thread, tesing the case of multiple threads.
for
thread_num
in
1 2 4 8 12 16
do
do
for
batch_size
in
1
for
batch_size
in
1
do
do
echo
"----Bert thread num:
$thread_num
batch size:
$batch_size
mode:http ----"
>>
profile_log_
$modelname
echo
'----$modelname thread num: $thread_num batch size: $batch_size mode:http ----'
>>
profile_log_
$modelname
rm
-rf
PipelineServingLogs
# Start one web service, If you start the service yourself, you can ignore it here.
rm
-rf
cpu_utilization.py
#python3 web_service.py >web.log 2>&1 &
python3 web_service.py
>
web.log 2>&1 &
#sleep 3
sleep
3
nvidia-smi
--id
=
2
--query-compute-apps
=
used_memory
--format
=
csv
-lms
100
>
gpu_use.log 2>&1 &
# --id is the serial number of the GPU card, Must be the same as the gpu id used by the server.
nvidia-smi
--id
=
2
--query-gpu
=
utilization.gpu
--format
=
csv
-lms
100
>
gpu_utilization.log 2>&1 &
nvidia-smi
--id
=
3
--query-gpu
=
memory.used
--format
=
csv
-lms
1000
>
gpu_use.log 2>&1 &
nvidia-smi
--id
=
3
--query-gpu
=
utilization.gpu
--format
=
csv
-lms
1000
>
gpu_utilization.log 2>&1 &
echo
"import psutil
\n
cpu_utilization=psutil.cpu_percent(1,False)
\n
print('CPU_UTILIZATION:', cpu_utilization)
\n
"
>
cpu_utilization.py
echo
"import psutil
\n
cpu_utilization=psutil.cpu_percent(1,False)
\n
print('CPU_UTILIZATION:', cpu_utilization)
\n
"
>
cpu_utilization.py
python3 benchmark.py run http
$thread_num
$batch_size
# Start http client
python3 cpu_utilization.py
>>
profile_log_
$modelname
python3 benchmark.py run http
$thread_num
$batch_size
>
profile 2>&1
ps
-ef
|
grep
web_service |
awk
'{print $2}'
| xargs
kill
-9
python3 benchmark.py dump benchmark.log benchmark.tmp
# Collect CPU metrics, Filter data that is zero momentarily, Record the maximum value of GPU memory and the average value of GPU utilization
mv
benchmark.tmp benchmark.log
python3 cpu_utilization.py
>>
profile_log_
$modelname
grep
-av
'^0 %'
gpu_utilization.log
>
gpu_utilization.log.tmp
awk
'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "MAX_GPU_MEMORY:", max}'
gpu_use.log
>>
profile_log_
$modelname
awk
'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "MAX_GPU_MEMORY:", max}'
gpu_use.log
>>
profile_log_
$modelname
awk
'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "GPU_UTILIZATION:", max}'
gpu_utilization.log
>>
profile_log_
$modelname
awk
-F
' '
'{sum+=$1} END {print "GPU_UTILIZATION:", sum/NR, sum, NR }'
gpu_utilization.log.tmp
>>
profile_log_
$modelname
cat
benchmark.log
>>
profile_log_
$modelname
#rm -rf gpu_use.log gpu_utilization.log
# Show profiles
python3 ../../util/show_profile.py profile
$thread_num
>>
profile_log_
$modelname
tail
-n
8 profile
>>
profile_log_
$modelname
echo
''
>>
profile_log_
$modelname
done
done
done
done
# Kill all nvidia-smi background task.
pkill nvidia-smi
echo
"Starting RPC Clients..."
# RPC
# RPC
ps
-ef
|
grep
web_service |
awk
'{print $2}'
| xargs
kill
-9
#
ps -ef | grep web_service | awk '{print $2}' | xargs kill -9
sleep
3
sleep
3
python3 benchmark.py yaml local_predictor 1 gpu
for
thread_num
in
1 8 16
# Create yaml,If you already have the config.yaml, ignore it.
#python3 benchmark.py yaml local_predictor 1 gpu
rm
-rf
profile_log_
$modelname
# Start a client in each thread, tesing the case of multiple threads.
for
thread_num
in
1 2 4 6 8 12 16
do
do
for
batch_size
in
1
for
batch_size
in
1
do
do
echo
"----Bert thread num:
$thread_num
batch size:
$batch_size
mode:rpc ----"
>>
profile_log_
$modelname
echo
"----
$modelname
thread num:
$thread_num
batch size:
$batch_size
mode:rpc ----"
>>
profile_log_
$modelname
rm
-rf
PipelineServingLogs
# Start one web service, If you start the service yourself, you can ignore it here.
rm
-rf
cpu_utilization.py
#python3 web_service.py >web.log 2>&1 &
python3 web_service.py
>
web.log 2>&1 &
#sleep 3
sleep
3
nvidia-smi
--id
=
2
--query-compute-apps
=
used_memory
--format
=
csv
-lms
100
>
gpu_use.log 2>&1 &
# --id is the serial number of the GPU card, Must be the same as the gpu id used by the server.
nvidia-smi
--id
=
2
--query-gpu
=
utilization.gpu
--format
=
csv
-lms
100
>
gpu_utilization.log 2>&1 &
nvidia-smi
--id
=
3
--query-compute-apps
=
used_memory
--format
=
csv
-lms
100
>
gpu_use.log 2>&1 &
nvidia-smi
--id
=
3
--query-gpu
=
utilization.gpu
--format
=
csv
-lms
100
>
gpu_utilization.log 2>&1 &
echo
"import psutil
\n
cpu_utilization=psutil.cpu_percent(1,False)
\n
print('CPU_UTILIZATION:', cpu_utilization)
\n
"
>
cpu_utilization.py
echo
"import psutil
\n
cpu_utilization=psutil.cpu_percent(1,False)
\n
print('CPU_UTILIZATION:', cpu_utilization)
\n
"
>
cpu_utilization.py
python3 benchmark.py run rpc
$thread_num
$batch_size
python3 cpu_utilization.py
>>
profile_log_
$modelname
# Start http client
ps
-ef
|
grep
web_service |
awk
'{print $2}'
| xargs
kill
-9
python3 benchmark.py run rpc
$thread_num
$batch_size
>
profile 2>&1
python3 benchmark.py dump benchmark.log benchmark.tmp
mv
benchmark.tmp benchmark.log
# Collect CPU metrics, Filter data that is zero momentarily, Record the maximum value of GPU memory and the average value of GPU utilization
python3 cpu_utilization.py
>>
profile_log_
$modelname
grep
-av
'^0 %'
gpu_utilization.log
>
gpu_utilization.log.tmp
awk
'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "MAX_GPU_MEMORY:", max}'
gpu_use.log
>>
profile_log_
$modelname
awk
'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "MAX_GPU_MEMORY:", max}'
gpu_use.log
>>
profile_log_
$modelname
awk
'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "GPU_UTILIZATION:", max}'
gpu_utilization.log
>>
profile_log_
$modelname
awk
-F
" "
'{sum+=$1} END {print "GPU_UTILIZATION:", sum/NR, sum, NR }'
gpu_utilization.log.tmp
>>
profile_log_
$modelname
#rm -rf gpu_use.log gpu_utilization.log
cat
benchmark.log
>>
profile_log_
$modelname
# Show profiles
python3 ../../util/show_profile.py profile
$thread_num
>>
profile_log_
$modelname
tail
-n
8 profile
>>
profile_log_
$modelname
echo
""
>>
profile_log_
$modelname
done
done
done
done
# Kill all nvidia-smi background task.
pkill nvidia-smi
python/examples/pipeline/ocr/web_service.py
浏览文件 @
7f8e1139
...
@@ -11,7 +11,7 @@
...
@@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# See the License for the specific language governing permissions and
# limitations under the License.
# limitations under the License.
from
paddle_serving_server.web_service
import
WebService
,
Op
from
paddle_serving_server
_gpu
.web_service
import
WebService
,
Op
import
logging
import
logging
import
numpy
as
np
import
numpy
as
np
import
cv2
import
cv2
...
@@ -45,6 +45,7 @@ class DetOp(Op):
...
@@ -45,6 +45,7 @@ class DetOp(Op):
imgs
=
[]
imgs
=
[]
for
key
in
input_dict
.
keys
():
for
key
in
input_dict
.
keys
():
data
=
base64
.
b64decode
(
input_dict
[
key
].
encode
(
'utf8'
))
data
=
base64
.
b64decode
(
input_dict
[
key
].
encode
(
'utf8'
))
self
.
raw_im
=
data
data
=
np
.
frombuffer
(
data
,
np
.
uint8
)
data
=
np
.
frombuffer
(
data
,
np
.
uint8
)
self
.
im
=
cv2
.
imdecode
(
data
,
cv2
.
IMREAD_COLOR
)
self
.
im
=
cv2
.
imdecode
(
data
,
cv2
.
IMREAD_COLOR
)
self
.
ori_h
,
self
.
ori_w
,
_
=
self
.
im
.
shape
self
.
ori_h
,
self
.
ori_w
,
_
=
self
.
im
.
shape
...
@@ -61,7 +62,7 @@ class DetOp(Op):
...
@@ -61,7 +62,7 @@ class DetOp(Op):
]
]
dt_boxes_list
=
self
.
post_func
(
det_out
,
[
ratio_list
])
dt_boxes_list
=
self
.
post_func
(
det_out
,
[
ratio_list
])
dt_boxes
=
self
.
filter_func
(
dt_boxes_list
[
0
],
[
self
.
ori_h
,
self
.
ori_w
])
dt_boxes
=
self
.
filter_func
(
dt_boxes_list
[
0
],
[
self
.
ori_h
,
self
.
ori_w
])
out_dict
=
{
"dt_boxes"
:
dt_boxes
,
"image"
:
self
.
im
}
out_dict
=
{
"dt_boxes"
:
dt_boxes
,
"image"
:
self
.
raw_
im
}
return
out_dict
,
None
,
""
return
out_dict
,
None
,
""
...
@@ -73,7 +74,9 @@ class RecOp(Op):
...
@@ -73,7 +74,9 @@ class RecOp(Op):
def
preprocess
(
self
,
input_dicts
,
data_id
,
log_id
):
def
preprocess
(
self
,
input_dicts
,
data_id
,
log_id
):
(
_
,
input_dict
),
=
input_dicts
.
items
()
(
_
,
input_dict
),
=
input_dicts
.
items
()
im
=
input_dict
[
"image"
]
raw_im
=
input_dict
[
"image"
]
data
=
np
.
frombuffer
(
raw_im
,
np
.
uint8
)
im
=
cv2
.
imdecode
(
data
,
cv2
.
IMREAD_COLOR
)
dt_boxes
=
input_dict
[
"dt_boxes"
]
dt_boxes
=
input_dict
[
"dt_boxes"
]
dt_boxes
=
self
.
sorted_boxes
(
dt_boxes
)
dt_boxes
=
self
.
sorted_boxes
(
dt_boxes
)
feed_list
=
[]
feed_list
=
[]
...
@@ -99,7 +102,7 @@ class RecOp(Op):
...
@@ -99,7 +102,7 @@ class RecOp(Op):
"""
"""
## Many mini-batchs, the type of feed_data is list.
## Many mini-batchs, the type of feed_data is list.
max_batch_size
=
6
#
len(dt_boxes)
max_batch_size
=
len
(
dt_boxes
)
# If max_batch_size is 0, skipping predict stage
# If max_batch_size is 0, skipping predict stage
if
max_batch_size
==
0
:
if
max_batch_size
==
0
:
...
...
python/pipeline/channel.py
浏览文件 @
7f8e1139
...
@@ -122,6 +122,17 @@ class ChannelData(object):
...
@@ -122,6 +122,17 @@ class ChannelData(object):
self
.
client_need_profile
=
client_need_profile
self
.
client_need_profile
=
client_need_profile
self
.
profile_data_set
=
set
()
self
.
profile_data_set
=
set
()
def
get_size
(
self
):
size
=
0
dict_data
=
None
if
isinstance
(
self
.
dictdata
,
dict
):
for
k
in
self
.
dictdata
:
size
+=
sys
.
getsizeof
(
self
.
dictdata
[
k
])
+
sys
.
getsizeof
(
k
)
if
isinstance
(
self
.
npdata
,
dict
):
for
k
in
self
.
npdata
:
size
+=
sys
.
getsizeof
(
self
.
npdata
[
k
])
+
sys
.
getsizeof
(
k
)
return
size
def
add_profile
(
self
,
profile_set
):
def
add_profile
(
self
,
profile_set
):
if
self
.
client_need_profile
is
False
:
if
self
.
client_need_profile
is
False
:
self
.
client_need_profile
=
True
self
.
client_need_profile
=
True
...
@@ -213,10 +224,10 @@ class ChannelData(object):
...
@@ -213,10 +224,10 @@ class ChannelData(object):
else
:
else
:
return
1
return
1
def
__str__
(
self
):
def
get_all_data
(
self
):
return
"type[{}], error_code[{}], data_id[{}], log_id[{}], dict_
data
[{}]"
.
format
(
return
"type[{}], error_code[{}], data_id[{}], log_id[{}], dict_
size
[{}]"
.
format
(
ChannelDataType
(
self
.
datatype
).
name
,
self
.
error_code
,
self
.
id
,
ChannelDataType
(
self
.
datatype
).
name
,
self
.
error_code
,
self
.
id
,
self
.
log_id
,
s
tr
(
self
.
dictdata
))
self
.
log_id
,
s
elf
.
get_size
(
))
class
ProcessChannel
(
object
):
class
ProcessChannel
(
object
):
...
@@ -313,8 +324,10 @@ class ProcessChannel(object):
...
@@ -313,8 +324,10 @@ class ProcessChannel(object):
def
push
(
self
,
channeldata
,
op_name
=
None
):
def
push
(
self
,
channeldata
,
op_name
=
None
):
_LOGGER
.
debug
(
_LOGGER
.
debug
(
self
.
_log
(
"(data_id={} log_id={}) Op({}) Enter channel::push"
.
self
.
_log
(
format
(
channeldata
.
id
,
channeldata
.
log_id
,
op_name
)))
"(data_id={} log_id={}) Op({}) Enter channel::push producers:{}"
.
format
(
channeldata
.
id
,
channeldata
.
log_id
,
op_name
,
len
(
self
.
_producers
))))
if
len
(
self
.
_producers
)
==
0
:
if
len
(
self
.
_producers
)
==
0
:
_LOGGER
.
critical
(
_LOGGER
.
critical
(
self
.
_log
(
self
.
_log
(
...
@@ -323,19 +336,30 @@ class ProcessChannel(object):
...
@@ -323,19 +336,30 @@ class ProcessChannel(object):
format
(
channeldata
.
id
,
channeldata
.
log_id
,
op_name
)))
format
(
channeldata
.
id
,
channeldata
.
log_id
,
op_name
)))
os
.
_exit
(
-
1
)
os
.
_exit
(
-
1
)
elif
len
(
self
.
_producers
)
==
1
:
elif
len
(
self
.
_producers
)
==
1
:
start_time
=
_time
()
with
self
.
_cv
:
with
self
.
_cv
:
enter_cv_time
=
_time
()
push_que_time
=
enter_cv_time
while
self
.
_stop
.
value
==
0
:
while
self
.
_stop
.
value
==
0
:
try
:
try
:
self
.
_que
.
put
((
channeldata
.
id
,
{
self
.
_que
.
put
((
channeldata
.
id
,
{
op_name
:
channeldata
op_name
:
channeldata
}),
}),
timeout
=
0
)
timeout
=
0
)
push_que_time
=
_time
()
break
break
except
Queue
.
Full
:
except
Queue
.
Full
:
self
.
_cv
.
wait
()
self
.
_cv
.
wait
()
if
self
.
_stop
.
value
==
1
:
if
self
.
_stop
.
value
==
1
:
raise
ChannelStopError
()
raise
ChannelStopError
()
self
.
_cv
.
notify_all
()
self
.
_cv
.
notify_all
()
notify_all_time
=
_time
()
_LOGGER
.
debug
(
"(data_id={}) Op({}) channel push cost! enter_cv:{} ms, push_que:{} ms, notify:{} ms, data_size:{}"
.
format
(
channeldata
.
id
,
op_name
,
(
enter_cv_time
-
start_time
)
*
1000
,
(
push_que_time
-
enter_cv_time
)
*
1000
,
(
notify_all_time
-
push_que_time
)
*
1000
,
channeldata
.
get_size
()))
_LOGGER
.
debug
(
_LOGGER
.
debug
(
self
.
_log
(
self
.
_log
(
"(data_id={} log_id={}) Op({}) Pushed data into internal queue."
.
"(data_id={} log_id={}) Op({}) Pushed data into internal queue."
.
...
@@ -414,10 +438,15 @@ class ProcessChannel(object):
...
@@ -414,10 +438,15 @@ class ProcessChannel(object):
os
.
_exit
(
-
1
)
os
.
_exit
(
-
1
)
elif
len
(
self
.
_consumer_cursors
)
==
1
:
elif
len
(
self
.
_consumer_cursors
)
==
1
:
resp
=
None
resp
=
None
time_1
=
int
(
round
(
_time
()
*
1000000
))
time_2
=
time_1
time_3
=
time_2
with
self
.
_cv
:
with
self
.
_cv
:
time_2
=
int
(
round
(
_time
()
*
1000000
))
while
self
.
_stop
.
value
==
0
and
resp
is
None
:
while
self
.
_stop
.
value
==
0
and
resp
is
None
:
try
:
try
:
resp
=
self
.
_que
.
get
(
timeout
=
0
)[
1
]
resp
=
self
.
_que
.
get
(
timeout
=
0
)[
1
]
time_3
=
int
(
round
(
_time
()
*
1000000
))
break
break
except
Queue
.
Empty
:
except
Queue
.
Empty
:
if
timeout
is
not
None
:
if
timeout
is
not
None
:
...
@@ -432,7 +461,12 @@ class ProcessChannel(object):
...
@@ -432,7 +461,12 @@ class ProcessChannel(object):
self
.
_cv
.
wait
()
self
.
_cv
.
wait
()
if
self
.
_stop
.
value
==
1
:
if
self
.
_stop
.
value
==
1
:
raise
ChannelStopError
()
raise
ChannelStopError
()
key
=
list
(
resp
.
keys
())[
0
]
data_id
=
resp
[
key
].
id
_LOGGER
.
debug
(
"(data_id={}) op({}) front cost enter_cv:{} ms, queue_get:{} ms"
.
format
(
data_id
,
op_name
,
(
time_2
-
time_1
)
/
1000.0
,
(
time_3
-
time_2
)
/
1000.0
))
if
resp
is
not
None
:
if
resp
is
not
None
:
list_values
=
list
(
resp
.
values
())
list_values
=
list
(
resp
.
values
())
_LOGGER
.
debug
(
_LOGGER
.
debug
(
...
@@ -485,6 +519,7 @@ class ProcessChannel(object):
...
@@ -485,6 +519,7 @@ class ProcessChannel(object):
if
self
.
_stop
.
value
==
1
:
if
self
.
_stop
.
value
==
1
:
raise
ChannelStopError
()
raise
ChannelStopError
()
time_1
=
int
(
round
(
_time
()
*
1000000
))
consumer_cursor
=
self
.
_consumer_cursors
[
op_name
]
consumer_cursor
=
self
.
_consumer_cursors
[
op_name
]
base_cursor
=
self
.
_base_cursor
.
value
base_cursor
=
self
.
_base_cursor
.
value
data_idx
=
consumer_cursor
-
base_cursor
data_idx
=
consumer_cursor
-
base_cursor
...
@@ -519,6 +554,8 @@ class ProcessChannel(object):
...
@@ -519,6 +554,8 @@ class ProcessChannel(object):
self
.
_cursor_count
[
new_consumer_cursor
]
+=
1
self
.
_cursor_count
[
new_consumer_cursor
]
+=
1
self
.
_cv
.
notify_all
()
self
.
_cv
.
notify_all
()
time_2
=
int
(
round
(
_time
()
*
1000000
))
#_LOGGER.warning("self._cv logic cost:{}".format(time2 - time1))
if
resp
is
not
None
:
if
resp
is
not
None
:
list_values
=
list
(
resp
.
values
())
list_values
=
list
(
resp
.
values
())
...
...
python/pipeline/operator.py
浏览文件 @
7f8e1139
...
@@ -123,7 +123,7 @@ class Op(object):
...
@@ -123,7 +123,7 @@ class Op(object):
if
self
.
_auto_batching_timeout
is
None
:
if
self
.
_auto_batching_timeout
is
None
:
self
.
_auto_batching_timeout
=
conf
[
"auto_batching_timeout"
]
self
.
_auto_batching_timeout
=
conf
[
"auto_batching_timeout"
]
if
self
.
_auto_batching_timeout
<=
0
or
self
.
_batch_size
==
1
:
if
self
.
_auto_batching_timeout
<=
0
or
self
.
_batch_size
==
1
:
_LOGGER
.
warnin
g
(
_LOGGER
.
debu
g
(
self
.
_log
(
self
.
_log
(
"Because auto_batching_timeout <= 0 or batch_size == 1,"
"Because auto_batching_timeout <= 0 or batch_size == 1,"
" set auto_batching_timeout to None."
))
" set auto_batching_timeout to None."
))
...
@@ -1005,6 +1005,7 @@ class Op(object):
...
@@ -1005,6 +1005,7 @@ class Op(object):
for
idx
in
range
(
batch_size
):
for
idx
in
range
(
batch_size
):
try
:
try
:
channeldata_dict
=
None
channeldata_dict
=
None
front_start_time
=
int
(
round
(
_time
()
*
1000000
))
if
timeout
is
not
None
:
if
timeout
is
not
None
:
remaining
=
endtime
-
_time
()
remaining
=
endtime
-
_time
()
if
remaining
<=
0.0
:
if
remaining
<=
0.0
:
...
@@ -1017,8 +1018,8 @@ class Op(object):
...
@@ -1017,8 +1018,8 @@ class Op(object):
channeldata_dict
=
input_channel
.
front
(
op_name
)
channeldata_dict
=
input_channel
.
front
(
op_name
)
batch
.
append
(
channeldata_dict
)
batch
.
append
(
channeldata_dict
)
_LOGGER
.
debug
(
_LOGGER
.
debug
(
"_auto_batching_generator get {} channeldata from op:{} in
to batch, batch_size:
{}"
.
"_auto_batching_generator get {} channeldata from op:{} in
put channel. time=
{}"
.
format
(
idx
,
op_name
,
batch_siz
e
))
format
(
idx
,
op_name
,
front_start_tim
e
))
except
ChannelTimeoutError
:
except
ChannelTimeoutError
:
_LOGGER
.
debug
(
"{} Failed to generate batch: "
_LOGGER
.
debug
(
"{} Failed to generate batch: "
"timeout"
.
format
(
op_info_prefix
))
"timeout"
.
format
(
op_info_prefix
))
...
@@ -1152,6 +1153,13 @@ class Op(object):
...
@@ -1152,6 +1153,13 @@ class Op(object):
# data in the whole batch is all error data
# data in the whole batch is all error data
continue
continue
# print
front_cost
=
int
(
round
(
_time
()
*
1000000
))
-
start
for
data_id
,
parsed_data
in
parsed_data_dict
.
items
():
_LOGGER
.
debug
(
"(data_id={}) POP INPUT CHANNEL! op:{}, cost:{} ms"
.
format
(
data_id
,
self
.
name
,
front_cost
/
1000.0
))
# preprecess
# preprecess
start
=
profiler
.
record
(
"prep#{}_0"
.
format
(
op_info_prefix
))
start
=
profiler
.
record
(
"prep#{}_0"
.
format
(
op_info_prefix
))
preped_data_dict
,
err_channeldata_dict
,
skip_process_dict
\
preped_data_dict
,
err_channeldata_dict
,
skip_process_dict
\
...
@@ -1199,6 +1207,7 @@ class Op(object):
...
@@ -1199,6 +1207,7 @@ class Op(object):
=
self
.
_run_postprocess
(
parsed_data_dict
,
midped_data_dict
,
op_info_prefix
,
logid_dict
)
=
self
.
_run_postprocess
(
parsed_data_dict
,
midped_data_dict
,
op_info_prefix
,
logid_dict
)
end
=
profiler
.
record
(
"postp#{}_1"
.
format
(
op_info_prefix
))
end
=
profiler
.
record
(
"postp#{}_1"
.
format
(
op_info_prefix
))
postp_time
=
end
-
start
postp_time
=
end
-
start
after_postp_time
=
_time
()
try
:
try
:
for
data_id
,
err_channeldata
in
err_channeldata_dict
.
items
():
for
data_id
,
err_channeldata
in
err_channeldata_dict
.
items
():
self
.
_push_to_output_channels
(
self
.
_push_to_output_channels
(
...
@@ -1212,7 +1221,6 @@ class Op(object):
...
@@ -1212,7 +1221,6 @@ class Op(object):
break
break
if
len
(
postped_data_dict
)
==
0
:
if
len
(
postped_data_dict
)
==
0
:
continue
continue
# push data to channel (if run succ)
# push data to channel (if run succ)
start
=
int
(
round
(
_time
()
*
1000000
))
start
=
int
(
round
(
_time
()
*
1000000
))
try
:
try
:
...
@@ -1226,12 +1234,21 @@ class Op(object):
...
@@ -1226,12 +1234,21 @@ class Op(object):
profile_str
=
profile_str
,
profile_str
=
profile_str
,
client_need_profile
=
need_profile_dict
[
data_id
],
client_need_profile
=
need_profile_dict
[
data_id
],
profile_set
=
profile_dict
[
data_id
])
profile_set
=
profile_dict
[
data_id
])
after_outchannel_time
=
_time
()
_LOGGER
.
debug
(
"(data_id={}) PUSH OUTPUT CHANNEL! op:{} push cost:{} ms"
.
format
(
data_id
,
self
.
name
,
(
after_outchannel_time
-
after_postp_time
)
*
1000
))
_LOGGER
.
debug
(
"(data_id={}) PUSH OUTPUT CHANNEL! op:{} push data:{}"
.
format
(
data_id
,
self
.
name
,
postped_data
.
get_all_data
()))
except
ChannelStopError
:
except
ChannelStopError
:
_LOGGER
.
debug
(
"{} Stop."
.
format
(
op_info_prefix
))
_LOGGER
.
debug
(
"{} Stop."
.
format
(
op_info_prefix
))
self
.
_finalize
(
is_thread_op
)
self
.
_finalize
(
is_thread_op
)
break
break
end
=
int
(
round
(
_time
()
*
1000000
))
end
=
int
(
round
(
_time
()
*
1000000
))
out_time
=
end
-
start
out_time
=
end
-
start
after_outchannel_time
=
int
(
round
(
_time
()
*
1000000
))
if
trace_buffer
is
not
None
:
if
trace_buffer
is
not
None
:
trace_que
.
append
({
trace_que
.
append
({
"name"
:
self
.
name
,
"name"
:
self
.
name
,
...
@@ -1345,14 +1362,7 @@ class RequestOp(Op):
...
@@ -1345,14 +1362,7 @@ class RequestOp(Op):
raise
ValueError
(
"request is None"
)
raise
ValueError
(
"request is None"
)
for
idx
,
key
in
enumerate
(
request
.
key
):
for
idx
,
key
in
enumerate
(
request
.
key
):
data
=
request
.
value
[
idx
]
dict_data
[
key
]
=
request
.
value
[
idx
]
try
:
evaled_data
=
eval
(
data
)
if
isinstance
(
evaled_data
,
np
.
ndarray
):
data
=
evaled_data
except
Exception
as
e
:
pass
dict_data
[
key
]
=
data
log_id
=
request
.
logid
log_id
=
request
.
logid
_LOGGER
.
info
(
"RequestOp unpack one request. log_id:{}, clientip:{}
\
_LOGGER
.
info
(
"RequestOp unpack one request. log_id:{}, clientip:{}
\
name:{}, method:{}"
.
format
(
log_id
,
request
.
clientip
,
request
.
name
,
name:{}, method:{}"
.
format
(
log_id
,
request
.
clientip
,
request
.
name
,
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录