diff --git a/README.md b/README.md index c867298e7e1a8c6bbc06aaba2ad2dc8f73518ae4..34e22a726282f9167e70a30cb792335ad171b34f 100644 --- a/README.md +++ b/README.md @@ -184,11 +184,6 @@ Here, `client.predict` function has two arguments. `feed` is a `python dict` wit

Community

-### User Group in China - - -

     

-

PaddleServing交流QQ群               PaddleServing微信群

### Slack diff --git a/core/general-server/op/general_response_op.cpp b/core/general-server/op/general_response_op.cpp index 935ef85d847cc17c2d5b76255de0936f0e08a890..b2d918bef0f3c715aa69f52a65edd48cdcc5e87b 100644 --- a/core/general-server/op/general_response_op.cpp +++ b/core/general-server/op/general_response_op.cpp @@ -114,70 +114,48 @@ int GeneralResponseOp::inference() { for (int j = 0; j < in->at(idx).shape.size(); ++j) { cap *= in->at(idx).shape[j]; } - if (in->at(idx).dtype == paddle::PaddleDType::INT64) { + + FetchInst *fetch_p = output->mutable_insts(0); + auto dtype = in->at(idx).dtype; + + if (dtype == paddle::PaddleDType::INT64) { VLOG(2) << "Prepare int64 var [" << model_config->_fetch_name[idx] << "]."; int64_t *data_ptr = static_cast(in->at(idx).data.data()); - if (model_config->_is_lod_fetch[idx]) { - FetchInst *fetch_p = output->mutable_insts(0); - for (int j = 0; j < in->at(idx).lod[0].size(); ++j) { - fetch_p->mutable_tensor_array(var_idx)->add_lod( - in->at(idx).lod[0][j]); - } - for (int j = 0; j < cap; ++j) { - fetch_p->mutable_tensor_array(var_idx)->add_int64_data(data_ptr[j]); - } - } else { - FetchInst *fetch_p = output->mutable_insts(0); - for (int j = 0; j < cap; ++j) { - fetch_p->mutable_tensor_array(var_idx)->add_int64_data(data_ptr[j]); - } - } - VLOG(2) << "fetch var [" << model_config->_fetch_name[idx] << "] ready"; - var_idx++; - } else if (in->at(idx).dtype == paddle::PaddleDType::FLOAT32) { + // from + // https://stackoverflow.com/questions/15499641/copy-a-stdvector-to-a-repeated-field-from-protobuf-with-memcpy + // `Swap` method is faster than `{}` method. + google::protobuf::RepeatedField tmp_data(data_ptr, + data_ptr + cap); + fetch_p->mutable_tensor_array(var_idx)->mutable_int64_data()->Swap( + &tmp_data); + } else if (dtype == paddle::PaddleDType::FLOAT32) { VLOG(2) << "Prepare float var [" << model_config->_fetch_name[idx] << "]."; float *data_ptr = static_cast(in->at(idx).data.data()); - if (model_config->_is_lod_fetch[idx]) { - FetchInst *fetch_p = output->mutable_insts(0); - for (int j = 0; j < in->at(idx).lod[0].size(); ++j) { - fetch_p->mutable_tensor_array(var_idx)->add_lod( - in->at(idx).lod[0][j]); - } - for (int j = 0; j < cap; ++j) { - fetch_p->mutable_tensor_array(var_idx)->add_float_data(data_ptr[j]); - } - } else { - FetchInst *fetch_p = output->mutable_insts(0); - for (int j = 0; j < cap; ++j) { - fetch_p->mutable_tensor_array(var_idx)->add_float_data(data_ptr[j]); - } - } - VLOG(2) << "fetch var [" << model_config->_fetch_name[idx] << "] ready"; - var_idx++; - } else if (in->at(idx).dtype == paddle::PaddleDType::INT32) { + google::protobuf::RepeatedField tmp_data(data_ptr, + data_ptr + cap); + fetch_p->mutable_tensor_array(var_idx)->mutable_float_data()->Swap( + &tmp_data); + } else if (dtype == paddle::PaddleDType::INT32) { VLOG(2) << "Prepare int32 var [" << model_config->_fetch_name[idx] << "]."; int32_t *data_ptr = static_cast(in->at(idx).data.data()); - if (model_config->_is_lod_fetch[idx]) { - FetchInst *fetch_p = output->mutable_insts(0); - for (int j = 0; j < in->at(idx).lod[0].size(); ++j) { - fetch_p->mutable_tensor_array(var_idx)->add_lod( - in->at(idx).lod[0][j]); - } - for (int j = 0; j < cap; ++j) { - fetch_p->mutable_tensor_array(var_idx)->add_int_data(data_ptr[j]); - } - } else { - FetchInst *fetch_p = output->mutable_insts(0); - for (int j = 0; j < cap; ++j) { - fetch_p->mutable_tensor_array(var_idx)->add_int_data(data_ptr[j]); - } + google::protobuf::RepeatedField tmp_data(data_ptr, + data_ptr + cap); + fetch_p->mutable_tensor_array(var_idx)->mutable_int_data()->Swap( + &tmp_data); + } + + if (model_config->_is_lod_fetch[idx]) { + for (int j = 0; j < in->at(idx).lod[0].size(); ++j) { + fetch_p->mutable_tensor_array(var_idx)->add_lod( + in->at(idx).lod[0][j]); } - VLOG(2) << "fetch var [" << model_config->_fetch_name[idx] << "] ready"; - var_idx++; } + + VLOG(2) << "fetch var [" << model_config->_fetch_name[idx] << "] ready"; + var_idx++; } } diff --git a/doc/README.md b/doc/README.md deleted file mode 100644 index 2d51eba9e2a2902685f9385c83542f32b98e5b4f..0000000000000000000000000000000000000000 --- a/doc/README.md +++ /dev/null @@ -1,119 +0,0 @@ -# Paddle Serving - -([简体中文](./README_CN.md)|English) - -Paddle Serving is PaddlePaddle's online estimation service framework, which can help developers easily implement remote prediction services that call deep learning models from mobile and server ends. At present, Paddle Serving is mainly based on models that support PaddlePaddle training. It can be used in conjunction with the Paddle training framework to quickly deploy inference services. Paddle Serving is designed around common industrial-level deep learning model deployment scenarios. Some common functions include multi-model management, model hot loading, [Baidu-rpc](https://github.com/apache/incubator-brpc)-based high-concurrency low-latency response capabilities, and online model A/B tests. The API that cooperates with the Paddle training framework can enable users to seamlessly transition between training and remote deployment, improving the landing efficiency of deep learning models. - ------------- - -## Quick Start - -Paddle Serving's current develop version supports lightweight Python API for fast predictions, and training with Paddle can get through. We take the most classic Boston house price prediction as an example to fully explain the process of model training on a single machine and model deployment using Paddle Serving. - -#### Install - -It is highly recommended that you build Paddle Serving inside Docker, please read [How to run PaddleServing in Docker](RUN_IN_DOCKER.md) - -``` -pip install paddle-serving-client -pip install paddle-serving-server -``` - -#### Training Script -``` python -import sys -import paddle -import paddle.fluid as fluid - -train_reader = paddle.batch(paddle.reader.shuffle( - paddle.dataset.uci_housing.train(), buf_size=500), batch_size=16) - -test_reader = paddle.batch(paddle.reader.shuffle( - paddle.dataset.uci_housing.test(), buf_size=500), batch_size=16) - -x = fluid.data(name='x', shape=[None, 13], dtype='float32') -y = fluid.data(name='y', shape=[None, 1], dtype='float32') - -y_predict = fluid.layers.fc(input=x, size=1, act=None) -cost = fluid.layers.square_error_cost(input=y_predict, label=y) -avg_loss = fluid.layers.mean(cost) -sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.01) -sgd_optimizer.minimize(avg_loss) - -place = fluid.CPUPlace() -feeder = fluid.DataFeeder(place=place, feed_list=[x, y]) -exe = fluid.Executor(place) -exe.run(fluid.default_startup_program()) - -import paddle_serving_client.io as serving_io - -for pass_id in range(30): - for data_train in train_reader(): - avg_loss_value, = exe.run( - fluid.default_main_program(), - feed=feeder.feed(data_train), - fetch_list=[avg_loss]) - -serving_io.save_model( - "serving_server_model", "serving_client_conf", - {"x": x}, {"y": y_predict}, fluid.default_main_program()) -``` - -#### Server Side Code -``` python -import sys -from paddle_serving.serving_server import OpMaker -from paddle_serving.serving_server import OpSeqMaker -from paddle_serving.serving_server import Server - -op_maker = OpMaker() -read_op = op_maker.create('general_reader') -general_infer_op = op_maker.create('general_infer') - -op_seq_maker = OpSeqMaker() -op_seq_maker.add_op(read_op) -op_seq_maker.add_op(general_infer_op) - -server = Server() -server.set_op_sequence(op_seq_maker.get_op_sequence()) -server.load_model_config(sys.argv[1]) -server.prepare_server(workdir="work_dir1", port=9393, device="cpu") -server.run_server() -``` - -#### Launch Server End -``` shell -python test_server.py serving_server_model -``` - -#### Client Prediction -``` python -from paddle_serving_client import Client -import paddle -import sys - -client = Client() -client.load_client_config(sys.argv[1]) -client.connect(["127.0.0.1:9292"]) - -test_reader = paddle.batch(paddle.reader.shuffle( - paddle.dataset.uci_housing.test(), buf_size=500), batch_size=1) - -for data in test_reader(): - fetch_map = client.predict(feed={"x": data[0][0]}, fetch=["y"]) - print("{} {}".format(fetch_map["y"][0], data[0][1][0])) - -``` - -### Document - -[Design Doc](DESIGN.md) - -[FAQ](./deprecated/FAQ.md) - -### Senior Developer Guildlines - -[Compile Tutorial](COMPILE.md) - -## Contribution -If you want to make contributions to Paddle Serving Please refer to [CONRTIBUTE](CONTRIBUTE.md) diff --git a/doc/README_CN.md b/doc/README_CN.md deleted file mode 100644 index da5641cad333518ded9fbae4438f05ae20e30ddd..0000000000000000000000000000000000000000 --- a/doc/README_CN.md +++ /dev/null @@ -1,119 +0,0 @@ -# Paddle Serving - -(简体中文|[English](./README.md)) - -Paddle Serving是PaddlePaddle的在线预估服务框架,能够帮助开发者轻松实现从移动端、服务器端调用深度学习模型的远程预测服务。当前Paddle Serving以支持PaddlePaddle训练的模型为主,可以与Paddle训练框架联合使用,快速部署预估服务。Paddle Serving围绕常见的工业级深度学习模型部署场景进行设计,一些常见的功能包括多模型管理、模型热加载、基于[Baidu-rpc](https://github.com/apache/incubator-brpc)的高并发低延迟响应能力、在线模型A/B实验等。与Paddle训练框架互相配合的API可以使用户在训练与远程部署之间无缝过度,提升深度学习模型的落地效率。 - ------------- - -## 快速上手指南 - -Paddle Serving当前的develop版本支持轻量级Python API进行快速预测,并且与Paddle的训练可以打通。我们以最经典的波士顿房价预测为示例,完整说明在单机进行模型训练以及使用Paddle Serving进行模型部署的过程。 - -#### 安装 - -强烈建议您在Docker内构建Paddle Serving,请查看[如何在Docker中运行PaddleServing](RUN_IN_DOCKER_CN.md) - -``` -pip install paddle-serving-client -pip install paddle-serving-server -``` - -#### 训练脚本 -``` python -import sys -import paddle -import paddle.fluid as fluid - -train_reader = paddle.batch(paddle.reader.shuffle( - paddle.dataset.uci_housing.train(), buf_size=500), batch_size=16) - -test_reader = paddle.batch(paddle.reader.shuffle( - paddle.dataset.uci_housing.test(), buf_size=500), batch_size=16) - -x = fluid.data(name='x', shape=[None, 13], dtype='float32') -y = fluid.data(name='y', shape=[None, 1], dtype='float32') - -y_predict = fluid.layers.fc(input=x, size=1, act=None) -cost = fluid.layers.square_error_cost(input=y_predict, label=y) -avg_loss = fluid.layers.mean(cost) -sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.01) -sgd_optimizer.minimize(avg_loss) - -place = fluid.CPUPlace() -feeder = fluid.DataFeeder(place=place, feed_list=[x, y]) -exe = fluid.Executor(place) -exe.run(fluid.default_startup_program()) - -import paddle_serving_client.io as serving_io - -for pass_id in range(30): - for data_train in train_reader(): - avg_loss_value, = exe.run( - fluid.default_main_program(), - feed=feeder.feed(data_train), - fetch_list=[avg_loss]) - -serving_io.save_model( - "serving_server_model", "serving_client_conf", - {"x": x}, {"y": y_predict}, fluid.default_main_program()) -``` - -#### 服务器端代码 -``` python -import sys -from paddle_serving.serving_server import OpMaker -from paddle_serving.serving_server import OpSeqMaker -from paddle_serving.serving_server import Server - -op_maker = OpMaker() -read_op = op_maker.create('general_reader') -general_infer_op = op_maker.create('general_infer') - -op_seq_maker = OpSeqMaker() -op_seq_maker.add_op(read_op) -op_seq_maker.add_op(general_infer_op) - -server = Server() -server.set_op_sequence(op_seq_maker.get_op_sequence()) -server.load_model_config(sys.argv[1]) -server.prepare_server(workdir="work_dir1", port=9393, device="cpu") -server.run_server() -``` - -#### 服务器端启动 -``` shell -python test_server.py serving_server_model -``` - -#### 客户端预测 -``` python -from paddle_serving_client import Client -import paddle -import sys - -client = Client() -client.load_client_config(sys.argv[1]) -client.connect(["127.0.0.1:9292"]) - -test_reader = paddle.batch(paddle.reader.shuffle( - paddle.dataset.uci_housing.test(), buf_size=500), batch_size=1) - -for data in test_reader(): - fetch_map = client.predict(feed={"x": data[0][0]}, fetch=["y"]) - print("{} {}".format(fetch_map["y"][0], data[0][1][0])) - -``` - -### 文档 - -[设计文档](DESIGN_CN.md) - -[FAQ](./deprecated/FAQ.md) - -### 资深开发者使用指南 - -[编译指南](COMPILE_CN.md) - -## 贡献 -如果你想要给Paddle Serving做贡献,请参考[贡献指南](CONTRIBUTE.md) diff --git a/python/examples/bert/benchmark.py b/python/examples/bert/benchmark.py index 3ac9d07625e881b43550578c4a6346e4ac874063..c177d4b8c25eb8a79c9a851399f530f197499964 100644 --- a/python/examples/bert/benchmark.py +++ b/python/examples/bert/benchmark.py @@ -116,8 +116,10 @@ def single_func(idx, resource): if __name__ == '__main__': multi_thread_runner = MultiThreadRunner() - endpoint_list = ["127.0.0.1:9292"] - turns = 10 + endpoint_list = [ + "127.0.0.1:9292", "127.0.0.1:9293", "127.0.0.1:9294", "127.0.0.1:9295" + ] + turns = 100 start = time.time() result = multi_thread_runner.run( single_func, args.thread, {"endpoint": endpoint_list, @@ -130,9 +132,9 @@ if __name__ == '__main__': avg_cost += result[0][i] avg_cost = avg_cost / args.thread - print("total cost :{} s".format(total_cost)) - print("each thread cost :{} s. ".format(avg_cost)) - print("qps :{} samples/s".format(args.batch_size * args.thread * turns / - total_cost)) + print("total cost: {}s".format(total_cost)) + print("each thread cost: {}s. ".format(avg_cost)) + print("qps: {}samples/s".format(args.batch_size * args.thread * turns / + total_cost)) if os.getenv("FLAGS_serving_latency"): show_latency(result[1]) diff --git a/python/examples/bert/benchmark.sh b/python/examples/bert/benchmark.sh index 7ee5f32e9e5d89a836f8962a256bcdf7bf0b62e2..96c9cd97d15bf1133210feb0bcabf95fdcde9d37 100644 --- a/python/examples/bert/benchmark.sh +++ b/python/examples/bert/benchmark.sh @@ -4,8 +4,9 @@ export FLAGS_profile_server=1 export FLAGS_profile_client=1 export FLAGS_serving_latency=1 python3 -m paddle_serving_server_gpu.serve --model $1 --port 9292 --thread 4 --gpu_ids 0,1,2,3 --mem_optim False --ir_optim True 2> elog > stdlog & - +hostname=`echo $(hostname)|awk -F '.baidu.com' '{print $1}'` sleep 5 +gpu_id=0 #warm up python3 benchmark.py --thread 8 --batch_size 1 --model $2/serving_client_conf.prototxt --request rpc > profile 2>&1 @@ -14,14 +15,24 @@ for thread_num in 4 8 16 do for batch_size in 1 4 16 64 256 do + job_bt=`date '+%Y%m%d%H%M%S'` + nvidia-smi --id=$gpu_id --query-compute-apps=used_memory --format=csv -lms 100 > gpu_use.log 2>&1 & + gpu_memory_pid=$! python3 benchmark.py --thread $thread_num --batch_size $batch_size --model $2/serving_client_conf.prototxt --request rpc > profile 2>&1 - echo "model name :" $1 - echo "thread num :" $thread_num - echo "batch size :" $batch_size + kill ${gpu_memory_pid} + echo "model_name:" $1 + echo "thread_num:" $thread_num + echo "batch_size:" $batch_size echo "=================Done====================" - echo "model name :$1" >> profile_log_$1 - echo "batch size :$batch_size" >> profile_log_$1 - python3 ../util/show_profile.py profile $thread_num >> profile_log_$1 + echo "model_name:$1" >> profile_log_$1 + echo "batch_size:$batch_size" >> profile_log_$1 + job_et=`date '+%Y%m%d%H%M%S'` + awk 'BEGIN {max = 0} {if(NR>1){if ($1 > max) max=$1}} END {print "MAX_GPU_MEMORY_USE:", max}' gpu_use.log >> profile_log_$1 + monquery -n ${hostname} -i GPU_AVERAGE_UTILIZATION -s $job_bt -e $job_et -d 10 > gpu_log_file_${job_bt} + monquery -n ${hostname} -i CPU_USER -s $job_bt -e $job_et -d 10 > cpu_log_file_${job_bt} + cpu_num=$(cat /proc/cpuinfo | grep processor | wc -l) + gpu_num=$(nvidia-smi -L|wc -l) + python ../util/show_profile.py profile $thread_num >> profile_log_$1 tail -n 8 profile >> profile_log_$1 echo "" >> profile_log_$1 done diff --git a/python/examples/blazeface/README.md b/python/examples/blazeface/README.md new file mode 100644 index 0000000000000000000000000000000000000000..f569841ce4a3ae69b1ff16041f7fb7d4617177f7 --- /dev/null +++ b/python/examples/blazeface/README.md @@ -0,0 +1,23 @@ +# Blazeface + +## Get Model +``` +python -m paddle_serving_app.package --get_model blazeface +tar -xzvf blazeface.tar.gz +``` + +## RPC Service + +### Start Service + +``` +python -m paddle_serving_server.serve --model serving_server --port 9494 +``` + +### Client Prediction + +``` +python test_client.py serving_client/serving_client_conf.prototxt test.jpg +``` + +the result is in `output` folder, including a json file and image file with bounding boxes. diff --git a/python/examples/ocr/test_ocr_rec_client.py b/python/examples/blazeface/test_client.py similarity index 53% rename from python/examples/ocr/test_ocr_rec_client.py rename to python/examples/blazeface/test_client.py index b61256d03202374ada5b0d50a075fef156eca2ea..27eb185ea90ce72641cef44d9066c46945ad2629 100644 --- a/python/examples/ocr/test_ocr_rec_client.py +++ b/python/examples/blazeface/test_client.py @@ -13,19 +13,26 @@ # limitations under the License. from paddle_serving_client import Client -from paddle_serving_app.reader import OCRReader -import cv2 +from paddle_serving_app.reader import * +import sys +import numpy as np +preprocess = Sequential([ + File2Image(), + Normalize([104, 117, 123], [127.502231, 127.502231, 127.502231], False) +]) + +postprocess = BlazeFacePostprocess("label_list.txt", "output") client = Client() -client.load_client_config("ocr_rec_client/serving_client_conf.prototxt") -client.connect(["127.0.0.1:9292"]) -image_file_list = ["./test_rec.jpg"] -img = cv2.imread(image_file_list[0]) -ocr_reader = OCRReader() -feed = {"image": ocr_reader.preprocess([img])} -fetch = ["ctc_greedy_decoder_0.tmp_0", "softmax_0.tmp_0"] -fetch_map = client.predict(feed=feed, fetch=fetch) -rec_res = ocr_reader.postprocess(fetch_map) -print(image_file_list[0]) -print(rec_res[0][0]) +client.load_client_config(sys.argv[1]) +client.connect(['127.0.0.1:9494']) + +im_0 = preprocess(sys.argv[2]) +tmp = Transpose((2, 0, 1)) +im = tmp(im_0) +fetch_map = client.predict( + feed={"image": im}, fetch=["detection_output_0.tmp_0"]) +fetch_map["image"] = sys.argv[2] +fetch_map["im_shape"] = im_0.shape +postprocess(fetch_map) diff --git a/python/examples/criteo_ctr_with_cube/benchmark.py b/python/examples/criteo_ctr_with_cube/benchmark.py index e5bde9f996fccc41027fa6d255ca227cba212e22..a850d244b0a5a1a01e98a6207fa9674b6ea0af1a 100755 --- a/python/examples/criteo_ctr_with_cube/benchmark.py +++ b/python/examples/criteo_ctr_with_cube/benchmark.py @@ -29,6 +29,7 @@ args = benchmark_args() def single_func(idx, resource): client = Client() + print([resource["endpoint"][idx % len(resource["endpoint"])]]) client.load_client_config('ctr_client_conf/serving_client_conf.prototxt') client.connect(['127.0.0.1:9292']) batch = 1 @@ -40,27 +41,29 @@ def single_func(idx, resource): ] reader = dataset.infer_reader(test_filelists[len(test_filelists) - 40:], batch, buf_size) - args.batch_size = 1 if args.request == "rpc": fetch = ["prob"] - print("Start Time") start = time.time() itr = 1000 for ei in range(itr): - if args.batch_size == 1: - data = reader().next() - feed_dict = {} - feed_dict['dense_input'] = data[0][0] - for i in range(1, 27): - feed_dict["embedding_{}.tmp_0".format(i - 1)] = data[0][i] - result = client.predict(feed=feed_dict, fetch=fetch) + if args.batch_size > 0: + feed_batch = [] + for bi in range(args.batch_size): + data = reader().next() + feed_dict = {} + feed_dict['dense_input'] = data[0][0] + for i in range(1, 27): + feed_dict["embedding_{}.tmp_0".format(i - 1)] = data[0][ + i] + feed_batch.append(feed_dict) + result = client.predict(feed=feed_batch, fetch=fetch) else: print("unsupport batch size {}".format(args.batch_size)) elif args.request == "http": raise ("Not support http service.") end = time.time() - qps = itr / (end - start) + qps = itr * args.batch_size / (end - start) return [[end - start, qps]] @@ -70,6 +73,7 @@ if __name__ == '__main__': #result = single_func(0, {"endpoint": endpoint_list}) result = multi_thread_runner.run(single_func, args.thread, {"endpoint": endpoint_list}) + print(result) avg_cost = 0 qps = 0 for i in range(args.thread): diff --git a/python/examples/criteo_ctr_with_cube/benchmark.sh b/python/examples/criteo_ctr_with_cube/benchmark.sh index 4bea258a5cfa4e12ed6848c61270fe44bbc7ba44..35b19b637d9e8dec10fd3b59224c5c17e3ba5f53 100755 --- a/python/examples/criteo_ctr_with_cube/benchmark.sh +++ b/python/examples/criteo_ctr_with_cube/benchmark.sh @@ -1,10 +1,16 @@ rm profile_log -batch_size=1 +export FLAGS_profile_client=1 +export FLAGS_profile_server=1 for thread_num in 1 2 4 8 16 do - $PYTHONROOT/bin/python benchmark.py --thread $thread_num --model ctr_client_conf/serving_client_conf.prototxt --request rpc > profile 2>&1 +for batch_size in 1 4 16 64 256 +do + $PYTHONROOT/bin/python benchmark.py --thread $thread_num --batch_size $batch_size --model serving_client_conf/serving_client_conf.prototxt --request rpc > profile 2>&1 + echo "batch size : $batch_size" + echo "thread num : $thread_num" echo "========================================" echo "batch size : $batch_size" >> profile_log $PYTHONROOT/bin/python ../util/show_profile.py profile $thread_num >> profile_log tail -n 2 profile >> profile_log done +done diff --git a/python/examples/criteo_ctr_with_cube/benchmark_batch.py b/python/examples/criteo_ctr_with_cube/benchmark_batch.py deleted file mode 100755 index df5c6b90badb36fd7e349555973ccbd7ea0a8b70..0000000000000000000000000000000000000000 --- a/python/examples/criteo_ctr_with_cube/benchmark_batch.py +++ /dev/null @@ -1,84 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# pylint: disable=doc-string-missing - -from paddle_serving_client import Client -import sys -import os -import criteo as criteo -import time -from paddle_serving_client.utils import MultiThreadRunner -from paddle_serving_client.utils import benchmark_args -from paddle_serving_client.metric import auc - -args = benchmark_args() - - -def single_func(idx, resource): - client = Client() - print([resource["endpoint"][idx % len(resource["endpoint"])]]) - client.load_client_config('ctr_client_conf/serving_client_conf.prototxt') - client.connect(['127.0.0.1:9292']) - batch = 1 - buf_size = 100 - dataset = criteo.CriteoDataset() - dataset.setup(1000001) - test_filelists = [ - "./raw_data/part-%d" % x for x in range(len(os.listdir("./raw_data"))) - ] - reader = dataset.infer_reader(test_filelists[len(test_filelists) - 40:], - batch, buf_size) - if args.request == "rpc": - fetch = ["prob"] - start = time.time() - itr = 1000 - for ei in range(itr): - if args.batch_size > 1: - feed_batch = [] - for bi in range(args.batch_size): - data = reader().next() - feed_dict = {} - feed_dict['dense_input'] = data[0][0] - for i in range(1, 27): - feed_dict["embedding_{}.tmp_0".format(i - 1)] = data[0][ - i] - feed_batch.append(feed_dict) - result = client.predict(feed=feed_batch, fetch=fetch) - else: - print("unsupport batch size {}".format(args.batch_size)) - - elif args.request == "http": - raise ("Not support http service.") - end = time.time() - qps = itr * args.batch_size / (end - start) - return [[end - start, qps]] - - -if __name__ == '__main__': - multi_thread_runner = MultiThreadRunner() - endpoint_list = ["127.0.0.1:9292"] - #result = single_func(0, {"endpoint": endpoint_list}) - result = multi_thread_runner.run(single_func, args.thread, - {"endpoint": endpoint_list}) - print(result) - avg_cost = 0 - qps = 0 - for i in range(args.thread): - avg_cost += result[0][i * 2 + 0] - qps += result[0][i * 2 + 1] - avg_cost = avg_cost / args.thread - print("average total cost {} s.".format(avg_cost)) - print("qps {} ins/s".format(qps)) diff --git a/python/examples/criteo_ctr_with_cube/benchmark_batch.sh b/python/examples/criteo_ctr_with_cube/benchmark_batch.sh deleted file mode 100755 index 3a51c0de68bf47fb798c165d2fb34868056ddab6..0000000000000000000000000000000000000000 --- a/python/examples/criteo_ctr_with_cube/benchmark_batch.sh +++ /dev/null @@ -1,12 +0,0 @@ -rm profile_log -for thread_num in 1 2 4 8 16 -do -for batch_size in 1 2 4 8 16 32 64 128 256 512 -do - $PYTHONROOT/bin/python benchmark_batch.py --thread $thread_num --batch_size $batch_size --model serving_client_conf/serving_client_conf.prototxt --request rpc > profile 2>&1 - echo "========================================" - echo "batch size : $batch_size" >> profile_log - $PYTHONROOT/bin/python ../util/show_profile.py profile $thread_num >> profile_log - tail -n 2 profile >> profile_log -done -done diff --git a/python/examples/criteo_ctr_with_cube/cube_prepare.sh b/python/examples/criteo_ctr_with_cube/cube_prepare.sh index 1417254a54e2194ab3a0194f2ec970f480787acd..773baba4d91b02b244e766cd8ebf899cc740dbbc 100755 --- a/python/examples/criteo_ctr_with_cube/cube_prepare.sh +++ b/python/examples/criteo_ctr_with_cube/cube_prepare.sh @@ -16,7 +16,5 @@ mkdir -p cube_model mkdir -p cube/data -./seq_generator ctr_serving_model/SparseFeatFactors ./cube_model/feature ./cube/cube-builder -dict_name=test_dict -job_mode=base -last_version=0 -cur_version=0 -depend_version=0 -input_path=./cube_model -output_path=${PWD}/cube/data -shard_num=1 -only_build=false -mv ./cube/data/0_0/test_dict_part0/* ./cube/data/ -cd cube && ./cube +cd cube && ./cube diff --git a/python/examples/criteo_ctr_with_cube/test_client.py b/python/examples/criteo_ctr_with_cube/test_client.py index ca752b763e067b6a73e28c1d2ab9f58b9b98ba5d..8518db55572196e470da014a02797ae9e200c988 100755 --- a/python/examples/criteo_ctr_with_cube/test_client.py +++ b/python/examples/criteo_ctr_with_cube/test_client.py @@ -20,6 +20,8 @@ import criteo as criteo import time from paddle_serving_client.metric import auc +py_version = sys.version_info[0] + client = Client() client.load_client_config(sys.argv[1]) client.connect(["127.0.0.1:9292"]) @@ -34,7 +36,10 @@ label_list = [] prob_list = [] start = time.time() for ei in range(10000): - data = reader().next() + if py_version == 2: + data = reader().next() + else: + data = reader().__next__() feed_dict = {} feed_dict['dense_input'] = data[0][0] for i in range(1, 27): diff --git a/python/examples/imagenet/benchmark.py b/python/examples/imagenet/benchmark.py index 5c4c44cc1bd091af6c4d343d2b7f0f436cca2e7e..f4a7b083300be727ba81e880c41791bf36bfd6f7 100644 --- a/python/examples/imagenet/benchmark.py +++ b/python/examples/imagenet/benchmark.py @@ -25,36 +25,36 @@ import base64 from paddle_serving_client import Client from paddle_serving_client.utils import MultiThreadRunner from paddle_serving_client.utils import benchmark_args -from paddle_serving_app.reader import Sequential, URL2Image, Resize +from paddle_serving_app.reader import Sequential, File2Image, Resize from paddle_serving_app.reader import CenterCrop, RGB2BGR, Transpose, Div, Normalize args = benchmark_args() seq_preprocess = Sequential([ - URL2Image(), Resize(256), CenterCrop(224), RGB2BGR(), Transpose((2, 0, 1)), + File2Image(), Resize(256), CenterCrop(224), RGB2BGR(), Transpose((2, 0, 1)), Div(255), Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225], True) ]) def single_func(idx, resource): file_list = [] + turns = 10 for file_name in os.listdir("./image_data/n01440764"): file_list.append(file_name) img_list = [] for i in range(1000): - img_list.append(open("./image_data/n01440764/" + file_list[i]).read()) + img_list.append("./image_data/n01440764/" + file_list[i]) profile_flags = False if "FLAGS_profile_client" in os.environ and os.environ[ "FLAGS_profile_client"]: profile_flags = True if args.request == "rpc": - reader = ImageReader() fetch = ["score"] client = Client() client.load_client_config(args.model) client.connect([resource["endpoint"][idx % len(resource["endpoint"])]]) start = time.time() - for i in range(1000): + for i in range(turns): if args.batch_size >= 1: feed_batch = [] i_start = time.time() @@ -77,7 +77,7 @@ def single_func(idx, resource): server = "http://" + resource["endpoint"][idx % len(resource[ "endpoint"])] + "/image/prediction" start = time.time() - for i in range(1000): + for i in range(turns): if py_version == 2: image = base64.b64encode( open("./image_data/n01440764/" + file_list[i]).read()) @@ -93,8 +93,9 @@ def single_func(idx, resource): if __name__ == '__main__': multi_thread_runner = MultiThreadRunner() - endpoint_list = ["127.0.0.1:9393"] - #endpoint_list = endpoint_list + endpoint_list + endpoint_list + endpoint_list = [ + "127.0.0.1:9292", "127.0.0.1:9293", "127.0.0.1:9294", "127.0.0.1:9295" + ] result = multi_thread_runner.run(single_func, args.thread, {"endpoint": endpoint_list}) #result = single_func(0, {"endpoint": endpoint_list}) diff --git a/python/examples/imagenet/benchmark.sh b/python/examples/imagenet/benchmark.sh index 84885908fa89d050b3ca71386fe2a21533ce0809..d7eb89fa9b0b68e5e442d15bdf16f431c91ba94d 100644 --- a/python/examples/imagenet/benchmark.sh +++ b/python/examples/imagenet/benchmark.sh @@ -11,7 +11,7 @@ $PYTHONROOT/bin/python benchmark.py --thread 8 --batch_size 1 --model $2/serving for thread_num in 4 8 16 do -for batch_size in 1 4 16 64 256 +for batch_size in 1 4 16 64 do $PYTHONROOT/bin/python benchmark.py --thread $thread_num --batch_size $batch_size --model $2/serving_client_conf.prototxt --request rpc > profile 2>&1 echo "model name :" $1 diff --git a/python/examples/imdb/benchmark.py b/python/examples/imdb/benchmark.py index 632d336ebf20363e257e6e60f08d773cea659a74..d226efbfbc5317db81039bc6a778498cdf853854 100644 --- a/python/examples/imdb/benchmark.py +++ b/python/examples/imdb/benchmark.py @@ -13,13 +13,14 @@ # limitations under the License. # pylint: disable=doc-string-missing +import os import sys import time import requests from paddle_serving_app.reader import IMDBDataset from paddle_serving_client import Client from paddle_serving_client.utils import MultiThreadRunner -from paddle_serving_client.utils import benchmark_args +from paddle_serving_client.utils import MultiThreadRunner, benchmark_args, show_latency args = benchmark_args() @@ -31,6 +32,13 @@ def single_func(idx, resource): with open("./test_data/part-0") as fin: for line in fin: dataset.append(line.strip()) + profile_flags = False + latency_flags = False + if os.getenv("FLAGS_profile_client"): + profile_flags = True + if os.getenv("FLAGS_serving_latency"): + latency_flags = True + latency_list = [] start = time.time() if args.request == "rpc": client = Client() @@ -67,9 +75,26 @@ def single_func(idx, resource): return [[end - start]] -multi_thread_runner = MultiThreadRunner() -result = multi_thread_runner.run(single_func, args.thread, {}) -avg_cost = 0 -for cost in result[0]: - avg_cost += cost -print("total cost {} s of each thread".format(avg_cost / args.thread)) +if __name__ == '__main__': + multi_thread_runner = MultiThreadRunner() + endpoint_list = [ + "127.0.0.1:9292", "127.0.0.1:9293", "127.0.0.1:9294", "127.0.0.1:9295" + ] + turns = 100 + start = time.time() + result = multi_thread_runner.run( + single_func, args.thread, {"endpoint": endpoint_list, + "turns": turns}) + end = time.time() + total_cost = end - start + avg_cost = 0 + for i in range(args.thread): + avg_cost += result[0][i] + avg_cost = avg_cost / args.thread + + print("total cost: {}".format(total_cost)) + print("each thread cost: {}".format(avg_cost)) + print("qps: {}samples/s".format(args.batch_size * args.thread * turns / + total_cost)) + if os.getenv("FLAGS_serving_latency"): + show_latency(result[0]) diff --git a/python/examples/imdb/benchmark.sh b/python/examples/imdb/benchmark.sh index 93dbf830c84bd38f72dd0d8a32139ad6098dc6f8..2b2d91c8192e66f6b3eee19b59fc7f5dc9339aa6 100644 --- a/python/examples/imdb/benchmark.sh +++ b/python/examples/imdb/benchmark.sh @@ -1,12 +1,35 @@ rm profile_log -for thread_num in 1 2 4 8 16 +export CUDA_VISIBLE_DEVICES=0,1,2,3 +export FLAGS_profile_server=1 +export FLAGS_profile_client=1 +export FLAGS_serving_latency=1 +python -m paddle_serving_server_gpu.serve --model $1 --port 9292 --thread 4 --gpu_ids 0,1,2,3 --mem_optim --ir_optim 2> elog > stdlog & +hostname=`echo $(hostname)|awk -F '.baidu.com' '{print $1}'` + +sleep 5 + +for thread_num in 4 8 16 do -for batch_size in 1 2 4 8 16 32 64 128 256 512 +for batch_size in 1 4 16 64 256 do - $PYTHONROOT/bin/python benchmark.py --thread $thread_num --batch_size $batch_size --model imdb_bow_client_conf/serving_client_conf.prototxt --request rpc > profile 2>&1 - echo "========================================" - echo "batch size : $batch_size" >> profile_log - $PYTHONROOT/bin/python ../util/show_profile.py profile $thread_num >> profile_log - tail -n 1 profile >> profile_log + job_bt=`date '+%Y%m%d%H%M%S'` + python benchmark.py --thread $thread_num --batch_size $batch_size --model $2/serving_client_conf.prototxt --request rpc > profile 2>&1 + echo "model_name:" $1 + echo "thread_num:" $thread_num + echo "batch_size:" $batch_size + echo "=================Done====================" + echo "model_name:$1" >> profile_log_$1 + echo "batch_size:$batch_size" >> profile_log_$1 + job_et=`date '+%Y%m%d%H%M%S'` + awk 'BEGIN {max = 0} {if(NR>1){if ($1 > max) max=$1}} END {print "MAX_GPU_MEMORY_USE:", max}' gpu_use.log >> profile_log_$1 + monquery -n ${hostname} -i GPU_AVERAGE_UTILIZATION -s $job_bt -e $job_et -d 10 > gpu_log_file_${job_bt} + monquery -n ${hostname} -i CPU_USER -s $job_bt -e $job_et -d 10 > cpu_log_file_${job_bt} + cpu_num=$(cat /proc/cpuinfo | grep processor | wc -l) + gpu_num=$(nvidia-smi -L|wc -l) + python ../util/show_profile.py profile $thread_num >> profile_log_$1 + tail -n 8 profile >> profile_log_$1 + echo "" >> profile_log_$1 done done + +ps -ef|grep 'serving'|grep -v grep|cut -c 9-15 | xargs kill -9 diff --git a/python/examples/ocr/README.md b/python/examples/ocr/README.md index 04c4fd3eaa304e55d980a2cf4fc34dda50f5009c..3535ed80eb27291aa4da4bb2683923c9e4082acf 100644 --- a/python/examples/ocr/README.md +++ b/python/examples/ocr/README.md @@ -4,18 +4,42 @@ ``` python -m paddle_serving_app.package --get_model ocr_rec tar -xzvf ocr_rec.tar.gz +python -m paddle_serving_app.package --get_model ocr_det +tar -xzvf ocr_det.tar.gz ``` ## RPC Service ### Start Service +For the following two code block, please check your devices and pick one +for GPU device +``` +python -m paddle_serving_server_gpu.serve --model ocr_rec_model --port 9292 --gpu_id 0 +python -m paddle_serving_server_gpu.serve --model ocr_det_model --port 9293 --gpu_id 0 +``` +for CPU device ``` python -m paddle_serving_server.serve --model ocr_rec_model --port 9292 +python -m paddle_serving_server.serve --model ocr_det_model --port 9293 ``` ### Client Prediction ``` -python test_ocr_rec_client.py +python ocr_rpc_client.py +``` + +## Web Service + +### Start Service + +``` +python -m paddle_serving_server_gpu.serve --model ocr_det_model --port 9293 --gpu_id 0 +python ocr_web_server.py +``` + +### Client Prediction +``` +sh ocr_web_client.sh ``` diff --git a/python/examples/ocr/ocr_rpc_client.py b/python/examples/ocr/ocr_rpc_client.py new file mode 100644 index 0000000000000000000000000000000000000000..212d46c2b226f91bcb0582e76e31ca2acdc8b948 --- /dev/null +++ b/python/examples/ocr/ocr_rpc_client.py @@ -0,0 +1,193 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from paddle_serving_client import Client +from paddle_serving_app.reader import OCRReader +import cv2 +import sys +import numpy as np +import os +from paddle_serving_client import Client +from paddle_serving_app.reader import Sequential, File2Image, ResizeByFactor +from paddle_serving_app.reader import Div, Normalize, Transpose +from paddle_serving_app.reader import DBPostProcess, FilterBoxes +import time +import re + + +def sorted_boxes(dt_boxes): + """ + Sort text boxes in order from top to bottom, left to right + args: + dt_boxes(array):detected text boxes with shape [4, 2] + return: + sorted boxes(array) with shape [4, 2] + """ + num_boxes = dt_boxes.shape[0] + sorted_boxes = sorted(dt_boxes, key=lambda x: (x[0][1], x[0][0])) + _boxes = list(sorted_boxes) + + for i in range(num_boxes - 1): + if abs(_boxes[i+1][0][1] - _boxes[i][0][1]) < 10 and \ + (_boxes[i + 1][0][0] < _boxes[i][0][0]): + tmp = _boxes[i] + _boxes[i] = _boxes[i + 1] + _boxes[i + 1] = tmp + return _boxes + + +def get_rotate_crop_image(img, points): + #img = cv2.imread(img) + img_height, img_width = img.shape[0:2] + left = int(np.min(points[:, 0])) + right = int(np.max(points[:, 0])) + top = int(np.min(points[:, 1])) + bottom = int(np.max(points[:, 1])) + img_crop = img[top:bottom, left:right, :].copy() + points[:, 0] = points[:, 0] - left + points[:, 1] = points[:, 1] - top + img_crop_width = int(np.linalg.norm(points[0] - points[1])) + img_crop_height = int(np.linalg.norm(points[0] - points[3])) + pts_std = np.float32([[0, 0], [img_crop_width, 0], \ + [img_crop_width, img_crop_height], [0, img_crop_height]]) + M = cv2.getPerspectiveTransform(points, pts_std) + dst_img = cv2.warpPerspective( + img_crop, + M, (img_crop_width, img_crop_height), + borderMode=cv2.BORDER_REPLICATE) + dst_img_height, dst_img_width = dst_img.shape[0:2] + if dst_img_height * 1.0 / dst_img_width >= 1.5: + dst_img = np.rot90(dst_img) + return dst_img + + +def read_det_box_file(filename): + with open(filename, 'r') as f: + line = f.readline() + a, b, c = int(line.split(' ')[0]), int(line.split(' ')[1]), int( + line.split(' ')[2]) + dt_boxes = np.zeros((a, b, c)).astype(np.float32) + line = f.readline() + for i in range(a): + for j in range(b): + line = f.readline() + dt_boxes[i, j, 0], dt_boxes[i, j, 1] = float( + line.split(' ')[0]), float(line.split(' ')[1]) + line = f.readline() + + +def resize_norm_img(img, max_wh_ratio): + import math + imgC, imgH, imgW = 3, 32, 320 + imgW = int(32 * max_wh_ratio) + h = img.shape[0] + w = img.shape[1] + ratio = w / float(h) + if math.ceil(imgH * ratio) > imgW: + resized_w = imgW + else: + resized_w = int(math.ceil(imgH * ratio)) + resized_image = cv2.resize(img, (resized_w, imgH)) + resized_image = resized_image.astype('float32') + resized_image = resized_image.transpose((2, 0, 1)) / 255 + resized_image -= 0.5 + resized_image /= 0.5 + padding_im = np.zeros((imgC, imgH, imgW), dtype=np.float32) + padding_im[:, :, 0:resized_w] = resized_image + return padding_im + + +def main(): + client1 = Client() + client1.load_client_config("ocr_det_client/serving_client_conf.prototxt") + client1.connect(["127.0.0.1:9293"]) + + client2 = Client() + client2.load_client_config("ocr_rec_client/serving_client_conf.prototxt") + client2.connect(["127.0.0.1:9292"]) + + read_image_file = File2Image() + preprocess = Sequential([ + ResizeByFactor(32, 960), Div(255), + Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), Transpose( + (2, 0, 1)) + ]) + post_func = DBPostProcess({ + "thresh": 0.3, + "box_thresh": 0.5, + "max_candidates": 1000, + "unclip_ratio": 1.5, + "min_size": 3 + }) + + filter_func = FilterBoxes(10, 10) + ocr_reader = OCRReader() + files = [ + "./imgs/{}".format(f) for f in os.listdir('./imgs') + if re.match(r'[0-9]+.*\.jpg|[0-9]+.*\.png', f) + ] + #files = ["2.jpg"]*30 + #files = ["rctw/rctw/train/images/image_{}.jpg".format(i) for i in range(500)] + time_all = 0 + time_det_all = 0 + time_rec_all = 0 + for name in files: + #print(name) + im = read_image_file(name) + ori_h, ori_w, _ = im.shape + time1 = time.time() + img = preprocess(im) + _, new_h, new_w = img.shape + ratio_list = [float(new_h) / ori_h, float(new_w) / ori_w] + #print(new_h, new_w, ori_h, ori_w) + time_before_det = time.time() + outputs = client1.predict(feed={"image": img}, fetch=["concat_1.tmp_0"]) + time_after_det = time.time() + time_det_all += (time_after_det - time_before_det) + #print(outputs) + dt_boxes_list = post_func(outputs["concat_1.tmp_0"], [ratio_list]) + dt_boxes = filter_func(dt_boxes_list[0], [ori_h, ori_w]) + dt_boxes = sorted_boxes(dt_boxes) + feed_list = [] + img_list = [] + max_wh_ratio = 0 + for i, dtbox in enumerate(dt_boxes): + boximg = get_rotate_crop_image(im, dt_boxes[i]) + img_list.append(boximg) + h, w = boximg.shape[0:2] + wh_ratio = w * 1.0 / h + max_wh_ratio = max(max_wh_ratio, wh_ratio) + for img in img_list: + norm_img = resize_norm_img(img, max_wh_ratio) + #norm_img = norm_img[np.newaxis, :] + feed = {"image": norm_img} + feed_list.append(feed) + #fetch = ["ctc_greedy_decoder_0.tmp_0", "softmax_0.tmp_0"] + fetch = ["ctc_greedy_decoder_0.tmp_0"] + time_before_rec = time.time() + if len(feed_list) == 0: + continue + fetch_map = client2.predict(feed=feed_list, fetch=fetch) + time_after_rec = time.time() + time_rec_all += (time_after_rec - time_before_rec) + rec_res = ocr_reader.postprocess(fetch_map) + #for res in rec_res: + # print(res[0].encode("utf-8")) + time2 = time.time() + time_all += (time2 - time1) + print("rpc+det time: {}".format(time_all / len(files))) + + +if __name__ == '__main__': + main() diff --git a/python/examples/ocr/ocr_web_client.sh b/python/examples/ocr/ocr_web_client.sh new file mode 100644 index 0000000000000000000000000000000000000000..5f4f1d7d1fb00dc63b3235533850f56f998a647f --- /dev/null +++ b/python/examples/ocr/ocr_web_client.sh @@ -0,0 +1 @@ + curl -H "Content-Type:application/json" -X POST -d '{"feed":[{"image": "https://paddle-serving.bj.bcebos.com/others/1.jpg"}], "fetch": ["res"]}' http://127.0.0.1:9292/ocr/prediction diff --git a/python/examples/ocr/ocr_web_server.py b/python/examples/ocr/ocr_web_server.py new file mode 100644 index 0000000000000000000000000000000000000000..b55027d84252f8590f1e62839ad8cbd25e56c8fe --- /dev/null +++ b/python/examples/ocr/ocr_web_server.py @@ -0,0 +1,158 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from paddle_serving_client import Client +from paddle_serving_app.reader import OCRReader +import cv2 +import sys +import numpy as np +import os +from paddle_serving_client import Client +from paddle_serving_app.reader import Sequential, URL2Image, ResizeByFactor +from paddle_serving_app.reader import Div, Normalize, Transpose +from paddle_serving_app.reader import DBPostProcess, FilterBoxes +from paddle_serving_server_gpu.web_service import WebService +import time +import re + + +class OCRService(WebService): + def init_det_client(self, det_port, det_client_config): + self.det_preprocess = Sequential([ + ResizeByFactor(32, 960), Div(255), + Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), Transpose( + (2, 0, 1)) + ]) + self.det_client = Client() + self.det_client.load_client_config(det_client_config) + self.det_client.connect(["127.0.0.1:{}".format(det_port)]) + + def preprocess(self, feed=[], fetch=[]): + img_url = feed[0]["image"] + #print(feed, img_url) + read_from_url = URL2Image() + im = read_from_url(img_url) + ori_h, ori_w, _ = im.shape + det_img = self.det_preprocess(im) + #print("det_img", det_img, det_img.shape) + det_out = self.det_client.predict( + feed={"image": det_img}, fetch=["concat_1.tmp_0"]) + + #print("det_out", det_out) + def sorted_boxes(dt_boxes): + num_boxes = dt_boxes.shape[0] + sorted_boxes = sorted(dt_boxes, key=lambda x: (x[0][1], x[0][0])) + _boxes = list(sorted_boxes) + for i in range(num_boxes - 1): + if abs(_boxes[i+1][0][1] - _boxes[i][0][1]) < 10 and \ + (_boxes[i + 1][0][0] < _boxes[i][0][0]): + tmp = _boxes[i] + _boxes[i] = _boxes[i + 1] + _boxes[i + 1] = tmp + return _boxes + + def get_rotate_crop_image(img, points): + img_height, img_width = img.shape[0:2] + left = int(np.min(points[:, 0])) + right = int(np.max(points[:, 0])) + top = int(np.min(points[:, 1])) + bottom = int(np.max(points[:, 1])) + img_crop = img[top:bottom, left:right, :].copy() + points[:, 0] = points[:, 0] - left + points[:, 1] = points[:, 1] - top + img_crop_width = int(np.linalg.norm(points[0] - points[1])) + img_crop_height = int(np.linalg.norm(points[0] - points[3])) + pts_std = np.float32([[0, 0], [img_crop_width, 0], \ + [img_crop_width, img_crop_height], [0, img_crop_height]]) + M = cv2.getPerspectiveTransform(points, pts_std) + dst_img = cv2.warpPerspective( + img_crop, + M, (img_crop_width, img_crop_height), + borderMode=cv2.BORDER_REPLICATE) + dst_img_height, dst_img_width = dst_img.shape[0:2] + if dst_img_height * 1.0 / dst_img_width >= 1.5: + dst_img = np.rot90(dst_img) + return dst_img + + def resize_norm_img(img, max_wh_ratio): + import math + imgC, imgH, imgW = 3, 32, 320 + imgW = int(32 * max_wh_ratio) + h = img.shape[0] + w = img.shape[1] + ratio = w / float(h) + if math.ceil(imgH * ratio) > imgW: + resized_w = imgW + else: + resized_w = int(math.ceil(imgH * ratio)) + resized_image = cv2.resize(img, (resized_w, imgH)) + resized_image = resized_image.astype('float32') + resized_image = resized_image.transpose((2, 0, 1)) / 255 + resized_image -= 0.5 + resized_image /= 0.5 + padding_im = np.zeros((imgC, imgH, imgW), dtype=np.float32) + padding_im[:, :, 0:resized_w] = resized_image + return padding_im + + _, new_h, new_w = det_img.shape + filter_func = FilterBoxes(10, 10) + post_func = DBPostProcess({ + "thresh": 0.3, + "box_thresh": 0.5, + "max_candidates": 1000, + "unclip_ratio": 1.5, + "min_size": 3 + }) + ratio_list = [float(new_h) / ori_h, float(new_w) / ori_w] + dt_boxes_list = post_func(det_out["concat_1.tmp_0"], [ratio_list]) + dt_boxes = filter_func(dt_boxes_list[0], [ori_h, ori_w]) + dt_boxes = sorted_boxes(dt_boxes) + feed_list = [] + img_list = [] + max_wh_ratio = 0 + for i, dtbox in enumerate(dt_boxes): + boximg = get_rotate_crop_image(im, dt_boxes[i]) + img_list.append(boximg) + h, w = boximg.shape[0:2] + wh_ratio = w * 1.0 / h + max_wh_ratio = max(max_wh_ratio, wh_ratio) + for img in img_list: + norm_img = resize_norm_img(img, max_wh_ratio) + feed = {"image": norm_img} + feed_list.append(feed) + fetch = ["ctc_greedy_decoder_0.tmp_0"] + #print("feed_list", feed_list) + return feed_list, fetch + + def postprocess(self, feed={}, fetch=[], fetch_map=None): + #print(fetch_map) + ocr_reader = OCRReader() + rec_res = ocr_reader.postprocess(fetch_map) + res_lst = [] + for res in rec_res: + res_lst.append(res[0]) + fetch_map["res"] = res_lst + del fetch_map["ctc_greedy_decoder_0.tmp_0"] + del fetch_map["ctc_greedy_decoder_0.tmp_0.lod"] + return fetch_map + + +ocr_service = OCRService(name="ocr") +ocr_service.load_model_config("ocr_rec_model") +ocr_service.prepare_server(workdir="workdir", port=9292) +ocr_service.init_det_client( + det_port=9293, + det_client_config="ocr_det_client/serving_client_conf.prototxt") +ocr_service.run_rpc_service() +ocr_service.run_web_service() diff --git a/python/examples/ocr/test_rec.jpg b/python/examples/ocr/test_rec.jpg deleted file mode 100644 index 2c34cd33eac5766a072fde041fa6c9b1d612f1db..0000000000000000000000000000000000000000 Binary files a/python/examples/ocr/test_rec.jpg and /dev/null differ diff --git a/python/examples/util/show_profile.py b/python/examples/util/show_profile.py index 1581dda19bb0abefe6eb21592bda7fc97d8fb7cd..3815ad9ec943329a29767ca8f4217943f0d84e4b 100644 --- a/python/examples/util/show_profile.py +++ b/python/examples/util/show_profile.py @@ -31,7 +31,7 @@ with open(profile_file) as f: if line[0] == "PROFILE": prase(line[2]) -print("thread num :{}".format(thread_num)) +print("thread_num: {}".format(thread_num)) for name in time_dict: - print("{} cost :{} s in each thread ".format(name, time_dict[name] / ( + print("{} cost: {}s in each thread ".format(name, time_dict[name] / ( 1000000.0 * float(thread_num)))) diff --git a/python/paddle_serving_app/models/model_list.py b/python/paddle_serving_app/models/model_list.py index 79b3f91bd6584d17ddbc4124584cf40bd586b965..3b0c3cb9c4927df7ba55830657318073b1a3a7cc 100644 --- a/python/paddle_serving_app/models/model_list.py +++ b/python/paddle_serving_app/models/model_list.py @@ -24,14 +24,15 @@ class ServingModels(object): "SentimentAnalysis"] = ["senta_bilstm", "senta_bow", "senta_cnn"] self.model_dict["SemanticRepresentation"] = ["ernie"] self.model_dict["ChineseWordSegmentation"] = ["lac"] - self.model_dict["ObjectDetection"] = ["faster_rcnn", "yolov4"] + self.model_dict[ + "ObjectDetection"] = ["faster_rcnn", "yolov4", "blazeface"] self.model_dict["ImageSegmentation"] = [ "unet", "deeplabv3", "deeplabv3+cityscapes" ] self.model_dict["ImageClassification"] = [ "resnet_v2_50_imagenet", "mobilenet_v2_imagenet" ] - self.model_dict["TextDetection"] = ["ocr_detection"] + self.model_dict["TextDetection"] = ["ocr_det"] self.model_dict["OCR"] = ["ocr_rec"] image_class_url = "https://paddle-serving.bj.bcebos.com/paddle_hub_models/image/ImageClassification/" diff --git a/python/paddle_serving_app/reader/functional.py b/python/paddle_serving_app/reader/functional.py index 4240641dd99fceb278ff60a5ba1dbb5275e534aa..7bab279c7f1aa71a2d55a8cb7b12bcb38607eb70 100644 --- a/python/paddle_serving_app/reader/functional.py +++ b/python/paddle_serving_app/reader/functional.py @@ -29,6 +29,7 @@ def normalize(img, mean, std, channel_first): else: img_mean = np.array(mean).reshape((1, 1, 3)) img_std = np.array(std).reshape((1, 1, 3)) + img = np.array(img).astype("float32") img -= img_mean img /= img_std return img diff --git a/python/paddle_serving_app/reader/image_reader.py b/python/paddle_serving_app/reader/image_reader.py index a44ca5de84da2bafce9b4cea37fb88095debabc6..096f46549af137cb04a87e26a3b28c8d42e33daa 100644 --- a/python/paddle_serving_app/reader/image_reader.py +++ b/python/paddle_serving_app/reader/image_reader.py @@ -440,6 +440,30 @@ class RCNNPostprocess(object): self.label_file, self.output_dir) +class BlazeFacePostprocess(RCNNPostprocess): + def clip_bbox(self, bbox, im_size=None): + h = 1. if im_size is None else im_size[0] + w = 1. if im_size is None else im_size[1] + xmin = max(min(bbox[0], w), 0.) + ymin = max(min(bbox[1], h), 0.) + xmax = max(min(bbox[2], w), 0.) + ymax = max(min(bbox[3], h), 0.) + return xmin, ymin, xmax, ymax + + def _get_bbox_result(self, fetch_map, fetch_name, clsid2catid): + result = {} + is_bbox_normalized = True #for blaze face, set true here + output = fetch_map[fetch_name] + lod = [fetch_map[fetch_name + '.lod']] + lengths = self._offset_to_lengths(lod) + np_data = np.array(output) + result['bbox'] = (np_data, lengths) + result['im_id'] = np.array([[0]]) + result["im_shape"] = np.array(fetch_map["im_shape"]).astype(np.int32) + bbox_results = self._bbox2out([result], clsid2catid, is_bbox_normalized) + return bbox_results + + class Sequential(object): """ Args: diff --git a/python/paddle_serving_app/reader/ocr_reader.py b/python/paddle_serving_app/reader/ocr_reader.py index e5dc88482bd5e0a7a26873fd5cb60c43dc5104c9..72a2918f89a8ccc913894f3f46fab08f51cf9460 100644 --- a/python/paddle_serving_app/reader/ocr_reader.py +++ b/python/paddle_serving_app/reader/ocr_reader.py @@ -182,22 +182,26 @@ class OCRReader(object): return norm_img_batch[0] - def postprocess(self, outputs): + def postprocess(self, outputs, with_score=False): rec_res = [] rec_idx_lod = outputs["ctc_greedy_decoder_0.tmp_0.lod"] - predict_lod = outputs["softmax_0.tmp_0.lod"] rec_idx_batch = outputs["ctc_greedy_decoder_0.tmp_0"] + if with_score: + predict_lod = outputs["softmax_0.tmp_0.lod"] for rno in range(len(rec_idx_lod) - 1): beg = rec_idx_lod[rno] end = rec_idx_lod[rno + 1] rec_idx_tmp = rec_idx_batch[beg:end, 0] preds_text = self.char_ops.decode(rec_idx_tmp) - beg = predict_lod[rno] - end = predict_lod[rno + 1] - probs = outputs["softmax_0.tmp_0"][beg:end, :] - ind = np.argmax(probs, axis=1) - blank = probs.shape[1] - valid_ind = np.where(ind != (blank - 1))[0] - score = np.mean(probs[valid_ind, ind[valid_ind]]) - rec_res.append([preds_text, score]) + if with_score: + beg = predict_lod[rno] + end = predict_lod[rno + 1] + probs = outputs["softmax_0.tmp_0"][beg:end, :] + ind = np.argmax(probs, axis=1) + blank = probs.shape[1] + valid_ind = np.where(ind != (blank - 1))[0] + score = np.mean(probs[valid_ind, ind[valid_ind]]) + rec_res.append([preds_text, score]) + else: + rec_res.append([preds_text]) return rec_res diff --git a/python/paddle_serving_client/utils/__init__.py b/python/paddle_serving_client/utils/__init__.py index 53f40726fbf21a0607b47bb29a20aa6ff50b6221..8af434cc7d08ca14aef7df2329e8656da930c0ce 100644 --- a/python/paddle_serving_client/utils/__init__.py +++ b/python/paddle_serving_client/utils/__init__.py @@ -39,11 +39,11 @@ def benchmark_args(): def show_latency(latency_list): latency_array = np.array(latency_list) info = "latency:\n" - info += "mean :{} ms\n".format(np.mean(latency_array)) - info += "median :{} ms\n".format(np.median(latency_array)) - info += "80 percent :{} ms\n".format(np.percentile(latency_array, 80)) - info += "90 percent :{} ms\n".format(np.percentile(latency_array, 90)) - info += "99 percent :{} ms\n".format(np.percentile(latency_array, 99)) + info += "mean: {}ms\n".format(np.mean(latency_array)) + info += "median: {}ms\n".format(np.median(latency_array)) + info += "80 percent: {}ms\n".format(np.percentile(latency_array, 80)) + info += "90 percent: {}ms\n".format(np.percentile(latency_array, 90)) + info += "99 percent: {}ms\n".format(np.percentile(latency_array, 99)) sys.stderr.write(info) diff --git a/python/pipeline/channel.py b/python/pipeline/channel.py index ce55b187e66ae02916d04a57732391de01f4ece5..953f378ae306bc8174c30224b85763b44ee2c811 100644 --- a/python/pipeline/channel.py +++ b/python/pipeline/channel.py @@ -27,7 +27,7 @@ import logging import enum import copy -_LOGGER = logging.getLogger(__name__) +_LOGGER = logging.getLogger() class ChannelDataEcode(enum.Enum): @@ -92,7 +92,16 @@ class ChannelData(object): def check_dictdata(dictdata): ecode = ChannelDataEcode.OK.value error_info = None - if not isinstance(dictdata, dict): + if isinstance(dictdata, list): + # batch data + for sample in dictdata: + if not isinstance(sample, dict): + ecode = ChannelDataEcode.TYPE_ERROR.value + error_info = "the value of data must " \ + "be dict, but get {}.".format(type(sample)) + break + elif not isinstance(dictdata, dict): + # batch size = 1 ecode = ChannelDataEcode.TYPE_ERROR.value error_info = "the value of data must " \ "be dict, but get {}.".format(type(dictdata)) @@ -102,12 +111,32 @@ class ChannelData(object): def check_npdata(npdata): ecode = ChannelDataEcode.OK.value error_info = None - for _, value in npdata.items(): - if not isinstance(value, np.ndarray): - ecode = ChannelDataEcode.TYPE_ERROR.value - error_info = "the value of data must " \ - "be np.ndarray, but get {}.".format(type(value)) - break + if isinstance(npdata, list): + # batch data + for sample in npdata: + if not isinstance(sample, dict): + ecode = ChannelDataEcode.TYPE_ERROR.value + error_info = "the value of data must " \ + "be dict, but get {}.".format(type(sample)) + break + for _, value in sample.items(): + if not isinstance(value, np.ndarray): + ecode = ChannelDataEcode.TYPE_ERROR.value + error_info = "the value of data must " \ + "be np.ndarray, but get {}.".format(type(value)) + return ecode, error_info + elif isinstance(npdata, dict): + # batch_size = 1 + for _, value in npdata.items(): + if not isinstance(value, np.ndarray): + ecode = ChannelDataEcode.TYPE_ERROR.value + error_info = "the value of data must " \ + "be np.ndarray, but get {}.".format(type(value)) + break + else: + ecode = ChannelDataEcode.TYPE_ERROR.value + error_info = "the value of data must " \ + "be dict, but get {}.".format(type(npdata)) return ecode, error_info def parse(self): diff --git a/python/pipeline/operator.py b/python/pipeline/operator.py index d82cac888298f83a1c8412f742adbf7de3932471..d2323f265c7fac65bc97d9b8d9a3dea8afe4cf2e 100644 --- a/python/pipeline/operator.py +++ b/python/pipeline/operator.py @@ -19,13 +19,14 @@ from paddle_serving_client import MultiLangClient, Client from concurrent import futures import logging import func_timeout +import os from numpy import * from .proto import pipeline_service_pb2 from .channel import ThreadChannel, ProcessChannel, ChannelDataEcode, ChannelData, ChannelDataType from .util import NameGenerator -_LOGGER = logging.getLogger(__name__) +_LOGGER = logging.getLogger() _op_name_gen = NameGenerator("Op") @@ -59,6 +60,10 @@ class Op(object): self._outputs = [] self._profiler = None + # only for multithread + self._for_init_op_lock = threading.Lock() + self._succ_init_op = False + def init_profiler(self, profiler): self._profiler = profiler @@ -71,18 +76,19 @@ class Op(object): fetch_names): if self.with_serving == False: _LOGGER.debug("{} no client".format(self.name)) - return + return None _LOGGER.debug("{} client_config: {}".format(self.name, client_config)) _LOGGER.debug("{} fetch_names: {}".format(self.name, fetch_names)) if client_type == 'brpc': - self._client = Client() - self._client.load_client_config(client_config) + client = Client() + client.load_client_config(client_config) elif client_type == 'grpc': - self._client = MultiLangClient() + client = MultiLangClient() else: raise ValueError("unknow client type: {}".format(client_type)) - self._client.connect(server_endpoints) + client.connect(server_endpoints) self._fetch_names = fetch_names + return client def _get_input_channel(self): return self._input @@ -130,19 +136,17 @@ class Op(object): (_, input_dict), = input_dicts.items() return input_dict - def process(self, feed_dict): + def process(self, client_predict_handler, feed_dict): err, err_info = ChannelData.check_npdata(feed_dict) if err != 0: raise NotImplementedError( "{} Please override preprocess func.".format(err_info)) - _LOGGER.debug(self._log('feed_dict: {}'.format(feed_dict))) - _LOGGER.debug(self._log('fetch: {}'.format(self._fetch_names))) - call_result = self._client.predict( + call_result = client_predict_handler( feed=feed_dict, fetch=self._fetch_names) _LOGGER.debug(self._log("get call_result")) return call_result - def postprocess(self, fetch_dict): + def postprocess(self, input_dict, fetch_dict): return fetch_dict def stop(self): @@ -174,7 +178,7 @@ class Op(object): p = multiprocessing.Process( target=self._run, args=(concurrency_idx, self._get_input_channel(), - self._get_output_channels(), client_type)) + self._get_output_channels(), client_type, False)) p.start() proces.append(p) return proces @@ -185,12 +189,12 @@ class Op(object): t = threading.Thread( target=self._run, args=(concurrency_idx, self._get_input_channel(), - self._get_output_channels(), client_type)) + self._get_output_channels(), client_type, True)) t.start() threads.append(t) return threads - def load_user_resources(self): + def init_op(self): pass def _run_preprocess(self, parsed_data, data_id, log_func): @@ -222,13 +226,15 @@ class Op(object): data_id=data_id) return preped_data, error_channeldata - def _run_process(self, preped_data, data_id, log_func): + def _run_process(self, client_predict_handler, preped_data, data_id, + log_func): midped_data, error_channeldata = None, None if self.with_serving: ecode = ChannelDataEcode.OK.value if self._timeout <= 0: try: - midped_data = self.process(preped_data) + midped_data = self.process(client_predict_handler, + preped_data) except Exception as e: ecode = ChannelDataEcode.UNKNOW.value error_info = log_func(e) @@ -237,7 +243,11 @@ class Op(object): for i in range(self._retry): try: midped_data = func_timeout.func_timeout( - self._timeout, self.process, args=(preped_data, )) + self._timeout, + self.process, + args=( + client_predict_handler, + preped_data, )) except func_timeout.FunctionTimedOut as e: if i + 1 >= self._retry: ecode = ChannelDataEcode.TIMEOUT.value @@ -267,10 +277,10 @@ class Op(object): midped_data = preped_data return midped_data, error_channeldata - def _run_postprocess(self, midped_data, data_id, log_func): + def _run_postprocess(self, input_dict, midped_data, data_id, log_func): output_data, error_channeldata = None, None try: - postped_data = self.postprocess(midped_data) + postped_data = self.postprocess(input_dict, midped_data) except Exception as e: error_info = log_func(e) _LOGGER.error(error_info) @@ -303,8 +313,8 @@ class Op(object): data_id=data_id) return output_data, error_channeldata - def _run(self, concurrency_idx, input_channel, output_channels, - client_type): + def _run(self, concurrency_idx, input_channel, output_channels, client_type, + use_multithread): def get_log_func(op_info_prefix): def log_func(info_str): return "{} {}".format(op_info_prefix, info_str) @@ -315,12 +325,30 @@ class Op(object): log = get_log_func(op_info_prefix) tid = threading.current_thread().ident + client = None + client_predict_handler = None # create client based on client_type - self.init_client(client_type, self._client_config, - self._server_endpoints, self._fetch_names) + try: + client = self.init_client(client_type, self._client_config, + self._server_endpoints, self._fetch_names) + if client is not None: + client_predict_handler = client.predict + except Exception as e: + _LOGGER.error(log(e)) + os._exit(-1) # load user resources - self.load_user_resources() + try: + if use_multithread: + with self._for_init_op_lock: + if not self._succ_init_op: + self.init_op() + self._succ_init_op = True + else: + self.init_op() + except Exception as e: + _LOGGER.error(log(e)) + os._exit(-1) self._is_run = True while self._is_run: @@ -349,8 +377,8 @@ class Op(object): # process self._profiler_record("{}-midp#{}_0".format(op_info_prefix, tid)) - midped_data, error_channeldata = self._run_process(preped_data, - data_id, log) + midped_data, error_channeldata = self._run_process( + client_predict_handler, preped_data, data_id, log) self._profiler_record("{}-midp#{}_1".format(op_info_prefix, tid)) if error_channeldata is not None: self._push_to_output_channels(error_channeldata, @@ -359,8 +387,8 @@ class Op(object): # postprocess self._profiler_record("{}-postp#{}_0".format(op_info_prefix, tid)) - output_data, error_channeldata = self._run_postprocess(midped_data, - data_id, log) + output_data, error_channeldata = self._run_postprocess( + parsed_data, midped_data, data_id, log) self._profiler_record("{}-postp#{}_1".format(op_info_prefix, tid)) if error_channeldata is not None: self._push_to_output_channels(error_channeldata, @@ -384,7 +412,11 @@ class RequestOp(Op): super(RequestOp, self).__init__( name="#G", input_ops=[], concurrency=concurrency) # load user resources - self.load_user_resources() + try: + self.init_op() + except Exception as e: + _LOGGER.error(e) + os._exit(-1) def unpack_request_package(self, request): dictdata = {} @@ -405,7 +437,11 @@ class ResponseOp(Op): super(ResponseOp, self).__init__( name="#R", input_ops=input_ops, concurrency=concurrency) # load user resources - self.load_user_resources() + try: + self.init_op() + except Exception as e: + _LOGGER.error(e) + os._exit(-1) def pack_response_package(self, channeldata): resp = pipeline_service_pb2.Response() @@ -450,17 +486,26 @@ class VirtualOp(Op): def add_virtual_pred_op(self, op): self._virtual_pred_ops.append(op) + def _actual_pred_op_names(self, op): + if not isinstance(op, VirtualOp): + return [op.name] + names = [] + for x in op._virtual_pred_ops: + names.extend(self._actual_pred_op_names(x)) + return names + def add_output_channel(self, channel): if not isinstance(channel, (ThreadChannel, ProcessChannel)): raise TypeError( self._log('output channel must be Channel type, not {}'.format( type(channel)))) for op in self._virtual_pred_ops: - channel.add_producer(op.name) + for op_name in self._actual_pred_op_names(op): + channel.add_producer(op_name) self._outputs.append(channel) - def _run(self, concurrency_idx, input_channel, output_channels, - client_type): + def _run(self, concurrency_idx, input_channel, output_channels, client_type, + use_multithread): def get_log_func(op_info_prefix): def log_func(info_str): return "{} {}".format(op_info_prefix, info_str) diff --git a/python/pipeline/pipeline_client.py b/python/pipeline/pipeline_client.py index 4ad05b5a953d5084ffda360c0a1ac561463898a4..891a2b2a2fe759d0f392ad043dcc6f9173bd0e3a 100644 --- a/python/pipeline/pipeline_client.py +++ b/python/pipeline/pipeline_client.py @@ -20,7 +20,7 @@ import functools from .proto import pipeline_service_pb2 from .proto import pipeline_service_pb2_grpc -_LOGGER = logging.getLogger(__name__) +_LOGGER = logging.getLogger() class PipelineClient(object): @@ -52,7 +52,7 @@ class PipelineClient(object): return {"ecode": resp.ecode, "error_info": resp.error_info} fetch_map = {"ecode": resp.ecode} for idx, key in enumerate(resp.key): - if key not in fetch: + if fetch is not None and key not in fetch: continue data = resp.value[idx] try: @@ -62,16 +62,16 @@ class PipelineClient(object): fetch_map[key] = data return fetch_map - def predict(self, feed_dict, fetch, asyn=False): + def predict(self, feed_dict, fetch=None, asyn=False): if not isinstance(feed_dict, dict): raise TypeError( "feed must be dict type with format: {name: value}.") - if not isinstance(fetch, list): + if fetch is not None and not isinstance(fetch, list): raise TypeError("fetch must be list type with format: [name].") req = self._pack_request_package(feed_dict) if not asyn: resp = self._stub.inference(req) - return self._unpack_response_package(resp) + return self._unpack_response_package(resp, fetch) else: call_future = self._stub.inference.future(req) return PipelinePredictFuture( diff --git a/python/pipeline/pipeline_server.py b/python/pipeline/pipeline_server.py index 55289eeca42e02bb979d4a21791fdde44e0aff02..2c1c19bd6aa23214ab8fc385869cc29c8e86b37e 100644 --- a/python/pipeline/pipeline_server.py +++ b/python/pipeline/pipeline_server.py @@ -45,7 +45,7 @@ from .channel import ThreadChannel, ProcessChannel, ChannelData, ChannelDataEcod from .profiler import TimeProfiler from .util import NameGenerator -_LOGGER = logging.getLogger(__name__) +_LOGGER = logging.getLogger() _profiler = TimeProfiler() @@ -235,6 +235,10 @@ class PipelineServer(object): return use_ops, succ_ops_of_use_op use_ops, out_degree_ops = get_use_ops(response_op) + _LOGGER.info("================= use op ==================") + for op in use_ops: + _LOGGER.info(op.name) + _LOGGER.info("===========================================") if len(use_ops) <= 1: raise Exception( "Besides RequestOp and ResponseOp, there should be at least one Op in DAG." diff --git a/python/pipeline/profiler.py b/python/pipeline/profiler.py index 146203f7c184b506bb8fd70dadac1d89166a2de9..49eabf5b318789823073154dd3f8ad38e18638a1 100644 --- a/python/pipeline/profiler.py +++ b/python/pipeline/profiler.py @@ -24,7 +24,7 @@ else: raise Exception("Error Python version") import time -_LOGGER = logging.getLogger(__name__) +_LOGGER = logging.getLogger() class TimeProfiler(object): @@ -58,7 +58,7 @@ class TimeProfiler(object): print_str += "{}_{}:{} ".format(name, tag, timestamp) else: tmp[name] = (tag, timestamp) - print_str += "\n" + print_str = "\n{}\n".format(print_str) sys.stderr.write(print_str) for name, item in tmp.items(): tag, timestamp = item