提交 2d46b7d7 编写于 作者: G guru4elephant

move general_infer_op and general_reader_op

上级 cef6f52f
......@@ -134,7 +134,4 @@ add_subdirectory(paddle_inference)
endif()
add_subdirectory(python)
set(PYTHON_INCLUDE_DIR ${PYTHON_INCLUDE})
set(PYTHON_LIBRARIES ${PYTHON_LIB})
#add_subdirectory(examples)
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
set(SOURCE_FILE cube-agent.go)
add_go_executable(cube-agent ${SOURCE_FILE})
add_dependencies(cube-agent agent-docopt-go)
add_dependencies(cube-agent agent-logex)
add_dependencies(cube-agent agent-pipeline)
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package agent
import (
"errors"
_ "github.com/Badangel/logex"
"strings"
"sync"
)
var (
Dir string
WorkerNum int
QueueCapacity int32
MasterHost []string
MasterPort []string
TestHostname string
TestIdc string
ShardLock sync.RWMutex
CmdWorkPool *WorkPool
CmdWorkFilter sync.Map
)
type (
Status struct {
Status string `json:"status"`
Version string `json:"version"`
}
MasterResp struct {
Success string `json:"success"`
Message string `json:"message"`
Data string `json:"data"`
}
ShardInfo struct {
DictName string
ShardSeq int
SlotIdList string
DataDir string
Service string `json:"service,omitempty"`
Libcube string `json:"libcube,omitempty"`
}
CubeResp struct {
Status int `json:"status"`
CurVersion string `json:"cur_version"`
BgVersion string `json:"bg_version"`
}
)
var BUILTIN_STATUS = Status{"RUNNING", "3.0.0.1"}
var ShardInfoMap map[string]map[string]*ShardInfo
var disks []string
func GetMaster(master string) (host, port string, err error) {
if len(ShardInfoMap) < 1 {
return "", "", errors.New("empty master list.")
}
if master == "" {
return MasterHost[0], MasterPort[0], nil
}
if _, ok := ShardInfoMap[master]; ok {
m := strings.Split(master, ":")
if len(m) != 2 {
return MasterHost[0], MasterPort[0], nil
}
return m[0], m[1], nil
} else {
return MasterHost[0], MasterPort[0], nil
}
}
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package agent
import (
"bytes"
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"github.com/Badangel/logex"
)
type handlerFunc func(subpath string, m map[string]string, b []byte) (string, string, error)
var ( // key = subpath; eg: path="/checker/job", key="job"
getHandler map[string]handlerFunc
putHandler map[string]handlerFunc
deleteHandler map[string]handlerFunc
postHandler map[string]handlerFunc
)
func StartHttp(addr string) error {
// init handlers:
initGetHandlers()
initPostHandlers()
http.HandleFunc("/agent/", handleRest)
logex.Notice("start http ", addr)
return http.ListenAndServe(addr, nil)
}
func handleRest(w http.ResponseWriter, r *http.Request) {
var (
req_log string
status int32
)
time_begin := time.Now()
cont_type := make([]string, 1, 1)
cont_type[0] = "application/json"
header := w.Header()
header["Content-Type"] = cont_type
w.Header().Add("Access-Control-Allow-Origin", "*")
m := parseHttpKv(r)
b, _ := ioutil.ReadAll(r.Body)
req_log = fmt.Sprintf("handle %v %v %v from %v, len(m)=%v, m=%+v",
r.Method, r.URL.Path, r.URL.RawQuery, r.RemoteAddr, len(m), m)
api := r.URL.Path
var showHandler map[string]handlerFunc
switch r.Method {
case "GET":
showHandler = getHandler
case "POST": // create
showHandler = postHandler
case "PUT": // update
showHandler = putHandler
case "DELETE":
showHandler = deleteHandler
default:
logex.Warningf(`{"error":1, "message":"unsupport method %v"}`, r.Method)
}
handler, ok := showHandler[api]
if !ok {
key_list := make([]string, 0, len(showHandler))
for key := range showHandler {
key_list = append(key_list, key)
}
status = 2
fmt.Fprintf(w, `{"success":"%v", "message":"wrong api", "method":"%s", "api":"%s", "api_list":"%v"}`,
status, r.Method, api, key_list)
logex.Noticef(`%v, time=%v, status=%v`,
req_log, time.Now().Sub(time_begin).Nanoseconds()/1000000, status)
return
}
var s string
rst, handle_log, err := handler(api, m, b)
if err == nil {
status = 0
s = fmt.Sprintf(`{"success":"%v", "message":"query ok", "data":%s}`, status, rst)
} else {
status = 255
s = fmt.Sprintf(`{"success":"%v", "message":%v, "data":%s}`,
status, quote(err.Error()), rst)
}
if isJsonDict(s) {
fmt.Fprintln(w, s)
} else {
logex.Fatalf("invalid json: %v", s)
}
if err == nil {
logex.Noticef(`%v, time=%v, status=%v, handle_log=%v`,
req_log, time.Now().Sub(time_begin).Nanoseconds()/1000000,
status, quote(handle_log))
} else {
logex.Noticef(`%v, time=%v, status=%v, err=%v, handle_log=%v`,
req_log, time.Now().Sub(time_begin).Nanoseconds()/1000000,
status, quote(err.Error()), quote(handle_log))
}
}
func parseHttpKv(r *http.Request) map[string]string {
r.ParseForm()
m := make(map[string]string)
for k, v := range r.Form {
switch k {
case "user": // remove @baidu.com for user
m[k] = strings.Split(v[0], "@")[0]
default:
m[k] = v[0]
}
}
// allow passing hostname for debug
if _, ok := m["hostname"]; !ok {
ip := r.RemoteAddr[:strings.Index(r.RemoteAddr, ":")]
m["hostname"], _ = getHostname(ip)
}
return m
}
// restReq sends a restful request to requrl and returns response body.
func restReq(method, requrl string, timeout int, kv *map[string]string) (string, error) {
logex.Debug("####restReq####")
logex.Debug(*kv)
data := url.Values{}
if kv != nil {
for k, v := range *kv {
logex.Trace("req set:", k, v)
data.Set(k, v)
}
}
if method == "GET" || method == "DELETE" {
requrl = requrl + "?" + data.Encode()
data = url.Values{}
}
logex.Notice(method, requrl)
req, err := http.NewRequest(method, requrl, bytes.NewBufferString(data.Encode()))
if err != nil {
logex.Warning("NewRequest failed:", err)
return "", err
}
if method == "POST" || method == "PUT" {
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
req.Header.Add("Content-Length", strconv.Itoa(len(data.Encode())))
}
client := &http.Client{}
client.Timeout = time.Duration(timeout) * time.Second
resp, err := client.Do(req)
if err != nil {
logex.Warning("Do failed:", err)
return "", err
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
logex.Warning("resp status: " + resp.Status)
return "", errors.New("resp status: " + resp.Status)
}
body, err := ioutil.ReadAll(resp.Body)
return string(body), err
}
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package agent
import (
"encoding/json"
"fmt"
)
func initGetHandlers() {
getHandler = map[string]handlerFunc{
"/agent/status": GetStatus,
}
}
func GetStatus(subpath string, m map[string]string, b []byte) (string, string, error) {
b, err := json.Marshal(BUILTIN_STATUS)
if err != nil {
return quote(""), "", fmt.Errorf("json marshal failed, %v", err)
}
return string(b), "", err
}
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package agent
import (
"encoding/json"
"fmt"
"github.com/Badangel/logex"
)
func initPostHandlers() {
postHandler = map[string]handlerFunc{
"/agent/cmd": PostCmd,
}
}
func PostCmd(subpath string, m map[string]string, b []byte) (string, string, error) {
var work Work
err := json.Unmarshal(b, &work)
if err != nil {
logex.Warningf("Unmarshal from %s error (+%v)", string(b), err)
return quote(""), "", fmt.Errorf("Work json unmarshal work failed, %v", err)
}
if _, ok := CmdWorkFilter.Load(work.Token()); ok {
logex.Warningf("Another work with same token is doing. Token(%s)", work.Token())
return quote(""), "", fmt.Errorf("Another work with same key is doing.", err)
}
CmdWorkFilter.Store(work.Token(), true)
err = work.DoWork()
CmdWorkFilter.Delete(work.Token())
if err != nil {
return quote(""), "", fmt.Errorf("Do work failed.", err)
}
return quote(""), "", err
}
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package agent
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"github.com/Badangel/logex"
)
// restReq sends a restful request to requrl and returns response body.
func RestReq(method, requrl string, timeout int, kv *map[string]string) (string, error) {
data := url.Values{}
if kv != nil {
for k, v := range *kv {
//logex.Trace("req set:", k, v)
data.Set(k, v)
}
}
if method == "GET" || method == "DELETE" {
requrl = requrl + "?" + data.Encode()
data = url.Values{}
}
//logex.Notice(method, requrl)
req, err := http.NewRequest(method, requrl, bytes.NewBufferString(data.Encode()))
if err != nil {
logex.Warning("NewRequest failed:", err)
return "", err
}
if method == "POST" || method == "PUT" {
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
req.Header.Add("Content-Length", strconv.Itoa(len(data.Encode())))
}
client := &http.Client{}
client.Timeout = time.Duration(timeout) * time.Second
resp, err := client.Do(req)
if err != nil {
logex.Warning("Do failed:", err)
return "", err
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
logex.Warning("resp status: " + resp.Status)
return "", errors.New("resp status: " + resp.Status)
}
body, err := ioutil.ReadAll(resp.Body)
return string(body), err
}
// restReq sends a restful request to requrl and returns response body as json.
func JsonReq(method, requrl string, timeout int, kv *map[string]string,
out interface{}) error {
s, err := RestReq(method, requrl, timeout, kv)
logex.Debugf("json request method:[%v], requrl:[%s], timeout:[%v], map[%v], out_str:[%s]", method, requrl, timeout, kv, s)
if err != nil {
return err
}
return json.Unmarshal([]byte(s), out)
}
func GetHdfsMeta(src string) (master, ugi, path string, err error) {
//src = "hdfs://root:rootpasst@st1-inf-platform0.st01.baidu.com:54310/user/mis_user/news_dnn_ctr_cube_1/1501836820/news_dnn_ctr_cube_1_part54.tar"
//src = "hdfs://st1-inf-platform0.st01.baidu.com:54310/user/mis_user/news_dnn_ctr_cube_1/1501836820/news_dnn_ctr_cube_1_part54.tar"
ugiBegin := strings.Index(src, "//")
ugiPos := strings.LastIndex(src, "@")
if ugiPos != -1 && ugiBegin != -1 {
ugi = src[ugiBegin+2 : ugiPos]
}
src1 := strings.Replace(strings.Replace(src, "hdfs://", "", 1), ugi, "", 1)
if ugi != "" {
src1 = src1[1:]
}
pos := strings.Index(src1, "/")
if pos != -1 {
master = src1[0:pos]
path = src1[pos:]
} else {
logex.Warningf("failed to get the master or path for (%s)", src)
err = errors.New("invalid master or path found")
}
logex.Debugf("parse the (%s) succ, master is %s, ugi is (%s), path is %s", src, master, ugi, path)
return
}
func getHostIp() (string, error) {
if addrs, err := net.InterfaceAddrs(); err == nil {
for _, addr := range addrs {
ips := addr.String()
logex.Debugf("get host ip: %v", ips)
if strings.HasPrefix(ips, "127") {
continue
} else {
list := strings.Split(ips, "/")
if len(list) != 2 {
continue
}
return list[0], nil
}
}
}
return "unkown ip", errors.New("get host ip failed")
}
func getHostname(ip string) (hostname string, err error) {
if hostnames, err := net.LookupAddr(ip); err != nil {
hostname = ip
//logex.Warningf("cannot find the hostname of ip (%s), error (%v)", ip, err)
} else {
if len(hostnames) > 0 {
hostname = hostnames[0]
} else {
hostname = ip
}
}
return hostname, err
}
func GetLocalHostname() (hostname string, err error) {
if ip, err := getHostIp(); err == nil {
return getHostname(ip)
} else {
return "unkown ip", err
}
}
func GetLocalHostnameCmd() (hostname string, err error) {
cmd := "hostname"
stdout, _, err := RetryCmd(cmd, RETRY_TIMES)
if stdout != "" && err == nil {
hostname := strings.TrimSpace(stdout)
index := strings.LastIndex(hostname, ".baidu.com")
if index > 0 {
return hostname[:strings.LastIndex(hostname, ".baidu.com")], nil
} else {
return hostname, nil
}
} else {
logex.Debugf("using hostname cmd failed. err:%v", err)
return GetLocalHostname()
}
}
// quote quotes string for json output. eg: s="123", quote(s)=`"123"`
func quote(s string) string {
return fmt.Sprintf("%q", s)
}
// quoteb quotes byte array for json output.
func quoteb(b []byte) string {
return quote(string(b))
}
// quotea quotes string array for json output
func quotea(a []string) string {
b, _ := json.Marshal(a)
return string(b)
}
func isJsonDict(s string) bool {
var js map[string]interface{}
return json.Unmarshal([]byte(s), &js) == nil
}
此差异已折叠。
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package agent
import (
"errors"
"fmt"
"sync"
"sync/atomic"
)
type (
workType struct {
poolWorker PoolWorker
resultChannel chan error
}
WorkPool struct {
queueChannel chan workType
workChannel chan PoolWorker
queuedWorkNum int32
activeWorkerNum int32
queueCapacity int32
workFilter sync.Map
}
)
type PoolWorker interface {
Token() string
DoWork()
}
func NewWorkPool(workerNum int, queueCapacity int32) *WorkPool {
workPool := WorkPool{
queueChannel: make(chan workType),
workChannel: make(chan PoolWorker, queueCapacity),
queuedWorkNum: 0,
activeWorkerNum: 0,
queueCapacity: queueCapacity,
}
for i := 0; i < workerNum; i++ {
go workPool.startWorkRoutine()
}
go workPool.startQueueRoutine()
return &workPool
}
func (workPool *WorkPool) startWorkRoutine() {
for {
select {
case work := <-workPool.workChannel:
workPool.doWork(work)
break
}
}
}
func (workPool *WorkPool) startQueueRoutine() {
for {
select {
case queueItem := <-workPool.queueChannel:
if atomic.AddInt32(&workPool.queuedWorkNum, 0) == workPool.queueCapacity {
queueItem.resultChannel <- fmt.Errorf("work pool fulled with %v pending works", QueueCapacity)
continue
}
atomic.AddInt32(&workPool.queuedWorkNum, 1)
workPool.workChannel <- queueItem.poolWorker
queueItem.resultChannel <- nil
break
}
}
}
func (workPool *WorkPool) doWork(poolWorker PoolWorker) {
defer atomic.AddInt32(&workPool.activeWorkerNum, -1)
defer workPool.workFilter.Delete(poolWorker.Token())
atomic.AddInt32(&workPool.queuedWorkNum, -1)
atomic.AddInt32(&workPool.activeWorkerNum, 1)
poolWorker.DoWork()
}
func (workPool *WorkPool) PostWorkWithToken(poolWorker PoolWorker) (err error) {
if _, ok := workPool.workFilter.Load(poolWorker.Token()); ok {
return errors.New("another work with same key is doing.")
}
workPool.workFilter.Store(poolWorker.Token(), true)
return workPool.PostWork(poolWorker)
}
func (workPool *WorkPool) PostWork(poolWorker PoolWorker) (err error) {
work := workType{poolWorker, make(chan error)}
defer close(work.resultChannel)
workPool.queueChannel <- work
err = <-work.resultChannel
return err
}
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"agent"
"fmt"
"github.com/Badangel/logex"
"github.com/docopt/docopt-go"
"os"
"path/filepath"
"runtime"
"strconv"
)
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
agent.Dir, _ = filepath.Abs(filepath.Dir(os.Args[0]))
usage := fmt.Sprintf(`Usage: ./m_master [options]
Options:
-n WORKERNUM set worker num.
-q QUEUENUM set queue num.
-P LISTEN_PORT agent listen port
Log options:
-l LOG_LEVEL set log level, values: 0,1,2,4,8,16. [default: 16]
--log_dir=DIR set log output dir. [default: ./log]
--log_name=NAME set log name. [default: m_agent]`, agent.Dir)
opts, err := docopt.Parse(usage, nil, true, "Cube Agent Checker 1.0.0", false)
if err != nil {
fmt.Println("ERROR:", err)
os.Exit(1)
}
log_level, _ := strconv.Atoi(opts["-l"].(string))
log_name := opts["--log_name"].(string)
log_dir := opts["--log_dir"].(string)
logex.SetLevel(getLogLevel(log_level))
if err := logex.SetUpFileLogger(log_dir, log_name, nil); err != nil {
fmt.Println("ERROR:", err)
}
logex.Notice("--- NEW SESSION -------------------------")
logex.Notice(">>> log_level:", log_level)
agent.WorkerNum = 10
if opts["-n"] != nil {
n, err := strconv.Atoi(opts["-n"].(string))
if err == nil {
agent.WorkerNum = n
}
}
agent.QueueCapacity = 20
if opts["-q"] != nil {
q, err := strconv.Atoi(opts["-q"].(string))
if err == nil {
agent.QueueCapacity = int32(q)
}
}
agent.CmdWorkPool = agent.NewWorkPool(agent.WorkerNum, agent.QueueCapacity)
if opts["-P"] == nil {
logex.Fatalf("ERROR: -P LISTEN PORT must be set!")
os.Exit(255)
}
agentPort := opts["-P"].(string)
logex.Notice(">>> starting server...")
addr := ":" + agentPort
if agent.StartHttp(addr) != nil {
logex.Noticef("cant start http(addr=%v). quit.", addr)
os.Exit(0)
}
}
func getLogLevel(log_level int) logex.Level {
switch log_level {
case 16:
return logex.DEBUG
case 8:
return logex.TRACE
case 4:
return logex.NOTICE
case 2:
return logex.WARNING
case 1:
return logex.FATAL
case 0:
return logex.NONE
}
return logex.DEBUG
}
......@@ -12,15 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include "examples/demo-serving/op/general_infer_op.h"
#include <algorithm>
#include <iostream>
#include <memory>
#include <sstream>
#include "core/general-server/op/general_infer_op.h"
#include "core/general-server/op/general_reader_op.h"
#include "core/predictor/framework/infer.h"
#include "core/predictor/framework/memory.h"
#include "core/predictor/framework/resource.h"
#include "examples/demo-serving/op/general_reader_op.h"
namespace baidu {
namespace paddle_serving {
......
......@@ -23,7 +23,7 @@
#else
#include "paddle_inference_api.h" // NOLINT
#endif
#include "examples/demo-serving/general_model_service.pb.h"
#include "core/general-server/general_model_service.pb.h"
namespace baidu {
namespace paddle_serving {
......
......@@ -12,11 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include "examples/demo-serving/op/general_reader_op.h"
#include <algorithm>
#include <iostream>
#include <memory>
#include <sstream>
#include "core/general-server/op/general_reader_op.h"
#include "core/predictor/framework/infer.h"
#include "core/predictor/framework/memory.h"
......
......@@ -25,8 +25,8 @@
#endif
#include <string>
#include "core/predictor/framework/resource.h"
#include "examples/demo-serving/general_model_service.pb.h"
#include "examples/demo-serving/load_general_model_service.pb.h"
#include "core/general-server/general_model_service.pb.h"
#include "core/general-server/load_general_model_service.pb.h"
namespace baidu {
namespace paddle_serving {
......
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "examples/demo-serving/op/general_infer_op.h"
#include <algorithm>
#include <iostream>
#include <memory>
#include <sstream>
#include "core/predictor/framework/infer.h"
#include "core/predictor/framework/memory.h"
#include "core/predictor/framework/resource.h"
#include "examples/demo-serving/op/general_reader_op.h"
namespace baidu {
namespace paddle_serving {
namespace serving {
using baidu::paddle_serving::predictor::MempoolWrapper;
using baidu::paddle_serving::predictor::general_model::Tensor;
using baidu::paddle_serving::predictor::general_model::Response;
using baidu::paddle_serving::predictor::general_model::FetchInst;
using baidu::paddle_serving::predictor::InferManager;
int GeneralInferOp::inference() {
const GeneralReaderOutput *reader_out =
get_depend_argument<GeneralReaderOutput>("general_reader_op");
if (!reader_out) {
LOG(ERROR) << "Failed mutable depended argument, op:"
<< "general_reader_op";
return -1;
}
int reader_status = reader_out->reader_status;
if (reader_status != 0) {
LOG(ERROR) << "Read request wrong.";
return -1;
}
const TensorVector *in = &reader_out->tensor_vector;
TensorVector *out = butil::get_object<TensorVector>();
int batch_size = (*in)[0].shape[0];
// infer
if (InferManager::instance().infer(GENERAL_MODEL_NAME, in, out, batch_size)) {
LOG(ERROR) << "Failed do infer in fluid model: " << GENERAL_MODEL_NAME;
return -1;
}
Response *res = mutable_data<Response>();
for (int i = 0; i < batch_size; ++i) {
FetchInst *fetch_inst = res->add_insts();
for (int j = 0; j < out->size(); ++j) {
Tensor *tensor = fetch_inst->add_tensor_array();
tensor->set_elem_type(1);
if (out->at(j).lod.size() == 1) {
tensor->add_shape(-1);
} else {
for (int k = 1; k < out->at(j).shape.size(); ++k) {
tensor->add_shape(out->at(j).shape[k]);
}
}
}
}
for (int i = 0; i < out->size(); ++i) {
float *data_ptr = static_cast<float *>(out->at(i).data.data());
int cap = 1;
for (int j = 1; j < out->at(i).shape.size(); ++j) {
cap *= out->at(i).shape[j];
}
if (out->at(i).lod.size() == 1) {
for (int j = 0; j < batch_size; ++j) {
for (int k = out->at(i).lod[0][j]; k < out->at(i).lod[0][j + 1]; k++) {
res->mutable_insts(j)->mutable_tensor_array(i)->add_data(
reinterpret_cast<char *>(&(data_ptr[k])), sizeof(float));
}
}
} else {
for (int j = 0; j < batch_size; ++j) {
for (int k = j * cap; k < (j + 1) * cap; ++k) {
res->mutable_insts(j)->mutable_tensor_array(i)->add_data(
reinterpret_cast<char *>(&(data_ptr[k])), sizeof(float));
}
}
}
}
/*
for (size_t i = 0; i < in->size(); ++i) {
(*in)[i].shape.clear();
}
in->clear();
butil::return_object<TensorVector>(in);
for (size_t i = 0; i < out->size(); ++i) {
(*out)[i].shape.clear();
}
out->clear();
butil::return_object<TensorVector>(out);
}
*/
return 0;
}
DEFINE_OP(GeneralInferOp);
} // namespace serving
} // namespace paddle_serving
} // namespace baidu
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <vector>
#ifdef BCLOUD
#ifdef WITH_GPU
#include "paddle/paddle_inference_api.h"
#else
#include "paddle/fluid/inference/api/paddle_inference_api.h"
#endif
#else
#include "paddle_inference_api.h" // NOLINT
#endif
#include "examples/demo-serving/general_model_service.pb.h"
namespace baidu {
namespace paddle_serving {
namespace serving {
static const char* GENERAL_MODEL_NAME = "general_model";
class GeneralInferOp
: public baidu::paddle_serving::predictor::OpWithChannel<
baidu::paddle_serving::predictor::general_model::Response> {
public:
typedef std::vector<paddle::PaddleTensor> TensorVector;
DECLARE_OP(GeneralInferOp);
int inference();
};
} // namespace serving
} // namespace paddle_serving
} // namespace baidu
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "examples/demo-serving/op/general_reader_op.h"
#include <algorithm>
#include <iostream>
#include <memory>
#include <sstream>
#include "core/predictor/framework/infer.h"
#include "core/predictor/framework/memory.h"
namespace baidu {
namespace paddle_serving {
namespace serving {
using baidu::paddle_serving::predictor::MempoolWrapper;
using baidu::paddle_serving::predictor::general_model::Tensor;
using baidu::paddle_serving::predictor::general_model::Request;
using baidu::paddle_serving::predictor::general_model::FeedInst;
using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
int conf_check(const Request *req,
const std::shared_ptr<PaddleGeneralModelConfig> &model_config) {
int var_num = req->insts(0).tensor_array_size();
VLOG(2) << "var num: " << var_num;
if (var_num != model_config->_feed_type.size()) {
LOG(ERROR) << "feed var number not match.";
return -1;
}
VLOG(2) << "begin to checkout feed type";
for (int i = 0; i < var_num; ++i) {
VLOG(2) << "feed type[" << i << "]: " <<
model_config->_feed_type[i];
if (model_config->_feed_type[i] !=
req->insts(0).tensor_array(i).elem_type()) {
LOG(ERROR) << "feed type not match.";
return -1;
}
VLOG(2) << "feed shape size: " << model_config->_feed_shape[i].size();
if (model_config->_feed_shape[i].size() ==
req->insts(0).tensor_array(i).shape_size()) {
for (int j = 0; j < model_config->_feed_shape[i].size(); ++j) {
req->insts(0).tensor_array(i).shape(j);
if (model_config->_feed_shape[i][j] !=
req->insts(0).tensor_array(i).shape(j)) {
LOG(ERROR) << "feed shape not match.";
return -1;
}
}
} else {
LOG(ERROR) << "feed shape not match.";
return -1;
}
}
return 0;
}
int GeneralReaderOp::inference() {
// reade request from client
const Request *req = dynamic_cast<const Request *>(get_request_message());
int batch_size = req->insts_size();
int input_var_num = 0;
std::vector<int64_t> elem_type;
std::vector<int64_t> elem_size;
std::vector<int64_t> capacity;
GeneralReaderOutput *res = mutable_data<GeneralReaderOutput>();
TensorVector *in = &res->tensor_vector;
if (!res) {
LOG(ERROR) << "Failed get op tls reader object output";
}
if (batch_size <= 0) {
res->reader_status = -1;
return 0;
}
int var_num = req->insts(0).tensor_array_size();
VLOG(2) << "var num: " << var_num;
// read config
LOG(INFO) << "start to call load general model_conf op";
baidu::paddle_serving::predictor::Resource &resource =
baidu::paddle_serving::predictor::Resource::instance();
LOG(INFO) << "get resource pointer done.";
std::shared_ptr<PaddleGeneralModelConfig> model_config =
resource.get_general_model_config();
LOG(INFO) << "print general model config done.";
// check
res->reader_status = conf_check(req, model_config);
if (res->reader_status != 0) {
LOG(INFO) << "model conf of server:";
resource.print_general_model_config(model_config);
return 0;
}
// package tensor
elem_type.resize(var_num);
elem_size.resize(var_num);
capacity.resize(var_num);
paddle::PaddleTensor lod_tensor;
for (int i = 0; i < var_num; ++i) {
elem_type[i] = req->insts(0).tensor_array(i).elem_type();
VLOG(2) << "var[" << i << "] has elem type: " << elem_type[i];
if (elem_type[i] == 0) { // int64
elem_size[i] = sizeof(int64_t);
lod_tensor.dtype = paddle::PaddleDType::INT64;
} else {
elem_size[i] = sizeof(float);
lod_tensor.dtype = paddle::PaddleDType::FLOAT32;
}
if (req->insts(0).tensor_array(i).shape(0) == -1) {
lod_tensor.lod.resize(1);
lod_tensor.lod[0].push_back(0);
VLOG(2) << "var[" << i << "] is lod_tensor";
} else {
lod_tensor.shape.push_back(batch_size);
capacity[i] = 1;
for (int k = 0; k < req->insts(0).tensor_array(i).shape_size(); ++k) {
int dim = req->insts(0).tensor_array(i).shape(k);
VLOG(2) << "shape for var[" << i << "]: " << dim;
capacity[i] *= dim;
lod_tensor.shape.push_back(dim);
}
VLOG(2) << "var[" << i << "] is tensor, capacity: " << capacity[i];
}
if (i == 0) {
lod_tensor.name = "words";
} else {
lod_tensor.name = "label";
}
in->push_back(lod_tensor);
}
for (int i = 0; i < var_num; ++i) {
if (in->at(i).lod.size() == 1) {
for (int j = 0; j < batch_size; ++j) {
const Tensor &tensor = req->insts(j).tensor_array(i);
int data_len = tensor.data_size();
VLOG(2) << "tensor size for var[" << i << "]: " << tensor.data_size();
int cur_len = in->at(i).lod[0].back();
VLOG(2) << "current len: " << cur_len;
in->at(i).lod[0].push_back(cur_len + data_len);
VLOG(2) << "new len: " << cur_len + data_len;
}
in->at(i).data.Resize(in->at(i).lod[0].back() * elem_size[i]);
in->at(i).shape = {in->at(i).lod[0].back(), 1};
VLOG(2) << "var[" << i
<< "] is lod_tensor and len=" << in->at(i).lod[0].back();
} else {
in->at(i).data.Resize(batch_size * capacity[i] * elem_size[i]);
VLOG(2) << "var[" << i
<< "] is tensor and capacity=" << batch_size * capacity[i];
}
}
for (int i = 0; i < var_num; ++i) {
if (elem_type[i] == 0) {
int64_t *dst_ptr = static_cast<int64_t *>(in->at(i).data.data());
int offset = 0;
for (int j = 0; j < batch_size; ++j) {
for (int k = 0; k < req->insts(j).tensor_array(i).data_size(); ++k) {
dst_ptr[offset + k] =
*(const int64_t *)req->insts(j).tensor_array(i).data(k).c_str();
}
if (in->at(i).lod.size() == 1) {
offset = in->at(i).lod[0][j + 1];
} else {
offset += capacity[i];
}
}
} else {
float *dst_ptr = static_cast<float *>(in->at(i).data.data());
int offset = 0;
for (int j = 0; j < batch_size; ++j) {
for (int k = 0; k < req->insts(j).tensor_array(i).data_size(); ++k) {
dst_ptr[offset + k] =
*(const float *)req->insts(j).tensor_array(i).data(k).c_str();
}
if (in->at(i).lod.size() == 1) {
offset = in->at(i).lod[0][j + 1];
} else {
offset += capacity[i];
}
}
}
}
VLOG(2) << "read data from client success";
// print request
std::ostringstream oss;
int64_t *example = reinterpret_cast<int64_t *>((*in)[0].data.data());
for (int i = 0; i < 10; i++) {
oss << *(example + i) << " ";
}
VLOG(2) << "head element of first feed var : " << oss.str();
//
return 0;
}
DEFINE_OP(GeneralReaderOp);
} // namespace serving
} // namespace paddle_serving
} // namespace baidu
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <vector>
#ifdef BCLOUD
#ifdef WITH_GPU
#include "paddle/paddle_inference_api.h"
#else
#include "paddle/fluid/inference/api/paddle_inference_api.h"
#endif
#else
#include "paddle_inference_api.h" // NOLINT
#endif
#include <string>
#include "core/predictor/framework/resource.h"
#include "examples/demo-serving/general_model_service.pb.h"
#include "examples/demo-serving/load_general_model_service.pb.h"
namespace baidu {
namespace paddle_serving {
namespace serving {
struct GeneralReaderOutput {
std::vector<paddle::PaddleTensor> tensor_vector;
int reader_status = 0;
void Clear() {
size_t tensor_count = tensor_vector.size();
for (size_t ti = 0; ti < tensor_count; ++ti) {
tensor_vector[ti].shape.clear();
}
tensor_vector.clear();
}
std::string ShortDebugString() const { return "Not implemented!"; }
};
class GeneralReaderOp : public baidu::paddle_serving::predictor::OpWithChannel<
GeneralReaderOutput> {
public:
typedef std::vector<paddle::PaddleTensor> TensorVector;
DECLARE_OP(GeneralReaderOp);
int inference();
};
} // namespace serving
} // namespace paddle_serving
} // namespace baidu
......@@ -141,6 +141,9 @@ class Client(object):
result = self.client_handle_.predict(
float_slot, float_feed_names, int_slot, int_feed_names, fetch_names)
# TODO(guru4elephant): the order of fetch var name should be consistent with
# general_model_config, this is not friendly
# In the future, we need make the number of fetched variable changable
result_map = {}
for i, name in enumerate(fetch_names):
result_map[name] = result[i]
......
......@@ -18,7 +18,7 @@ from paddle.fluid.framework import core
from paddle.fluid.framework import default_main_program
from paddle.fluid.framework import Program
from paddle.fluid import CPUPlace
from paddle.fluid.io import save_persistables
from paddle.fluid.io import save_inference_model
from ..proto import general_model_config_pb2 as model_conf
import os
......@@ -27,19 +27,13 @@ def save_model(server_model_folder,
feed_var_dict,
fetch_var_dict,
main_program=None):
if main_program is None:
main_program = default_main_program()
elif isinstance(main_program, CompiledProgram):
main_program = main_program._program
if main_program is None:
raise TypeError("program should be as Program type or None")
if not isinstance(main_program, Program):
raise TypeError("program should be as Program type or None")
executor = Executor(place=CPUPlace())
save_persistables(executor, server_model_folder,
main_program)
feed_var_names = [feed_var_dict[x].name for x in feed_var_dict]
target_vars = fetch_var_dict.values()
save_inference_model(server_model_folder, feed_var_names,
target_vars, executor, main_program=main_program)
config = model_conf.GeneralModelConfig()
......@@ -71,10 +65,11 @@ def save_model(server_model_folder,
config.fetch_var.extend([fetch_var])
cmd = "mkdir -p {}".format(client_config_folder)
os.system(cmd)
with open("{}/serving_client_conf.prototxt", "w") as fout:
with open("{}/serving_client_conf.prototxt".format(client_config_folder), "w") as fout:
fout.write(str(config))
with open("{}/serving_server_conf.prototxt", "w") as fout:
with open("{}/serving_server_conf.prototxt".format(server_model_folder), "w") as fout:
fout.write(str(config))
......
......@@ -175,7 +175,7 @@ class Server(object):
def run_server(self):
# just run server with system command
# currently we do not load cube
command = "/home/users/dongdaxiang/github_develop/Serving/build_server/core/general-server" \
command = "/home/users/dongdaxiang/github_develop/Serving/build_server/core/general-server/serving" \
" -enable_model_toolkit " \
"-inferservice_path {} " \
"-inferservice_file {} " \
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册