提交 dff1447b 编写于 作者: M MRXLT

fix conflict

...@@ -256,6 +256,7 @@ curl -H "Content-Type:application/json" -X POST -d '{"url": "https://paddle-serv ...@@ -256,6 +256,7 @@ curl -H "Content-Type:application/json" -X POST -d '{"url": "https://paddle-serv
### Developers ### Developers
- [How to config Serving native operators on server side?](doc/SERVER_DAG.md) - [How to config Serving native operators on server side?](doc/SERVER_DAG.md)
- [How to develop a new Serving operator?](doc/NEW_OPERATOR.md) - [How to develop a new Serving operator?](doc/NEW_OPERATOR.md)
- [How to develop a new Web Service?](doc/NEW_WEB_SERVICE.md)
- [Golang client](doc/IMDB_GO_CLIENT.md) - [Golang client](doc/IMDB_GO_CLIENT.md)
- [Compile from source code](doc/COMPILE.md) - [Compile from source code](doc/COMPILE.md)
......
...@@ -262,6 +262,7 @@ curl -H "Content-Type:application/json" -X POST -d '{"url": "https://paddle-serv ...@@ -262,6 +262,7 @@ curl -H "Content-Type:application/json" -X POST -d '{"url": "https://paddle-serv
### 开发者教程 ### 开发者教程
- [如何配置Server端的计算图?](doc/SERVER_DAG_CN.md) - [如何配置Server端的计算图?](doc/SERVER_DAG_CN.md)
- [如何开发一个新的General Op?](doc/NEW_OPERATOR_CN.md) - [如何开发一个新的General Op?](doc/NEW_OPERATOR_CN.md)
- [如何开发一个新的Web Service?](doc/NEW_WEB_SERVICE_CN.md)
- [如何在Paddle Serving使用Go Client?](doc/IMDB_GO_CLIENT_CN.md) - [如何在Paddle Serving使用Go Client?](doc/IMDB_GO_CLIENT_CN.md)
- [如何编译PaddleServing?](doc/COMPILE_CN.md) - [如何编译PaddleServing?](doc/COMPILE_CN.md)
......
...@@ -119,7 +119,7 @@ int PredictorClient::create_predictor_by_desc(const std::string &sdk_desc) { ...@@ -119,7 +119,7 @@ int PredictorClient::create_predictor_by_desc(const std::string &sdk_desc) {
LOG(ERROR) << "Predictor Creation Failed"; LOG(ERROR) << "Predictor Creation Failed";
return -1; return -1;
} }
_api.thrd_initialize(); // _api.thrd_initialize();
return 0; return 0;
} }
...@@ -130,7 +130,7 @@ int PredictorClient::create_predictor() { ...@@ -130,7 +130,7 @@ int PredictorClient::create_predictor() {
LOG(ERROR) << "Predictor Creation Failed"; LOG(ERROR) << "Predictor Creation Failed";
return -1; return -1;
} }
_api.thrd_initialize(); // _api.thrd_initialize();
return 0; return 0;
} }
...@@ -152,7 +152,7 @@ int PredictorClient::batch_predict( ...@@ -152,7 +152,7 @@ int PredictorClient::batch_predict(
int fetch_name_num = fetch_name.size(); int fetch_name_num = fetch_name.size();
_api.thrd_clear(); _api.thrd_initialize();
std::string variant_tag; std::string variant_tag;
_predictor = _api.fetch_predictor("general_model", &variant_tag); _predictor = _api.fetch_predictor("general_model", &variant_tag);
predict_res_batch.set_variant_tag(variant_tag); predict_res_batch.set_variant_tag(variant_tag);
...@@ -247,8 +247,9 @@ int PredictorClient::batch_predict( ...@@ -247,8 +247,9 @@ int PredictorClient::batch_predict(
} else { } else {
client_infer_end = timeline.TimeStampUS(); client_infer_end = timeline.TimeStampUS();
postprocess_start = client_infer_end; postprocess_start = client_infer_end;
VLOG(2) << "get model output num";
uint32_t model_num = res.outputs_size(); uint32_t model_num = res.outputs_size();
VLOG(2) << "model num: " << model_num;
for (uint32_t m_idx = 0; m_idx < model_num; ++m_idx) { for (uint32_t m_idx = 0; m_idx < model_num; ++m_idx) {
VLOG(2) << "process model output index: " << m_idx; VLOG(2) << "process model output index: " << m_idx;
auto output = res.outputs(m_idx); auto output = res.outputs(m_idx);
...@@ -326,6 +327,8 @@ int PredictorClient::batch_predict( ...@@ -326,6 +327,8 @@ int PredictorClient::batch_predict(
fprintf(stderr, "%s\n", oss.str().c_str()); fprintf(stderr, "%s\n", oss.str().c_str());
} }
_api.thrd_clear();
return 0; return 0;
} }
......
...@@ -78,7 +78,6 @@ PYBIND11_MODULE(serving_client, m) { ...@@ -78,7 +78,6 @@ PYBIND11_MODULE(serving_client, m) {
[](PredictorClient &self) { self.create_predictor(); }) [](PredictorClient &self) { self.create_predictor(); })
.def("destroy_predictor", .def("destroy_predictor",
[](PredictorClient &self) { self.destroy_predictor(); }) [](PredictorClient &self) { self.destroy_predictor(); })
.def("batch_predict", .def("batch_predict",
[](PredictorClient &self, [](PredictorClient &self,
const std::vector<std::vector<std::vector<float>>> const std::vector<std::vector<std::vector<float>>>
......
...@@ -27,9 +27,9 @@ namespace predictor { ...@@ -27,9 +27,9 @@ namespace predictor {
} }
#endif #endif
#ifdef WITH_GPU // #ifdef WITH_GPU
#define USE_PTHREAD // #define USE_PTHREAD
#endif // #endif
#ifdef USE_PTHREAD #ifdef USE_PTHREAD
......
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "core/sdk-cpp/include/common.h"
namespace baidu {
namespace paddle_serving {
namespace sdk_cpp {
#ifndef CATCH_ANY_AND_RET
#define CATCH_ANY_AND_RET(errno) \
catch (...) { \
LOG(ERROR) << "exception catched"; \
return errno; \
}
#endif
#define USE_PTHREAD
#ifdef USE_PTHREAD
#define THREAD_T pthread_t
#define THREAD_KEY_T pthread_key_t
#define THREAD_MUTEX_T pthread_mutex_t
#define THREAD_KEY_CREATE pthread_key_create
#define THREAD_SETSPECIFIC pthread_setspecific
#define THREAD_GETSPECIFIC pthread_getspecific
#define THREAD_CREATE pthread_create
#define THREAD_CANCEL pthread_cancel
#define THREAD_JOIN pthread_join
#define THREAD_KEY_DELETE pthread_key_delete
#define THREAD_MUTEX_INIT pthread_mutex_init
#define THREAD_MUTEX_LOCK pthread_mutex_lock
#define THREAD_MUTEX_UNLOCK pthread_mutex_unlock
#define THREAD_MUTEX_DESTROY pthread_mutex_destroy
#define THREAD_COND_T pthread_cond_t
#define THREAD_COND_INIT pthread_cond_init
#define THREAD_COND_SIGNAL pthread_cond_signal
#define THREAD_COND_WAIT pthread_cond_wait
#define THREAD_COND_DESTROY pthread_cond_destroy
#else
#define THREAD_T bthread_t
#define THREAD_KEY_T bthread_key_t
#define THREAD_MUTEX_T bthread_mutex_t
#define THREAD_KEY_CREATE bthread_key_create
#define THREAD_SETSPECIFIC bthread_setspecific
#define THREAD_GETSPECIFIC bthread_getspecific
#define THREAD_CREATE bthread_start_background
#define THREAD_CANCEL bthread_stop
#define THREAD_JOIN bthread_join
#define THREAD_KEY_DELETE bthread_key_delete
#define THREAD_MUTEX_INIT bthread_mutex_init
#define THREAD_MUTEX_LOCK bthread_mutex_lock
#define THREAD_MUTEX_UNLOCK bthread_mutex_unlock
#define THREAD_MUTEX_DESTROY bthread_mutex_destroy
#define THREAD_COND_T bthread_cond_t
#define THREAD_COND_INIT bthread_cond_init
#define THREAD_COND_SIGNAL bthread_cond_signal
#define THREAD_COND_WAIT bthread_cond_wait
#define THREAD_COND_DESTROY bthread_cond_destroy
#endif
} // namespace sdk_cpp
} // namespace paddle_serving
} // namespace baidu
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include <vector> #include <vector>
#include "core/sdk-cpp/include/common.h" #include "core/sdk-cpp/include/common.h"
#include "core/sdk-cpp/include/endpoint_config.h" #include "core/sdk-cpp/include/endpoint_config.h"
#include "core/sdk-cpp/include/macros.h"
#include "core/sdk-cpp/include/predictor.h" #include "core/sdk-cpp/include/predictor.h"
#include "core/sdk-cpp/include/stub.h" #include "core/sdk-cpp/include/stub.h"
...@@ -245,7 +246,7 @@ class StubImpl : public Stub { ...@@ -245,7 +246,7 @@ class StubImpl : public Stub {
const brpc::ChannelOptions& options); const brpc::ChannelOptions& options);
StubTLS* get_tls() { StubTLS* get_tls() {
return static_cast<StubTLS*>(bthread_getspecific(_bthread_key)); return static_cast<StubTLS*>(THREAD_GETSPECIFIC(_bthread_key));
} }
private: private:
...@@ -262,7 +263,8 @@ class StubImpl : public Stub { ...@@ -262,7 +263,8 @@ class StubImpl : public Stub {
uint32_t _package_size; uint32_t _package_size;
// tls handlers // tls handlers
bthread_key_t _bthread_key; // bthread_key_t _bthread_key;
THREAD_KEY_T _bthread_key;
// bvar variables // bvar variables
std::map<std::string, BvarWrapper*> _ltc_bvars; std::map<std::string, BvarWrapper*> _ltc_bvars;
......
...@@ -70,7 +70,7 @@ int StubImpl<T, C, R, I, O>::initialize(const VariantInfo& var, ...@@ -70,7 +70,7 @@ int StubImpl<T, C, R, I, O>::initialize(const VariantInfo& var,
_endpoint = ep; _endpoint = ep;
if (bthread_key_create(&_bthread_key, NULL) != 0) { if (THREAD_KEY_CREATE(&_bthread_key, NULL) != 0) {
LOG(FATAL) << "Failed create key for stub tls"; LOG(FATAL) << "Failed create key for stub tls";
return -1; return -1;
} }
...@@ -132,13 +132,13 @@ int StubImpl<T, C, R, I, O>::initialize(const VariantInfo& var, ...@@ -132,13 +132,13 @@ int StubImpl<T, C, R, I, O>::initialize(const VariantInfo& var,
template <typename T, typename C, typename R, typename I, typename O> template <typename T, typename C, typename R, typename I, typename O>
int StubImpl<T, C, R, I, O>::thrd_initialize() { int StubImpl<T, C, R, I, O>::thrd_initialize() {
if (bthread_getspecific(_bthread_key) != NULL) { if (THREAD_GETSPECIFIC(_bthread_key) != NULL) {
LOG(WARNING) << "Already thread initialized for stub"; LOG(WARNING) << "Already thread initialized for stub";
return 0; return 0;
} }
StubTLS* tls = new (std::nothrow) StubTLS(); StubTLS* tls = new (std::nothrow) StubTLS();
if (!tls || bthread_setspecific(_bthread_key, tls) != 0) { if (!tls || THREAD_SETSPECIFIC(_bthread_key, tls) != 0) {
LOG(FATAL) << "Failed binding tls data to bthread_key"; LOG(FATAL) << "Failed binding tls data to bthread_key";
return -1; return -1;
} }
......
...@@ -12,23 +12,20 @@ Paddle Serving支持基于Paddle进行训练的各种模型,并通过指定模 ...@@ -12,23 +12,20 @@ Paddle Serving支持基于Paddle进行训练的各种模型,并通过指定模
import paddlehub as hub import paddlehub as hub
model_name = "bert_chinese_L-12_H-768_A-12" model_name = "bert_chinese_L-12_H-768_A-12"
module = hub.Module(model_name) module = hub.Module(model_name)
inputs, outputs, program = module.context( inputs, outputs, program = module.context(trainable=True, max_seq_len=20)
trainable=True, max_seq_len=20) feed_keys = ["input_ids", "position_ids", "segment_ids", "input_mask", "pooled_output", "sequence_output"]
feed_keys = ["input_ids", "position_ids", "segment_ids",
"input_mask", "pooled_output", "sequence_output"]
fetch_keys = ["pooled_output", "sequence_output"] fetch_keys = ["pooled_output", "sequence_output"]
feed_dict = dict(zip(feed_keys, [inputs[x] for x in feed_keys])) feed_dict = dict(zip(feed_keys, [inputs[x] for x in feed_keys]))
fetch_dict = dict(zip(fetch_keys, [outputs[x]] for x in fetch_keys)) fetch_dict = dict(zip(fetch_keys, [outputs[x]] for x in fetch_keys))
import paddle_serving_client.io as serving_io import paddle_serving_client.io as serving_io
serving_io.save_model("bert_seq20_model", "bert_seq20_client", serving_io.save_model("bert_seq20_model", "bert_seq20_client", feed_dict, fetch_dict, program)
feed_dict, fetch_dict, program)
``` ```
#### Step2:启动服务 #### Step2:启动服务
``` shell ``` shell
python -m paddle_serving_server_gpu.serve --model bert_seq20_model --thread 10 --port 9292 --gpu_ids 0 python -m paddle_serving_server_gpu.serve --model bert_seq20_model --port 9292 --gpu_ids 0
``` ```
| 参数 | 含义 | | 参数 | 含义 |
...@@ -53,7 +50,6 @@ pip install paddle_serving_app ...@@ -53,7 +50,6 @@ pip install paddle_serving_app
客户端脚本 bert_client.py内容如下 客户端脚本 bert_client.py内容如下
``` python ``` python
import os
import sys import sys
from paddle_serving_client import Client from paddle_serving_client import Client
from paddle_serving_app import ChineseBertReader from paddle_serving_app import ChineseBertReader
......
...@@ -2,6 +2,8 @@ ...@@ -2,6 +2,8 @@
(简体中文|[English](./DESIGN.md)) (简体中文|[English](./DESIGN.md))
注意本页内容有已经过期,请查看:[设计文档](https://github.com/PaddlePaddle/Serving/blob/develop/doc/DESIGN_DOC_CN.md)
## 1. 项目背景 ## 1. 项目背景
PaddlePaddle是百度开源的机器学习框架,广泛支持各种深度学习模型的定制化开发; Paddle Serving是Paddle的在线预测部分,与Paddle模型训练环节无缝衔接,提供机器学习预测云服务。本文将从模型、服务、接入等层面,自底向上描述Paddle Serving设计方案。 PaddlePaddle是百度开源的机器学习框架,广泛支持各种深度学习模型的定制化开发; Paddle Serving是Paddle的在线预测部分,与Paddle模型训练环节无缝衔接,提供机器学习预测云服务。本文将从模型、服务、接入等层面,自底向上描述Paddle Serving设计方案。
......
...@@ -164,12 +164,26 @@ Distributed Sparse Parameter Indexing is commonly seen in advertising and recomm ...@@ -164,12 +164,26 @@ Distributed Sparse Parameter Indexing is commonly seen in advertising and recomm
<img src='cube_eng.png' width = "450" height = "230"> <img src='cube_eng.png' width = "450" height = "230">
<br> <br>
<p> <p>
Why do we need to support distributed sparse parameter indexing in Paddle Serving? 1) In some recommendation scenarios, the number of features can be up to hundreds of billions that a single node can not hold the parameters within random access memory. 2) Paddle Serving supports distributed sparse parameter indexing that can couple with paddle inference. Users do not need to do extra work to have a low latency inference engine with hundreds of billions of parameters. Why do we need to support distributed sparse parameter indexing in Paddle Serving? 1) In some recommendation scenarios, the number of features can be up to hundreds of billions that a single node can not hold the parameters within random access memory. 2) Paddle Serving supports distributed sparse parameter indexing that can couple with paddle inference. Users do not need to do extra work to have a low latency inference engine with hundreds of billions of parameters.
### 3.2 Model Management, online A/B test, Model Online Reloading ### 3.2 Online A/B test
After sufficient offline evaluation of the model, online A/B test is usually needed to decide whether to enable the service on a large scale. The following figure shows the basic structure of A/B test with Paddle Serving. After the client is configured with the corresponding configuration, the traffic will be automatically distributed to different servers to achieve A/B test. Please refer to [ABTEST in Paddle Serving](ABTEST_IN_PADDLE_SERVING.md) for specific examples.
<p align="center">
<br>
<img src='abtest.png' width = "345" height = "230">
<br>
<p>
### 3.3 Model Online Reloading
In order to ensure the availability of services, the model needs to be hot loaded without service interruption. Paddle Serving supports this feature and provides a tool for monitoring output models to update local models. Please refer to [Hot loading in Paddle Serving](HOT_LOADING_IN_SERVING.md) for specific examples.
### 3.4 Model Management
Paddle Serving's C++ engine supports model management, online A/B test and model online reloading. Currently, python API is not released yet, please wait for the next release. Paddle Serving's C++ engine supports model management. Currently, python API is not released yet, please wait for the next release.
## 4. User Types ## 4. User Types
Paddle Serving provides RPC and HTTP protocol for users. For HTTP service, we recommend users with median or small traffic services to use, and the latency is not a strict requirement. For RPC protocol, we recommend high traffic services and low latency required services to use. For users who use distributed sparse parameter indexing built-in service, it is not necessary to care about the underlying details of communication. The following figure gives out several scenarios that user may want to use Paddle Serving. Paddle Serving provides RPC and HTTP protocol for users. For HTTP service, we recommend users with median or small traffic services to use, and the latency is not a strict requirement. For RPC protocol, we recommend high traffic services and low latency required services to use. For users who use distributed sparse parameter indexing built-in service, it is not necessary to care about the underlying details of communication. The following figure gives out several scenarios that user may want to use Paddle Serving.
......
...@@ -162,11 +162,27 @@ Paddle Serving的核心执行引擎是一个有向无环图,图中的每个节 ...@@ -162,11 +162,27 @@ Paddle Serving的核心执行引擎是一个有向无环图,图中的每个节
为什么要使用Paddle Serving提供的分布式稀疏参数索引服务?1)在一些推荐场景中,模型的输入特征规模通常可以达到上千亿,单台机器无法支撑T级别模型在内存的保存,因此需要进行分布式存储。2)Paddle Serving提供的分布式稀疏参数索引服务,具有并发请求多个节点的能力,从而以较低的延时完成预估服务。 为什么要使用Paddle Serving提供的分布式稀疏参数索引服务?1)在一些推荐场景中,模型的输入特征规模通常可以达到上千亿,单台机器无法支撑T级别模型在内存的保存,因此需要进行分布式存储。2)Paddle Serving提供的分布式稀疏参数索引服务,具有并发请求多个节点的能力,从而以较低的延时完成预估服务。
### 3.2 模型管理、在线A/B流量测试、模型热加载 ### 3.2 在线A/B流量测试
Paddle Serving的C++引擎支持模型管理、在线A/B流量测试、模型热加载等功能,当前在Python API还有没完全开放这部分功能的配置,敬请期待。 在对模型进行充分的离线评估后,通常需要进行在线A/B测试,来决定是否大规模上线服务。下图为使用Paddle Serving做A/B测试的基本结构,Client端做好相应的配置后,自动将流量分发给不同的Server,从而完成A/B测试。具体例子请参考[如何使用Paddle Serving做ABTEST](ABTEST_IN_PADDLE_SERVING_CN.md)
<p align="center">
<br>
<img src='abtest.png' width = "345" height = "230">
<br>
<p>
### 3.3 模型热加载
为了保证服务的可用性,需要在服务不中断的情况下对模型进行热加载。Paddle Serving对该特性进行了支持,并提供了一个监控产出模型更新本地模型的工具,具体例子请参考[Paddle Serving中的模型热加载](HOT_LOADING_IN_SERVING_CN.md)
### 3.4 模型管理
Paddle Serving的C++引擎支持模型管理功能,当前在Python API还有没完全开放这部分功能的配置,敬请期待。
## 4. 用户类型 ## 4. 用户类型
Paddle Serving面向的用户提供RPC和HTTP两种访问协议。对于HTTP协议,我们更倾向于流量中小型的服务使用,并且对延时没有严格要求的AI服务开发者。对于RPC协议,我们面向流量较大,对延时要求更高的用户,此外RPC的客户端可能也处在一个大系统的服务中,这种情况下非常适合使用Paddle Serving提供的RPC服务。对于使用分布式稀疏参数索引服务而言,Paddle Serving的用户不需要关心底层的细节,其调用本质也是通过RPC服务再调用RPC服务。下图给出了当前设计的Paddle Serving可能会使用Serving服务的几种场景。 Paddle Serving面向的用户提供RPC和HTTP两种访问协议。对于HTTP协议,我们更倾向于流量中小型的服务使用,并且对延时没有严格要求的AI服务开发者。对于RPC协议,我们面向流量较大,对延时要求更高的用户,此外RPC的客户端可能也处在一个大系统的服务中,这种情况下非常适合使用Paddle Serving提供的RPC服务。对于使用分布式稀疏参数索引服务而言,Paddle Serving的用户不需要关心底层的细节,其调用本质也是通过RPC服务再调用RPC服务。下图给出了当前设计的Paddle Serving可能会使用Serving服务的几种场景。
<p align="center"> <p align="center">
......
# How to develop a new Web service?
([简体中文](NEW_WEB_SERVICE_CN.md)|English)
This document will take the image classification service based on the Imagenet data set as an example to introduce how to develop a new web service. The complete code can be visited at [here](https://github.com/PaddlePaddle/Serving/blob/develop/python/examples/imagenet/image_classification_service.py).
## WebService base class
Paddle Serving implements the [WebService](https://github.com/PaddlePaddle/Serving/blob/develop/python/paddle_serving_server/web_service.py#L23) base class. You need to override its `preprocess` and `postprocess` method. The default implementation is as follows:
```python
class WebService(object):
def preprocess(self, feed={}, fetch=[]):
return feed, fetch
def postprocess(self, feed={}, fetch=[], fetch_map=None):
return fetch_map
```
### preprocess
The preprocess method has two input parameters, `feed` and `fetch`. For an HTTP request `request`:
- The value of `feed` is request data `request.json`
- The value of `fetch` is the fetch part `request.json["fetch"]` in the request data
The return values are the feed and fetch values used in the prediction.
### postprocess
The postprocess method has three input parameters, `feed`, `fetch` and `fetch_map`:
- The value of `feed` is request data `request.json`
- The value of `fetch` is the fetch part `request.json["fetch"]` in the request data
- The value of `fetch_map` is the model output value.
The return value will be processed as `{"reslut": fetch_map}` as the return of the HTTP request.
## Develop ImageService class
```python
class ImageService(WebService):
def preprocess(self, feed={}, fetch=[]):
reader = ImageReader()
if "image" not in feed:
raise ("feed data error!")
if isinstance(feed["image"], list):
feed_batch = []
for image in feed["image"]:
sample = base64.b64decode(image)
img = reader.process_image(sample)
res_feed = {}
res_feed["image"] = img.reshape(-1)
feed_batch.append(res_feed)
return feed_batch, fetch
else:
sample = base64.b64decode(feed["image"])
img = reader.process_image(sample)
res_feed = {}
res_feed["image"] = img.reshape(-1)
return res_feed, fetch
```
For the above `ImageService`, only the `preprocess` method is rewritten to process the image data in Base64 format into the data format required by prediction.
# 如何开发一个新的Web Service?
(简体中文|[English](NEW_WEB_SERVICE.md))
本文档将以Imagenet图像分类服务为例,来介绍如何开发一个新的Web Service。您可以在[这里](https://github.com/PaddlePaddle/Serving/blob/develop/python/examples/imagenet/image_classification_service.py)查阅完整的代码。
## WebService基类
Paddle Serving实现了[WebService](https://github.com/PaddlePaddle/Serving/blob/develop/python/paddle_serving_server/web_service.py#L23)基类,您需要重写它的`preprocess`方法和`postprocess`方法,默认实现如下:
```python
class WebService(object):
def preprocess(self, feed={}, fetch=[]):
return feed, fetch
def postprocess(self, feed={}, fetch=[], fetch_map=None):
return fetch_map
```
### preprocess方法
preprocess方法有两个输入参数,`feed``fetch`。对于一个HTTP请求`request`
- `feed`的值为请求数据`request.json`
- `fetch`的值为请求数据中的fetch部分`request.json["fetch"]`
返回值分别是预测过程中用到的feed和fetch值。
### postprocess方法
postprocess方法有三个输入参数,`feed``fetch``fetch_map`
- `feed`的值为请求数据`request.json`
- `fetch`的值为请求数据中的fetch部分`request.json["fetch"]`
- `fetch_map`的值为fetch到的模型输出值
返回值将会被处理成`{"reslut": fetch_map}`作为HTTP请求的返回。
## 开发ImageService类
```python
class ImageService(WebService):
def preprocess(self, feed={}, fetch=[]):
reader = ImageReader()
if "image" not in feed:
raise ("feed data error!")
if isinstance(feed["image"], list):
feed_batch = []
for image in feed["image"]:
sample = base64.b64decode(image)
img = reader.process_image(sample)
res_feed = {}
res_feed["image"] = img.reshape(-1)
feed_batch.append(res_feed)
return feed_batch, fetch
else:
sample = base64.b64decode(feed["image"])
img = reader.process_image(sample)
res_feed = {}
res_feed["image"] = img.reshape(-1)
return res_feed, fetch
```
对于上述的`ImageService`,只重写了前处理方法,将base64格式的图片数据处理成模型预测需要的数据格式。
doc/abtest.png

291.5 KB | W: | H:

doc/abtest.png

295.1 KB | W: | H:

doc/abtest.png
doc/abtest.png
doc/abtest.png
doc/abtest.png
  • 2-up
  • Swipe
  • Onion skin
...@@ -36,3 +36,4 @@ bert_service.set_gpus(gpu_ids) ...@@ -36,3 +36,4 @@ bert_service.set_gpus(gpu_ids)
bert_service.prepare_server( bert_service.prepare_server(
workdir="workdir", port=int(sys.argv[2]), device="gpu") workdir="workdir", port=int(sys.argv[2]), device="gpu")
bert_service.run_server() bert_service.run_server()
bert_service.run_flask()
# Faster RCNN model on Paddle Serving
([简体中文](./README_CN.md)|English)
### Get The Faster RCNN Model
```
wget https://paddle-serving.bj.bcebos.com/pddet_demo/faster_rcnn_model.tar.gz
wget https://paddle-serving.bj.bcebos.com/pddet_demo/infer_cfg.yml
```
If you want to have more detection models, please refer to [Paddle Detection Model Zoo](https://github.com/PaddlePaddle/PaddleDetection/blob/release/0.2/docs/MODEL_ZOO_cn.md)
### Start the service
```
tar xf faster_rcnn_model.tar.gz
mv faster_rcnn_model/pddet *.
GLOG_v=2 python -m paddle_serving_server_gpu.serve --model pddet_serving_model --port 9494 --gpu_id 0
```
### Perform prediction
```
python test_client.py pddet_client_conf/serving_client_conf.prototxt infer_cfg.yml 000000570688.jpg
```
## 3. Result analysis
<p align = "center">
    <br>
<img src = '000000570688.jpg'>
    <br>
<p>
This is the input picture
  
<p align = "center">
    <br>
<img src = '000000570688_bbox.jpg'>
    <br>
<p>
This is the picture after adding bbox. You can see that the client has done post-processing for the picture. In addition, the output/bbox.json also has the number and coordinate information of each box.
# 使用Paddle Serving部署Faster RCNN模型
(简体中文|[English](./README.md))
## 获得Faster RCNN模型
```
wget https://paddle-serving.bj.bcebos.com/pddet_demo/faster_rcnn_model.tar.gz
wget https://paddle-serving.bj.bcebos.com/pddet_demo/infer_cfg.yml
```
如果你想要更多的检测模型,请参考[Paddle检测模型库](https://github.com/PaddlePaddle/PaddleDetection/blob/release/0.2/docs/MODEL_ZOO_cn.md)
### 启动服务
```
tar xf faster_rcnn_model.tar.gz
mv faster_rcnn_model/pddet* .
GLOG_v=2 python -m paddle_serving_server_gpu.serve --model pddet_serving_model --port 9494 --gpu_id 0
```
### 执行预测
```
python test_client.py pddet_client_conf/serving_client_conf.prototxt infer_cfg.yml 000000570688.jpg
```
## 3. 结果分析
<p align="center">
<br>
<img src='000000570688.jpg' >
<br>
<p>
这是输入图片
<p align="center">
<br>
<img src='000000570688_bbox.jpg' >
<br>
<p>
这是实现添加了bbox之后的图片,可以看到客户端已经为图片做好了后处理,此外在output/bbox.json也有各个框的编号和坐标信息。
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from paddle_serving_client import Client
import sys
import os
import time
from paddle_serving_app.reader.pddet import Detection
import numpy as np
py_version = sys.version_info[0]
feed_var_names = ['image', 'im_shape', 'im_info']
fetch_var_names = ['multiclass_nms']
pddet = Detection(config_path=sys.argv[2], output_dir="./output")
feed_dict = pddet.preprocess(feed_var_names, sys.argv[3])
client = Client()
client.load_client_config(sys.argv[1])
client.connect(['127.0.0.1:9494'])
fetch_map = client.predict(feed=feed_dict, fetch=fetch_var_names)
outs = fetch_map.values()
pddet.postprocess(fetch_map, fetch_var_names)
...@@ -31,14 +31,14 @@ class ImageService(WebService): ...@@ -31,14 +31,14 @@ class ImageService(WebService):
sample = base64.b64decode(image) sample = base64.b64decode(image)
img = reader.process_image(sample) img = reader.process_image(sample)
res_feed = {} res_feed = {}
res_feed["image"] = img.reshape(-1) res_feed["image"] = img
feed_batch.append(res_feed) feed_batch.append(res_feed)
return feed_batch, fetch return feed_batch, fetch
else: else:
sample = base64.b64decode(feed["image"]) sample = base64.b64decode(feed["image"])
img = reader.process_image(sample) img = reader.process_image(sample)
res_feed = {} res_feed = {}
res_feed["image"] = img.reshape(-1) res_feed["image"] = img
return res_feed, fetch return res_feed, fetch
...@@ -47,3 +47,4 @@ image_service.load_model_config(sys.argv[1]) ...@@ -47,3 +47,4 @@ image_service.load_model_config(sys.argv[1])
image_service.prepare_server( image_service.prepare_server(
workdir=sys.argv[2], port=int(sys.argv[3]), device="cpu") workdir=sys.argv[2], port=int(sys.argv[3]), device="cpu")
image_service.run_server() image_service.run_server()
image_service.run_flask()
...@@ -12,12 +12,12 @@ ...@@ -12,12 +12,12 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from paddle_serving_server_gpu.web_service import WebService
import sys import sys
import cv2 import cv2
import base64 import base64
import numpy as np import numpy as np
from paddle_serving_app import ImageReader from paddle_serving_app import ImageReader
from paddle_serving_server_gpu.web_service import WebService
class ImageService(WebService): class ImageService(WebService):
...@@ -32,14 +32,14 @@ class ImageService(WebService): ...@@ -32,14 +32,14 @@ class ImageService(WebService):
sample = base64.b64decode(image) sample = base64.b64decode(image)
img = reader.process_image(sample) img = reader.process_image(sample)
res_feed = {} res_feed = {}
res_feed["image"] = img.reshape(-1) res_feed["image"] = img
feed_batch.append(res_feed) feed_batch.append(res_feed)
return feed_batch, fetch return feed_batch, fetch
else: else:
sample = base64.b64decode(feed["image"]) sample = base64.b64decode(feed["image"])
img = reader.process_image(sample) img = reader.process_image(sample)
res_feed = {} res_feed = {}
res_feed["image"] = img.reshape(-1) res_feed["image"] = img
return res_feed, fetch return res_feed, fetch
...@@ -49,3 +49,4 @@ image_service.set_gpus("0,1") ...@@ -49,3 +49,4 @@ image_service.set_gpus("0,1")
image_service.prepare_server( image_service.prepare_server(
workdir=sys.argv[2], port=int(sys.argv[3]), device="gpu") workdir=sys.argv[2], port=int(sys.argv[3]), device="gpu")
image_service.run_server() image_service.run_server()
image_service.run_flask()
...@@ -31,7 +31,7 @@ def predict(image_path, server): ...@@ -31,7 +31,7 @@ def predict(image_path, server):
r = requests.post( r = requests.post(
server, data=req, headers={"Content-Type": "application/json"}) server, data=req, headers={"Content-Type": "application/json"})
try: try:
print(r.json()["score"][0]) print(r.json()["result"]["score"])
except ValueError: except ValueError:
print(r.text) print(r.text)
return r return r
......
...@@ -26,7 +26,7 @@ start = time.time() ...@@ -26,7 +26,7 @@ start = time.time()
for i in range(1000): for i in range(1000):
with open("./data/n01440764_10026.JPEG", "rb") as f: with open("./data/n01440764_10026.JPEG", "rb") as f:
img = f.read() img = f.read()
img = reader.process_image(img).reshape(-1) img = reader.process_image(img)
fetch_map = client.predict(feed={"image": img}, fetch=["score"]) fetch_map = client.predict(feed={"image": img}, fetch=["score"])
end = time.time() end = time.time()
print(end - start) print(end - start)
......
...@@ -39,3 +39,4 @@ imdb_service.prepare_server( ...@@ -39,3 +39,4 @@ imdb_service.prepare_server(
workdir=sys.argv[2], port=int(sys.argv[3]), device="cpu") workdir=sys.argv[2], port=int(sys.argv[3]), device="cpu")
imdb_service.prepare_dict({"dict_file_path": sys.argv[4]}) imdb_service.prepare_dict({"dict_file_path": sys.argv[4]})
imdb_service.run_server() imdb_service.run_server()
imdb_service.run_flask()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
import argparse
from .image_tool import Resize, Detection
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
import numpy as np
from PIL import Image, ImageDraw
import cv2
import yaml
import copy
import argparse
import logging
import paddle.fluid as fluid
import json
FORMAT = '%(asctime)s-%(levelname)s: %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)
logger = logging.getLogger(__name__)
precision_map = {
'trt_int8': fluid.core.AnalysisConfig.Precision.Int8,
'trt_fp32': fluid.core.AnalysisConfig.Precision.Float32,
'trt_fp16': fluid.core.AnalysisConfig.Precision.Half
}
class Resize(object):
def __init__(self,
target_size,
max_size=0,
interp=cv2.INTER_LINEAR,
use_cv2=True,
image_shape=None):
super(Resize, self).__init__()
self.target_size = target_size
self.max_size = max_size
self.interp = interp
self.use_cv2 = use_cv2
self.image_shape = image_shape
def __call__(self, im):
origin_shape = im.shape[:2]
im_c = im.shape[2]
if self.max_size != 0:
im_size_min = np.min(origin_shape[0:2])
im_size_max = np.max(origin_shape[0:2])
im_scale = float(self.target_size) / float(im_size_min)
if np.round(im_scale * im_size_max) > self.max_size:
im_scale = float(self.max_size) / float(im_size_max)
im_scale_x = im_scale
im_scale_y = im_scale
resize_w = int(im_scale_x * float(origin_shape[1]))
resize_h = int(im_scale_y * float(origin_shape[0]))
else:
im_scale_x = float(self.target_size) / float(origin_shape[1])
im_scale_y = float(self.target_size) / float(origin_shape[0])
resize_w = self.target_size
resize_h = self.target_size
if self.use_cv2:
im = cv2.resize(
im,
None,
None,
fx=im_scale_x,
fy=im_scale_y,
interpolation=self.interp)
else:
if self.max_size != 0:
raise TypeError(
'If you set max_size to cap the maximum size of image,'
'please set use_cv2 to True to resize the image.')
im = im.astype('uint8')
im = Image.fromarray(im)
im = im.resize((int(resize_w), int(resize_h)), self.interp)
im = np.array(im)
# padding im
if self.max_size != 0 and self.image_shape is not None:
padding_im = np.zeros(
(self.max_size, self.max_size, im_c), dtype=np.float32)
im_h, im_w = im.shape[:2]
padding_im[:im_h, :im_w, :] = im
im = padding_im
return im, im_scale_x
class Normalize(object):
def __init__(self, mean, std, is_scale=True, is_channel_first=False):
super(Normalize, self).__init__()
self.mean = mean
self.std = std
self.is_scale = is_scale
self.is_channel_first = is_channel_first
def __call__(self, im):
im = im.astype(np.float32, copy=False)
if self.is_channel_first:
mean = np.array(self.mean)[:, np.newaxis, np.newaxis]
std = np.array(self.std)[:, np.newaxis, np.newaxis]
else:
mean = np.array(self.mean)[np.newaxis, np.newaxis, :]
std = np.array(self.std)[np.newaxis, np.newaxis, :]
if self.is_scale:
im = im / 255.0
im -= mean
im /= std
return im
class Permute(object):
def __init__(self, to_bgr=False, channel_first=True):
self.to_bgr = to_bgr
self.channel_first = channel_first
def __call__(self, im):
if self.channel_first:
im = im.transpose((2, 0, 1))
if self.to_bgr:
im = im[[2, 1, 0], :, :]
return im.copy()
class PadStride(object):
def __init__(self, stride=0):
assert stride >= 0, "Unsupported stride: {},"
" the stride in PadStride must be greater "
"or equal to 0".format(stride)
self.coarsest_stride = stride
def __call__(self, im):
coarsest_stride = self.coarsest_stride
if coarsest_stride == 0:
return im
im_c, im_h, im_w = im.shape
pad_h = int(np.ceil(float(im_h) / coarsest_stride) * coarsest_stride)
pad_w = int(np.ceil(float(im_w) / coarsest_stride) * coarsest_stride)
padding_im = np.zeros((im_c, pad_h, pad_w), dtype=np.float32)
padding_im[:, :im_h, :im_w] = im
return padding_im
class Detection():
def __init__(self, config_path, output_dir):
self.config_path = config_path
self.if_visualize = True
self.if_dump_result = True
self.output_dir = output_dir
def DecodeImage(self, im_path):
assert os.path.exists(im_path), "Image path {} can not be found".format(
im_path)
with open(im_path, 'rb') as f:
im = f.read()
data = np.frombuffer(im, dtype='uint8')
im = cv2.imdecode(data, 1) # BGR mode, but need RGB mode
im = cv2.cvtColor(im, cv2.COLOR_BGR2RGB)
return im
def Preprocess(self, img_path, arch, config):
img = self.DecodeImage(img_path)
orig_shape = img.shape
scale = 1.
data = []
data_config = copy.deepcopy(config)
for data_aug_conf in data_config:
obj = data_aug_conf.pop('type')
preprocess = eval(obj)(**data_aug_conf)
if obj == 'Resize':
img, scale = preprocess(img)
else:
img = preprocess(img)
img = img[np.newaxis, :] # N, C, H, W
data.append(img)
extra_info = self.get_extra_info(img, arch, orig_shape, scale)
data += extra_info
return data
def expand_boxes(self, boxes, scale):
"""
Expand an array of boxes by a given scale.
"""
w_half = (boxes[:, 2] - boxes[:, 0]) * .5
h_half = (boxes[:, 3] - boxes[:, 1]) * .5
x_c = (boxes[:, 2] + boxes[:, 0]) * .5
y_c = (boxes[:, 3] + boxes[:, 1]) * .5
w_half *= scale
h_half *= scale
boxes_exp = np.zeros(boxes.shape)
boxes_exp[:, 0] = x_c - w_half
boxes_exp[:, 2] = x_c + w_half
boxes_exp[:, 1] = y_c - h_half
boxes_exp[:, 3] = y_c + h_half
return boxes_exp
def mask2out(self, results, clsid2catid, resolution, thresh_binarize=0.5):
import pycocotools.mask as mask_util
scale = (resolution + 2.0) / resolution
segm_res = []
for t in results:
bboxes = t['bbox'][0]
lengths = t['bbox'][1][0]
if bboxes.shape == (1, 1) or bboxes is None:
continue
if len(bboxes.tolist()) == 0:
continue
masks = t['mask'][0]
s = 0
# for each sample
for i in range(len(lengths)):
num = lengths[i]
im_shape = t['im_shape'][i]
bbox = bboxes[s:s + num][:, 2:]
clsid_scores = bboxes[s:s + num][:, 0:2]
mask = masks[s:s + num]
s += num
im_h = int(im_shape[0])
im_w = int(im_shape[1])
expand_bbox = expand_boxes(bbox, scale)
expand_bbox = expand_bbox.astype(np.int32)
padded_mask = np.zeros(
(resolution + 2, resolution + 2), dtype=np.float32)
for j in range(num):
xmin, ymin, xmax, ymax = expand_bbox[j].tolist()
clsid, score = clsid_scores[j].tolist()
clsid = int(clsid)
padded_mask[1:-1, 1:-1] = mask[j, clsid, :, :]
catid = clsid2catid[clsid]
w = xmax - xmin + 1
h = ymax - ymin + 1
w = np.maximum(w, 1)
h = np.maximum(h, 1)
resized_mask = cv2.resize(padded_mask, (w, h))
resized_mask = np.array(
resized_mask > thresh_binarize, dtype=np.uint8)
im_mask = np.zeros((im_h, im_w), dtype=np.uint8)
x0 = min(max(xmin, 0), im_w)
x1 = min(max(xmax + 1, 0), im_w)
y0 = min(max(ymin, 0), im_h)
y1 = min(max(ymax + 1, 0), im_h)
im_mask[y0:y1, x0:x1] = resized_mask[(y0 - ymin):(
y1 - ymin), (x0 - xmin):(x1 - xmin)]
segm = mask_util.encode(
np.array(
im_mask[:, :, np.newaxis], order='F'))[0]
catid = clsid2catid[clsid]
segm['counts'] = segm['counts'].decode('utf8')
coco_res = {
'category_id': catid,
'segmentation': segm,
'score': score
}
segm_res.append(coco_res)
return segm_res
def draw_bbox(self, image, catid2name, bboxes, threshold, color_list):
"""
draw bbox on image
"""
draw = ImageDraw.Draw(image)
for dt in np.array(bboxes):
catid, bbox, score = dt['category_id'], dt['bbox'], dt['score']
if score < threshold:
continue
xmin, ymin, w, h = bbox
xmax = xmin + w
ymax = ymin + h
color = tuple(color_list[catid])
# draw bbox
draw.line(
[(xmin, ymin), (xmin, ymax), (xmax, ymax), (xmax, ymin),
(xmin, ymin)],
width=2,
fill=color)
# draw label
text = "{} {:.2f}".format(catid2name[catid], score)
tw, th = draw.textsize(text)
draw.rectangle(
[(xmin + 1, ymin - th), (xmin + tw + 1, ymin)], fill=color)
draw.text((xmin + 1, ymin - th), text, fill=(255, 255, 255))
return image
def draw_mask(self, image, masks, threshold, color_list, alpha=0.7):
"""
Draw mask on image
"""
mask_color_id = 0
w_ratio = .4
img_array = np.array(image).astype('float32')
for dt in np.array(masks):
segm, score = dt['segmentation'], dt['score']
if score < threshold:
continue
import pycocotools.mask as mask_util
mask = mask_util.decode(segm) * 255
color_mask = color_list[mask_color_id % len(color_list), 0:3]
mask_color_id += 1
for c in range(3):
color_mask[c] = color_mask[c] * (1 - w_ratio) + w_ratio * 255
idx = np.nonzero(mask)
img_array[idx[0], idx[1], :] *= 1.0 - alpha
img_array[idx[0], idx[1], :] += alpha * color_mask
return Image.fromarray(img_array.astype('uint8'))
def get_extra_info(self, im, arch, shape, scale):
info = []
input_shape = []
im_shape = []
logger.info('The architecture is {}'.format(arch))
if 'YOLO' in arch:
im_size = np.array([shape[:2]]).astype('int32')
logger.info('Extra info: im_size')
info.append(im_size)
elif 'SSD' in arch:
im_shape = np.array([shape[:2]]).astype('int32')
logger.info('Extra info: im_shape')
info.append([im_shape])
elif 'RetinaNet' in arch:
input_shape.extend(im.shape[2:])
im_info = np.array([input_shape + [scale]]).astype('float32')
logger.info('Extra info: im_info')
info.append(im_info)
elif 'RCNN' in arch:
input_shape.extend(im.shape[2:])
im_shape.extend(shape[:2])
im_info = np.array([input_shape + [scale]]).astype('float32')
im_shape = np.array([im_shape + [1.]]).astype('float32')
logger.info('Extra info: im_info, im_shape')
info.append(im_info)
info.append(im_shape)
else:
logger.error(
"Unsupported arch: {}, expect YOLO, SSD, RetinaNet and RCNN".
format(arch))
return info
def offset_to_lengths(self, lod):
offset = lod[0]
lengths = [offset[i + 1] - offset[i] for i in range(len(offset) - 1)]
return [lengths]
def bbox2out(self, results, clsid2catid, is_bbox_normalized=False):
"""
Args:
results: request a dict, should include: `bbox`, `im_id`,
if is_bbox_normalized=True, also need `im_shape`.
clsid2catid: class id to category id map of COCO2017 dataset.
is_bbox_normalized: whether or not bbox is normalized.
"""
xywh_res = []
for t in results:
bboxes = t['bbox'][0]
lengths = t['bbox'][1][0]
if bboxes.shape == (1, 1) or bboxes is None:
continue
k = 0
for i in range(len(lengths)):
num = lengths[i]
for j in range(num):
dt = bboxes[k]
clsid, score, xmin, ymin, xmax, ymax = dt.tolist()
catid = (clsid2catid[int(clsid)])
if is_bbox_normalized:
xmin, ymin, xmax, ymax = \
self.clip_bbox([xmin, ymin, xmax, ymax])
w = xmax - xmin
h = ymax - ymin
im_shape = t['im_shape'][0][i].tolist()
im_height, im_width = int(im_shape[0]), int(im_shape[1])
xmin *= im_width
ymin *= im_height
w *= im_width
h *= im_height
else:
w = xmax - xmin + 1
h = ymax - ymin + 1
bbox = [xmin, ymin, w, h]
coco_res = {
'category_id': catid,
'bbox': bbox,
'score': score
}
xywh_res.append(coco_res)
k += 1
return xywh_res
def get_bbox_result(self, fetch_map, fetch_name, result, conf, clsid2catid):
is_bbox_normalized = True if 'SSD' in conf['arch'] else False
output = fetch_map[fetch_name]
lod = [fetch_map[fetch_name + '.lod']]
lengths = self.offset_to_lengths(lod)
np_data = np.array(output)
result['bbox'] = (np_data, lengths)
result['im_id'] = np.array([[0]])
bbox_results = self.bbox2out([result], clsid2catid, is_bbox_normalized)
return bbox_results
def mask2out(self, results, clsid2catid, resolution, thresh_binarize=0.5):
import pycocotools.mask as mask_util
scale = (resolution + 2.0) / resolution
segm_res = []
for t in results:
bboxes = t['bbox'][0]
lengths = t['bbox'][1][0]
if bboxes.shape == (1, 1) or bboxes is None:
continue
if len(bboxes.tolist()) == 0:
continue
masks = t['mask'][0]
s = 0
# for each sample
for i in range(len(lengths)):
num = lengths[i]
im_shape = t['im_shape'][i]
bbox = bboxes[s:s + num][:, 2:]
clsid_scores = bboxes[s:s + num][:, 0:2]
mask = masks[s:s + num]
s += num
im_h = int(im_shape[0])
im_w = int(im_shape[1])
expand_bbox = expand_boxes(bbox, scale)
expand_bbox = expand_bbox.astype(np.int32)
padded_mask = np.zeros(
(resolution + 2, resolution + 2), dtype=np.float32)
for j in range(num):
xmin, ymin, xmax, ymax = expand_bbox[j].tolist()
clsid, score = clsid_scores[j].tolist()
clsid = int(clsid)
padded_mask[1:-1, 1:-1] = mask[j, clsid, :, :]
catid = clsid2catid[clsid]
w = xmax - xmin + 1
h = ymax - ymin + 1
w = np.maximum(w, 1)
h = np.maximum(h, 1)
resized_mask = cv2.resize(padded_mask, (w, h))
resized_mask = np.array(
resized_mask > thresh_binarize, dtype=np.uint8)
im_mask = np.zeros((im_h, im_w), dtype=np.uint8)
x0 = min(max(xmin, 0), im_w)
x1 = min(max(xmax + 1, 0), im_w)
y0 = min(max(ymin, 0), im_h)
y1 = min(max(ymax + 1, 0), im_h)
im_mask[y0:y1, x0:x1] = resized_mask[(y0 - ymin):(
y1 - ymin), (x0 - xmin):(x1 - xmin)]
segm = mask_util.encode(
np.array(
im_mask[:, :, np.newaxis], order='F'))[0]
catid = clsid2catid[clsid]
segm['counts'] = segm['counts'].decode('utf8')
coco_res = {
'category_id': catid,
'segmentation': segm,
'score': score
}
segm_res.append(coco_res)
return segm_res
def get_mask_result(self, fetch_map, fetch_var_names, result, conf,
clsid2catid):
resolution = conf['mask_resolution']
bbox_out, mask_out = fetch_map[fetch_var_names]
lengths = self.offset_to_lengths(bbox_out.lod())
bbox = np.array(bbox_out)
mask = np.array(mask_out)
result['bbox'] = (bbox, lengths)
result['mask'] = (mask, lengths)
mask_results = self.mask2out([result], clsid2catid,
conf['mask_resolution'])
return mask_results
def get_category_info(self, with_background, label_list):
if label_list[0] != 'background' and with_background:
label_list.insert(0, 'background')
if label_list[0] == 'background' and not with_background:
label_list = label_list[1:]
clsid2catid = {i: i for i in range(len(label_list))}
catid2name = {i: name for i, name in enumerate(label_list)}
return clsid2catid, catid2name
def color_map(self, num_classes):
color_map = num_classes * [0, 0, 0]
for i in range(0, num_classes):
j = 0
lab = i
while lab:
color_map[i * 3] |= (((lab >> 0) & 1) << (7 - j))
color_map[i * 3 + 1] |= (((lab >> 1) & 1) << (7 - j))
color_map[i * 3 + 2] |= (((lab >> 2) & 1) << (7 - j))
j += 1
lab >>= 3
color_map = np.array(color_map).reshape(-1, 3)
return color_map
def visualize(self,
bbox_results,
catid2name,
num_classes,
mask_results=None):
image = Image.open(self.infer_img).convert('RGB')
color_list = self.color_map(num_classes)
image = self.draw_bbox(image, catid2name, bbox_results, 0.5, color_list)
if mask_results is not None:
image = self.draw_mask(image, mask_results, 0.5, color_list)
image_path = os.path.split(self.infer_img)[-1]
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)
out_path = os.path.join(self.output_dir, image_path)
image.save(out_path, quality=95)
logger.info('Save visualize result to {}'.format(out_path))
def preprocess(self, feed_var_names, image_file):
self.infer_img = image_file
config_path = self.config_path
res = {}
assert config_path is not None, "Config path: {} des not exist!".format(
model_path)
with open(config_path) as f:
conf = yaml.safe_load(f)
img_data = self.Preprocess(image_file, conf['arch'], conf['Preprocess'])
if 'SSD' in conf['arch']:
img_data, res['im_shape'] = img_data
img_data = [img_data]
if len(feed_var_names) != len(img_data):
raise ValueError(
'the length of feed vars does not equals the length of preprocess of img data, please check your feed dict'
)
def processImg(v):
np_data = np.array(v[0])
res = np_data
return res
feed_dict = {k: processImg(v) for k, v in zip(feed_var_names, img_data)}
return feed_dict
def postprocess(self, fetch_map, fetch_var_names):
config_path = self.config_path
res = {}
with open(config_path) as f:
conf = yaml.safe_load(f)
if 'SSD' in conf['arch']:
img_data, res['im_shape'] = img_data
img_data = [img_data]
clsid2catid, catid2name = self.get_category_info(
conf['with_background'], conf['label_list'])
bbox_result = self.get_bbox_result(fetch_map, fetch_var_names[0], res,
conf, clsid2catid)
mask_result = None
if 'mask_resolution' in conf:
res['im_shape'] = img_data[-1]
mask_result = self.get_mask_result(fetch_map, fetch_var_names, res,
conf, clsid2catid)
if self.if_visualize:
if os.path.isdir(self.output_dir) is False:
os.mkdir(self.output_dir)
self.visualize(bbox_result, catid2name,
len(conf['label_list']), mask_result)
if self.if_dump_result:
if os.path.isdir(self.output_dir) is False:
os.mkdir(self.output_dir)
bbox_file = os.path.join(self.output_dir, 'bbox.json')
logger.info('dump bbox to {}'.format(bbox_file))
with open(bbox_file, 'w') as f:
json.dump(bbox_result, f, indent=4)
if mask_result is not None:
mask_file = os.path.join(flags.output_dir, 'mask.json')
logger.info('dump mask to {}'.format(mask_file))
with open(mask_file, 'w') as f:
json.dump(mask_result, f, indent=4)
...@@ -26,6 +26,34 @@ int_type = 0 ...@@ -26,6 +26,34 @@ int_type = 0
float_type = 1 float_type = 1
class _NOPProfiler(object):
def record(self, name):
pass
def print_profile(self):
pass
class _TimeProfiler(object):
def __init__(self):
self.pid = os.getpid()
self.print_head = 'PROFILE\tpid:{}\t'.format(self.pid)
self.time_record = [self.print_head]
def record(self, name):
self.time_record.append('{}:{} '.format(
name, int(round(time.time() * 1000000))))
def print_profile(self):
self.time_record.append('\n')
sys.stderr.write(''.join(self.time_record))
self.time_record = [self.print_head]
_is_profile = int(os.environ.get('FLAGS_profile_client', 0))
_Profiler = _TimeProfiler if _is_profile else _NOPProfiler
class SDKConfig(object): class SDKConfig(object):
def __init__(self): def __init__(self):
self.sdk_desc = sdk.SDKConf() self.sdk_desc = sdk.SDKConf()
...@@ -89,6 +117,7 @@ class Client(object): ...@@ -89,6 +117,7 @@ class Client(object):
self.predictor_sdk_ = None self.predictor_sdk_ = None
self.producers = [] self.producers = []
self.consumer = None self.consumer = None
self.profile_ = _Profiler()
def rpath(self): def rpath(self):
lib_path = os.path.dirname(paddle_serving_client.__file__) lib_path = os.path.dirname(paddle_serving_client.__file__)
...@@ -184,6 +213,8 @@ class Client(object): ...@@ -184,6 +213,8 @@ class Client(object):
key)) key))
def predict(self, feed=None, fetch=None, need_variant_tag=False): def predict(self, feed=None, fetch=None, need_variant_tag=False):
self.profile_.record('py_prepro_0')
if feed is None or fetch is None: if feed is None or fetch is None:
raise ValueError("You should specify feed and fetch for prediction") raise ValueError("You should specify feed and fetch for prediction")
...@@ -256,11 +287,17 @@ class Client(object): ...@@ -256,11 +287,17 @@ class Client(object):
int_slot_batch.append(int_slot) int_slot_batch.append(int_slot)
float_slot_batch.append(float_slot) float_slot_batch.append(float_slot)
self.profile_.record('py_prepro_1')
self.profile_.record('py_client_infer_0')
result_batch = self.result_handle_ result_batch = self.result_handle_
res = self.client_handle_.batch_predict( res = self.client_handle_.batch_predict(
float_slot_batch, float_feed_names, float_shape, int_slot_batch, float_slot_batch, float_feed_names, float_shape, int_slot_batch,
int_feed_names, int_shape, fetch_names, result_batch, self.pid) int_feed_names, int_shape, fetch_names, result_batch, self.pid)
self.profile_.record('py_client_infer_1')
self.profile_.record('py_postpro_0')
if res == -1: if res == -1:
return None return None
...@@ -273,7 +310,7 @@ class Client(object): ...@@ -273,7 +310,7 @@ class Client(object):
if self.fetch_names_to_type_[name] == int_type: if self.fetch_names_to_type_[name] == int_type:
result_map[name] = result_batch.get_int64_by_name(mi, name) result_map[name] = result_batch.get_int64_by_name(mi, name)
shape = result_batch.get_shape(mi, name) shape = result_batch.get_shape(mi, name)
result_map[name] = np.array(result_map[name]) result_map[name] = np.array(result_map[name], dtype='int64')
result_map[name].shape = shape result_map[name].shape = shape
if name in self.lod_tensor_set: if name in self.lod_tensor_set:
result_map["{}.lod".format(name)] = np.array( result_map["{}.lod".format(name)] = np.array(
...@@ -281,7 +318,8 @@ class Client(object): ...@@ -281,7 +318,8 @@ class Client(object):
elif self.fetch_names_to_type_[name] == float_type: elif self.fetch_names_to_type_[name] == float_type:
result_map[name] = result_batch.get_float_by_name(mi, name) result_map[name] = result_batch.get_float_by_name(mi, name)
shape = result_batch.get_shape(mi, name) shape = result_batch.get_shape(mi, name)
result_map[name] = np.array(result_map[name]) result_map[name] = np.array(
result_map[name], dtype='float32')
result_map[name].shape = shape result_map[name].shape = shape
if name in self.lod_tensor_set: if name in self.lod_tensor_set:
result_map["{}.lod".format(name)] = np.array( result_map["{}.lod".format(name)] = np.array(
...@@ -297,6 +335,10 @@ class Client(object): ...@@ -297,6 +335,10 @@ class Client(object):
engine_name: multi_result_map[mi] engine_name: multi_result_map[mi]
for mi, engine_name in enumerate(model_engine_names) for mi, engine_name in enumerate(model_engine_names)
} }
self.profile_.record('py_postpro_1')
self.profile_.print_profile()
# When using the A/B test, the tag of variant needs to be returned # When using the A/B test, the tag of variant needs to be returned
return ret if not need_variant_tag else [ return ret if not need_variant_tag else [
ret, self.result_handle_.variant_tag() ret, self.result_handle_.variant_tag()
......
...@@ -351,6 +351,7 @@ class Server(object): ...@@ -351,6 +351,7 @@ class Server(object):
self._prepare_resource(workdir) self._prepare_resource(workdir)
self._prepare_engine(self.model_config_paths, device) self._prepare_engine(self.model_config_paths, device)
self._prepare_infer_service(port) self._prepare_infer_service(port)
self.port = port
self.workdir = workdir self.workdir = workdir
infer_service_fn = "{}/{}".format(workdir, self.infer_service_fn) infer_service_fn = "{}/{}".format(workdir, self.infer_service_fn)
......
...@@ -18,6 +18,8 @@ from flask import Flask, request, abort ...@@ -18,6 +18,8 @@ from flask import Flask, request, abort
from multiprocessing import Pool, Process from multiprocessing import Pool, Process
from paddle_serving_server import OpMaker, OpSeqMaker, Server from paddle_serving_server import OpMaker, OpSeqMaker, Server
from paddle_serving_client import Client from paddle_serving_client import Client
from contextlib import closing
import socket
class WebService(object): class WebService(object):
...@@ -41,19 +43,34 @@ class WebService(object): ...@@ -41,19 +43,34 @@ class WebService(object):
server.set_num_threads(16) server.set_num_threads(16)
server.load_model_config(self.model_config) server.load_model_config(self.model_config)
server.prepare_server( server.prepare_server(
workdir=self.workdir, port=self.port + 1, device=self.device) workdir=self.workdir, port=self.port_list[0], device=self.device)
server.run_server() server.run_server()
def port_is_available(self, port):
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.settimeout(2)
result = sock.connect_ex(('0.0.0.0', port))
if result != 0:
return True
else:
return False
def prepare_server(self, workdir="", port=9393, device="cpu"): def prepare_server(self, workdir="", port=9393, device="cpu"):
self.workdir = workdir self.workdir = workdir
self.port = port self.port = port
self.device = device self.device = device
default_port = 12000
self.port_list = []
for i in range(1000):
if self.port_is_available(default_port + i):
self.port_list.append(default_port + i)
break
def _launch_web_service(self): def _launch_web_service(self):
self.client_service = Client() self.client = Client()
self.client_service.load_client_config( self.client.load_client_config("{}/serving_server_conf.prototxt".format(
"{}/serving_server_conf.prototxt".format(self.model_config)) self.model_config))
self.client_service.connect(["0.0.0.0:{}".format(self.port + 1)]) self.client.connect(["0.0.0.0:{}".format(self.port_list[0])])
def get_prediction(self, request): def get_prediction(self, request):
if not request.json: if not request.json:
...@@ -64,12 +81,12 @@ class WebService(object): ...@@ -64,12 +81,12 @@ class WebService(object):
feed, fetch = self.preprocess(request.json, request.json["fetch"]) feed, fetch = self.preprocess(request.json, request.json["fetch"])
if isinstance(feed, dict) and "fetch" in feed: if isinstance(feed, dict) and "fetch" in feed:
del feed["fetch"] del feed["fetch"]
fetch_map = self.client_service.predict(feed=feed, fetch=fetch) fetch_map = self.client.predict(feed=feed, fetch=fetch)
fetch_map = self.postprocess(
feed=request.json, fetch=fetch, fetch_map=fetch_map)
for key in fetch_map: for key in fetch_map:
fetch_map[key] = fetch_map[key].tolist() fetch_map[key] = fetch_map[key].tolist()
result = self.postprocess( result = {"result": fetch_map}
feed=request.json, fetch=fetch, fetch_map=fetch_map)
result = {"result": result}
except ValueError: except ValueError:
result = {"result": "Request Value Error"} result = {"result": "Request Value Error"}
return result return result
...@@ -83,6 +100,24 @@ class WebService(object): ...@@ -83,6 +100,24 @@ class WebService(object):
p_rpc = Process(target=self._launch_rpc_service) p_rpc = Process(target=self._launch_rpc_service)
p_rpc.start() p_rpc.start()
def run_flask(self):
app_instance = Flask(__name__)
@app_instance.before_first_request
def init():
self._launch_web_service()
service_name = "/" + self.name + "/prediction"
@app_instance.route(service_name, methods=["POST"])
def run():
return self.get_prediction(request)
app_instance.run(host="0.0.0.0",
port=self.port,
threaded=False,
processes=4)
def preprocess(self, feed={}, fetch=[]): def preprocess(self, feed={}, fetch=[]):
return feed, fetch return feed, fetch
......
...@@ -14,14 +14,15 @@ ...@@ -14,14 +14,15 @@
# pylint: disable=doc-string-missing # pylint: disable=doc-string-missing
from flask import Flask, request, abort from flask import Flask, request, abort
from paddle_serving_server_gpu import OpMaker, OpSeqMaker, Server from contextlib import closing
import paddle_serving_server_gpu as serving
from multiprocessing import Pool, Process, Queue from multiprocessing import Pool, Process, Queue
from paddle_serving_client import Client from paddle_serving_client import Client
from paddle_serving_server_gpu import OpMaker, OpSeqMaker, Server
from paddle_serving_server_gpu.serve import start_multi_card from paddle_serving_server_gpu.serve import start_multi_card
import socket
import sys import sys
import numpy as np import numpy as np
import paddle_serving_server_gpu as serving
class WebService(object): class WebService(object):
...@@ -67,22 +68,39 @@ class WebService(object): ...@@ -67,22 +68,39 @@ class WebService(object):
def _launch_rpc_service(self, service_idx): def _launch_rpc_service(self, service_idx):
self.rpc_service_list[service_idx].run_server() self.rpc_service_list[service_idx].run_server()
def port_is_available(self, port):
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.settimeout(2)
result = sock.connect_ex(('0.0.0.0', port))
if result != 0:
return True
else:
return False
def prepare_server(self, workdir="", port=9393, device="gpu", gpuid=0): def prepare_server(self, workdir="", port=9393, device="gpu", gpuid=0):
self.workdir = workdir self.workdir = workdir
self.port = port self.port = port
self.device = device self.device = device
self.gpuid = gpuid self.gpuid = gpuid
self.port_list = []
default_port = 12000
for i in range(1000):
if self.port_is_available(default_port + i):
self.port_list.append(default_port + i)
if len(self.port_list) > len(self.gpus):
break
if len(self.gpus) == 0: if len(self.gpus) == 0:
# init cpu service # init cpu service
self.rpc_service_list.append( self.rpc_service_list.append(
self.default_rpc_service( self.default_rpc_service(
self.workdir, self.port + 1, -1, thread_num=10)) self.workdir, self.port_list[0], -1, thread_num=10))
else: else:
for i, gpuid in enumerate(self.gpus): for i, gpuid in enumerate(self.gpus):
self.rpc_service_list.append( self.rpc_service_list.append(
self.default_rpc_service( self.default_rpc_service(
"{}_{}".format(self.workdir, i), "{}_{}".format(self.workdir, i),
self.port + 1 + i, self.port_list[i],
gpuid, gpuid,
thread_num=10)) thread_num=10))
...@@ -94,9 +112,9 @@ class WebService(object): ...@@ -94,9 +112,9 @@ class WebService(object):
endpoints = "" endpoints = ""
if gpu_num > 0: if gpu_num > 0:
for i in range(gpu_num): for i in range(gpu_num):
endpoints += "127.0.0.1:{},".format(self.port + i + 1) endpoints += "127.0.0.1:{},".format(self.port_list[i])
else: else:
endpoints = "127.0.0.1:{}".format(self.port + 1) endpoints = "127.0.0.1:{}".format(self.port_list[0])
self.client.connect([endpoints]) self.client.connect([endpoints])
def get_prediction(self, request): def get_prediction(self, request):
...@@ -109,11 +127,11 @@ class WebService(object): ...@@ -109,11 +127,11 @@ class WebService(object):
if isinstance(feed, dict) and "fetch" in feed: if isinstance(feed, dict) and "fetch" in feed:
del feed["fetch"] del feed["fetch"]
fetch_map = self.client.predict(feed=feed, fetch=fetch) fetch_map = self.client.predict(feed=feed, fetch=fetch)
for key in fetch_map: fetch_map = self.postprocess(
fetch_map[key] = fetch_map[key][0].tolist()
result = self.postprocess(
feed=request.json, fetch=fetch, fetch_map=fetch_map) feed=request.json, fetch=fetch, fetch_map=fetch_map)
result = {"result": result} for key in fetch_map:
fetch_map[key] = fetch_map[key].tolist()
result = {"result": fetch_map}
except ValueError: except ValueError:
result = {"result": "Request Value Error"} result = {"result": "Request Value Error"}
return result return result
...@@ -131,6 +149,24 @@ class WebService(object): ...@@ -131,6 +149,24 @@ class WebService(object):
for p in server_pros: for p in server_pros:
p.start() p.start()
def run_flask(self):
app_instance = Flask(__name__)
@app_instance.before_first_request
def init():
self._launch_web_service()
service_name = "/" + self.name + "/prediction"
@app_instance.route(service_name, methods=["POST"])
def run():
return self.get_prediction(request)
app_instance.run(host="0.0.0.0",
port=self.port,
threaded=False,
processes=4)
def preprocess(self, feed={}, fetch=[]): def preprocess(self, feed={}, fetch=[]):
return feed, fetch return feed, fetch
......
...@@ -47,7 +47,8 @@ REQUIRED_PACKAGES = [ ...@@ -47,7 +47,8 @@ REQUIRED_PACKAGES = [
packages=['paddle_serving_app', packages=['paddle_serving_app',
'paddle_serving_app.reader', 'paddle_serving_app.reader',
'paddle_serving_app.utils'] 'paddle_serving_app.utils',
'paddle_serving_app.reader.pddet']
package_data={} package_data={}
package_dir={'paddle_serving_app': package_dir={'paddle_serving_app':
...@@ -55,7 +56,9 @@ package_dir={'paddle_serving_app': ...@@ -55,7 +56,9 @@ package_dir={'paddle_serving_app':
'paddle_serving_app.reader': 'paddle_serving_app.reader':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_app/reader', '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_app/reader',
'paddle_serving_app.utils': 'paddle_serving_app.utils':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_app/utils',} '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_app/utils',
'paddle_serving_app.reader.pddet':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_app/reader/pddet',}
setup( setup(
name='paddle-serving-app', name='paddle-serving-app',
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册