提交 89e7e2af 编写于 作者: R root

fix with open bug and add java pipeline client

上级 fcc97c73
......@@ -27,7 +27,7 @@ mvn compile
mvn install
```
### Start the server
### Start the server(not pipeline)
Take the fit_a_line model as an example, the server starts
......@@ -59,6 +59,48 @@ Client prediction
java -cp paddle-serving-sdk-java-examples-0.0.1-jar-with-dependencies.jar PaddleServingClientExample yolov4 ../../../python/examples/yolov4/000000570688.jpg
# The case of yolov4 needs to specify a picture as input
```
### Start the server(pipeline)
as for input data type = string,take IMDB model ensemble as an example,the server starts
```
cd ../../python/examples/pipeline/imdb_model_ensemble
sh get_data.sh
python -m paddle_serving_server.serve --model imdb_cnn_model --port 9292 &> cnn.log &
python -m paddle_serving_server.serve --model imdb_bow_model --port 9393 &> bow.log &
python test_pipeline_server.py &>pipeline.log &
```
Client prediction(Synchronous)
```
cd ../../../java/examples/target
java -cp paddle-serving-sdk-java-examples-0.0.1-jar-with-dependencies.jar PipelineClientExample string_imdb_predict
```
Client prediction(Asynchronous)
```
cd ../../../java/examples/target
java -cp paddle-serving-sdk-java-examples-0.0.1-jar-with-dependencies.jar PipelineClientExample asyn_predict
```
as for input data type = INDArray,take uci_housing_model as an example,the server starts
```
cd ../../python/examples/pipeline/simple_web_service
sh get_data.sh
python web_service_forJava.py &>log.txt &
```
Client prediction(Synchronous)
```
cd ../../../java/examples/target
java -cp paddle-serving-sdk-java-examples-0.0.1-jar-with-dependencies.jar PipelineClientExample indarray_predict
```
### Customization guidance
......@@ -70,6 +112,9 @@ The second is to deploy GPU Serving and Java Client separately. If they are on t
**It should be noted that in the example, all models need to use `--use_multilang` to start GRPC multi-programming language support, and the port number is 9393. If you need another port, you need to modify it in the java file**
**Currently Serving has launched the Pipeline mode (see [Pipeline Serving](../doc/PIPELINE_SERVING.md) for details). The next version (0.4.1) of the Pipeline Serving Client for Java will be released. **
**Currently Serving has launched the Pipeline mode (see [Pipeline Serving](../doc/PIPELINE_SERVING.md) for details). Pipeline Serving Client for Java is released, the next version multi-thread java client example will be released**
**It should be noted that in the example, Java Pipeline Client code is in path /Java/Examples and /Java/src/main, and the Pipeline server code is in path /python/examples/pipeline/**
......@@ -27,7 +27,7 @@ mvn compile
mvn install
```
### 启动服务端
### 启动服务端(非pipeline方式)
以fit_a_line模型为例,服务端启动
......@@ -58,6 +58,49 @@ python -m paddle_serving_server_gpu.serve --model yolov4_model --port 9393 --gpu
# in /Serving/java/examples/target
java -cp paddle-serving-sdk-java-examples-0.0.1-jar-with-dependencies.jar PaddleServingClientExample yolov4 ../../../python/examples/yolov4/000000570688.jpg
# yolov4的案例需要指定一个图片作为输入
```
### 启动服务端(Pipeline方式)
对于input data type = string类型,以IMDB model ensemble模型为例,服务端启动
```
cd ../../python/examples/pipeline/imdb_model_ensemble
sh get_data.sh
python -m paddle_serving_server.serve --model imdb_cnn_model --port 9292 &> cnn.log &
python -m paddle_serving_server.serve --model imdb_bow_model --port 9393 &> bow.log &
python test_pipeline_server.py &>pipeline.log &
```
客户端预测(同步)
```
cd ../../../java/examples/target
java -cp paddle-serving-sdk-java-examples-0.0.1-jar-with-dependencies.jar PipelineClientExample string_imdb_predict
```
客户端预测(异步)
```
cd ../../../java/examples/target
java -cp paddle-serving-sdk-java-examples-0.0.1-jar-with-dependencies.jar PipelineClientExample asyn_predict
```
对于input data type = INDArray类型,以Simple Pipeline WebService中的uci_housing_model模型为例,服务端启动
```
cd ../../python/examples/pipeline/simple_web_service
sh get_data.sh
python web_service_forJava.py &>log.txt &
```
客户端预测(同步)
```
cd ../../../java/examples/target
java -cp paddle-serving-sdk-java-examples-0.0.1-jar-with-dependencies.jar PipelineClientExample indarray_predict
```
### 二次开发指导
......@@ -70,6 +113,9 @@ java -cp paddle-serving-sdk-java-examples-0.0.1-jar-with-dependencies.jar Paddle
**需要注意的是,在示例中,所有模型都需要使用`--use_multilang`来启动GRPC多编程语言支持,以及端口号都是9393,如果需要别的端口,需要在java文件里修改**
**目前Serving已推出Pipeline模式(详见[Pipeline Serving](../doc/PIPELINE_SERVING_CN.md)),下个版本(0.4.1)面向Java的Pipeline Serving Client将会发布,敬请期待。**
**目前Serving已推出Pipeline模式(详见[Pipeline Serving](../doc/PIPELINE_SERVING_CN.md)),面向Java的Pipeline Serving Client已发布,下个更新会发布Java版本的多线程用例敬请期待。**
**需要注意的是,Java Pipeline Client相关示例在/Java/Examples和/Java/src/main中,对应的Pipeline server在/python/examples/pipeline/中**
import io.paddle.serving.pipelineclient.*;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import org.nd4j.linalg.api.iter.NdIndexIterator;
import org.nd4j.linalg.api.ndarray.INDArray;
import org.datavec.image.loader.NativeImageLoader;
import org.nd4j.linalg.api.ops.CustomOp;
import org.nd4j.linalg.api.ops.DynamicCustomOp;
import org.nd4j.linalg.factory.Nd4j;
import java.util.*;
public class PipelineClientExample {
boolean string_imdb_predict() {
HashMap<String, String> feed_data
= new HashMap<String, String>() {{
put("words", "i am very sad | 0");
}};
System.out.println(feed_data);
List<String> fetch = Arrays.asList("prediction");
System.out.println(fetch);
PipelineClient client = new PipelineClient();
String target = "172.17.0.2:18070";
boolean succ = client.connect(target);
if (succ != true) {
System.out.println("connect failed.");
return false;
}
HashMap<String,String> result = client.predict(feed_data, fetch,false,0);
if (result == null) {
return false;
}
System.out.println(result);
return true;
}
boolean asyn_predict() {
HashMap<String, String> feed_data
= new HashMap<String, String>() {{
put("words", "i am very sad | 0");
}};
System.out.println(feed_data);
List<String> fetch = Arrays.asList("prediction");
System.out.println(fetch);
PipelineClient client = new PipelineClient();
String target = "172.17.0.2:18070";
boolean succ = client.connect(target);
if (succ != true) {
System.out.println("connect failed.");
return false;
}
PipelineFuture future = client.asyn_predict(feed_data, fetch,false,0);
HashMap<String,String> result = future.get();
if (result == null) {
return false;
}
System.out.println(result);
return true;
}
boolean indarray_predict() {
float[] data = {0.0137f, -0.1136f, 0.2553f, -0.0692f, 0.0582f, -0.0727f, -0.1583f, -0.0584f, 0.6283f, 0.4919f, 0.1856f, 0.0795f, -0.0332f};
INDArray npdata = Nd4j.createFromArray(data);
HashMap<String, String> feed_data
= new HashMap<String, String>() {{
put("x", "array("+npdata.toString()+")");
}};
List<String> fetch = Arrays.asList("prediction");
PipelineClient client = new PipelineClient();
String target = "172.17.0.2:9998";
boolean succ = client.connect(target);
if (succ != true) {
System.out.println("connect failed.");
return false;
}
HashMap<String,String> result = client.predict(feed_data, fetch,false,0);
if (result == null) {
return false;
}
System.out.println(result);
return true;
}
public static void main( String[] args ) {
PipelineClientExample e = new PipelineClientExample();
boolean succ = false;
if (args.length < 1) {
System.out.println("Usage: java -cp <jar> PaddleServingClientExample <test-type>.");
System.out.println("<test-type>: fit_a_line bert model_ensemble asyn_predict batch_predict cube_local cube_quant yolov4");
return;
}
String testType = args[0];
System.out.format("[Example] %s\n", testType);
if ("string_imdb_predict".equals(testType)) {
succ = e.string_imdb_predict();
}else if ("asyn_predict".equals(testType)) {
succ = e.asyn_predict();
}else if ("indarray_predict".equals(testType)) {
succ = e.indarray_predict();
} else {
System.out.format("test-type(%s) not match.\n", testType);
return;
}
if (succ == true) {
System.out.println("[Example] succ.");
} else {
System.out.println("[Example] fail.");
}
}
}
//if list or array or matrix,please Convert to INDArray,for example:
//INDArray npdata = Nd4j.createFromArray(data);
//INDArray Convert to String,for example:
//string value = "array("+npdata.toString()+")"
package io.paddle.serving.pipelineclient;
import java.util.*;
import java.util.function.Function;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import com.google.protobuf.ByteString;
import com.google.common.util.concurrent.ListenableFuture;
import org.nd4j.linalg.api.ndarray.INDArray;
import org.nd4j.linalg.api.iter.NdIndexIterator;
import org.nd4j.linalg.factory.Nd4j;
import io.paddle.serving.pipelineproto.*;
import io.paddle.serving.pipelineclient.PipelineFuture;
public class PipelineClient {
private ManagedChannel channel_;
private PipelineServiceGrpc.PipelineServiceBlockingStub blockingStub_;
private PipelineServiceGrpc.PipelineServiceFutureStub futureStub_;
private String clientip;
private String _profile_key;
private String _profile_value;
public PipelineClient() {
channel_ = null;
blockingStub_ = null;
futureStub_ = null;
boolean is_profile = false;
clientip = null;
_profile_value = "1";
_profile_key = "pipeline.profile";
}
public boolean connect(String target) {
try {
String[] temp = target.split(":");
this.clientip = temp[0] == "localhost"?"127.0.0.1":temp[0];
channel_ = ManagedChannelBuilder.forTarget(target)
.defaultLoadBalancingPolicy("round_robin")
.maxInboundMessageSize(Integer.MAX_VALUE)
.usePlaintext()
.build();
blockingStub_ = PipelineServiceGrpc.newBlockingStub(channel_);
futureStub_ = PipelineServiceGrpc.newFutureStub(channel_);
} catch (Exception e) {
System.out.format("Connect failed: %s\n", e.toString());
return false;
}
return true;
}
private Request _packInferenceRequest(
HashMap<String, String> feed_dict,
boolean profile,
int logid) throws IllegalArgumentException {
List<String> keys = new ArrayList<String>();
List<String> values = new ArrayList<String>();
long[] flattened_shape = {-1};
Request.Builder req_builder = Request.newBuilder()
.setClientip(this.clientip)
.setLogid(logid);
for (Map.Entry<String, String> entry : feed_dict.entrySet()) {
keys.add(entry.getKey());
values.add(entry.getValue());
}
if(profile){
keys.add(_profile_key);
values.add(_profile_value);
}
req_builder.addAllKey(keys);
req_builder.addAllValue(values);
return req_builder.build();
}
private HashMap<String,String> _unpackResponse(Response resp) throws IllegalArgumentException{
return PipelineClient._staitcUnpackResponse(resp);
}
private static HashMap<String,String> _staitcUnpackResponse(Response resp) {
HashMap<String,String> ret_Map = new HashMap<String,String>();
int err_no = resp.getErrNo();
if ( err_no!= 0) {
return null;
}
List<String> keys = resp.getKeyList();
List<String> values= resp.getValueList();
for (int i = 0;i<keys.size();i++) {
ret_Map.put(keys.get(i),values.get(i));
}
return ret_Map;
}
public HashMap<String,String> predict(
HashMap<String, String> feed_batch,
Iterable<String> fetch,
boolean profile,
int logid) {
try {
Request req = _packInferenceRequest(
feed_batch, profile,logid);
Response resp = blockingStub_.inference(req);
return _unpackResponse(resp);
} catch (StatusRuntimeException e) {
System.out.format("Failed to predict: %s\n", e.toString());
return null;
}
}
public HashMap<String,String> predict(
HashMap<String, String> feed_batch,
Iterable<String> fetch) {
return predict(feed_batch,fetch,false,0);
}
public HashMap<String,String> predict(
HashMap<String, String> feed_batch,
Iterable<String> fetch,
boolean profile) {
return predict(feed_batch,fetch,profile,0);
}
public HashMap<String,String> predict(
HashMap<String, String> feed_batch,
Iterable<String> fetch,
int logid) {
return predict(feed_batch,fetch,false,logid);
}
public PipelineFuture asyn_predict(
HashMap<String, String> feed_batch,
Iterable<String> fetch,
boolean profile,
int logid) {
Request req = _packInferenceRequest(
feed_batch, profile, logid);
ListenableFuture<Response> future = futureStub_.inference(req);
PipelineFuture predict_future = new PipelineFuture(future,
(Response resp) -> {
return PipelineClient._staitcUnpackResponse(resp);
}
);
return predict_future;
}
public PipelineFuture asyn_predict(
HashMap<String, String> feed_batch,
Iterable<String> fetch) {
return asyn_predict(feed_batch,fetch,false,0);
}
public PipelineFuture asyn_predict(
HashMap<String, String> feed_batch,
Iterable<String> fetch,
boolean profile) {
return asyn_predict(feed_batch,fetch,profile,0);
}
public PipelineFuture asyn_predict(
HashMap<String, String> feed_batch,
Iterable<String> fetch,
int logid) {
return asyn_predict(feed_batch,fetch,false,logid);
}
}
package io.paddle.serving.pipelineclient;
import java.util.*;
import java.util.function.Function;
import io.grpc.StatusRuntimeException;
import com.google.common.util.concurrent.ListenableFuture;
import org.nd4j.linalg.api.ndarray.INDArray;
import io.paddle.serving.pipelineclient.PipelineClient;
import io.paddle.serving.pipelineproto.*;
public class PipelineFuture {
private ListenableFuture<Response> callFuture_;
private Function<Response,
HashMap<String,String> > callBackFunc_;
PipelineFuture(ListenableFuture<Response> call_future,
Function<Response,
HashMap<String,String> > call_back_func) {
callFuture_ = call_future;
callBackFunc_ = call_back_func;
}
public HashMap<String,String> get() {
Response resp = null;
try {
resp = callFuture_.get();
} catch (Exception e) {
System.out.format("predict failed: %s\n", e.toString());
return null;
}
HashMap<String,String> result
= callBackFunc_.apply(resp);
return result;
}
}
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto2";
option java_multiple_files = true;
option java_package = "io.paddle.serving.pipelineproto";
option java_outer_classname = "PipelineProto";
package baidu.paddle_serving.pipeline_serving;
message Request {
repeated string key = 1;
repeated string value = 2;
optional string name = 3;
optional string method = 4;
optional int64 logid = 5;
optional string clientip = 6;
};
message Response {
optional int32 err_no = 1;
optional string err_msg = 2;
repeated string key = 3;
repeated string value = 4;
};
service PipelineService {
rpc inference(Request) returns (Response) {}
};
......@@ -3,6 +3,7 @@
worker_num: 1
#http端口, rpc_port和http_port不允许同时为空。当rpc_port可用且http_port为空时,不自动生成http_port
rpc_port: 9998
http_port: 18082
dag:
......@@ -20,7 +21,7 @@ op:
model_config: uci_housing_model
#计算硬件ID,当devices为""或不写时为CPU预测;当devices为"0", "0,1,2"时为GPU预测,表示使用的GPU卡
devices: "0" # "0,1"
devices: "" # "0,1"
#client类型,包括brpc, grpc和local_predictor.local_predictor不启动Serving服务,进程内预测
client_type: local_predictor
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
from paddle_serving_server.web_service import WebService, Op
except ImportError:
from paddle_serving_server.web_service import WebService, Op
import logging
import numpy as np
from numpy import array
import sys
import base64
_LOGGER = logging.getLogger()
np.set_printoptions(threshold=sys.maxsize)
class UciOp(Op):
def init_op(self):
self.separator = ","
def preprocess(self, input_dicts, data_id, log_id):
(_, input_dict), = input_dicts.items()
_LOGGER.error("UciOp::preprocess >>> log_id:{}, input:{}".format(
log_id, input_dict))
proc_dict = {}
x_value = input_dict["x"]
input_dict["x"] = x_value.reshape(1,13)
return input_dict, False, None, ""
def postprocess(self, input_dicts, fetch_dict, log_id):
_LOGGER.info("UciOp::postprocess >>> log_id:{}, fetch_dict:{}".format(
log_id, fetch_dict))
fetch_dict["price"] = str(fetch_dict["price"][0][0])
return fetch_dict, None, ""
class UciService(WebService):
def get_pipeline_response(self, read_op):
uci_op = UciOp(name="uci", input_ops=[read_op])
return uci_op
uci_service = UciService(name="uci")
uci_service.prepare_pipeline_config("config.yml")
uci_service.run_service()
......@@ -21,6 +21,7 @@ import contextlib
from contextlib import closing
import multiprocessing
import yaml
import io
from .proto import pipeline_service_pb2_grpc, pipeline_service_pb2
from . import operator
......@@ -333,7 +334,7 @@ class ServerYamlConfChecker(object):
raise SystemExit("Failed to prepare_server: only one of yml_file"
" or yml_dict can be selected as the parameter.")
if yml_file is not None:
with open(yml_file, encoding='utf-8') as f:
with io.open(yml_file, encoding='utf-8') as f:
conf = yaml.load(f.read())
elif yml_dict is not None:
conf = yml_dict
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册