提交 134cd16e 编写于 作者: X xulongteng

Merge branch 'develop' of https://github.com/MRXLT/Serving into develop

update from origin
......@@ -15,3 +15,5 @@
add_subdirectory(cube-server)
add_subdirectory(cube-api)
add_subdirectory(cube-builder)
add_subdirectory(cube-transfer)
add_subdirectory(cube-agent)
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/cmake")
project(cube-agent Go)
include(cmake/golang.cmake)
ExternalGoProject_Add(agent-docopt-go github.com/docopt/docopt-go)
ExternalGoProject_Add(agent-logex github.com/Badangel/logex)
ExternalGoProject_Add(agent-pipeline github.com/Badangel/pipeline)
add_subdirectory(src)
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
if(NOT CMAKE_Go_COMPILER)
if(NOT $ENV{GO_COMPILER} STREQUAL "")
get_filename_component(CMAKE_Go_COMPILER_INIT $ENV{GO_COMPILER} PROGRAM PROGRAM_ARGS CMAKE_Go_FLAGS_ENV_INIT)
if(CMAKE_Go_FLAGS_ENV_INIT)
set(CMAKE_Go_COMPILER_ARG1 "${CMAKE_Go_FLAGS_ENV_INIT}" CACHE STRING "First argument to Go compiler")
endif()
if(NOT EXISTS ${CMAKE_Go_COMPILER_INIT})
message(SEND_ERROR "Could not find compiler set in environment variable GO_COMPILER:\n$ENV{GO_COMPILER}.")
endif()
endif()
set(Go_BIN_PATH
$ENV{GOPATH}
$ENV{GOROOT}
$ENV{GOROOT}/../bin
$ENV{GO_COMPILER}
/usr/bin
/usr/local/bin
)
if(CMAKE_Go_COMPILER_INIT)
set(CMAKE_Go_COMPILER ${CMAKE_Go_COMPILER_INIT} CACHE PATH "Go Compiler")
else()
find_program(CMAKE_Go_COMPILER
NAMES go
PATHS ${Go_BIN_PATH}
)
EXEC_PROGRAM(${CMAKE_Go_COMPILER} ARGS version OUTPUT_VARIABLE GOLANG_VERSION)
STRING(REGEX MATCH "go[0-9]+.[0-9]+.[0-9]+[ /A-Za-z0-9]*" VERSION "${GOLANG_VERSION}")
message("-- The Golang compiler identification is ${VERSION}")
message("-- Check for working Golang compiler: ${CMAKE_Go_COMPILER}")
endif()
endif()
mark_as_advanced(CMAKE_Go_COMPILER)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/cmake/CMakeGoCompiler.cmake.in
${CMAKE_PLATFORM_INFO_DIR}/CMakeGoCompiler.cmake @ONLY)
set(CMAKE_Go_COMPILER_ENV_VAR "GO_COMPILER")
\ No newline at end of file
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
set(CMAKE_Go_COMPILER "@CMAKE_Go_COMPILER@")
set(CMAKE_Go_COMPILER_LOADED 1)
set(CMAKE_Go_SOURCE_FILE_EXTENSIONS go)
set(CMAKE_Go_LINKER_PREFERENCE 40)
set(CMAKE_Go_OUTPUT_EXTENSION .o)
set(CMAKE_Go_OUTPUT_EXTENSION_REPLACE 1)
set(CMAKE_Go_COMPILER_ENV_VAR "GO_COMPILER")
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
if(NOT CMAKE_Go_COMPILE_OBJECT)
set(CMAKE_Go_COMPILE_OBJECT "go tool compile -l -N -o <OBJECT> <SOURCE> ")
endif()
if(NOT CMAKE_Go_LINK_EXECUTABLE)
set(CMAKE_Go_LINK_EXECUTABLE "go tool link -o <TARGET> <OBJECTS> ")
endif()
\ No newline at end of file
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
set(CMAKE_Go_COMPILER_WORKS 1 CACHE INTERNAL "")
\ No newline at end of file
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
set(GOPATH "${CMAKE_CURRENT_LIST_DIR}/../")
file(MAKE_DIRECTORY ${GOPATH})
function(ExternalGoProject_Add TARG)
add_custom_target(${TARG} env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} get ${ARGN})
endfunction(ExternalGoProject_Add)
function(add_go_executable NAME)
file(GLOB GO_SOURCE RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "*.go")
add_custom_command(OUTPUT ${OUTPUT_DIR}/.timestamp
COMMAND env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} build
-o "${CMAKE_CURRENT_BINARY_DIR}/${NAME}"
${CMAKE_GO_FLAGS} ${GO_SOURCE}
WORKING_DIRECTORY ${CMAKE_CURRENT_LIST_DIR})
add_custom_target(${NAME} ALL DEPENDS ${OUTPUT_DIR}/.timestamp ${ARGN})
install(PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/${NAME} DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/bin)
endfunction(add_go_executable)
function(ADD_GO_LIBRARY NAME BUILD_TYPE)
if(BUILD_TYPE STREQUAL "STATIC")
set(BUILD_MODE -buildmode=c-archive)
set(LIB_NAME "lib${NAME}.a")
else()
set(BUILD_MODE -buildmode=c-shared)
if(APPLE)
set(LIB_NAME "lib${NAME}.dylib")
else()
set(LIB_NAME "lib${NAME}.so")
endif()
endif()
file(GLOB GO_SOURCE RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "*.go")
add_custom_command(OUTPUT ${OUTPUT_DIR}/.timestamp
COMMAND env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} build ${BUILD_MODE}
-o "${CMAKE_CURRENT_BINARY_DIR}/${LIB_NAME}"
${CMAKE_GO_FLAGS} ${GO_SOURCE}
WORKING_DIRECTORY ${CMAKE_CURRENT_LIST_DIR})
add_custom_target(${NAME} ALL DEPENDS ${OUTPUT_DIR}/.timestamp ${ARGN})
if(NOT BUILD_TYPE STREQUAL "STATIC")
install(PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/${LIB_NAME} DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/bin)
endif()
endfunction(ADD_GO_LIBRARY)
\ No newline at end of file
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
set(SOURCE_FILE cube-agent.go)
add_go_executable(cube-agent ${SOURCE_FILE})
add_dependencies(cube-agent agent-docopt-go)
add_dependencies(cube-agent agent-logex)
add_dependencies(cube-agent agent-pipeline)
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package agent
import (
"errors"
_ "github.com/Badangel/logex"
"github.com/Badangel/pipeline"
"os/exec"
"strconv"
"strings"
"sync"
)
var (
Dir string
WorkerNum int
QueueCapacity int32
MasterHost []string
MasterPort []string
TestHostname string
TestIdc string
ShardLock sync.RWMutex
CmdWorkPool *WorkPool
CmdWorkFilter sync.Map
)
type (
Status struct {
Status string `json:"status"`
Version string `json:"version"`
}
MasterResp struct {
Success string `json:"success"`
Message string `json:"message"`
Data string `json:"data"`
}
ShardInfo struct {
DictName string
ShardSeq int
SlotIdList string
DataDir string
Service string `json:"service,omitempty"`
Libcube string `json:"libcube,omitempty"`
}
CubeResp struct {
Status int `json:"status"`
CurVersion string `json:"cur_version"`
BgVersion string `json:"bg_version"`
}
)
var BUILTIN_STATUS = Status{"RUNNING", "3.0.0.1"}
var ShardInfoMap map[string]map[string]*ShardInfo
var disks []string
func GetMaster(master string) (host, port string, err error) {
if len(ShardInfoMap) < 1 {
return "", "", errors.New("empty master list.")
}
if master == "" {
return MasterHost[0], MasterPort[0], nil
}
if _, ok := ShardInfoMap[master]; ok {
m := strings.Split(master, ":")
if len(m) != 2 {
return MasterHost[0], MasterPort[0], nil
}
return m[0], m[1], nil
} else {
return MasterHost[0], MasterPort[0], nil
}
}
func init() {
dfCmd := "df -h | grep -E '/home|/ssd'"
stdout, _, err := pipeline.Run(exec.Command("/bin/bash", "-c", dfCmd))
if err == nil && stdout.String() != "" {
t := strings.TrimSpace(stdout.String())
diskLi := strings.Split(t, "\n")
for _, diskStr := range diskLi {
disk := strings.Fields(diskStr)
usedPercent, _ := strconv.Atoi(strings.TrimRight(disk[4], "%"))
if usedPercent <= 40 {
disks = append(disks, disk[5])
}
}
}
if len(disks) == 0 {
disks = append(disks, "/home")
}
//logex.Debugf("available disks found: (%+v)", disks)
}
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package agent
import (
"bytes"
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"github.com/Badangel/logex"
)
type handlerFunc func(subpath string, m map[string]string, b []byte) (string, string, error)
var ( // key = subpath; eg: path="/checker/job", key="job"
getHandler map[string]handlerFunc
putHandler map[string]handlerFunc
deleteHandler map[string]handlerFunc
postHandler map[string]handlerFunc
)
func StartHttp(addr string) error {
// init handlers:
initGetHandlers()
initPostHandlers()
http.HandleFunc("/agent/", handleRest)
logex.Notice("start http ", addr)
return http.ListenAndServe(addr, nil)
}
func handleRest(w http.ResponseWriter, r *http.Request) {
var (
req_log string
status int32
)
time_begin := time.Now()
cont_type := make([]string, 1, 1)
cont_type[0] = "application/json"
header := w.Header()
header["Content-Type"] = cont_type
w.Header().Add("Access-Control-Allow-Origin", "*")
m := parseHttpKv(r)
b, _ := ioutil.ReadAll(r.Body)
req_log = fmt.Sprintf("handle %v %v %v from %v, len(m)=%v, m=%+v",
r.Method, r.URL.Path, r.URL.RawQuery, r.RemoteAddr, len(m), m)
api := r.URL.Path
var showHandler map[string]handlerFunc
switch r.Method {
case "GET":
showHandler = getHandler
case "POST": // create
showHandler = postHandler
case "PUT": // update
showHandler = putHandler
case "DELETE":
showHandler = deleteHandler
default:
logex.Warningf(`{"error":1, "message":"unsupport method %v"}`, r.Method)
}
handler, ok := showHandler[api]
if !ok {
key_list := make([]string, 0, len(showHandler))
for key := range showHandler {
key_list = append(key_list, key)
}
status = 2
fmt.Fprintf(w, `{"success":"%v", "message":"wrong api", "method":"%s", "api":"%s", "api_list":"%v"}`,
status, r.Method, api, key_list)
logex.Noticef(`%v, time=%v, status=%v`,
req_log, time.Now().Sub(time_begin).Nanoseconds()/1000000, status)
return
}
var s string
rst, handle_log, err := handler(api, m, b)
if err == nil {
status = 0
s = fmt.Sprintf(`{"success":"%v", "message":"query ok", "data":%s}`, status, rst)
} else {
status = 255
s = fmt.Sprintf(`{"success":"%v", "message":%v, "data":%s}`,
status, quote(err.Error()), rst)
}
if isJsonDict(s) {
fmt.Fprintln(w, s)
} else {
logex.Fatalf("invalid json: %v", s)
}
if err == nil {
logex.Noticef(`%v, time=%v, status=%v, handle_log=%v`,
req_log, time.Now().Sub(time_begin).Nanoseconds()/1000000,
status, quote(handle_log))
} else {
logex.Noticef(`%v, time=%v, status=%v, err=%v, handle_log=%v`,
req_log, time.Now().Sub(time_begin).Nanoseconds()/1000000,
status, quote(err.Error()), quote(handle_log))
}
}
func parseHttpKv(r *http.Request) map[string]string {
r.ParseForm()
m := make(map[string]string)
for k, v := range r.Form {
switch k {
case "user": // remove @baidu.com for user
m[k] = strings.Split(v[0], "@")[0]
default:
m[k] = v[0]
}
}
// allow passing hostname for debug
if _, ok := m["hostname"]; !ok {
ip := r.RemoteAddr[:strings.Index(r.RemoteAddr, ":")]
m["hostname"], _ = getHostname(ip)
}
return m
}
// restReq sends a restful request to requrl and returns response body.
func restReq(method, requrl string, timeout int, kv *map[string]string) (string, error) {
logex.Debug("####restReq####")
logex.Debug(*kv)
data := url.Values{}
if kv != nil {
for k, v := range *kv {
logex.Trace("req set:", k, v)
data.Set(k, v)
}
}
if method == "GET" || method == "DELETE" {
requrl = requrl + "?" + data.Encode()
data = url.Values{}
}
logex.Notice(method, requrl)
req, err := http.NewRequest(method, requrl, bytes.NewBufferString(data.Encode()))
if err != nil {
logex.Warning("NewRequest failed:", err)
return "", err
}
if method == "POST" || method == "PUT" {
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
req.Header.Add("Content-Length", strconv.Itoa(len(data.Encode())))
}
client := &http.Client{}
client.Timeout = time.Duration(timeout) * time.Second
resp, err := client.Do(req)
if err != nil {
logex.Warning("Do failed:", err)
return "", err
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
logex.Warning("resp status: " + resp.Status)
return "", errors.New("resp status: " + resp.Status)
}
body, err := ioutil.ReadAll(resp.Body)
return string(body), err
}
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package agent
import (
"encoding/json"
"fmt"
)
func initGetHandlers() {
getHandler = map[string]handlerFunc{
"/agent/status": GetStatus,
}
}
func GetStatus(subpath string, m map[string]string, b []byte) (string, string, error) {
b, err := json.Marshal(BUILTIN_STATUS)
if err != nil {
return quote(""), "", fmt.Errorf("json marshal failed, %v", err)
}
return string(b), "", err
}
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package agent
import (
"encoding/json"
"fmt"
"github.com/Badangel/logex"
)
func initPostHandlers() {
postHandler = map[string]handlerFunc{
"/agent/cmd": PostCmd,
}
}
func PostCmd(subpath string, m map[string]string, b []byte) (string, string, error) {
var work Work
err := json.Unmarshal(b, &work)
if err != nil {
logex.Warningf("Unmarshal from %s error (+%v)", string(b), err)
return quote(""), "", fmt.Errorf("Work json unmarshal work failed, %v", err)
}
if _, ok := CmdWorkFilter.Load(work.Token()); ok {
logex.Warningf("Another work with same token is doing. Token(%s)", work.Token())
return quote(""), "", fmt.Errorf("Another work with same key is doing.", err)
}
CmdWorkFilter.Store(work.Token(), true)
err = work.DoWork()
CmdWorkFilter.Delete(work.Token())
if err != nil {
return quote(""), "", fmt.Errorf("Do work failed.", err)
}
return quote(""), "", err
}
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package agent
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"github.com/Badangel/logex"
)
// restReq sends a restful request to requrl and returns response body.
func RestReq(method, requrl string, timeout int, kv *map[string]string) (string, error) {
data := url.Values{}
if kv != nil {
for k, v := range *kv {
//logex.Trace("req set:", k, v)
data.Set(k, v)
}
}
if method == "GET" || method == "DELETE" {
requrl = requrl + "?" + data.Encode()
data = url.Values{}
}
//logex.Notice(method, requrl)
req, err := http.NewRequest(method, requrl, bytes.NewBufferString(data.Encode()))
if err != nil {
logex.Warning("NewRequest failed:", err)
return "", err
}
if method == "POST" || method == "PUT" {
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
req.Header.Add("Content-Length", strconv.Itoa(len(data.Encode())))
}
client := &http.Client{}
client.Timeout = time.Duration(timeout) * time.Second
resp, err := client.Do(req)
if err != nil {
logex.Warning("Do failed:", err)
return "", err
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
logex.Warning("resp status: " + resp.Status)
return "", errors.New("resp status: " + resp.Status)
}
body, err := ioutil.ReadAll(resp.Body)
return string(body), err
}
// restReq sends a restful request to requrl and returns response body as json.
func JsonReq(method, requrl string, timeout int, kv *map[string]string,
out interface{}) error {
s, err := RestReq(method, requrl, timeout, kv)
logex.Debugf("json request method:[%v], requrl:[%s], timeout:[%v], map[%v], out_str:[%s]", method, requrl, timeout, kv, s)
if err != nil {
return err
}
return json.Unmarshal([]byte(s), out)
}
func GetHdfsMeta(src string) (master, ugi, path string, err error) {
//src = "hdfs://root:rootpasst@st1-inf-platform0.st01.baidu.com:54310/user/mis_user/news_dnn_ctr_cube_1/1501836820/news_dnn_ctr_cube_1_part54.tar"
//src = "hdfs://st1-inf-platform0.st01.baidu.com:54310/user/mis_user/news_dnn_ctr_cube_1/1501836820/news_dnn_ctr_cube_1_part54.tar"
ugiBegin := strings.Index(src, "//")
ugiPos := strings.LastIndex(src, "@")
if ugiPos != -1 && ugiBegin != -1 {
ugi = src[ugiBegin+2 : ugiPos]
}
src1 := strings.Replace(strings.Replace(src, "hdfs://", "", 1), ugi, "", 1)
if ugi != "" {
src1 = src1[1:]
}
pos := strings.Index(src1, "/")
if pos != -1 {
master = src1[0:pos]
path = src1[pos:]
} else {
logex.Warningf("failed to get the master or path for (%s)", src)
err = errors.New("invalid master or path found")
}
logex.Debugf("parse the (%s) succ, master is %s, ugi is (%s), path is %s", src, master, ugi, path)
return
}
func getHostIp() (string, error) {
if addrs, err := net.InterfaceAddrs(); err == nil {
for _, addr := range addrs {
ips := addr.String()
logex.Debugf("get host ip: %v", ips)
if strings.HasPrefix(ips, "127") {
continue
} else {
list := strings.Split(ips, "/")
if len(list) != 2 {
continue
}
return list[0], nil
}
}
}
return "unkown ip", errors.New("get host ip failed")
}
func getHostname(ip string) (hostname string, err error) {
if hostnames, err := net.LookupAddr(ip); err != nil {
hostname = ip
//logex.Warningf("cannot find the hostname of ip (%s), error (%v)", ip, err)
} else {
if len(hostnames) > 0 {
hostname = hostnames[0][:strings.LastIndex(hostnames[0], ".baidu.com.")]
} else {
hostname = ip
}
}
return hostname, err
}
func GetLocalHostname() (hostname string, err error) {
if ip, err := getHostIp(); err == nil {
return getHostname(ip)
} else {
return "unkown ip", err
}
}
func GetLocalHostnameCmd() (hostname string, err error) {
cmd := "hostname"
stdout, _, err := RetryCmd(cmd, RETRY_TIMES)
if stdout != "" && err == nil {
hostname := strings.TrimSpace(stdout)
index := strings.LastIndex(hostname, ".baidu.com")
if index > 0 {
return hostname[:strings.LastIndex(hostname, ".baidu.com")], nil
} else {
return hostname, nil
}
} else {
logex.Debugf("using hostname cmd failed. err:%v", err)
return GetLocalHostname()
}
}
// quote quotes string for json output. eg: s="123", quote(s)=`"123"`
func quote(s string) string {
return fmt.Sprintf("%q", s)
}
// quoteb quotes byte array for json output.
func quoteb(b []byte) string {
return quote(string(b))
}
// quotea quotes string array for json output
func quotea(a []string) string {
b, _ := json.Marshal(a)
return string(b)
}
func isJsonDict(s string) bool {
var js map[string]interface{}
return json.Unmarshal([]byte(s), &js) == nil
}
此差异已折叠。
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package agent
import (
"errors"
"fmt"
"sync"
"sync/atomic"
)
type (
workType struct {
poolWorker PoolWorker
resultChannel chan error
}
WorkPool struct {
queueChannel chan workType
workChannel chan PoolWorker
queuedWorkNum int32
activeWorkerNum int32
queueCapacity int32
workFilter sync.Map
}
)
type PoolWorker interface {
Token() string
DoWork()
}
func NewWorkPool(workerNum int, queueCapacity int32) *WorkPool {
workPool := WorkPool{
queueChannel: make(chan workType),
workChannel: make(chan PoolWorker, queueCapacity),
queuedWorkNum: 0,
activeWorkerNum: 0,
queueCapacity: queueCapacity,
}
for i := 0; i < workerNum; i++ {
go workPool.startWorkRoutine()
}
go workPool.startQueueRoutine()
return &workPool
}
func (workPool *WorkPool) startWorkRoutine() {
for {
select {
case work := <-workPool.workChannel:
workPool.doWork(work)
break
}
}
}
func (workPool *WorkPool) startQueueRoutine() {
for {
select {
case queueItem := <-workPool.queueChannel:
if atomic.AddInt32(&workPool.queuedWorkNum, 0) == workPool.queueCapacity {
queueItem.resultChannel <- fmt.Errorf("work pool fulled with %v pending works", QueueCapacity)
continue
}
atomic.AddInt32(&workPool.queuedWorkNum, 1)
workPool.workChannel <- queueItem.poolWorker
queueItem.resultChannel <- nil
break
}
}
}
func (workPool *WorkPool) doWork(poolWorker PoolWorker) {
defer atomic.AddInt32(&workPool.activeWorkerNum, -1)
defer workPool.workFilter.Delete(poolWorker.Token())
atomic.AddInt32(&workPool.queuedWorkNum, -1)
atomic.AddInt32(&workPool.activeWorkerNum, 1)
poolWorker.DoWork()
}
func (workPool *WorkPool) PostWorkWithToken(poolWorker PoolWorker) (err error) {
if _, ok := workPool.workFilter.Load(poolWorker.Token()); ok {
return errors.New("another work with same key is doing.")
}
workPool.workFilter.Store(poolWorker.Token(), true)
return workPool.PostWork(poolWorker)
}
func (workPool *WorkPool) PostWork(poolWorker PoolWorker) (err error) {
work := workType{poolWorker, make(chan error)}
defer close(work.resultChannel)
workPool.queueChannel <- work
err = <-work.resultChannel
return err
}
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"agent"
"fmt"
"github.com/Badangel/logex"
"github.com/docopt/docopt-go"
"os"
"path/filepath"
"runtime"
"strconv"
)
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
agent.Dir, _ = filepath.Abs(filepath.Dir(os.Args[0]))
usage := fmt.Sprintf(`Usage: ./m_master [options]
Options:
-n WORKERNUM set worker num.
-q QUEUENUM set queue num.
-P LISTEN_PORT agent listen port
Log options:
-l LOG_LEVEL set log level, values: 0,1,2,4,8,16. [default: 16]
--log_dir=DIR set log output dir. [default: ./log]
--log_name=NAME set log name. [default: m_agent]`, agent.Dir)
opts, err := docopt.Parse(usage, nil, true, "Cube Agent Checker 1.0.0", false)
if err != nil {
fmt.Println("ERROR:", err)
os.Exit(1)
}
log_level, _ := strconv.Atoi(opts["-l"].(string))
log_name := opts["--log_name"].(string)
log_dir := opts["--log_dir"].(string)
logex.SetLevel(getLogLevel(log_level))
if err := logex.SetUpFileLogger(log_dir, log_name, nil); err != nil {
fmt.Println("ERROR:", err)
}
logex.Notice("--- NEW SESSION -------------------------")
logex.Notice(">>> log_level:", log_level)
agent.WorkerNum = 10
if opts["-n"] != nil {
n, err := strconv.Atoi(opts["-n"].(string))
if err == nil {
agent.WorkerNum = n
}
}
agent.QueueCapacity = 20
if opts["-q"] != nil {
q, err := strconv.Atoi(opts["-q"].(string))
if err == nil {
agent.QueueCapacity = int32(q)
}
}
agent.CmdWorkPool = agent.NewWorkPool(agent.WorkerNum, agent.QueueCapacity)
if opts["-P"] == nil {
logex.Fatalf("ERROR: -P LISTEN PORT must be set!")
os.Exit(255)
}
agentPort := opts["-P"].(string)
logex.Notice(">>> starting server...")
addr := ":" + agentPort
if agent.StartHttp(addr) != nil {
logex.Noticef("cant start http(addr=%v). quit.", addr)
os.Exit(0)
}
}
func getLogLevel(log_level int) logex.Level {
switch log_level {
case 16:
return logex.DEBUG
case 8:
return logex.TRACE
case 4:
return logex.NOTICE
case 2:
return logex.WARNING
case 1:
return logex.FATAL
case 0:
return logex.NONE
}
return logex.DEBUG
}
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/cmake")
project(cube-transfer Go)
include(cmake/golang.cmake)
ExternalGoProject_Add(docopt-go github.com/docopt/docopt-go)
ExternalGoProject_Add(rfw github.com/mipearson/rfw)
ExternalGoProject_Add(logex github.com/Badangel/logex)
add_subdirectory(src)
install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/conf DESTINATION ${PADDLE_SERVING_INSTALL_DIR})
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
if(NOT CMAKE_Go_COMPILER)
if(NOT $ENV{GO_COMPILER} STREQUAL "")
get_filename_component(CMAKE_Go_COMPILER_INIT $ENV{GO_COMPILER} PROGRAM PROGRAM_ARGS CMAKE_Go_FLAGS_ENV_INIT)
if(CMAKE_Go_FLAGS_ENV_INIT)
set(CMAKE_Go_COMPILER_ARG1 "${CMAKE_Go_FLAGS_ENV_INIT}" CACHE STRING "First argument to Go compiler")
endif()
if(NOT EXISTS ${CMAKE_Go_COMPILER_INIT})
message(SEND_ERROR "Could not find compiler set in environment variable GO_COMPILER:\n$ENV{GO_COMPILER}.")
endif()
endif()
set(Go_BIN_PATH
$ENV{GOPATH}
$ENV{GOROOT}
$ENV{GOROOT}/../bin
$ENV{GO_COMPILER}
/usr/bin
/usr/local/bin
)
if(CMAKE_Go_COMPILER_INIT)
set(CMAKE_Go_COMPILER ${CMAKE_Go_COMPILER_INIT} CACHE PATH "Go Compiler")
else()
find_program(CMAKE_Go_COMPILER
NAMES go
PATHS ${Go_BIN_PATH}
)
EXEC_PROGRAM(${CMAKE_Go_COMPILER} ARGS version OUTPUT_VARIABLE GOLANG_VERSION)
STRING(REGEX MATCH "go[0-9]+.[0-9]+.[0-9]+[ /A-Za-z0-9]*" VERSION "${GOLANG_VERSION}")
message("-- The Golang compiler identification is ${VERSION}")
message("-- Check for working Golang compiler: ${CMAKE_Go_COMPILER}")
endif()
endif()
mark_as_advanced(CMAKE_Go_COMPILER)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/cmake/CMakeGoCompiler.cmake.in
${CMAKE_PLATFORM_INFO_DIR}/CMakeGoCompiler.cmake @ONLY)
set(CMAKE_Go_COMPILER_ENV_VAR "GO_COMPILER")
\ No newline at end of file
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
set(CMAKE_Go_COMPILER "@CMAKE_Go_COMPILER@")
set(CMAKE_Go_COMPILER_LOADED 1)
set(CMAKE_Go_SOURCE_FILE_EXTENSIONS go)
set(CMAKE_Go_LINKER_PREFERENCE 40)
set(CMAKE_Go_OUTPUT_EXTENSION .o)
set(CMAKE_Go_OUTPUT_EXTENSION_REPLACE 1)
set(CMAKE_Go_COMPILER_ENV_VAR "GO_COMPILER")
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
if(NOT CMAKE_Go_COMPILE_OBJECT)
set(CMAKE_Go_COMPILE_OBJECT "go tool compile -l -N -o <OBJECT> <SOURCE> ")
endif()
if(NOT CMAKE_Go_LINK_EXECUTABLE)
set(CMAKE_Go_LINK_EXECUTABLE "go tool link -o <TARGET> <OBJECTS> ")
endif()
\ No newline at end of file
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
set(CMAKE_Go_COMPILER_WORKS 1 CACHE INTERNAL "")
\ No newline at end of file
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
set(GOPATH "${CMAKE_CURRENT_LIST_DIR}/../")
file(MAKE_DIRECTORY ${GOPATH})
function(ExternalGoProject_Add TARG)
add_custom_target(${TARG} env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} get ${ARGN})
endfunction(ExternalGoProject_Add)
function(add_go_executable NAME)
file(GLOB GO_SOURCE RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "*.go")
add_custom_command(OUTPUT ${OUTPUT_DIR}/.timestamp
COMMAND env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} build
-o "${CMAKE_CURRENT_BINARY_DIR}/${NAME}"
${CMAKE_GO_FLAGS} ${GO_SOURCE}
WORKING_DIRECTORY ${CMAKE_CURRENT_LIST_DIR})
add_custom_target(${NAME} ALL DEPENDS ${OUTPUT_DIR}/.timestamp ${ARGN})
install(PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/${NAME} DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/bin)
endfunction(add_go_executable)
function(ADD_GO_LIBRARY NAME BUILD_TYPE)
if(BUILD_TYPE STREQUAL "STATIC")
set(BUILD_MODE -buildmode=c-archive)
set(LIB_NAME "lib${NAME}.a")
else()
set(BUILD_MODE -buildmode=c-shared)
if(APPLE)
set(LIB_NAME "lib${NAME}.dylib")
else()
set(LIB_NAME "lib${NAME}.so")
endif()
endif()
file(GLOB GO_SOURCE RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "*.go")
add_custom_command(OUTPUT ${OUTPUT_DIR}/.timestamp
COMMAND env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} build ${BUILD_MODE}
-o "${CMAKE_CURRENT_BINARY_DIR}/${LIB_NAME}"
${CMAKE_GO_FLAGS} ${GO_SOURCE}
WORKING_DIRECTORY ${CMAKE_CURRENT_LIST_DIR})
add_custom_target(${NAME} ALL DEPENDS ${OUTPUT_DIR}/.timestamp ${ARGN})
if(NOT BUILD_TYPE STREQUAL "STATIC")
install(PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/${LIB_NAME} DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/bin)
endif()
endfunction(ADD_GO_LIBRARY)
\ No newline at end of file
[default]
dict_name: test_dict
mode: base_only
storage_place: LOCAL
buildtool_local: /home/work/Serving/build/output/bin/cube-builder
donefile_address: http://127.0.0.1/home/work/dangyifei/donefile
output_address: /home/work/dangyifei/test-transfer/test_data/output
tmp_address: /home/work/dangyifei/test-transfer/test_data/tmp
shard_num: 1
copy_num: 2
deploy_path: /home/work/test_dict
transfer_address: 127.0.0.1
[cube_agent]
agent0_0: 127.0.0.1:8001
cube0_0: 127.0.0.1:8000:/ssd2/cube_open
agent0_1: 127.0.0.1:8001
cube0_1: 127.0.0.1:8000:/home/disk1/cube_open
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
set(SOURCE_FILE cube-transfer.go)
add_go_executable(cube-transfer ${SOURCE_FILE})
add_dependencies(cube-transfer docopt-go)
add_dependencies(cube-transfer rfw)
add_dependencies(cube-transfer logex)
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"fmt"
"github.com/docopt/docopt-go"
"github.com/Badangel/logex"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"time"
"transfer"
"transfer/dict"
)
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
transfer.Dir, _ = filepath.Abs(filepath.Dir(os.Args[0]))
usage := fmt.Sprintf(`Usage: ./m_master [options]
Options:
-p PORT set listen port. [default: 8099]
--config=conf/transfer.conf set conf file. [defalut: ./conf/transfer.conf]
Log options:
-l LOG_LEVEL set log level, values: 0,1,2,4,8,16. [default: 4]
--log_dir=DIR set log output dir. [default: ./log]
--log_name=NAME set log name. [default: transfer]`,
transfer.Dir)
opts, err := docopt.Parse(usage, nil, true, "Cube Transfer", false)
if err != nil {
fmt.Println("ERROR:", err)
os.Exit(1)
}
log_level, _ := strconv.Atoi(opts["-l"].(string))
log_name := opts["--log_name"].(string)
log_dir := opts["--log_dir"].(string)
logex.SetLevel(getLogLevel(log_level))
if err := logex.SetUpFileLogger(log_dir, log_name, nil); err != nil {
fmt.Println("ERROR:", err)
}
fmt.Printf("%v: Print stdout1 here...\n", time.Now())
os.Stderr.WriteString(fmt.Sprintf("%v: Print stderr here...\n", time.Now()))
logex.Notice("--- NEW SESSION -------------------------")
logex.Notice(">>> log_level:", log_level)
// settings:
if opts["-p"] == nil {
fmt.Fprintln(os.Stderr, "ERROR: -p PORT must be set!")
fmt.Fprintln(os.Stderr, usage)
os.Exit(1)
}
transfer.Port = opts["-p"].(string)
logex.Notice(">>> port:", transfer.Port)
if opts["--config"] == nil {
fmt.Fprintln(os.Stderr, "ERROR: --config config_file must be set!")
fmt.Fprintln(os.Stderr, usage)
os.Exit(1)
}
//read conf
var configMgr transfer.ConfigManager
configMgr.Init(opts["--config"].(string))
transfer.Dict.DictName = configMgr.Read("default", "dict_name")
if transfer.Dict.DictName == "" {
fmt.Fprintln(os.Stderr, "ERROR: nead [default] DictName in config_file!")
fmt.Fprintln(os.Stderr, usage)
os.Exit(1)
}
logex.Notice(">>> DictName:", transfer.Dict.DictName)
transfer.Dict.DictMode = configMgr.Read("default", "mode")
if transfer.Dict.DictMode == "" {
fmt.Fprintln(os.Stderr, "ERROR: nead [default] DictMode in config_file!")
fmt.Fprintln(os.Stderr, usage)
os.Exit(1)
}
logex.Notice(">>> Mode:", transfer.Dict.DictMode)
transfer.Dict.StoragePlace = configMgr.Read("default", "storage_place")
if transfer.Dict.StoragePlace == "" || transfer.Dict.StoragePlace != "LOCAL" {
fmt.Fprintln(os.Stderr, "ERROR: nead [default] StoragePlace in config_file! only support Local")
fmt.Fprintln(os.Stderr, usage)
os.Exit(1)
}
logex.Notice(">>> StoragePlace:", transfer.Dict.StoragePlace)
transfer.BuildToolLocal = configMgr.Read("default", "buildtool_local")
if transfer.BuildToolLocal == "" {
fmt.Fprintln(os.Stderr, "ERROR: nead [default] BuildToolLocal in config_file!")
fmt.Fprintln(os.Stderr, usage)
os.Exit(1)
}
logex.Notice(">>> BuildToolLocal:", transfer.BuildToolLocal)
transfer.Dict.DonefileAddress = configMgr.Read("default", "donefile_address")
if transfer.Dict.DonefileAddress == "" {
fmt.Fprintln(os.Stderr, "ERROR: nead [default] DonefileAddress in config_file!")
fmt.Fprintln(os.Stderr, usage)
os.Exit(1)
}
logex.Notice(">>> DonefileAddress:", transfer.Dict.DonefileAddress)
transfer.Dict.OutputAddress = configMgr.Read("default", "output_address")
if transfer.Dict.OutputAddress == "" {
fmt.Fprintln(os.Stderr, "ERROR: nead [default] OutputAddress in config_file!")
fmt.Fprintln(os.Stderr, usage)
os.Exit(1)
}
logex.Notice(">>> OutputAddress:", transfer.Dict.OutputAddress)
transfer.Dict.TmpAddress = configMgr.Read("default", "tmp_address")
if transfer.Dict.TmpAddress == "" {
fmt.Fprintln(os.Stderr, "ERROR: nead [default] TmpAddress in config_file!")
fmt.Fprintln(os.Stderr, usage)
os.Exit(1)
}
logex.Notice(">>> TmpAddress:", transfer.Dict.TmpAddress)
ShardNumStr := configMgr.Read("default", "shard_num")
if ShardNumStr == "" {
fmt.Fprintln(os.Stderr, "ERROR: nead [default] ShardNum in config_file!")
fmt.Fprintln(os.Stderr, usage)
os.Exit(1)
}
transfer.Dict.ShardNum, err = strconv.Atoi(ShardNumStr)
if err != nil {
logex.Fatal("ShardNum form is not right")
os.Exit(1)
}
logex.Notice(">>> ShardNum:", transfer.Dict.ShardNum)
CopyNumStr := configMgr.Read("default", "copy_num")
if CopyNumStr == "" {
fmt.Fprintln(os.Stderr, "ERROR: nead [default] CopyNum in config_file!")
fmt.Fprintln(os.Stderr, usage)
os.Exit(1)
}
transfer.Dict.CopyNum, err = strconv.Atoi(CopyNumStr)
if err != nil {
logex.Fatal("ShardNum form is not right")
os.Exit(1)
}
logex.Notice(">>> CopyNum:", transfer.Dict.CopyNum)
transfer.Dict.InstancesNum = transfer.Dict.ShardNum * transfer.Dict.CopyNum
transfer.Dict.DeployPath = configMgr.Read("default", "deploy_path")
if transfer.Dict.DeployPath == "" {
fmt.Fprintln(os.Stderr, "ERROR: nead [default] DeployPath in config_file!")
fmt.Fprintln(os.Stderr, usage)
os.Exit(1)
}
logex.Notice(">>> DeployPath:", transfer.Dict.DeployPath)
transfer.TransferAddr = configMgr.Read("default", "transfer_address")
if transfer.TransferAddr == "" {
fmt.Fprintln(os.Stderr, "ERROR: nead [default] TransferAddr in config_file!")
fmt.Fprintln(os.Stderr, usage)
os.Exit(1)
}
logex.Notice(">>> TransferAddr:", transfer.TransferAddr)
for i := 0; i < transfer.Dict.ShardNum; i++ {
for j := 0; j < transfer.Dict.CopyNum; j++ {
var instance dict.DictInstance
agentName := fmt.Sprintf("agent%d_%d", i, j)
agentInfo := configMgr.Read("cube_agent", agentName)
agentInfoSlice := strings.Split(agentInfo, ":")
cubeName := fmt.Sprintf("cube%d_%d", i, j)
cubeInfo := configMgr.Read("cube_agent", cubeName)
cubeInfoSlice := strings.Split(cubeInfo, ":")
instance.DictName = transfer.Dict.DictName
instance.AgentIp = agentInfoSlice[0]
instance.AgentPort, _ = strconv.Atoi(agentInfoSlice[1])
instance.IP = cubeInfoSlice[0]
instance.Port, _ = strconv.Atoi(cubeInfoSlice[1])
instance.DictName = transfer.Dict.DictName
instance.CreateTime = int(time.Now().Unix())
instance.Shard = i
instance.DeployPath = cubeInfoSlice[2]
transfer.Dict.Instances = append(transfer.Dict.Instances, instance)
}
}
logex.Noticef(">>> instance: %v", transfer.Dict.Instances)
transfer.Start()
fmt.Print("m-cube-transfer over!")
}
func getLogLevel(log_level int) logex.Level {
switch log_level {
case 16:
return logex.DEBUG
case 8:
return logex.TRACE
case 4:
return logex.NOTICE
case 2:
return logex.WARNING
case 1:
return logex.FATAL
case 0:
return logex.NONE
}
return logex.DEBUG
}
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package transfer
import (
"fmt"
"github.com/Badangel/logex"
"strconv"
"strings"
"time"
"transfer/dict"
)
func BuilderStart(versionInfo dict.DictVersionInfo) error {
fmt.Printf("[entry]start build\n")
logex.Noticef("[entry]start build")
if strings.HasPrefix(Dict.DonefileAddress, dict.FTP_HEADER) || strings.HasPrefix(Dict.DonefileAddress, dict.HTTP_HEADER) {
if strings.HasPrefix(Dict.DonefileAddress, dict.FTP_HEADER){
s1 := strings.Replace(Dict.DonefileAddress, dict.FTP_HEADER,"",1)
index:=strings.Index(s1,"/")
versionInfo.Input = dict.FTP_HEADER + s1[0 : index] + versionInfo.Input
}
if strings.HasPrefix(Dict.DonefileAddress, dict.HTTP_HEADER){
s1 := strings.Replace(Dict.DonefileAddress, dict.HTTP_HEADER,"",1)
index:=strings.Index(s1,"/")
versionInfo.Input = dict.HTTP_HEADER + s1[0 : index] + versionInfo.Input
}
localInputPath := Dict.TmpAddress + "/../input/" + Dict.DictName + "_" + strconv.Itoa(versionInfo.Version) + "_" + strconv.Itoa(versionInfo.Depend)
Wget(versionInfo.Input, localInputPath)
versionInfo.Input = localInputPath
}
Dict.WaitVersionInfo.Output = BuildIndex(versionInfo)
InitAllInstances()
return nil
}
func BuildIndex(versionInfo dict.DictVersionInfo) string {
versionStr := strconv.Itoa(versionInfo.Version)
dependStr := strconv.Itoa(versionInfo.Depend)
var params []string
params = append(params, "-dict_name="+Dict.DictName)
params = append(params, "-job_mode="+Dict.WaitVersionInfo.Mode)
curlen := len(Dict.CurrentVersionInfo)
lastVersion := "0"
if curlen > 0 && Dict.WaitVersionInfo.Key == Dict.CurrentVersionInfo[curlen-1].Key {
lastVersion = strconv.Itoa(Dict.CurrentVersionInfo[curlen-1].Version)
}
shardNum := strconv.Itoa(Dict.ShardNum)
params = append(params, "-last_version="+lastVersion)
params = append(params, "-cur_version="+versionStr)
params = append(params, "-depend_version="+dependStr)
params = append(params, "-input_path="+versionInfo.Input)
params = append(params, "-output_path="+Dict.OutputAddress)
params = append(params, "-shard_num="+shardNum)
params = append(params, "-master_address="+TransferAddr+":"+Port)
params = append(params, "-only_build=false")
err := ExeCommad(BuildToolLocal, params)
if err != nil {
fmt.Printf("build exe: %v\n", err)
logex.Noticef("[builder] exe cmd err: %v", err)
}
outPut := Dict.OutputAddress + "/" + dependStr + "_" + versionStr
return outPut
}
func InitAllInstances() {
for i, _ := range Dict.Instances {
Dict.Instances[i].Status = dict.Instance_Status_Init
Dict.Instances[i].BuildedTime = int(time.Now().Unix())
Dict.Instances[i].DownloadStartTime = 0
Dict.Instances[i].DownloadedTime = 0
Dict.Instances[i].ReloadStartTime = 0
Dict.Instances[i].ReloadedTime = 0
Dict.Instances[i].EnablStartTime = 0
Dict.Instances[i].EnabledTime = 0
}
Dict.DownloadSuccInsts = 0
Dict.ReloadSuccInsts = 0
Dict.EnableSuccInsts = 0
}
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package transfer
import (
"bufio"
"io"
"os"
"strings"
)
const middle = "->"
type ConfigManager struct {
Mymap map[string]string
strcet string
}
func (c *ConfigManager) Init(path string) {
c.Mymap = make(map[string]string)
f, err := os.Open(path)
if err != nil {
panic(err)
}
defer f.Close()
r := bufio.NewReader(f)
for {
b, _, err := r.ReadLine()
if err != nil {
if err == io.EOF {
break
}
panic(err)
}
s := strings.TrimSpace(string(b))
if strings.Index(s, "#") == 0 {
continue
}
n1 := strings.Index(s, "[")
n2 := strings.LastIndex(s, "]")
if n1 > -1 && n2 > -1 && n2 > n1+1 {
c.strcet = strings.TrimSpace(s[n1+1 : n2])
continue
}
if len(c.strcet) == 0 {
continue
}
index := strings.Index(s, ":")
if index < 0 {
continue
}
frist := strings.TrimSpace(s[:index])
if len(frist) == 0 {
continue
}
second := strings.TrimSpace(s[index+1:])
pos := strings.Index(second, "\t#")
if pos > -1 {
second = second[0:pos]
}
pos = strings.Index(second, " #")
if pos > -1 {
second = second[0:pos]
}
pos = strings.Index(second, "\t//")
if pos > -1 {
second = second[0:pos]
}
pos = strings.Index(second, " //")
if pos > -1 {
second = second[0:pos]
}
if len(second) == 0 {
continue
}
key := c.strcet + middle + frist
c.Mymap[key] = strings.TrimSpace(second)
}
}
func (c ConfigManager) Read(node, key string) string {
key = node + middle + key
v, found := c.Mymap[key]
if !found {
return ""
}
return v
}
\ No newline at end of file
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package transfer
import (
"fmt"
"github.com/Badangel/logex"
"strconv"
"time"
"transfer/dict"
)
func DeployStart(versionInfo dict.DictVersionInfo) error {
fmt.Printf("[entry]start deploy\n")
logex.Noticef("[entry]start deploy")
var err error
for {
switch Dict.WaitVersionInfo.Status {
case dict.Dict_Status_Deploying, dict.Dict_Status_Downloading:
CmdInstsDownload()
case dict.Dict_Status_Download_Succ, dict.Dict_Status_Reloading:
CmdInstsReload()
case dict.Dict_Status_Reload_Succ, dict.Dict_Status_Enabling:
CmdInstsEnable()
default:
logex.Noticef("dict status %v", Dict.WaitVersionInfo.Status)
}
if Dict.WaitVersionInfo.Status == dict.Dict_Status_Finished {
Dict.WaitVersionInfo.FinishTime = int(time.Now().Unix())
Dict.CurrentVersionInfo = append(Dict.CurrentVersionInfo, Dict.WaitVersionInfo)
fmt.Printf("[deploy finish]version:%v\n", Dict.WaitVersionInfo)
logex.Noticef("[deploy finish]version:%v", Dict.WaitVersionInfo)
var newVersionInfo dict.DictVersionInfo
Dict.WaitVersionInfo = newVersionInfo
WriteCurrentVersionInfoToFile()
WriteWaitVersionInfoToFile()
break
} else {
time.Sleep(time.Duration(10) * time.Second)
}
}
return err
}
func CmdInstsDownload() {
for {
chs := make([]chan error, len(Dict.Instances))
keyAndRespSlice := make([]dict.CubeAgentResponse, len(Dict.Instances))
for i, inst := range Dict.Instances {
if inst.Status != dict.Instance_Status_Download_Succ {
chs[i] = make(chan error)
var json_params dict.CubeAgentRequest
json_params.Command = dict.DOWNLOAD
json_params.DictName = inst.DictName
json_params.DeployPath = inst.DeployPath
json_params.Version = strconv.Itoa(Dict.WaitVersionInfo.Version)
json_params.Depend = strconv.Itoa(Dict.WaitVersionInfo.Depend)
json_params.Id = strconv.Itoa(Dict.WaitVersionInfo.Id)
json_params.Key = strconv.Itoa(Dict.WaitVersionInfo.Key)
json_params.Mode = Dict.WaitVersionInfo.Mode
json_params.ShardSeq = inst.Shard
json_params.Port = strconv.Itoa(inst.Port)
json_params.Source = dict.GetFileHead(Dict.StoragePlace, TransferAddr) + Dict.WaitVersionInfo.Output + "/" + json_params.DictName + "_part" + strconv.Itoa(inst.Shard) + ".tar"
var address = fmt.Sprintf("http://%v:%v/agent/cmd", inst.AgentIp, inst.AgentPort)
logex.Noticef("[download cmd]%v:%v", address, json_params)
go nonBlockSendJsonReq("POST2", address, 120, &json_params, &keyAndRespSlice[i], chs[i])
Dict.Instances[i].DownloadStartTime = int(time.Now().Unix())
Dict.Instances[i].Mode = Dict.WaitVersionInfo.Mode
}
}
for i, inst := range Dict.Instances {
err := <-chs[i]
logex.Noticef("[instance resp]download:%v", Dict.Instances)
if err != nil || keyAndRespSlice[i].Success != "0" {
logex.Warningf("cmd cube online downlaod of %v:%v, shard:%v failed", inst.AgentIp, inst.AgentPort, inst.Shard)
continue
}
if inst.Status < dict.Instance_Status_Download_Succ {
Dict.Instances[i].Status = dict.Instance_Status_Download_Succ
Dict.Instances[i].DownloadedTime = int(time.Now().Unix())
Dict.DownloadSuccInsts++
}
}
if Dict.DownloadSuccInsts == Dict.InstancesNum {
Dict.WaitVersionInfo.Status = dict.Dict_Status_Download_Succ
fmt.Printf("[all download ok]inst :%v\n", Dict.Instances)
logex.Noticef("[all download ok]inst :%v", Dict.Instances)
break
}
time.Sleep(5 * time.Second)
}
}
func CmdInstsReload() {
for {
chs := make([]chan error, len(Dict.Instances))
keyAndRespSlice := make([]dict.CubeAgentResponse, len(Dict.Instances))
for i, inst := range Dict.Instances {
if inst.Status != dict.Instance_Status_Reload_Succ {
chs[i] = make(chan error)
var json_params dict.CubeAgentRequest
json_params.Command = dict.RELOAD
json_params.DictName = inst.DictName
json_params.DeployPath = inst.DeployPath
json_params.Version = strconv.Itoa(Dict.WaitVersionInfo.Version)
json_params.Depend = strconv.Itoa(Dict.WaitVersionInfo.Depend)
json_params.Id = strconv.Itoa(Dict.WaitVersionInfo.Id)
json_params.Key = strconv.Itoa(Dict.WaitVersionInfo.Key)
json_params.Mode = Dict.WaitVersionInfo.Mode
json_params.ShardSeq = inst.Shard
json_params.Port = strconv.Itoa(inst.Port)
json_params.Source = dict.GetFileHead(Dict.StoragePlace, TransferAddr) + Dict.WaitVersionInfo.Output + "/" + json_params.DictName + "_part" + strconv.Itoa(inst.Shard) + ".tar"
var address = fmt.Sprintf("http://%v:%v/agent/cmd", inst.AgentIp, inst.AgentPort)
logex.Noticef("[reload cmd]%v:%v", address, json_params)
go nonBlockSendJsonReq("POST2", address, 120, &json_params, &keyAndRespSlice[i], chs[i])
Dict.Instances[i].ReloadStartTime = int(time.Now().Unix())
}
}
for i, inst := range Dict.Instances {
err := <-chs[i]
logex.Noticef("[instance resp]reload:%v", Dict.Instances)
if err != nil || keyAndRespSlice[i].Success != "0" {
logex.Warningf("cmd cube online reload of %v:%v, shard:%v failed", inst.AgentIp, inst.AgentPort, inst.Shard)
continue
}
if inst.Status < dict.Instance_Status_Reload_Succ {
Dict.Instances[i].Status = dict.Instance_Status_Reload_Succ
Dict.Instances[i].ReloadedTime = int(time.Now().Unix())
Dict.ReloadSuccInsts++
}
}
if Dict.ReloadSuccInsts == Dict.InstancesNum {
Dict.WaitVersionInfo.Status = dict.Dict_Status_Reload_Succ
fmt.Printf("[all reload ok]inst:%v\n", Dict.Instances)
logex.Noticef("[all reload ok]inst :%v", Dict.Instances)
break
}
time.Sleep(5 * time.Second)
}
}
func CmdInstsEnable() {
for {
chs := make([]chan error, len(Dict.Instances))
keyAndRespSlice := make([]dict.CubeAgentResponse, len(Dict.Instances))
for i, inst := range Dict.Instances {
if inst.Status != dict.Instance_Status_Enable_Succ {
chs[i] = make(chan error)
var json_params dict.CubeAgentRequest
json_params.Command = dict.ENABLE
json_params.DictName = inst.DictName
json_params.DeployPath = inst.DeployPath
json_params.Version = strconv.Itoa(Dict.WaitVersionInfo.Version)
json_params.Depend = strconv.Itoa(Dict.WaitVersionInfo.Depend)
json_params.Id = strconv.Itoa(Dict.WaitVersionInfo.Id)
json_params.Key = strconv.Itoa(Dict.WaitVersionInfo.Key)
json_params.Mode = Dict.WaitVersionInfo.Mode
json_params.ShardSeq = inst.Shard
json_params.Port = strconv.Itoa(inst.Port)
json_params.Source = dict.GetFileHead(Dict.StoragePlace, TransferAddr) + Dict.WaitVersionInfo.Output + "/" + json_params.DictName + "_part" + strconv.Itoa(inst.Shard) + ".tar"
var address = fmt.Sprintf("http://%v:%v/agent/cmd", inst.AgentIp, inst.AgentPort)
logex.Noticef("[enable cmd]%v:%v", address, json_params)
go nonBlockSendJsonReq("POST2", address, 120, &json_params, &keyAndRespSlice[i], chs[i])
Dict.Instances[i].EnablStartTime = int(time.Now().Unix())
}
}
for i, inst := range Dict.Instances {
err := <-chs[i]
logex.Noticef("[instance resp]enable:%v", Dict.Instances)
if err != nil || keyAndRespSlice[i].Success != "0" {
logex.Warningf("cmd cube online enable of %v:%v, shard:%v failed", inst.AgentIp, inst.AgentPort, inst.Shard)
continue
}
if inst.Status < dict.Instance_Status_Enable_Succ {
Dict.Instances[i].Status = dict.Instance_Status_Enable_Succ
Dict.Instances[i].EnabledTime = int(time.Now().Unix())
Dict.EnableSuccInsts++
}
}
if Dict.EnableSuccInsts == Dict.InstancesNum {
Dict.WaitVersionInfo.Status = dict.Dict_Status_Finished
fmt.Printf("[all enable ok]inst :%v\n", Dict.Instances)
logex.Noticef("[all enable ok]inst :%v", Dict.Instances)
break
}
time.Sleep(5 * time.Second)
}
}
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dict
type CubeAgentRequest struct {
Command string `json:"command"`
DictName string `json:"dict_name"`
DeployPath string `json:"deploy_path"`
Version string `json:"version"`
Depend string `json:"depend"`
Id string `json:"id"`
Key string `json:"key"`
Mode string `json:"mode"`
ShardSeq int `json:"shard_seq"`
Source string `json:"source"`
Service string `json:"service,omitempty"`
SlotIdList string `json:"slot_id_list,omitempty"`
ActiveVersionList string `json:"active_version_list,omitempty"`
Port string `json:"port,omitempty"`
VersionSign string `json:"version_sign,omitempty"`
}
type CubeAgentResponse struct {
Success string `json:"success"`
Message string `json:"message"`
Data string `json:"data"`
}
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dict
import "errors"
var (
// Dict Mode
BASE_ONLY = "base_only"
BASR_DELTA = "base_delta"
// Deploy Mode
BASE = "base"
DELTA = "delta"
// Succ or Failed Status
SUCC = "succ"
FAILED = "failed"
//command
DOWNLOAD = "download"
RELOAD = "reload"
ENABLE = "enable"
FTP_HEADER = "ftp://"
HTTP_HEADER = "http://"
)
// Dict Status
type DictStatus int
const (
// Dict Status
//clear状态编码参考InstanceStatus
Dict_Status_Clearing DictStatus = 1
Dict_Status_Cleared DictStatus = 2
Dict_Status_Trigging DictStatus = 10
Dict_Status_Building DictStatus = 20
Dict_Status_Deploying DictStatus = 30
Dict_Status_Downloading DictStatus = 40
Dict_Status_Download_Succ DictStatus = 50
Dict_Status_Reloading DictStatus = 60
Dict_Status_Reload_Succ DictStatus = 70
Dict_Status_Enabling DictStatus = 80
Dict_Status_Finished DictStatus = 90
Dict_Status_Restarting DictStatus = 100
)
func (this DictStatus) String() DictStatusStr {
switch this {
case Dict_Status_Trigging:
return Dict_Status_Trigging_Str
case Dict_Status_Building:
return Dict_Status_Building_Str
case Dict_Status_Deploying:
return Dict_Status_Deploying_Str
case Dict_Status_Downloading:
return Dict_Status_Downloading_Str
case Dict_Status_Download_Succ:
return Dict_Status_Download_Succ_Str
case Dict_Status_Reloading:
return Dict_Status_Reloading_Str
case Dict_Status_Reload_Succ:
return Dict_Status_Reload_Succ_Str
case Dict_Status_Enabling:
return Dict_Status_Enabling_Str
case Dict_Status_Finished:
return Dict_Status_Finished_Str
case Dict_Status_Restarting:
return Dict_Status_Restarting_Str
case Dict_Status_Clearing:
return Dict_Status_Clearing_Str
case Dict_Status_Cleared:
return Dict_Status_Cleared_Str
default:
return ""
}
}
type DictStatusStr string
const (
// Dict Status
Dict_Status_Trigging_Str DictStatusStr = "Trigging"
Dict_Status_Building_Str DictStatusStr = "Building"
Dict_Status_Deploying_Str DictStatusStr = "deploying"
Dict_Status_Downloading_Str DictStatusStr = "downloading"
Dict_Status_Download_Succ_Str DictStatusStr = "download_succ"
Dict_Status_Reloading_Str DictStatusStr = "reloading"
Dict_Status_Reload_Succ_Str DictStatusStr = "reload_succ"
Dict_Status_Enabling_Str DictStatusStr = "enabling"
Dict_Status_Finished_Str DictStatusStr = "finished"
Dict_Status_Restarting_Str DictStatusStr = "restarting"
Dict_Status_Clearing_Str DictStatusStr = "clearing"
Dict_Status_Cleared_Str DictStatusStr = "cleared"
)
func (this DictStatusStr) Int() (DictStatus, error) {
switch this {
case Dict_Status_Trigging_Str:
return Dict_Status_Trigging, nil
case Dict_Status_Building_Str:
return Dict_Status_Building, nil
case Dict_Status_Deploying_Str:
return Dict_Status_Deploying, nil
case Dict_Status_Downloading_Str:
return Dict_Status_Downloading, nil
case Dict_Status_Download_Succ_Str:
return Dict_Status_Download_Succ, nil
case Dict_Status_Reloading_Str:
return Dict_Status_Reloading, nil
case Dict_Status_Reload_Succ_Str:
return Dict_Status_Reload_Succ, nil
case Dict_Status_Enabling_Str:
return Dict_Status_Enabling, nil
case Dict_Status_Finished_Str:
return Dict_Status_Finished, nil
case Dict_Status_Restarting_Str:
return Dict_Status_Restarting, nil
case Dict_Status_Clearing_Str:
return Dict_Status_Clearing, nil
case Dict_Status_Cleared_Str:
return Dict_Status_Cleared, nil
default:
return 0, errors.New("invalid dict status")
}
}
// Instance Status:
type InstanceStatus int
const (
//各种状态都有可能进入clear状态,因此clear相关的状态都小于init状态
Instance_Status_Clear InstanceStatus = 1
Instance_Status_Clearing InstanceStatus = 2
Instance_Status_Clear_Failed InstanceStatus = 3
Instance_Status_Clear_Succ InstanceStatus = 4
Instance_Status_Init InstanceStatus = 10
Instance_Status_Downloading InstanceStatus = 20
Instance_Status_Download_Failed InstanceStatus = 30
Instance_Status_Download_Succ InstanceStatus = 40
Instance_Status_Reloading InstanceStatus = 50
Instance_Status_Reload_Failed InstanceStatus = 60
Instance_Status_Reload_Succ InstanceStatus = 70
Instance_Status_Enabling InstanceStatus = 80
Instance_Status_Enable_Failed InstanceStatus = 90
Instance_Status_Enable_Succ InstanceStatus = 100
Instance_Status_Poping InstanceStatus = 110
Instance_Status_Pop_Failed InstanceStatus = 120
Instance_Status_Pop_Succ InstanceStatus = 130
Instance_Status_Dead InstanceStatus = 250
)
func (this InstanceStatus) String() InstanceStatusStr {
switch this {
case Instance_Status_Init:
return Instance_Status_Init_Str
case Instance_Status_Downloading:
return Instance_Status_Downloading_Str
case Instance_Status_Download_Failed:
return Instance_Status_Download_Failed_Str
case Instance_Status_Download_Succ:
return Instance_Status_Download_Succ_Str
case Instance_Status_Reloading:
return Instance_Status_Reloading_Str
case Instance_Status_Reload_Failed:
return Instance_Status_Reload_Failed_Str
case Instance_Status_Reload_Succ:
return Instance_Status_Reload_Succ_Str
case Instance_Status_Enabling:
return Instance_Status_Enabling_Str
case Instance_Status_Enable_Failed:
return Instance_Status_Enable_Failed_Str
case Instance_Status_Enable_Succ:
return Instance_Status_Enable_Succ_Str
case Instance_Status_Dead:
return Instance_Status_Dead_Str
case Instance_Status_Clear:
return Instance_Status_Clear_Str
case Instance_Status_Clearing:
return Instance_Status_Clearing_Str
case Instance_Status_Clear_Failed:
return Instance_Status_Clear_Failed_Str
case Instance_Status_Clear_Succ:
return Instance_Status_Clear_Succ_Str
case Instance_Status_Poping:
return Instance_Status_Poping_Str
case Instance_Status_Pop_Failed:
return Instance_Status_Pop_Failed_Str
case Instance_Status_Pop_Succ:
return Instance_Status_Pop_Succ_Str
default:
return ""
}
}
type InstanceStatusStr string
const (
Instance_Status_Init_Str InstanceStatusStr = "init"
Instance_Status_Downloading_Str InstanceStatusStr = "downloading"
Instance_Status_Download_Failed_Str InstanceStatusStr = "download_failed"
Instance_Status_Download_Succ_Str InstanceStatusStr = "download_succ"
Instance_Status_Reloading_Str InstanceStatusStr = "reloading"
Instance_Status_Reload_Failed_Str InstanceStatusStr = "finish_reload_failed"
Instance_Status_Reload_Succ_Str InstanceStatusStr = "finish_reload_succ"
Instance_Status_Enabling_Str InstanceStatusStr = "enabling"
Instance_Status_Enable_Failed_Str InstanceStatusStr = "enable_failed"
Instance_Status_Enable_Succ_Str InstanceStatusStr = "enable_succ"
Instance_Status_Dead_Str InstanceStatusStr = "dead"
Instance_Status_Clear_Str InstanceStatusStr = "clear"
Instance_Status_Clearing_Str InstanceStatusStr = "clearing"
Instance_Status_Clear_Failed_Str InstanceStatusStr = "clear_failed"
Instance_Status_Clear_Succ_Str InstanceStatusStr = "clear_succ"
Instance_Status_Poping_Str InstanceStatusStr = "poping"
Instance_Status_Pop_Failed_Str InstanceStatusStr = "pop_failed"
Instance_Status_Pop_Succ_Str InstanceStatusStr = "pop_succ"
)
func (this InstanceStatusStr) Int() (InstanceStatus, error) {
switch this {
case Instance_Status_Init_Str:
return Instance_Status_Init, nil
case Instance_Status_Downloading_Str:
return Instance_Status_Downloading, nil
case Instance_Status_Download_Failed_Str:
return Instance_Status_Download_Failed, nil
case Instance_Status_Download_Succ_Str:
return Instance_Status_Download_Succ, nil
case Instance_Status_Reloading_Str:
return Instance_Status_Reloading, nil
case Instance_Status_Reload_Failed_Str:
return Instance_Status_Reload_Failed, nil
case Instance_Status_Reload_Succ_Str:
return Instance_Status_Reload_Succ, nil
case Instance_Status_Enabling_Str:
return Instance_Status_Enabling, nil
case Instance_Status_Enable_Failed_Str:
return Instance_Status_Enable_Failed, nil
case Instance_Status_Enable_Succ_Str:
return Instance_Status_Enable_Succ, nil
case Instance_Status_Dead_Str:
return Instance_Status_Dead, nil
case Instance_Status_Clear_Str:
return Instance_Status_Clear, nil
case Instance_Status_Clearing_Str:
return Instance_Status_Clearing, nil
case Instance_Status_Clear_Failed_Str:
return Instance_Status_Clear_Failed, nil
case Instance_Status_Clear_Succ_Str:
return Instance_Status_Clear_Succ, nil
case Instance_Status_Poping_Str:
return Instance_Status_Poping, nil
case Instance_Status_Pop_Failed_Str:
return Instance_Status_Pop_Failed, nil
case Instance_Status_Pop_Succ_Str:
return Instance_Status_Pop_Succ, nil
default:
return 0, errors.New("invalid instance status")
}
}
\ No newline at end of file
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dict
type DictInfo struct {
DictName string `json:"dict_name"`
DictMode string `json:"dict_mode"`
ShardNum int `json:"shard_num"`
CopyNum int `json:"copy_num"`
InstancesNum int `json:"inst_num"`
DeployPath string `json:"deploy_path"`
DonefileAddress string `json:"donefile_addr"`
OutputAddress string `json:"output_addr"`
TmpAddress string `json:"tmp_addr"`
StoragePlace string `json:"storage_place"`
DownloadSuccInsts int `json:"download_inst"`
ReloadSuccInsts int `json:"reload_insts"`
EnableSuccInsts int `json:"enable_insts"`
Instances []DictInstance `json:"instances"`
WaitVersionInfo DictVersionInfo `json:"wait_version_info"`
CurrentVersionInfo []DictVersionInfo `json:"current_version_info"`
}
\ No newline at end of file
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dict
type DictInstance struct {
DictName string `json:"dict_name"`
Mode string `json:"mode"`
Version int `json:"version"`
Depend int `json:"depend"`
Id int `json:"id"`
Key int `json:"key"`
Shard int `json:"shard"`
Source string `json:"source"`
DeployPath string `json:"deploy_path"`
IP string `json:"ip"`
Port int `json:"port"`
AgentIp string `json:"agent_ip"`
AgentPort int `json:"agent_port"`
Status InstanceStatus `json:"status_id"`
StatusStr InstanceStatusStr `json:"status"`
BuildedTime int `json:"builded_time"`
DownloadStartTime int `json:"download_start_time"`
DownloadedTime int `json:"downloaded_time"`
ReloadStartTime int `json:"reload_start_time"`
ReloadedTime int `json:"reloaded_time"`
EnablStartTime int `json:"enable_start_time"`
EnabledTime int `json:"enabled_time"`
CreateTime int `json:"create_time"`
}
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dict
import (
"strconv"
)
type DictShardInfo struct {
Name string `json:"name"`
Version string `json:"version"`
Depend string `json:"depend"`
Id string `json:"id"`
Key string `json:"key"`
Shard int `json:"shard"`
Mode string `json:"mode"`
DictMode string `json:"dict_mode,omitempty"`
Source string `json:"data_source"`
Service string `json:"service,omitempty"`
DeltaInfo string `json:"delta_info,omitempty"`
BuildedTime int `json:"builded_time,omitempty"`
BuildedTimeStr string `json:"build_finish_time,omitempty"`
CreateTime int `json:"create_time,omitempty"`
IsActive bool `json:"is_active,omitempty"`
}
func GetDictShardScaler(shard int, dictVersionInfo DictVersionInfo, storagePlace string, transferaddr string)(info DictShardInfo){
info.Name = dictVersionInfo.DictName
info.Version = strconv.Itoa(dictVersionInfo.Version)
info.Depend = strconv.Itoa(dictVersionInfo.Depend)
info.Id = strconv.Itoa(dictVersionInfo.Id)
info.Key = strconv.Itoa(dictVersionInfo.Key)
info.Mode = dictVersionInfo.Mode
info.Shard = shard
info.Source = GetFileHead(storagePlace, transferaddr) + dictVersionInfo.Output+ "/" + info.Version + "/" + info.Name + "_part" + strconv.Itoa(shard) + ".tar"
return
}
func GetFileHead(storagePlace string, transferaddr string) string {
if storagePlace == "LOCAL"{
return "ftp://" + transferaddr
} else {
return ""
}
}
\ No newline at end of file
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dict
type (
DictVersionInfo struct {
DictName string `json:"dict_name"`
Version int `json:"version"`
Depend int `json:"depend"`
Id int `json:"id"`
Key int `json:"key"`
Mode string `json:"mode"`
Input string `json:"input"`
Output string `json:"output"`
Status DictStatus `json:"status"`
StatusStr DictStatusStr `json:"status_str"`
FinishTime int `json:"finish_time"`
CreateTime int `json:"create_time"`
MetaInfos map[int]string `json:"meta_infos"`
}
DonefileInfo struct {
Id string `json:"id"`
Key string `json:"key"`
Input string `json:"input"`
}
DictShardMetaInfo struct {
Name string `json:"name"`
Version int `json:"version"`
Depend int `json:"depend"`
Shard int `json:"shard"`
Split int `json:"split"`
Meta string `json:"meta"`
}
MetaInfo struct {
IndexTotalCount string `json:"index_total_count"`
IndexLenList []string `json:"index_len_list"`
DataLenList []string `json:"data_len_list"`
}
)
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package transfer
import "transfer/dict"
var (
Dir string
Port string
Dict dict.DictInfo
BuildToolLocal string
TransferAddr string
//DictName string
//DictMode string
//DonefileAddress string
//OutputAddress string
//TmpAddress string
//ShardNum int
//CopyNum int
//DeployPath string
//Instances []dict.DictInstance
//StoragePlace string
//WaitVersionInfo dict.DictVersionInfo
//CurrentVersionInfo []dict.DictVersionInfo
//InstancesNum int
//DownloadSuccInsts int
//ReloadSuccInsts int
//EnableSuccInsts int
)
type LocalAddress string
type HDFSAddress string
const (
STATUS_OK int = 0
STATUS_WRONG_API int = 2
STATUS_RETRY_LATER int = 10
STATUS_FAIL int = 255
)
\ No newline at end of file
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package transfer
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"github.com/Badangel/logex"
"io/ioutil"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"time"
)
type handlerFunc func(subpath string, m map[string]string) (string, string, int, error)
var ( // key = subpath; eg: path="/checker/job", key="job"
getHandler map[string]handlerFunc
putHandler map[string]handlerFunc
deleteHandler map[string]handlerFunc
postHandler map[string]handlerFunc
)
func startHttp(addr string) error {
// init handlers:
initGetHandlers()
initPostHandlers()
http.HandleFunc("/dict/", handleRest)
http.HandleFunc("/instance/", handleRest)
logex.Notice("start http:", addr)
return http.ListenAndServe(addr, nil)
}
func handleRest(w http.ResponseWriter, r *http.Request) {
var (
req_log string
)
time_begin := time.Now()
cont_type := make([]string, 1, 1)
cont_type[0] = "application/json"
header := w.Header()
header["Content-Type"] = cont_type
w.Header().Add("Access-Control-Allow-Origin", "*")
m := parseHttpKv(r)
req_log = fmt.Sprintf("handle %v %v %v from %v",
r.Method, r.URL.Path, r.URL.RawQuery, r.RemoteAddr)
api := r.URL.Path
var showHandler map[string]handlerFunc
switch r.Method {
case "GET":
showHandler = getHandler
case "POST": // create
showHandler = postHandler
case "PUT": // update
showHandler = putHandler
case "DELETE":
showHandler = deleteHandler
default:
logex.Warningf(`{"error":1, "message":"unsupport method %v"}`, r.Method)
}
handler, ok := showHandler[api]
if !ok {
key_list := make([]string, 0, len(showHandler))
for key := range showHandler {
key_list = append(key_list, key)
}
fmt.Fprintf(w, `{"success":"%v", "message":"wrong api", "method":"%s", "api":"%s", "api_list":"%v"}`,
STATUS_WRONG_API, r.Method, api, key_list)
logex.Noticef(`%v, time=%v, status=%v`,
req_log, time.Now().Sub(time_begin).Nanoseconds()/1000000, STATUS_WRONG_API)
return
}
var s string
rst, handle_log, status, err := handler(api, m)
if status == STATUS_OK {
s = fmt.Sprintf(`{"success":"%v", "message":"query ok", "data":%s}`, status, rst)
} else {
s = fmt.Sprintf(`{"success":"%v", "message":%v, "data":%s}`,
status, quote(err.Error()), rst)
}
if isJsonDict(s) {
fmt.Fprintln(w, s)
} else {
logex.Fatalf("invalid json: %v", s)
}
if err == nil {
logex.Noticef(`%v, time=%v, status=%v, handle_log=%v`,
req_log, time.Now().Sub(time_begin).Nanoseconds()/1000000,
status, quote(handle_log))
} else {
logex.Noticef(`%v, time=%v, status=%v, err=%v, handle_log=%v`,
req_log, time.Now().Sub(time_begin).Nanoseconds()/1000000,
status, quote(err.Error()), quote(handle_log))
}
}
func parseHttpKv(r *http.Request) map[string]string {
r.ParseForm()
m := make(map[string]string)
for k, v := range r.Form {
switch k {
case "user": // remove @baidu.com for user
m[k] = strings.Split(v[0], "@")[0]
default:
m[k] = v[0]
}
}
// allow passing hostname for debug
//if _, ok := m["hostname"]; !ok {
// ip := r.RemoteAddr[:strings.Index(r.RemoteAddr, ":")]
// m["hostname"], _ = getHostname(ip)
//}
return m
}
// restReq sends a restful request to requrl and returns response body.
func restReq(method, requrl string, timeout int, kv *map[string]string) (string, error) {
logex.Debug("####restReq####")
logex.Debug(*kv)
data := url.Values{}
if kv != nil {
for k, v := range *kv {
logex.Trace("req set:", k, v)
data.Set(k, v)
}
}
if method == "GET" || method == "DELETE" {
requrl = requrl + "?" + data.Encode()
data = url.Values{}
}
logex.Notice(method, requrl)
req, err := http.NewRequest(method, requrl, bytes.NewBufferString(data.Encode()))
if err != nil {
logex.Warning("NewRequest failed:", err)
return "", err
}
if method == "POST" || method == "PUT" {
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
req.Header.Add("Content-Length", strconv.Itoa(len(data.Encode())))
}
client := &http.Client{}
client.Timeout = time.Duration(timeout) * time.Second
resp, err := client.Do(req)
if err != nil {
logex.Warning("Do failed:", err)
return "", err
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
logex.Warning("resp status: " + resp.Status)
return "", errors.New("resp status: " + resp.Status)
}
body, err := ioutil.ReadAll(resp.Body)
return string(body), err
}
//func jsonRestReq(method, requrl string, timeout int, json_params *CheckerRequest) (string, error) {
func jsonRestReq(method, requrl string, timeout int, json_params interface{}) (int, string, error) {
logex.Debug("####jsonRestReq####")
if method == "POST2" {
method = "POST"
}
//b, err := json.Marshal(*json_params)
b, err := json.Marshal(json_params)
if err != nil {
logex.Warning("json_params marshal failed:", err)
return 0, "", err
}
logex.Notice(method, requrl)
req, err := http.NewRequest(method, requrl, bytes.NewBufferString(string(b)))
if err != nil {
logex.Warning("NewRequest failed:", err)
return 0, "", err
}
if method == "POST" || method == "PUT" {
req.Header.Add("Content-Type", "application/json")
//req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
req.Header.Add("Content-Length", strconv.Itoa(len(string(b))))
}
client := &http.Client{}
client.Timeout = time.Duration(timeout) * time.Second
resp, err := client.Do(req)
if err != nil {
logex.Warning("Do failed:", err)
return 0, "", err
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
logex.Warning("resp status: " + resp.Status)
return 0, "", errors.New("resp status: " + resp.Status)
}
body, err := ioutil.ReadAll(resp.Body)
return resp.StatusCode, string(body), err
}
// quote quotes string for json output. eg: s="123", quote(s)=`"123"`
func quote(s string) string {
return fmt.Sprintf("%q", s)
}
func isJsonDict(s string) bool {
var js map[string]interface{}
return json.Unmarshal([]byte(s), &js) == nil
}
func getHostname(ip string) (hostname string, err error) {
if hostnames, err := net.LookupAddr(ip); err != nil {
hostname = ip
//logex.Warningf("cannot find the hostname of ip (%s), error (%v)", ip, err)
} else {
if len(hostnames) > 0 {
hostname = hostnames[0][:strings.LastIndex(hostnames[0], ".baidu.com.")]
} else {
hostname = ip
}
}
return hostname, err
}
func nonBlockSendJsonReq(method, requrl string, timeout int, json_params interface{},
out interface{}, ch chan error) {
_, body_str, err := jsonRestReq(method, requrl, timeout, json_params)
logex.Noticef("json request method:[%v], requrl:[%s], timeout:[%v], params[%v], resbody[%v], err[%v]", method, requrl, timeout, json_params, body_str, err)
if err != nil {
ch <- err
}
err = json.Unmarshal([]byte(body_str), out)
if err != nil {
logex.Warningf("json unmarshal failed error (%v) for : %v", err, body_str)
}
ch <- err
}
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package transfer
import (
"encoding/json"
"errors"
"fmt"
"strconv"
"transfer/dict"
)
func initGetHandlers() {
getHandler = map[string]handlerFunc{
"/dict/info": GetDictInfo,
"/instance/status": GetInstanceStatus,
"/dict/scaler": GetDictScaler,
"/dict/meta_info": GetDictShardMetaInfo,
"/dict/deploy/history": GetDictDeployHistory,
}
}
func GetDictInfo(subpath string, m map[string]string) (string, string, int, error) {
b, err := json.Marshal(Dict)
if err != nil {
return quote(""), "", STATUS_FAIL, fmt.Errorf("json marshal failed, %v", err)
}
return string(b), "", STATUS_OK, nil
}
func GetInstanceStatus(subpath string, m map[string]string) (string, string, int, error) {
b, err := json.Marshal(Dict.Instances)
if err != nil {
return quote(""), "", STATUS_FAIL, fmt.Errorf("json marshal failed, %v", err)
}
return string(b), "", STATUS_OK, nil
}
func GetDictScaler(subpath string, m map[string]string) (string, string, int, error) {
var (
shard int
err error
infos []dict.DictShardInfo
)
shardStr, ok := m["shard"]
if !ok {
shard = 0
} else {
shard, err = strconv.Atoi(shardStr)
if err != nil {
return quote(""), "", STATUS_FAIL, errors.New("invalid arg: shard should be int")
}
}
for _, version := range Dict.CurrentVersionInfo {
info := dict.GetDictShardScaler(shard, version, Dict.StoragePlace, TransferAddr)
infos = append(infos, info)
}
if Dict.WaitVersionInfo.Status > dict.Dict_Status_Deploying {
info := dict.GetDictShardScaler(shard, Dict.WaitVersionInfo, Dict.StoragePlace, TransferAddr)
infos = append(infos, info)
}
b, err := json.Marshal(infos)
if err != nil {
return quote(""), "", STATUS_FAIL, fmt.Errorf("json marshal failed, %v", err)
}
return quote(string(b)), "", STATUS_OK, nil
}
func GetDictShardMetaInfo(subpath string, m map[string]string) (string, string, int, error) {
name, ok := m["name"]
if !ok {
return quote("failed"), "", STATUS_FAIL, errors.New("need arg: name")
}
version, err := strconv.Atoi(m["version"])
if err != nil {
return quote("failed"), "", STATUS_FAIL, errors.New("need arg: version, version should be int")
}
depend, err := strconv.Atoi(m["depend"])
if err != nil {
return quote("failed"), "", STATUS_FAIL, errors.New("need arg: depend, depend should be int")
}
shard, err := strconv.Atoi(m["shard"])
if err != nil {
return quote("failed"), "", STATUS_FAIL, errors.New("need arg: shard, shard should be int")
}
split := 1
split_str, ok := m["split"]
if ok {
split, err = strconv.Atoi(split_str)
if err != nil {
return quote("failed"), "", STATUS_FAIL, errors.New("arg: split, should be int")
}
}
meta_info, err := GetDictShardMetaInfos(name, version, depend, shard, split)
if err != nil {
return quote("failed"), "", STATUS_FAIL, fmt.Errorf("info error: %v", err)
}
b, err := json.Marshal(meta_info)
if err != nil {
return quote("failed"), "", STATUS_FAIL, fmt.Errorf("json marshal failed, %v", err)
}
return string(b), "", STATUS_OK, nil
}
func GetDictDeployHistory(subpath string, m map[string]string) (string, string, int, error) {
var deployVerisonInfos []dict.DictVersionInfo
deployVerisonInfos = Dict.CurrentVersionInfo
if Dict.WaitVersionInfo.Status > dict.Dict_Status_Deploying {
deployVerisonInfos = append(deployVerisonInfos, Dict.WaitVersionInfo)
}
b, err := json.Marshal(deployVerisonInfos)
if err != nil {
return quote("failed"), "", STATUS_FAIL, fmt.Errorf("json marshal failed, %v", err)
}
return string(b), "", STATUS_OK, nil
}
func GetDictShardMetaInfos(dictName string, version int, depend int,
shard int, split int) (dict.DictShardMetaInfo, error) {
var info dict.DictShardMetaInfo
for _, v := range Dict.CurrentVersionInfo {
if v.Version == version && v.Depend == depend {
if meta, ok := v.MetaInfos[shard]; ok {
info.Name = dictName;
info.Version = version
info.Depend = depend
info.Shard = shard
info.Split = split
info.Meta = meta
return info, nil
} else {
return info, fmt.Errorf("shard not right")
}
}
}
return info, fmt.Errorf("info not right")
}
\ No newline at end of file
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package transfer
import (
"encoding/json"
"errors"
"fmt"
"reflect"
"strconv"
"transfer/dict"
)
func initPostHandlers() {
postHandler = map[string]handlerFunc{
"/dict/meta_info/register": PostDictShardMetaInfoRegister,
}
}
func PostDictShardMetaInfoRegister(subpath string, m map[string]string) (string, string, int, error) {
var (
shardMetaInfo dict.DictShardMetaInfo
ok bool
err error
metaInfo dict.MetaInfo
)
shardMetaInfo.Name, ok = m["name"]
if !ok {
return quote("failed"), "", STATUS_FAIL, errors.New("need arg: name")
}
shardMetaInfo.Version, err = strconv.Atoi(m["version"])
if err != nil {
return quote("failed"), "", STATUS_FAIL, errors.New("need arg: version, should be int")
}
shardMetaInfo.Depend, err = strconv.Atoi(m["depend"])
if err != nil {
return quote("failed"), "", STATUS_FAIL, errors.New("need arg: depend, should be int")
}
shardMetaInfo.Shard, err = strconv.Atoi(m["shard"])
if err != nil {
return quote("failed"), "", STATUS_FAIL, errors.New("need arg: shard, should be int")
}
shardMetaInfo.Split = 1
split_str, ok := m["split"]
if ok {
shardMetaInfo.Split, err = strconv.Atoi(split_str)
if err != nil {
return quote("failed"), "", STATUS_FAIL, errors.New("arg: split, should be int")
}
}
// check dict shard_num and status
if shardMetaInfo.Shard < 0 || shardMetaInfo.Shard >= Dict.ShardNum {
return quote("failed"), "", STATUS_FAIL, fmt.Errorf("shard value invalid, dict shard is :%v", Dict.ShardNum)
}
shardMetaInfo.Meta, ok = m["meta"]
if !ok {
return quote("failed"), "", STATUS_FAIL, errors.New("need arg: meta")
}
if err = json.Unmarshal([]byte(shardMetaInfo.Meta), &metaInfo); err != nil {
return quote("failed"), "", STATUS_FAIL, fmt.Errorf("meta string bad formatted: %v", err)
}
if reflect.DeepEqual(metaInfo, (dict.MetaInfo{})) {
return quote("failed"), "", STATUS_FAIL, errors.New("meta string bad formatted")
}
if 0 == len(metaInfo.IndexLenList) {
return quote("failed"), "", STATUS_FAIL, errors.New("meta string bad formatted, index_len_list is null")
}
if 0 == len(metaInfo.DataLenList) {
return quote("failed"), "", STATUS_FAIL, errors.New("meta string bad formatted, data_len_list is null")
}
fmt.Printf("update meta shardMetaInfo:%v metaInfo:%v\n", shardMetaInfo,metaInfo)
if err = UpdateDictShardMetaInfo(shardMetaInfo); err != nil {
return quote("failed"), "", STATUS_FAIL, fmt.Errorf("update dict_shard_meta_info failed, %v", err)
}
fmt.Printf("update meta2\n")
return quote("ok"), "", STATUS_OK, nil
}
func UpdateDictShardMetaInfo(shardMetaInfo dict.DictShardMetaInfo) error {
Dict.WaitVersionInfo.MetaInfos[shardMetaInfo.Shard] = shardMetaInfo.Meta
WriteWaitVersionInfoToFile()
return nil
}
\ No newline at end of file
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package transfer
import (
"fmt"
"github.com/Badangel/logex"
"os"
"time"
"transfer/dict"
)
func Start() {
go BackupTransfer()
logex.Notice(">>> starting server...")
addr := ":" + Port
err := startHttp(addr)
if err != nil {
logex.Fatalf("start http(addr=%v) failed: %v", addr, err)
os.Exit(255)
}
logex.Notice(">>> start server succ")
}
func BackupTransfer() {
for {
//trigger
version, err := TriggerStart(Dict.DonefileAddress)
if err != nil {
logex.Fatalf("[trigger err]trigger err:%v ", err)
fmt.Printf("[error]trigger err:%v \n", err)
break
}
logex.Noticef("[trigger] get version:%v \n", version)
if version.Id == 0 {
logex.Noticef("[sleep]no new version, sleep 5 min")
fmt.Printf("[sleep]no new version, wait 5 min\n")
time.Sleep(5 * time.Minute)
continue
}
Dict.WaitVersionInfo = version
logex.Noticef("[trigger finish] WaitVersionInfo version:%v \n", Dict.WaitVersionInfo)
WriteWaitVersionInfoToFile()
//builder
Dict.WaitVersionInfo.Status = dict.Dict_Status_Building
Dict.WaitVersionInfo.MetaInfos = make(map[int]string)
WriteWaitVersionInfoToFile()
if err = BuilderStart(Dict.WaitVersionInfo); err != nil {
logex.Fatalf("builder err:%v \n", err)
}
if Dict.WaitVersionInfo.Mode == dict.BASE {
var newCurrentVersion []dict.DictVersionInfo
Dict.CurrentVersionInfo = newCurrentVersion
WriteCurrentVersionInfoToFile()
}
logex.Noticef("[builder finish] WaitVersionInfo version:%v \n", Dict.WaitVersionInfo)
//deployer
Dict.WaitVersionInfo.Status = dict.Dict_Status_Deploying
WriteWaitVersionInfoToFile()
if err = DeployStart(Dict.WaitVersionInfo); err != nil {
logex.Fatalf("deploy err:%v \n", err)
}
logex.Noticef("[deploy finish]current version: %v\n",Dict.CurrentVersionInfo)
}
fmt.Print("transfer over!")
logex.Noticef("[transfer]status machine exit!")
}
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package transfer
import (
"encoding/json"
"fmt"
"github.com/Badangel/logex"
"io/ioutil"
"strconv"
"strings"
"time"
"transfer/dict"
)
func TriggerStart(addr string) (version dict.DictVersionInfo, err error) {
return GetDoneFileInfo(addr)
}
func GetDoneFileInfo(addr string) (version dict.DictVersionInfo, err error) {
fmt.Printf("[entry]start trigger\n")
logex.Noticef("[entry]start trigger")
//if donefile is in ftp, first download to local
if strings.HasPrefix(addr, dict.FTP_HEADER) || strings.HasPrefix(addr, dict.HTTP_HEADER) {
donefileAddr := Dict.TmpAddress + "/../donefile"
Wget(addr, donefileAddr)
addr = donefileAddr
}
baseDonefile := addr + "/base.txt"
fmt.Printf("[trigrer]donefile path:%v \n", baseDonefile)
logex.Noticef("[trigrer]base donefile path:%v", baseDonefile)
contents, err := ioutil.ReadFile(baseDonefile)
VersionLen := len(Dict.CurrentVersionInfo)
version.DictName = Dict.DictName
if err != nil {
fmt.Printf("[trigrer]read files err:%v \n", err)
logex.Fatalf("[trigrer]read files err:%v ", err)
return
} else {
contentss := string(contents)
lines := strings.Split(contentss, "\n")
index := len(lines) - 2
var donefileInfo dict.DonefileInfo
fmt.Printf("line %v: %v\n", index, lines[index])
if err = json.Unmarshal([]byte(lines[index]), &donefileInfo); err != nil {
return
}
logex.Noticef("[trigrer]donfile info:%v", donefileInfo)
newId, _ := strconv.Atoi(donefileInfo.Id)
if VersionLen == 0 || newId > Dict.CurrentVersionInfo[VersionLen-1].Key {
version.Id = newId
version.Key, _ = strconv.Atoi(donefileInfo.Key)
version.Input = donefileInfo.Input
deployVersion := int(time.Now().Unix())
version.CreateTime = deployVersion
version.Version = deployVersion
version.Depend = deployVersion
version.Mode = dict.BASE
return
}
}
if Dict.DictMode == dict.BASR_DELTA && VersionLen > 0 {
patchDonefile := addr + "/patch.txt"
fmt.Printf("[trigrer]patchDonefile path:%v \n", patchDonefile)
logex.Noticef("[trigrer]patch donefile path:%v", patchDonefile)
contents, err = ioutil.ReadFile(patchDonefile)
if err != nil {
fmt.Printf("read files err:%v \n", err)
return
} else {
contentss := string(contents)
lines := strings.Split(contentss, "\n")
for index := 0; index < len(lines)-1; index++ {
var donefileInfo dict.DonefileInfo
if err = json.Unmarshal([]byte(lines[index]), &donefileInfo); err != nil {
return
}
logex.Noticef("[trigrer]donfile info:%v", donefileInfo)
newId, _ := strconv.Atoi(donefileInfo.Id)
newKey, _ := strconv.Atoi(donefileInfo.Key)
if newId > Dict.CurrentVersionInfo[VersionLen-1].Id && newKey == Dict.CurrentVersionInfo[VersionLen-1].Key {
version.Id = newId
version.Key, _ = strconv.Atoi(donefileInfo.Key)
version.Input = donefileInfo.Input
deployVersion := int(time.Now().Unix())
version.CreateTime = deployVersion
version.Version = deployVersion
version.Depend = Dict.CurrentVersionInfo[VersionLen-1].Depend
version.Mode = dict.DELTA
return
}
}
}
}
return
}
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package transfer
import (
"bufio"
"encoding/json"
"fmt"
"github.com/Badangel/logex"
"io"
"io/ioutil"
"os/exec"
)
func WriteWaitVersionInfoToFile(){
waitFilePath := Dict.TmpAddress + "/" + "WaitVersionInfo.json"
b, err := json.Marshal(Dict.WaitVersionInfo)
fmt.Printf("wait version %v\n", Dict.WaitVersionInfo)
if err != nil {
logex.Fatalf("WaitVersionInfo format error : %v", err)
}
err = ioutil.WriteFile(waitFilePath, b, 0644)
if err != nil {
logex.Fatalf("write wait verison info error : %v", err)
return
}
return
}
func WriteCurrentVersionInfoToFile(){
currentFilePath := Dict.TmpAddress + "/" + "CurrentVersionInfo.json"
b, err := json.Marshal(Dict.CurrentVersionInfo)
if err != nil {
logex.Fatalf(" CurrentVersionInfo format error : %v", err)
}
err = ioutil.WriteFile(currentFilePath, b, 0644)
if err != nil {
logex.Fatalf("write current verison info error : %v", err)
return
}
return
}
func ExeCommad(files string, params []string) (err error) {
cmd := exec.Command(files, params...)
fmt.Println(cmd.Args)
/*if stdout, err := cmd.StdoutPipe(); err != nil {
logex.Fatalf("%v", err)
return
}*/
stdout, err := cmd.StdoutPipe()
defer stdout.Close()
if err != nil {
fmt.Println(err)
return
}
if err := cmd.Start(); err != nil {
logex.Fatalf("%v",err)
}
reader := bufio.NewReader(stdout)
/*buf, err := cmd.Output()
fmt.Printf("output: %s\n",buf)
fmt.Printf("err: %v\n",err)*/
for {
line, err2 := reader.ReadString('\n')
if err2 != nil || io.EOF == err2 {
break
}
fmt.Printf("[cmd]>%s",line)
}
err = cmd.Wait()
if nil != err {
fmt.Println(err)
}
return nil
}
func Wget(ftpPath string, downPath string) {
var params []string
params = append(params, "-P")
params = append(params, downPath)
params = append(params, "-r")
params = append(params, "-N")
params = append(params, "-np")
params = append(params, "-nd")
params = append(params, "-R")
params = append(params, "index.html")
params = append(params, ftpPath)
err := ExeCommad("wget", params)
if err != nil {
fmt.Printf("wget exe: %v\n", err)
}
}
\ No newline at end of file
......@@ -4,11 +4,16 @@
OS: Linux
CMake: 3.2
CMake: (验证过的版本:3.2)
python
C++编译器 (验证过的版本:GCC 4.8.2/5.4.0)
python (验证过的版本:2.7)
Go编译器 (验证过的版本:1.9.2)
## 编译
```shell
$ git clone https://github.com/PaddlePaddle/serving.git
$ cd serving
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册