未验证 提交 592fe770 编写于 作者: J Jiawei Wang 提交者: GitHub

Merge pull request #1083 from wangjiawei04/develop

pipeline benchmark
import sys
import os
import yaml
import requests
import time
import json
from paddle_serving_server_gpu.pipeline import PipelineClient
except ImportError:
from paddle_serving_server.pipeline import PipelineClient
import numpy as np
from paddle_serving_client.utils import MultiThreadRunner
from paddle_serving_client.utils import benchmark_args, show_latency
2021-03-16 10:26:01,832 ==================== TRACER ======================
2021-03-16 10:26:01,838 Op(bert):
2021-03-16 10:26:01,838 in[5.7833 ms]
2021-03-16 10:26:01,838 prep[8.2001 ms]
2021-03-16 10:26:01,838 midp[198.79853333333332 ms]
2021-03-16 10:26:01,839 postp[0.8411 ms]
2021-03-16 10:26:01,839 out[0.9440666666666667 ms]
2021-03-16 10:26:01,839 idle[0.03135320683677345]
2021-03-16 10:26:01,839 DAGExecutor:
2021-03-16 10:26:01,839 Query count[30]
2021-03-16 10:26:01,839 QPS[3.0 q/s]
2021-03-16 10:26:01,839 Succ[1.0]
2021-03-16 10:26:01,839 Error req[]
2021-03-16 10:26:01,839 Latency:
2021-03-16 10:26:01,839 ave[237.85519999999997 ms]
2021-03-16 10:26:01,839 .50[179.937 ms]
2021-03-16 10:26:01,839 .60[179.994 ms]
2021-03-16 10:26:01,839 .70[180.515 ms]
2021-03-16 10:26:01,840 .80[180.735 ms]
2021-03-16 10:26:01,840 .90[182.275 ms]
2021-03-16 10:26:01,840 .95[182.789 ms]
2021-03-16 10:26:01,840 .99[1921.33 ms]
2021-03-16 10:26:01,840 Channel (server worker num[1]):
2021-03-16 10:26:01,840 chl0(In: ['@DAGExecutor'], Out: ['bert']) size[0/0]
2021-03-16 10:26:01,841 chl1(In: ['bert'], Out: ['@DAGExecutor']) size[0/0]
def parse_benchmark(filein, fileout):
with open(filein, "r") as fin:
res = yaml.load(fin)
del_list = []
for key in res["DAG"].keys():
if "call" in key:
for key in del_list:
del res["DAG"][key]
with open(fileout, "w") as fout:
yaml.dump(res, fout, default_flow_style=False)
def gen_yml(device):
fin = open("config.yml", "r")
config = yaml.load(fin)
config["dag"]["tracer"] = {"interval_s": 10}
if device == "gpu":
config["op"]["bert"]["local_service_conf"]["device_type"] = 1
config["op"]["bert"]["local_service_conf"]["devices"] = "2"
with open("config2.yml", "w") as fout:
yaml.dump(config, fout, default_flow_style=False)
def run_http(idx, batch_size):
print("start thread ({})".format(idx))
url = ""
start = time.time()
with open("data-c.txt", 'r') as fin:
start = time.time()
lines = fin.readlines()
start_idx = 0
while start_idx < len(lines):
end_idx = min(len(lines), start_idx + batch_size)
feed = {}
for i in range(start_idx, end_idx):
feed[str(i - start_idx)] = lines[i]
keys = list(feed.keys())
values = [feed[x] for x in keys]
data = {"key": keys, "value": values}
r = requests.post(url=url, data=json.dumps(data))
start_idx += batch_size
if start_idx > 2000:
end = time.time()
return [[end - start]]
def multithread_http(thread, batch_size):
multi_thread_runner = MultiThreadRunner()
result = multi_thread_runner.run(run_http , thread, batch_size)
def run_rpc(thread, batch_size):
client = PipelineClient()
with open("data-c.txt", 'r') as fin:
start = time.time()
lines = fin.readlines()
start_idx = 0
while start_idx < len(lines):
end_idx = min(len(lines), start_idx + batch_size)
feed = {}
for i in range(start_idx, end_idx):
feed[str(i - start_idx)] = lines[i]
ret = client.predict(feed_dict=feed, fetch=["res"])
start_idx += batch_size
if start_idx > 1000:
end = time.time()
return [[end - start]]
def multithread_rpc(thraed, batch_size):
multi_thread_runner = MultiThreadRunner()
result = multi_thread_runner.run(run_rpc , thread, batch_size)
if __name__ == "__main__":
if sys.argv[1] == "yaml":
mode = sys.argv[2] # brpc/ local predictor
thread = int(sys.argv[3])
device = sys.argv[4]
elif sys.argv[1] == "run":
mode = sys.argv[2] # http/ rpc
thread = int(sys.argv[3])
batch_size = int(sys.argv[4])
if mode == "http":
multithread_http(thread, batch_size)
elif mode == "rpc":
multithread_rpc(thread, batch_size)
elif sys.argv[1] == "dump":
filein = sys.argv[2]
fileout = sys.argv[3]
parse_benchmark(filein, fileout)
export FLAGS_profile_pipeline=1
alias python3="python3.7"
ps -ef | grep web_service | awk '{print $2}' | xargs kill -9
sleep 3
python3 benchmark.py yaml local_predictor 1 gpu
rm -rf profile_log_$modelname
for thread_num in 1 8 16
for batch_size in 1 10 100
echo "----Bert thread num: $thread_num batch size: $batch_size mode:http ----" >>profile_log_$modelname
rm -rf PipelineServingLogs
rm -rf cpu_utilization.py
python3 web_service.py >web.log 2>&1 &
sleep 3
nvidia-smi --id=2 --query-compute-apps=used_memory --format=csv -lms 100 > gpu_use.log 2>&1 &
nvidia-smi --id=2 --query-gpu=utilization.gpu --format=csv -lms 100 > gpu_utilization.log 2>&1 &
echo "import psutil\ncpu_utilization=psutil.cpu_percent(1,False)\nprint('CPU_UTILIZATION:', cpu_utilization)\n" > cpu_utilization.py
python3 benchmark.py run http $thread_num $batch_size
python3 cpu_utilization.py >>profile_log_$modelname
ps -ef | grep web_service | awk '{print $2}' | xargs kill -9
python3 benchmark.py dump benchmark.log benchmark.tmp
mv benchmark.tmp benchmark.log
awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "MAX_GPU_MEMORY:", max}' gpu_use.log >> profile_log_$modelname
awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "GPU_UTILIZATION:", max}' gpu_utilization.log >> profile_log_$modelname
cat benchmark.log >> profile_log_$modelname
#rm -rf gpu_use.log gpu_utilization.log
ps -ef | grep web_service | awk '{print $2}' | xargs kill -9
sleep 3
python3 benchmark.py yaml local_predictor 1 gpu
for thread_num in 1 8 16
for batch_size in 1 10 100
echo "----Bert thread num: $thread_num batch size: $batch_size mode:rpc ----" >>profile_log_$modelname
rm -rf PipelineServingLogs
rm -rf cpu_utilization.py
python3 web_service.py >web.log 2>&1 &
sleep 3
nvidia-smi --id=2 --query-compute-apps=used_memory --format=csv -lms 100 > gpu_use.log 2>&1 &
nvidia-smi --id=2 --query-gpu=utilization.gpu --format=csv -lms 100 > gpu_utilization.log 2>&1 &
echo "import psutil\ncpu_utilization=psutil.cpu_percent(1,False)\nprint('CPU_UTILIZATION:', cpu_utilization)\n" > cpu_utilization.py
python3 benchmark.py run rpc $thread_num $batch_size
python3 cpu_utilization.py >>profile_log_$modelname
ps -ef | grep web_service | awk '{print $2}' | xargs kill -9
python3 benchmark.py dump benchmark.log benchmark.tmp
mv benchmark.tmp benchmark.log
awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "MAX_GPU_MEMORY:", max}' gpu_use.log >> profile_log_$modelname
awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "GPU_UTILIZATION:", max}' gpu_utilization.log >> profile_log_$modelname
#rm -rf gpu_use.log gpu_utilization.log
cat benchmark.log >> profile_log_$modelname
is_thread_op: false
interval_s: 10
http_port: 18082
client_type: local_predictor
concurrency: 2
device_type: 1
devices: '2'
- pooled_output
model_config: bert_seq128_model/
rpc_port: 9998
worker_num: 20
wget https://paddle-serving.bj.bcebos.com/paddle_hub_models/text/SemanticModel/bert_chinese_L-12_H-768_A-12.tar.gz
tar -xzf bert_chinese_L-12_H-768_A-12.tar.gz
mv bert_chinese_L-12_H-768_A-12_model bert_seq128_model
mv bert_chinese_L-12_H-768_A-12_client bert_seq128_client
wget https://paddle-serving.bj.bcebos.com/bert_example/data-c.txt --no-check-certificate
wget https://paddle-serving.bj.bcebos.com/bert_example/vocab.txt --no-check-certificate
import sys
import os
import yaml
import requests
import time
import json
from paddle_serving_server_gpu.pipeline import PipelineClient
except ImportError:
from paddle_serving_server.pipeline import PipelineClient
import numpy as np
client = PipelineClient()
batch_size = 101
with open("data-c.txt", 'r') as fin:
lines = fin.readlines()
start_idx = 0
while start_idx < len(lines):
end_idx = min(len(lines), start_idx + batch_size)
feed = {}
for i in range(start_idx, end_idx):
feed[str(i - start_idx)] = lines[i]
ret = client.predict(feed_dict=feed, fetch=["res"])
start_idx += batch_size
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
from paddle_serving_server_gpu.web_service import WebService, Op
except ImportError:
from paddle_serving_server.web_service import WebService, Op
import logging
import numpy as np
import sys
from paddle_serving_app.reader import ChineseBertReader
_LOGGER = logging.getLogger()
class BertOp(Op):
def init_op(self):
self.reader = ChineseBertReader({
"vocab_file": "vocab.txt",
"max_seq_len": 128
def preprocess(self, input_dicts, data_id, log_id):
(_, input_dict), = input_dicts.items()
print("input dict", input_dict)
batch_size = len(input_dict.keys())
feed_res = []
for i in range(batch_size):
feed_dict = self.reader.process(input_dict[str(i)].encode("utf-8"))
for key in feed_dict.keys():
feed_dict[key] = np.array(feed_dict[key]).reshape((1, len(feed_dict[key]), 1))
feed_dict = {}
for key in feed_res[0].keys():
feed_dict[key] = np.concatenate([x[key] for x in feed_res], axis=0)
print(key, feed_dict[key].shape)
return feed_dict, False, None, ""
def postprocess(self, input_dicts, fetch_dict, log_id):
fetch_dict["pooled_output"] = str(fetch_dict["pooled_output"])
return fetch_dict, None, ""
class BertService(WebService):
def get_pipeline_response(self, read_op):
bert_op = BertOp(name="bert", input_ops=[read_op])
return bert_op
bert_service = BertService(name="bert")
import sys
import os
import base64
import yaml
import requests
import time
import json
from paddle_serving_server_gpu.pipeline import PipelineClient
except ImportError:
from paddle_serving_server.pipeline import PipelineClient
import numpy as np
from paddle_serving_client.utils import MultiThreadRunner
from paddle_serving_client.utils import benchmark_args, show_latency
def parse_benchmark(filein, fileout):
with open(filein, "r") as fin:
res = yaml.load(fin)
del_list = []
for key in res["DAG"].keys():
if "call" in key:
for key in del_list:
del res["DAG"][key]
with open(fileout, "w") as fout:
yaml.dump(res, fout, default_flow_style=False)
def gen_yml(device):
fin = open("config.yml", "r")
config = yaml.load(fin)
config["dag"]["tracer"] = {"interval_s": 10}
if device == "gpu":
config["op"]["det"]["local_service_conf"]["device_type"] = 1
config["op"]["det"]["local_service_conf"]["devices"] = "2"
config["op"]["rec"]["local_service_conf"]["device_type"] = 1
config["op"]["rec"]["local_service_conf"]["devices"] = "2"
with open("config2.yml", "w") as fout:
yaml.dump(config, fout, default_flow_style=False)
def cv2_to_base64(image):
return base64.b64encode(image).decode('utf8')
def run_http(idx, batch_size):
print("start thread ({})".format(idx))
url = ""
start = time.time()
test_img_dir = "imgs/"
for img_file in os.listdir(test_img_dir):
with open(os.path.join(test_img_dir, img_file), 'rb') as file:
image_data1 = file.read()
image = cv2_to_base64(image_data1)
data = {"key": ["image"], "value": [image]}
for i in range(100):
r = requests.post(url=url, data=json.dumps(data))
end = time.time()
return [[end - start]]
def multithread_http(thread, batch_size):
multi_thread_runner = MultiThreadRunner()
result = multi_thread_runner.run(run_http , thread, batch_size)
def run_rpc(thread, batch_size):
client = PipelineClient()
start = time.time()
test_img_dir = "imgs/"
for img_file in os.listdir(test_img_dir):
with open(os.path.join(test_img_dir, img_file), 'rb') as file:
image_data = file.read()
image = cv2_to_base64(image_data)
for i in range(100):
ret = client.predict(feed_dict={"image": image}, fetch=["res"])
end = time.time()
return [[end - start]]
def multithread_rpc(thraed, batch_size):
multi_thread_runner = MultiThreadRunner()
result = multi_thread_runner.run(run_rpc , thread, batch_size)
if __name__ == "__main__":
if sys.argv[1] == "yaml":
mode = sys.argv[2] # brpc/ local predictor
thread = int(sys.argv[3])
device = sys.argv[4]
elif sys.argv[1] == "run":
mode = sys.argv[2] # http/ rpc
thread = int(sys.argv[3])
batch_size = int(sys.argv[4])
if mode == "http":
multithread_http(thread, batch_size)
elif mode == "rpc":
multithread_rpc(thread, batch_size)
elif sys.argv[1] == "dump":
filein = sys.argv[2]
fileout = sys.argv[3]
parse_benchmark(filein, fileout)
export FLAGS_profile_pipeline=1
alias python3="python3.7"
ps -ef | grep web_service | awk '{print $2}' | xargs kill -9
sleep 3
python3 benchmark.py yaml local_predictor 1 gpu
rm -rf profile_log_$modelname
for thread_num in 1 8 16
for batch_size in 1
echo "----Bert thread num: $thread_num batch size: $batch_size mode:http ----" >>profile_log_$modelname
rm -rf PipelineServingLogs
rm -rf cpu_utilization.py
python3 web_service.py >web.log 2>&1 &
sleep 3
nvidia-smi --id=2 --query-compute-apps=used_memory --format=csv -lms 100 > gpu_use.log 2>&1 &
nvidia-smi --id=2 --query-gpu=utilization.gpu --format=csv -lms 100 > gpu_utilization.log 2>&1 &
echo "import psutil\ncpu_utilization=psutil.cpu_percent(1,False)\nprint('CPU_UTILIZATION:', cpu_utilization)\n" > cpu_utilization.py
python3 benchmark.py run http $thread_num $batch_size
python3 cpu_utilization.py >>profile_log_$modelname
ps -ef | grep web_service | awk '{print $2}' | xargs kill -9
python3 benchmark.py dump benchmark.log benchmark.tmp
mv benchmark.tmp benchmark.log
awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "MAX_GPU_MEMORY:", max}' gpu_use.log >> profile_log_$modelname
awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "GPU_UTILIZATION:", max}' gpu_utilization.log >> profile_log_$modelname
cat benchmark.log >> profile_log_$modelname
#rm -rf gpu_use.log gpu_utilization.log
ps -ef | grep web_service | awk '{print $2}' | xargs kill -9
sleep 3
python3 benchmark.py yaml local_predictor 1 gpu
for thread_num in 1 8 16
for batch_size in 1
echo "----Bert thread num: $thread_num batch size: $batch_size mode:rpc ----" >>profile_log_$modelname
rm -rf PipelineServingLogs
rm -rf cpu_utilization.py
python3 web_service.py >web.log 2>&1 &
sleep 3
nvidia-smi --id=2 --query-compute-apps=used_memory --format=csv -lms 100 > gpu_use.log 2>&1 &
nvidia-smi --id=2 --query-gpu=utilization.gpu --format=csv -lms 100 > gpu_utilization.log 2>&1 &
echo "import psutil\ncpu_utilization=psutil.cpu_percent(1,False)\nprint('CPU_UTILIZATION:', cpu_utilization)\n" > cpu_utilization.py
python3 benchmark.py run rpc $thread_num $batch_size
python3 cpu_utilization.py >>profile_log_$modelname
ps -ef | grep web_service | awk '{print $2}' | xargs kill -9
python3 benchmark.py dump benchmark.log benchmark.tmp
mv benchmark.tmp benchmark.log
awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "MAX_GPU_MEMORY:", max}' gpu_use.log >> profile_log_$modelname
awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "GPU_UTILIZATION:", max}' gpu_utilization.log >> profile_log_$modelname
#rm -rf gpu_use.log gpu_utilization.log
cat benchmark.log >> profile_log_$modelname
......@@ -6,7 +6,7 @@ http_port: 9999
#worker_num, 最大并发数。当build_dag_each_worker=True时, 框架会创建worker_num个进程,每个进程内构建grpcSever和DAG
worker_num: 1
worker_num: 5
#build_dag_each_worker, False,框架在进程内创建一条DAG;True,框架会每个进程内创建多个独立的DAG
build_dag_each_worker: false
......@@ -20,6 +20,9 @@ dag:
#使用性能分析, True,生成Timeline性能数据,对性能有一定影响;False为不使用
use_profile: false
interval_s: 10
......@@ -37,7 +40,7 @@ op:
fetch_list: ["concat_1.tmp_0"]
#计算硬件ID,当devices为""或不写时为CPU预测;当devices为"0", "0,1,2"时为GPU预测,表示使用的GPU卡
devices: "0"
devices: "2"
concurrency: 2
......@@ -61,4 +64,4 @@ op:
fetch_list: ["ctc_greedy_decoder_0.tmp_0", "softmax_0.tmp_0"]
#计算硬件ID,当devices为""或不写时为CPU预测;当devices为"0", "0,1,2"时为GPU预测,表示使用的GPU卡
devices: "0"
devices: "2"
......@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from paddle_serving_server.web_service import WebService, Op
from paddle_serving_server_gpu.web_service import WebService, Op
except ImportError:
from paddle_serving_server.web_service import WebService, Op
import logging
......@@ -45,16 +45,19 @@ class DetOp(Op):
def preprocess(self, input_dicts, data_id, log_id):
(_, input_dict), = input_dicts.items()
data = base64.b64decode(input_dict["image"].encode('utf8'))
data = np.fromstring(data, np.uint8)
# Note: class variables(self.var) can only be used in process op mode
self.im = cv2.imdecode(data, cv2.IMREAD_COLOR)
self.ori_h, self.ori_w, _ = self.im.shape
det_img = self.det_preprocess(self.im)
_, self.new_h, self.new_w = det_img.shape
return {"image": det_img[np.newaxis, :].copy()}, False, None, ""
imgs = []
for key in input_dict.keys():
data = base64.b64decode(input_dict[key].encode('utf8'))
data = np.fromstring(data, np.uint8)
self.im = cv2.imdecode(data, cv2.IMREAD_COLOR)
self.ori_h, self.ori_w, _ = self.im.shape
det_img = self.det_preprocess(self.im)
_, self.new_h, self.new_w = det_img.shape
imgs.append(det_img[np.newaxis, :].copy())
return {"image": np.concatenate(imgs, axis=0)}, False, None, ""
def postprocess(self, input_dicts, fetch_dict, log_id):
# print(fetch_dict)
det_out = fetch_dict["concat_1.tmp_0"]
ratio_list = [
float(self.new_h) / self.ori_h, float(self.new_w) / self.ori_w
......@@ -62,7 +65,6 @@ class DetOp(Op):
dt_boxes_list = self.post_func(det_out, [ratio_list])
dt_boxes = self.filter_func(dt_boxes_list[0], [self.ori_h, self.ori_w])
out_dict = {"dt_boxes": dt_boxes, "image": self.im}
print("out dict", out_dict)
return out_dict, None, ""
......@@ -112,5 +114,5 @@ class OcrService(WebService):
uci_service = OcrService(name="ocr")
......@@ -26,10 +26,11 @@ from time import time as _time
import time
import threading
import multiprocessing
import copy
_LOGGER = logging.getLogger(__name__)
_LOGGER.propagate = False
_is_profile = int(os.environ.get('FLAGS_profile_pipeline', 0))
class PerformanceTracer(object):
def __init__(self, is_thread_mode, interval_s, server_worker_num):
......@@ -48,6 +49,8 @@ class PerformanceTracer(object):
self._channels = []
# The size of data in Channel will not exceed server_worker_num
self._server_worker_num = server_worker_num
if _is_profile:
self.profile_dict = {}
def data_buffer(self):
return self._data_buffer
......@@ -82,7 +85,7 @@ class PerformanceTracer(object):
item = self._data_buffer.get_nowait()
name = item["name"]
actions = item["actions"]
if name == "DAG":
succ = item["succ"]
req_id = item["id"]
......@@ -106,9 +109,9 @@ class PerformanceTracer(object):
for action, costs in op_cost[name].items():
op_cost[name][action] = sum(costs) / (1e3 * len(costs))
tot_cost += op_cost[name][action]
if name != "DAG":
for action in all_actions:
if action in op_cost[name]:
_LOGGER.info("\t{}[{} ms]".format(
......@@ -118,7 +121,9 @@ class PerformanceTracer(object):
calcu_cost += op_cost[name][action]
_LOGGER.info("\tidle[{}]".format(1 - 1.0 * calcu_cost /
if _is_profile:
self.profile_dict = copy.deepcopy(op_cost)
if "DAG" in op_cost:
calls = list(op_cost["DAG"].values())
......@@ -137,7 +142,17 @@ class PerformanceTracer(object):
for latency in latencys:
_LOGGER.info("\t\t.{}[{} ms]".format(latency, calls[int(
tot * latency / 100.0)]))
if _is_profile:
self.profile_dict["DAG"]["query_count"] = tot
self.profile_dict["DAG"]["qps"] = qps
self.profile_dict["DAG"]["succ"] = 1 - 1.0 * err_count / tot
self.profile_dict["DAG"]["avg"] = ave_cost
for latency in latencys:
self.profile_dict["DAG"][str(latency)] = calls[int(tot * latency / 100.0)]
if _is_profile:
import yaml
with open("benchmark.log", "w") as fout:
yaml.dump(self.profile_dict, fout, default_flow_style=False)
# channel
_LOGGER.info("Channel (server worker num[{}]):".format(
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册