diff --git a/python/examples/pipeline/bert/benchmark.py b/python/examples/pipeline/bert/benchmark.py new file mode 100644 index 0000000000000000000000000000000000000000..3dece4914d6a606753c2b91db2a6d759e0ec6897 --- /dev/null +++ b/python/examples/pipeline/bert/benchmark.py @@ -0,0 +1,133 @@ +import sys +import os +import yaml +import requests +import time +import json +try: + from paddle_serving_server_gpu.pipeline import PipelineClient +except ImportError: + from paddle_serving_server.pipeline import PipelineClient +import numpy as np +from paddle_serving_client.utils import MultiThreadRunner +from paddle_serving_client.utils import benchmark_args, show_latency +''' +2021-03-16 10:26:01,832 ==================== TRACER ====================== +2021-03-16 10:26:01,838 Op(bert): +2021-03-16 10:26:01,838 in[5.7833 ms] +2021-03-16 10:26:01,838 prep[8.2001 ms] +2021-03-16 10:26:01,838 midp[198.79853333333332 ms] +2021-03-16 10:26:01,839 postp[0.8411 ms] +2021-03-16 10:26:01,839 out[0.9440666666666667 ms] +2021-03-16 10:26:01,839 idle[0.03135320683677345] +2021-03-16 10:26:01,839 DAGExecutor: +2021-03-16 10:26:01,839 Query count[30] +2021-03-16 10:26:01,839 QPS[3.0 q/s] +2021-03-16 10:26:01,839 Succ[1.0] +2021-03-16 10:26:01,839 Error req[] +2021-03-16 10:26:01,839 Latency: +2021-03-16 10:26:01,839 ave[237.85519999999997 ms] +2021-03-16 10:26:01,839 .50[179.937 ms] +2021-03-16 10:26:01,839 .60[179.994 ms] +2021-03-16 10:26:01,839 .70[180.515 ms] +2021-03-16 10:26:01,840 .80[180.735 ms] +2021-03-16 10:26:01,840 .90[182.275 ms] +2021-03-16 10:26:01,840 .95[182.789 ms] +2021-03-16 10:26:01,840 .99[1921.33 ms] +2021-03-16 10:26:01,840 Channel (server worker num[1]): +2021-03-16 10:26:01,840 chl0(In: ['@DAGExecutor'], Out: ['bert']) size[0/0] +2021-03-16 10:26:01,841 chl1(In: ['bert'], Out: ['@DAGExecutor']) size[0/0] +''' +def parse_benchmark(filein, fileout): + with open(filein, "r") as fin: + res = yaml.load(fin) + del_list = [] + for key in res["DAG"].keys(): + if "call" in key: + del_list.append(key) + for key in del_list: + del res["DAG"][key] + with open(fileout, "w") as fout: + yaml.dump(res, fout, default_flow_style=False) + +def gen_yml(device): + fin = open("config.yml", "r") + config = yaml.load(fin) + fin.close() + config["dag"]["tracer"] = {"interval_s": 10} + if device == "gpu": + config["op"]["bert"]["local_service_conf"]["device_type"] = 1 + config["op"]["bert"]["local_service_conf"]["devices"] = "2" + with open("config2.yml", "w") as fout: + yaml.dump(config, fout, default_flow_style=False) + +def run_http(idx, batch_size): + print("start thread ({})".format(idx)) + url = "http://127.0.0.1:18082/bert/prediction" + start = time.time() + with open("data-c.txt", 'r') as fin: + start = time.time() + lines = fin.readlines() + start_idx = 0 + while start_idx < len(lines): + end_idx = min(len(lines), start_idx + batch_size) + feed = {} + for i in range(start_idx, end_idx): + feed[str(i - start_idx)] = lines[i] + keys = list(feed.keys()) + values = [feed[x] for x in keys] + data = {"key": keys, "value": values} + r = requests.post(url=url, data=json.dumps(data)) + start_idx += batch_size + if start_idx > 2000: + break + end = time.time() + return [[end - start]] + +def multithread_http(thread, batch_size): + multi_thread_runner = MultiThreadRunner() + result = multi_thread_runner.run(run_http , thread, batch_size) + +def run_rpc(thread, batch_size): + client = PipelineClient() + client.connect(['127.0.0.1:9998']) + with open("data-c.txt", 'r') as fin: + start = time.time() + lines = fin.readlines() + start_idx = 0 + while start_idx < len(lines): + end_idx = min(len(lines), start_idx + batch_size) + feed = {} + for i in range(start_idx, end_idx): + feed[str(i - start_idx)] = lines[i] + ret = client.predict(feed_dict=feed, fetch=["res"]) + start_idx += batch_size + if start_idx > 1000: + break + end = time.time() + return [[end - start]] + + +def multithread_rpc(thraed, batch_size): + multi_thread_runner = MultiThreadRunner() + result = multi_thread_runner.run(run_rpc , thread, batch_size) + +if __name__ == "__main__": + if sys.argv[1] == "yaml": + mode = sys.argv[2] # brpc/ local predictor + thread = int(sys.argv[3]) + device = sys.argv[4] + gen_yml(device) + elif sys.argv[1] == "run": + mode = sys.argv[2] # http/ rpc + thread = int(sys.argv[3]) + batch_size = int(sys.argv[4]) + if mode == "http": + multithread_http(thread, batch_size) + elif mode == "rpc": + multithread_rpc(thread, batch_size) + elif sys.argv[1] == "dump": + filein = sys.argv[2] + fileout = sys.argv[3] + parse_benchmark(filein, fileout) + diff --git a/python/examples/pipeline/bert/benchmark.sh b/python/examples/pipeline/bert/benchmark.sh new file mode 100644 index 0000000000000000000000000000000000000000..bff0fd8e4eb429efd37f7267d959f5e0f600d8ce --- /dev/null +++ b/python/examples/pipeline/bert/benchmark.sh @@ -0,0 +1,59 @@ +export FLAGS_profile_pipeline=1 +alias python3="python3.7" +modelname="bert" +# HTTP +ps -ef | grep web_service | awk '{print $2}' | xargs kill -9 +sleep 3 +python3 benchmark.py yaml local_predictor 1 gpu +rm -rf profile_log_$modelname +for thread_num in 1 8 16 +do + for batch_size in 1 10 100 + do + echo "----Bert thread num: $thread_num batch size: $batch_size mode:http ----" >>profile_log_$modelname + rm -rf PipelineServingLogs + rm -rf cpu_utilization.py + python3 web_service.py >web.log 2>&1 & + sleep 3 + nvidia-smi --id=2 --query-compute-apps=used_memory --format=csv -lms 100 > gpu_use.log 2>&1 & + nvidia-smi --id=2 --query-gpu=utilization.gpu --format=csv -lms 100 > gpu_utilization.log 2>&1 & + echo "import psutil\ncpu_utilization=psutil.cpu_percent(1,False)\nprint('CPU_UTILIZATION:', cpu_utilization)\n" > cpu_utilization.py + python3 benchmark.py run http $thread_num $batch_size + python3 cpu_utilization.py >>profile_log_$modelname + ps -ef | grep web_service | awk '{print $2}' | xargs kill -9 + python3 benchmark.py dump benchmark.log benchmark.tmp + mv benchmark.tmp benchmark.log + awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "MAX_GPU_MEMORY:", max}' gpu_use.log >> profile_log_$modelname + awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "GPU_UTILIZATION:", max}' gpu_utilization.log >> profile_log_$modelname + cat benchmark.log >> profile_log_$modelname + #rm -rf gpu_use.log gpu_utilization.log + done +done +# RPC +ps -ef | grep web_service | awk '{print $2}' | xargs kill -9 +sleep 3 +python3 benchmark.py yaml local_predictor 1 gpu + +for thread_num in 1 8 16 +do + for batch_size in 1 10 100 + do + echo "----Bert thread num: $thread_num batch size: $batch_size mode:rpc ----" >>profile_log_$modelname + rm -rf PipelineServingLogs + rm -rf cpu_utilization.py + python3 web_service.py >web.log 2>&1 & + sleep 3 + nvidia-smi --id=2 --query-compute-apps=used_memory --format=csv -lms 100 > gpu_use.log 2>&1 & + nvidia-smi --id=2 --query-gpu=utilization.gpu --format=csv -lms 100 > gpu_utilization.log 2>&1 & + echo "import psutil\ncpu_utilization=psutil.cpu_percent(1,False)\nprint('CPU_UTILIZATION:', cpu_utilization)\n" > cpu_utilization.py + python3 benchmark.py run rpc $thread_num $batch_size + python3 cpu_utilization.py >>profile_log_$modelname + ps -ef | grep web_service | awk '{print $2}' | xargs kill -9 + python3 benchmark.py dump benchmark.log benchmark.tmp + mv benchmark.tmp benchmark.log + awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "MAX_GPU_MEMORY:", max}' gpu_use.log >> profile_log_$modelname + awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "GPU_UTILIZATION:", max}' gpu_utilization.log >> profile_log_$modelname + #rm -rf gpu_use.log gpu_utilization.log + cat benchmark.log >> profile_log_$modelname + done +done diff --git a/python/examples/pipeline/bert/config.yml b/python/examples/pipeline/bert/config.yml new file mode 100644 index 0000000000000000000000000000000000000000..6896ef5cb774cfeda4582d4542af9c75d5204651 --- /dev/null +++ b/python/examples/pipeline/bert/config.yml @@ -0,0 +1,17 @@ +dag: + is_thread_op: false + tracer: + interval_s: 10 +http_port: 18082 +op: + bert: + local_service_conf: + client_type: local_predictor + concurrency: 2 + device_type: 1 + devices: '2' + fetch_list: + - pooled_output + model_config: bert_seq128_model/ +rpc_port: 9998 +worker_num: 20 diff --git a/python/examples/pipeline/bert/get_data.sh b/python/examples/pipeline/bert/get_data.sh new file mode 100644 index 0000000000000000000000000000000000000000..b10a69afdf4cc22898c8081d3a0e1e1ccd0bfdfe --- /dev/null +++ b/python/examples/pipeline/bert/get_data.sh @@ -0,0 +1,6 @@ +wget https://paddle-serving.bj.bcebos.com/paddle_hub_models/text/SemanticModel/bert_chinese_L-12_H-768_A-12.tar.gz +tar -xzf bert_chinese_L-12_H-768_A-12.tar.gz +mv bert_chinese_L-12_H-768_A-12_model bert_seq128_model +mv bert_chinese_L-12_H-768_A-12_client bert_seq128_client +wget https://paddle-serving.bj.bcebos.com/bert_example/data-c.txt --no-check-certificate +wget https://paddle-serving.bj.bcebos.com/bert_example/vocab.txt --no-check-certificate diff --git a/python/examples/pipeline/bert/pipeline_rpc_client.py b/python/examples/pipeline/bert/pipeline_rpc_client.py new file mode 100644 index 0000000000000000000000000000000000000000..11bdee54dd48500b4d8c7d674318ceb8909afd91 --- /dev/null +++ b/python/examples/pipeline/bert/pipeline_rpc_client.py @@ -0,0 +1,27 @@ +import sys +import os +import yaml +import requests +import time +import json +try: + from paddle_serving_server_gpu.pipeline import PipelineClient +except ImportError: + from paddle_serving_server.pipeline import PipelineClient +import numpy as np + + +client = PipelineClient() +client.connect(['127.0.0.1:9998']) +batch_size = 101 +with open("data-c.txt", 'r') as fin: + lines = fin.readlines() + start_idx = 0 + while start_idx < len(lines): + end_idx = min(len(lines), start_idx + batch_size) + feed = {} + for i in range(start_idx, end_idx): + feed[str(i - start_idx)] = lines[i] + ret = client.predict(feed_dict=feed, fetch=["res"]) + print(ret) + start_idx += batch_size diff --git a/python/examples/pipeline/bert/web_service.py b/python/examples/pipeline/bert/web_service.py new file mode 100644 index 0000000000000000000000000000000000000000..7dca62a8fcfa9566b856fb3d025f045942499ed4 --- /dev/null +++ b/python/examples/pipeline/bert/web_service.py @@ -0,0 +1,61 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +try: + from paddle_serving_server_gpu.web_service import WebService, Op +except ImportError: + from paddle_serving_server.web_service import WebService, Op +import logging +import numpy as np +import sys +from paddle_serving_app.reader import ChineseBertReader +_LOGGER = logging.getLogger() + + +class BertOp(Op): + def init_op(self): + self.reader = ChineseBertReader({ + "vocab_file": "vocab.txt", + "max_seq_len": 128 + }) + + def preprocess(self, input_dicts, data_id, log_id): + (_, input_dict), = input_dicts.items() + print("input dict", input_dict) + batch_size = len(input_dict.keys()) + feed_res = [] + for i in range(batch_size): + feed_dict = self.reader.process(input_dict[str(i)].encode("utf-8")) + for key in feed_dict.keys(): + feed_dict[key] = np.array(feed_dict[key]).reshape((1, len(feed_dict[key]), 1)) + feed_res.append(feed_dict) + feed_dict = {} + for key in feed_res[0].keys(): + feed_dict[key] = np.concatenate([x[key] for x in feed_res], axis=0) + print(key, feed_dict[key].shape) + return feed_dict, False, None, "" + + def postprocess(self, input_dicts, fetch_dict, log_id): + fetch_dict["pooled_output"] = str(fetch_dict["pooled_output"]) + return fetch_dict, None, "" + + +class BertService(WebService): + def get_pipeline_response(self, read_op): + bert_op = BertOp(name="bert", input_ops=[read_op]) + return bert_op + + +bert_service = BertService(name="bert") +bert_service.prepare_pipeline_config("config2.yml") +bert_service.run_service() diff --git a/python/examples/pipeline/ocr/benchmark.py b/python/examples/pipeline/ocr/benchmark.py new file mode 100644 index 0000000000000000000000000000000000000000..af1caae42a37866f889317c39c6d0b20cb9c9747 --- /dev/null +++ b/python/examples/pipeline/ocr/benchmark.py @@ -0,0 +1,101 @@ +import sys +import os +import base64 +import yaml +import requests +import time +import json +try: + from paddle_serving_server_gpu.pipeline import PipelineClient +except ImportError: + from paddle_serving_server.pipeline import PipelineClient +import numpy as np +from paddle_serving_client.utils import MultiThreadRunner +from paddle_serving_client.utils import benchmark_args, show_latency +def parse_benchmark(filein, fileout): + with open(filein, "r") as fin: + res = yaml.load(fin) + del_list = [] + for key in res["DAG"].keys(): + if "call" in key: + del_list.append(key) + for key in del_list: + del res["DAG"][key] + with open(fileout, "w") as fout: + yaml.dump(res, fout, default_flow_style=False) + +def gen_yml(device): + fin = open("config.yml", "r") + config = yaml.load(fin) + fin.close() + config["dag"]["tracer"] = {"interval_s": 10} + if device == "gpu": + config["op"]["det"]["local_service_conf"]["device_type"] = 1 + config["op"]["det"]["local_service_conf"]["devices"] = "2" + config["op"]["rec"]["local_service_conf"]["device_type"] = 1 + config["op"]["rec"]["local_service_conf"]["devices"] = "2" + with open("config2.yml", "w") as fout: + yaml.dump(config, fout, default_flow_style=False) + +def cv2_to_base64(image): + return base64.b64encode(image).decode('utf8') + +def run_http(idx, batch_size): + print("start thread ({})".format(idx)) + url = "http://127.0.0.1:9999/ocr/prediction" + start = time.time() + + test_img_dir = "imgs/" + for img_file in os.listdir(test_img_dir): + with open(os.path.join(test_img_dir, img_file), 'rb') as file: + image_data1 = file.read() + image = cv2_to_base64(image_data1) + data = {"key": ["image"], "value": [image]} + for i in range(100): + r = requests.post(url=url, data=json.dumps(data)) + end = time.time() + return [[end - start]] + +def multithread_http(thread, batch_size): + multi_thread_runner = MultiThreadRunner() + result = multi_thread_runner.run(run_http , thread, batch_size) + +def run_rpc(thread, batch_size): + client = PipelineClient() + client.connect(['127.0.0.1:18090']) + start = time.time() + test_img_dir = "imgs/" + for img_file in os.listdir(test_img_dir): + with open(os.path.join(test_img_dir, img_file), 'rb') as file: + image_data = file.read() + image = cv2_to_base64(image_data) + + for i in range(100): + ret = client.predict(feed_dict={"image": image}, fetch=["res"]) + end = time.time() + return [[end - start]] + + +def multithread_rpc(thraed, batch_size): + multi_thread_runner = MultiThreadRunner() + result = multi_thread_runner.run(run_rpc , thread, batch_size) + +if __name__ == "__main__": + if sys.argv[1] == "yaml": + mode = sys.argv[2] # brpc/ local predictor + thread = int(sys.argv[3]) + device = sys.argv[4] + gen_yml(device) + elif sys.argv[1] == "run": + mode = sys.argv[2] # http/ rpc + thread = int(sys.argv[3]) + batch_size = int(sys.argv[4]) + if mode == "http": + multithread_http(thread, batch_size) + elif mode == "rpc": + multithread_rpc(thread, batch_size) + elif sys.argv[1] == "dump": + filein = sys.argv[2] + fileout = sys.argv[3] + parse_benchmark(filein, fileout) + diff --git a/python/examples/pipeline/ocr/benchmark.sh b/python/examples/pipeline/ocr/benchmark.sh new file mode 100644 index 0000000000000000000000000000000000000000..d789b94b98d03c87bfcc1ee7520e13336abdca2f --- /dev/null +++ b/python/examples/pipeline/ocr/benchmark.sh @@ -0,0 +1,59 @@ +export FLAGS_profile_pipeline=1 +alias python3="python3.7" +modelname="ocr" +# HTTP +ps -ef | grep web_service | awk '{print $2}' | xargs kill -9 +sleep 3 +python3 benchmark.py yaml local_predictor 1 gpu +rm -rf profile_log_$modelname +for thread_num in 1 8 16 +do + for batch_size in 1 + do + echo "----Bert thread num: $thread_num batch size: $batch_size mode:http ----" >>profile_log_$modelname + rm -rf PipelineServingLogs + rm -rf cpu_utilization.py + python3 web_service.py >web.log 2>&1 & + sleep 3 + nvidia-smi --id=2 --query-compute-apps=used_memory --format=csv -lms 100 > gpu_use.log 2>&1 & + nvidia-smi --id=2 --query-gpu=utilization.gpu --format=csv -lms 100 > gpu_utilization.log 2>&1 & + echo "import psutil\ncpu_utilization=psutil.cpu_percent(1,False)\nprint('CPU_UTILIZATION:', cpu_utilization)\n" > cpu_utilization.py + python3 benchmark.py run http $thread_num $batch_size + python3 cpu_utilization.py >>profile_log_$modelname + ps -ef | grep web_service | awk '{print $2}' | xargs kill -9 + python3 benchmark.py dump benchmark.log benchmark.tmp + mv benchmark.tmp benchmark.log + awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "MAX_GPU_MEMORY:", max}' gpu_use.log >> profile_log_$modelname + awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "GPU_UTILIZATION:", max}' gpu_utilization.log >> profile_log_$modelname + cat benchmark.log >> profile_log_$modelname + #rm -rf gpu_use.log gpu_utilization.log + done +done +# RPC +ps -ef | grep web_service | awk '{print $2}' | xargs kill -9 +sleep 3 +python3 benchmark.py yaml local_predictor 1 gpu + +for thread_num in 1 8 16 +do + for batch_size in 1 + do + echo "----Bert thread num: $thread_num batch size: $batch_size mode:rpc ----" >>profile_log_$modelname + rm -rf PipelineServingLogs + rm -rf cpu_utilization.py + python3 web_service.py >web.log 2>&1 & + sleep 3 + nvidia-smi --id=2 --query-compute-apps=used_memory --format=csv -lms 100 > gpu_use.log 2>&1 & + nvidia-smi --id=2 --query-gpu=utilization.gpu --format=csv -lms 100 > gpu_utilization.log 2>&1 & + echo "import psutil\ncpu_utilization=psutil.cpu_percent(1,False)\nprint('CPU_UTILIZATION:', cpu_utilization)\n" > cpu_utilization.py + python3 benchmark.py run rpc $thread_num $batch_size + python3 cpu_utilization.py >>profile_log_$modelname + ps -ef | grep web_service | awk '{print $2}' | xargs kill -9 + python3 benchmark.py dump benchmark.log benchmark.tmp + mv benchmark.tmp benchmark.log + awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "MAX_GPU_MEMORY:", max}' gpu_use.log >> profile_log_$modelname + awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "GPU_UTILIZATION:", max}' gpu_utilization.log >> profile_log_$modelname + #rm -rf gpu_use.log gpu_utilization.log + cat benchmark.log >> profile_log_$modelname + done +done diff --git a/python/examples/pipeline/ocr/config.yml b/python/examples/pipeline/ocr/config.yml index 92149e0d8ac471de104f3d527d709dc384ee3c2c..251c2e2b21bb4fd17506752b6b922941c8be4105 100644 --- a/python/examples/pipeline/ocr/config.yml +++ b/python/examples/pipeline/ocr/config.yml @@ -6,7 +6,7 @@ http_port: 9999 #worker_num, 最大并发数。当build_dag_each_worker=True时, 框架会创建worker_num个进程,每个进程内构建grpcSever和DAG ##当build_dag_each_worker=False时,框架会设置主线程grpc线程池的max_workers=worker_num -worker_num: 1 +worker_num: 5 #build_dag_each_worker, False,框架在进程内创建一条DAG;True,框架会每个进程内创建多个独立的DAG build_dag_each_worker: false @@ -20,6 +20,9 @@ dag: #使用性能分析, True,生成Timeline性能数据,对性能有一定影响;False为不使用 use_profile: false + tracer: + interval_s: 10 + op: det: #并发数,is_thread_op=True时,为线程并发;否则为进程并发 @@ -37,7 +40,7 @@ op: fetch_list: ["concat_1.tmp_0"] #计算硬件ID,当devices为""或不写时为CPU预测;当devices为"0", "0,1,2"时为GPU预测,表示使用的GPU卡 - devices: "0" + devices: "2" rec: #并发数,is_thread_op=True时,为线程并发;否则为进程并发 concurrency: 2 @@ -61,4 +64,4 @@ op: fetch_list: ["ctc_greedy_decoder_0.tmp_0", "softmax_0.tmp_0"] #计算硬件ID,当devices为""或不写时为CPU预测;当devices为"0", "0,1,2"时为GPU预测,表示使用的GPU卡 - devices: "0" + devices: "2" diff --git a/python/examples/pipeline/ocr/web_service.py b/python/examples/pipeline/ocr/web_service.py index 7e9dd3141abacbed16048b33225b785fea00b3d2..d1d8c675216820abeaec15d5bad9ddc6a1acf3f5 100644 --- a/python/examples/pipeline/ocr/web_service.py +++ b/python/examples/pipeline/ocr/web_service.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. try: - from paddle_serving_server.web_service import WebService, Op + from paddle_serving_server_gpu.web_service import WebService, Op except ImportError: from paddle_serving_server_gpu.web_service import WebService, Op import logging @@ -45,16 +45,19 @@ class DetOp(Op): def preprocess(self, input_dicts, data_id, log_id): (_, input_dict), = input_dicts.items() - data = base64.b64decode(input_dict["image"].encode('utf8')) - data = np.fromstring(data, np.uint8) - # Note: class variables(self.var) can only be used in process op mode - self.im = cv2.imdecode(data, cv2.IMREAD_COLOR) - self.ori_h, self.ori_w, _ = self.im.shape - det_img = self.det_preprocess(self.im) - _, self.new_h, self.new_w = det_img.shape - return {"image": det_img[np.newaxis, :].copy()}, False, None, "" + imgs = [] + for key in input_dict.keys(): + data = base64.b64decode(input_dict[key].encode('utf8')) + data = np.fromstring(data, np.uint8) + self.im = cv2.imdecode(data, cv2.IMREAD_COLOR) + self.ori_h, self.ori_w, _ = self.im.shape + det_img = self.det_preprocess(self.im) + _, self.new_h, self.new_w = det_img.shape + imgs.append(det_img[np.newaxis, :].copy()) + return {"image": np.concatenate(imgs, axis=0)}, False, None, "" def postprocess(self, input_dicts, fetch_dict, log_id): +# print(fetch_dict) det_out = fetch_dict["concat_1.tmp_0"] ratio_list = [ float(self.new_h) / self.ori_h, float(self.new_w) / self.ori_w @@ -62,7 +65,6 @@ class DetOp(Op): dt_boxes_list = self.post_func(det_out, [ratio_list]) dt_boxes = self.filter_func(dt_boxes_list[0], [self.ori_h, self.ori_w]) out_dict = {"dt_boxes": dt_boxes, "image": self.im} - print("out dict", out_dict) return out_dict, None, "" @@ -112,5 +114,5 @@ class OcrService(WebService): uci_service = OcrService(name="ocr") -uci_service.prepare_pipeline_config("config.yml") +uci_service.prepare_pipeline_config("config2.yml") uci_service.run_service() diff --git a/python/pipeline/profiler.py b/python/pipeline/profiler.py index 18845a7578b3e435f86dbe4c85e559b29cb65ddd..6e38cb313d20962883cda192504e146ee31b1c95 100644 --- a/python/pipeline/profiler.py +++ b/python/pipeline/profiler.py @@ -26,10 +26,11 @@ from time import time as _time import time import threading import multiprocessing - +import copy _LOGGER = logging.getLogger(__name__) _LOGGER.propagate = False +_is_profile = int(os.environ.get('FLAGS_profile_pipeline', 0)) class PerformanceTracer(object): def __init__(self, is_thread_mode, interval_s, server_worker_num): @@ -48,6 +49,8 @@ class PerformanceTracer(object): self._channels = [] # The size of data in Channel will not exceed server_worker_num self._server_worker_num = server_worker_num + if _is_profile: + self.profile_dict = {} def data_buffer(self): return self._data_buffer @@ -82,7 +85,7 @@ class PerformanceTracer(object): item = self._data_buffer.get_nowait() name = item["name"] actions = item["actions"] - + if name == "DAG": succ = item["succ"] req_id = item["id"] @@ -106,9 +109,9 @@ class PerformanceTracer(object): for action, costs in op_cost[name].items(): op_cost[name][action] = sum(costs) / (1e3 * len(costs)) tot_cost += op_cost[name][action] - if name != "DAG": _LOGGER.info("Op({}):".format(name)) + for action in all_actions: if action in op_cost[name]: _LOGGER.info("\t{}[{} ms]".format( @@ -118,7 +121,9 @@ class PerformanceTracer(object): calcu_cost += op_cost[name][action] _LOGGER.info("\tidle[{}]".format(1 - 1.0 * calcu_cost / tot_cost)) - + if _is_profile: + self.profile_dict = copy.deepcopy(op_cost) + if "DAG" in op_cost: calls = list(op_cost["DAG"].values()) calls.sort() @@ -137,7 +142,17 @@ class PerformanceTracer(object): for latency in latencys: _LOGGER.info("\t\t.{}[{} ms]".format(latency, calls[int( tot * latency / 100.0)])) - + if _is_profile: + self.profile_dict["DAG"]["query_count"] = tot + self.profile_dict["DAG"]["qps"] = qps + self.profile_dict["DAG"]["succ"] = 1 - 1.0 * err_count / tot + self.profile_dict["DAG"]["avg"] = ave_cost + for latency in latencys: + self.profile_dict["DAG"][str(latency)] = calls[int(tot * latency / 100.0)] + if _is_profile: + import yaml + with open("benchmark.log", "w") as fout: + yaml.dump(self.profile_dict, fout, default_flow_style=False) # channel _LOGGER.info("Channel (server worker num[{}]):".format( self._server_worker_num))