diff --git a/cube/CMakeLists.txt b/cube/CMakeLists.txt index 55fd966cf553e013ecbdf50eddc9766471a33075..07cf04977b618a515a2459f646c2dba298a5d58b 100644 --- a/cube/CMakeLists.txt +++ b/cube/CMakeLists.txt @@ -15,3 +15,5 @@ add_subdirectory(cube-server) add_subdirectory(cube-api) add_subdirectory(cube-builder) +add_subdirectory(cube-transfer) +add_subdirectory(cube-agent) diff --git a/cube/cube-agent/CMakeLists.txt b/cube/cube-agent/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..30158aa506e53ec8a37d10aef4f29bfcd5a60d06 --- /dev/null +++ b/cube/cube-agent/CMakeLists.txt @@ -0,0 +1,25 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/cmake") + +project(cube-agent Go) + +include(cmake/golang.cmake) + +ExternalGoProject_Add(agent-docopt-go github.com/docopt/docopt-go) +ExternalGoProject_Add(agent-logex github.com/Badangel/logex) +ExternalGoProject_Add(agent-pipeline github.com/Badangel/pipeline) + +add_subdirectory(src) diff --git a/cube/cube-agent/cmake/CMakeDetermineGoCompiler.cmake b/cube/cube-agent/cmake/CMakeDetermineGoCompiler.cmake new file mode 100644 index 0000000000000000000000000000000000000000..d17449f39f904c56c3d6969db75f8a44207fc9f4 --- /dev/null +++ b/cube/cube-agent/cmake/CMakeDetermineGoCompiler.cmake @@ -0,0 +1,58 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +if(NOT CMAKE_Go_COMPILER) + if(NOT $ENV{GO_COMPILER} STREQUAL "") + get_filename_component(CMAKE_Go_COMPILER_INIT $ENV{GO_COMPILER} PROGRAM PROGRAM_ARGS CMAKE_Go_FLAGS_ENV_INIT) + + if(CMAKE_Go_FLAGS_ENV_INIT) + set(CMAKE_Go_COMPILER_ARG1 "${CMAKE_Go_FLAGS_ENV_INIT}" CACHE STRING "First argument to Go compiler") + endif() + + if(NOT EXISTS ${CMAKE_Go_COMPILER_INIT}) + message(SEND_ERROR "Could not find compiler set in environment variable GO_COMPILER:\n$ENV{GO_COMPILER}.") + endif() + + endif() + + set(Go_BIN_PATH + $ENV{GOPATH} + $ENV{GOROOT} + $ENV{GOROOT}/../bin + $ENV{GO_COMPILER} + /usr/bin + /usr/local/bin + ) + + if(CMAKE_Go_COMPILER_INIT) + set(CMAKE_Go_COMPILER ${CMAKE_Go_COMPILER_INIT} CACHE PATH "Go Compiler") + else() + find_program(CMAKE_Go_COMPILER + NAMES go + PATHS ${Go_BIN_PATH} + ) + EXEC_PROGRAM(${CMAKE_Go_COMPILER} ARGS version OUTPUT_VARIABLE GOLANG_VERSION) + STRING(REGEX MATCH "go[0-9]+.[0-9]+.[0-9]+[ /A-Za-z0-9]*" VERSION "${GOLANG_VERSION}") + message("-- The Golang compiler identification is ${VERSION}") + message("-- Check for working Golang compiler: ${CMAKE_Go_COMPILER}") + endif() + +endif() + +mark_as_advanced(CMAKE_Go_COMPILER) + +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/cmake/CMakeGoCompiler.cmake.in + ${CMAKE_PLATFORM_INFO_DIR}/CMakeGoCompiler.cmake @ONLY) + +set(CMAKE_Go_COMPILER_ENV_VAR "GO_COMPILER") \ No newline at end of file diff --git a/cube/cube-agent/cmake/CMakeGoCompiler.cmake.in b/cube/cube-agent/cmake/CMakeGoCompiler.cmake.in new file mode 100644 index 0000000000000000000000000000000000000000..4d54319b0cdd2cc0373858f1981cec0f7933a9cc --- /dev/null +++ b/cube/cube-agent/cmake/CMakeGoCompiler.cmake.in @@ -0,0 +1,22 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +set(CMAKE_Go_COMPILER "@CMAKE_Go_COMPILER@") +set(CMAKE_Go_COMPILER_LOADED 1) + +set(CMAKE_Go_SOURCE_FILE_EXTENSIONS go) +set(CMAKE_Go_LINKER_PREFERENCE 40) +set(CMAKE_Go_OUTPUT_EXTENSION .o) +set(CMAKE_Go_OUTPUT_EXTENSION_REPLACE 1) +set(CMAKE_Go_COMPILER_ENV_VAR "GO_COMPILER") diff --git a/cube/cube-agent/cmake/CMakeGoInformation.cmake b/cube/cube-agent/cmake/CMakeGoInformation.cmake new file mode 100644 index 0000000000000000000000000000000000000000..4171072919e94c4547203a8e8bb2b6758436f001 --- /dev/null +++ b/cube/cube-agent/cmake/CMakeGoInformation.cmake @@ -0,0 +1,21 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +if(NOT CMAKE_Go_COMPILE_OBJECT) + set(CMAKE_Go_COMPILE_OBJECT "go tool compile -l -N -o ") +endif() + +if(NOT CMAKE_Go_LINK_EXECUTABLE) + set(CMAKE_Go_LINK_EXECUTABLE "go tool link -o ") +endif() \ No newline at end of file diff --git a/cube/cube-agent/cmake/CMakeTestGoCompiler.cmake b/cube/cube-agent/cmake/CMakeTestGoCompiler.cmake new file mode 100644 index 0000000000000000000000000000000000000000..e69831165e62e2e8091fc88ade83904426a4ac5f --- /dev/null +++ b/cube/cube-agent/cmake/CMakeTestGoCompiler.cmake @@ -0,0 +1,15 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +set(CMAKE_Go_COMPILER_WORKS 1 CACHE INTERNAL "") \ No newline at end of file diff --git a/cube/cube-agent/cmake/golang.cmake b/cube/cube-agent/cmake/golang.cmake new file mode 100644 index 0000000000000000000000000000000000000000..817d029d946bad8da4f4cf2785e68d062fc4cada --- /dev/null +++ b/cube/cube-agent/cmake/golang.cmake @@ -0,0 +1,60 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +set(GOPATH "${CMAKE_CURRENT_LIST_DIR}/../") +file(MAKE_DIRECTORY ${GOPATH}) + +function(ExternalGoProject_Add TARG) + add_custom_target(${TARG} env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} get ${ARGN}) +endfunction(ExternalGoProject_Add) + +function(add_go_executable NAME) + file(GLOB GO_SOURCE RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "*.go") + add_custom_command(OUTPUT ${OUTPUT_DIR}/.timestamp + COMMAND env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} build + -o "${CMAKE_CURRENT_BINARY_DIR}/${NAME}" + ${CMAKE_GO_FLAGS} ${GO_SOURCE} + WORKING_DIRECTORY ${CMAKE_CURRENT_LIST_DIR}) + + add_custom_target(${NAME} ALL DEPENDS ${OUTPUT_DIR}/.timestamp ${ARGN}) + install(PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/${NAME} DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/bin) +endfunction(add_go_executable) + + +function(ADD_GO_LIBRARY NAME BUILD_TYPE) + if(BUILD_TYPE STREQUAL "STATIC") + set(BUILD_MODE -buildmode=c-archive) + set(LIB_NAME "lib${NAME}.a") + else() + set(BUILD_MODE -buildmode=c-shared) + if(APPLE) + set(LIB_NAME "lib${NAME}.dylib") + else() + set(LIB_NAME "lib${NAME}.so") + endif() + endif() + + file(GLOB GO_SOURCE RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "*.go") + add_custom_command(OUTPUT ${OUTPUT_DIR}/.timestamp + COMMAND env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} build ${BUILD_MODE} + -o "${CMAKE_CURRENT_BINARY_DIR}/${LIB_NAME}" + ${CMAKE_GO_FLAGS} ${GO_SOURCE} + WORKING_DIRECTORY ${CMAKE_CURRENT_LIST_DIR}) + + add_custom_target(${NAME} ALL DEPENDS ${OUTPUT_DIR}/.timestamp ${ARGN}) + + if(NOT BUILD_TYPE STREQUAL "STATIC") + install(PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/${LIB_NAME} DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/bin) + endif() +endfunction(ADD_GO_LIBRARY) \ No newline at end of file diff --git a/cube/cube-agent/src/CMakeLists.txt b/cube/cube-agent/src/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..eb192f0fd14969e9f25a71a0ba968ea244bca830 --- /dev/null +++ b/cube/cube-agent/src/CMakeLists.txt @@ -0,0 +1,19 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +set(SOURCE_FILE cube-agent.go) +add_go_executable(cube-agent ${SOURCE_FILE}) +add_dependencies(cube-agent agent-docopt-go) +add_dependencies(cube-agent agent-logex) +add_dependencies(cube-agent agent-pipeline) diff --git a/cube/cube-agent/src/agent/define.go b/cube/cube-agent/src/agent/define.go new file mode 100644 index 0000000000000000000000000000000000000000..9098a101679f31c7ed3b253c11548ff5d985c7ce --- /dev/null +++ b/cube/cube-agent/src/agent/define.go @@ -0,0 +1,114 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package agent + +import ( + "errors" + _ "github.com/Badangel/logex" + "github.com/Badangel/pipeline" + "os/exec" + "strconv" + "strings" + "sync" +) + +var ( + Dir string + WorkerNum int + QueueCapacity int32 + MasterHost []string + MasterPort []string + + TestHostname string + TestIdc string + ShardLock sync.RWMutex + + CmdWorkPool *WorkPool + CmdWorkFilter sync.Map +) + +type ( + Status struct { + Status string `json:"status"` + Version string `json:"version"` + } + + MasterResp struct { + Success string `json:"success"` + Message string `json:"message"` + Data string `json:"data"` + } + + ShardInfo struct { + DictName string + ShardSeq int + SlotIdList string + DataDir string + Service string `json:"service,omitempty"` + Libcube string `json:"libcube,omitempty"` + } + + CubeResp struct { + Status int `json:"status"` + CurVersion string `json:"cur_version"` + BgVersion string `json:"bg_version"` + } +) + +var BUILTIN_STATUS = Status{"RUNNING", "3.0.0.1"} + +var ShardInfoMap map[string]map[string]*ShardInfo +var disks []string + +func GetMaster(master string) (host, port string, err error) { + if len(ShardInfoMap) < 1 { + return "", "", errors.New("empty master list.") + } + if master == "" { + return MasterHost[0], MasterPort[0], nil + } + if _, ok := ShardInfoMap[master]; ok { + m := strings.Split(master, ":") + if len(m) != 2 { + return MasterHost[0], MasterPort[0], nil + } + return m[0], m[1], nil + } else { + return MasterHost[0], MasterPort[0], nil + } +} + +func init() { + dfCmd := "df -h | grep -E '/home|/ssd'" + stdout, _, err := pipeline.Run(exec.Command("/bin/bash", "-c", dfCmd)) + + if err == nil && stdout.String() != "" { + t := strings.TrimSpace(stdout.String()) + diskLi := strings.Split(t, "\n") + for _, diskStr := range diskLi { + disk := strings.Fields(diskStr) + usedPercent, _ := strconv.Atoi(strings.TrimRight(disk[4], "%")) + if usedPercent <= 40 { + disks = append(disks, disk[5]) + } + } + } + + if len(disks) == 0 { + disks = append(disks, "/home") + } + + //logex.Debugf("available disks found: (%+v)", disks) +} diff --git a/cube/cube-agent/src/agent/http.go b/cube/cube-agent/src/agent/http.go new file mode 100755 index 0000000000000000000000000000000000000000..d548c5c4d424d3ebc2a3fbb141d79f6656e3c58b --- /dev/null +++ b/cube/cube-agent/src/agent/http.go @@ -0,0 +1,191 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package agent + +import ( + "bytes" + "errors" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "strconv" + "strings" + "time" + + "github.com/Badangel/logex" +) + +type handlerFunc func(subpath string, m map[string]string, b []byte) (string, string, error) + +var ( // key = subpath; eg: path="/checker/job", key="job" + getHandler map[string]handlerFunc + putHandler map[string]handlerFunc + deleteHandler map[string]handlerFunc + postHandler map[string]handlerFunc +) + +func StartHttp(addr string) error { + + // init handlers: + initGetHandlers() + initPostHandlers() + + http.HandleFunc("/agent/", handleRest) + logex.Notice("start http ", addr) + return http.ListenAndServe(addr, nil) +} + +func handleRest(w http.ResponseWriter, r *http.Request) { + var ( + req_log string + status int32 + ) + time_begin := time.Now() + + cont_type := make([]string, 1, 1) + cont_type[0] = "application/json" + header := w.Header() + header["Content-Type"] = cont_type + w.Header().Add("Access-Control-Allow-Origin", "*") + + m := parseHttpKv(r) + b, _ := ioutil.ReadAll(r.Body) + + req_log = fmt.Sprintf("handle %v %v %v from %v, len(m)=%v, m=%+v", + r.Method, r.URL.Path, r.URL.RawQuery, r.RemoteAddr, len(m), m) + + api := r.URL.Path + + var showHandler map[string]handlerFunc + switch r.Method { + case "GET": + showHandler = getHandler + case "POST": // create + showHandler = postHandler + case "PUT": // update + showHandler = putHandler + case "DELETE": + showHandler = deleteHandler + default: + logex.Warningf(`{"error":1, "message":"unsupport method %v"}`, r.Method) + } + + handler, ok := showHandler[api] + + if !ok { + key_list := make([]string, 0, len(showHandler)) + for key := range showHandler { + key_list = append(key_list, key) + } + status = 2 + fmt.Fprintf(w, `{"success":"%v", "message":"wrong api", "method":"%s", "api":"%s", "api_list":"%v"}`, + status, r.Method, api, key_list) + + logex.Noticef(`%v, time=%v, status=%v`, + req_log, time.Now().Sub(time_begin).Nanoseconds()/1000000, status) + return + } + + var s string + rst, handle_log, err := handler(api, m, b) + if err == nil { + status = 0 + s = fmt.Sprintf(`{"success":"%v", "message":"query ok", "data":%s}`, status, rst) + } else { + status = 255 + s = fmt.Sprintf(`{"success":"%v", "message":%v, "data":%s}`, + status, quote(err.Error()), rst) + } + + if isJsonDict(s) { + fmt.Fprintln(w, s) + } else { + logex.Fatalf("invalid json: %v", s) + } + + if err == nil { + logex.Noticef(`%v, time=%v, status=%v, handle_log=%v`, + req_log, time.Now().Sub(time_begin).Nanoseconds()/1000000, + status, quote(handle_log)) + } else { + logex.Noticef(`%v, time=%v, status=%v, err=%v, handle_log=%v`, + req_log, time.Now().Sub(time_begin).Nanoseconds()/1000000, + status, quote(err.Error()), quote(handle_log)) + } +} + +func parseHttpKv(r *http.Request) map[string]string { + r.ParseForm() + m := make(map[string]string) + for k, v := range r.Form { + switch k { + case "user": // remove @baidu.com for user + m[k] = strings.Split(v[0], "@")[0] + default: + m[k] = v[0] + } + } + + // allow passing hostname for debug + if _, ok := m["hostname"]; !ok { + ip := r.RemoteAddr[:strings.Index(r.RemoteAddr, ":")] + m["hostname"], _ = getHostname(ip) + } + return m +} + +// restReq sends a restful request to requrl and returns response body. +func restReq(method, requrl string, timeout int, kv *map[string]string) (string, error) { + logex.Debug("####restReq####") + logex.Debug(*kv) + data := url.Values{} + if kv != nil { + for k, v := range *kv { + logex.Trace("req set:", k, v) + data.Set(k, v) + } + } + if method == "GET" || method == "DELETE" { + requrl = requrl + "?" + data.Encode() + data = url.Values{} + } + + logex.Notice(method, requrl) + req, err := http.NewRequest(method, requrl, bytes.NewBufferString(data.Encode())) + if err != nil { + logex.Warning("NewRequest failed:", err) + return "", err + } + if method == "POST" || method == "PUT" { + req.Header.Add("Content-Type", "application/x-www-form-urlencoded") + req.Header.Add("Content-Length", strconv.Itoa(len(data.Encode()))) + } + + client := &http.Client{} + client.Timeout = time.Duration(timeout) * time.Second + resp, err := client.Do(req) + if err != nil { + logex.Warning("Do failed:", err) + return "", err + } + if resp.StatusCode < 200 || resp.StatusCode > 299 { + logex.Warning("resp status: " + resp.Status) + return "", errors.New("resp status: " + resp.Status) + } + + body, err := ioutil.ReadAll(resp.Body) + return string(body), err +} diff --git a/cube/cube-agent/src/agent/http_get.go b/cube/cube-agent/src/agent/http_get.go new file mode 100755 index 0000000000000000000000000000000000000000..86e394372f666d7ecce8b17ac9c66c22c824aad5 --- /dev/null +++ b/cube/cube-agent/src/agent/http_get.go @@ -0,0 +1,35 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package agent + +import ( + "encoding/json" + "fmt" +) + +func initGetHandlers() { + getHandler = map[string]handlerFunc{ + "/agent/status": GetStatus, + } +} + +func GetStatus(subpath string, m map[string]string, b []byte) (string, string, error) { + b, err := json.Marshal(BUILTIN_STATUS) + if err != nil { + return quote(""), "", fmt.Errorf("json marshal failed, %v", err) + } + + return string(b), "", err +} diff --git a/cube/cube-agent/src/agent/http_post.go b/cube/cube-agent/src/agent/http_post.go new file mode 100755 index 0000000000000000000000000000000000000000..921da6e7d6ef575ecbc4a22f159f54177546e18b --- /dev/null +++ b/cube/cube-agent/src/agent/http_post.go @@ -0,0 +1,50 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package agent + +import ( + "encoding/json" + "fmt" + "github.com/Badangel/logex" +) + +func initPostHandlers() { + postHandler = map[string]handlerFunc{ + "/agent/cmd": PostCmd, + } +} + +func PostCmd(subpath string, m map[string]string, b []byte) (string, string, error) { + var work Work + err := json.Unmarshal(b, &work) + if err != nil { + logex.Warningf("Unmarshal from %s error (+%v)", string(b), err) + return quote(""), "", fmt.Errorf("Work json unmarshal work failed, %v", err) + } + + if _, ok := CmdWorkFilter.Load(work.Token()); ok { + logex.Warningf("Another work with same token is doing. Token(%s)", work.Token()) + return quote(""), "", fmt.Errorf("Another work with same key is doing.", err) + } + + CmdWorkFilter.Store(work.Token(), true) + err = work.DoWork() + CmdWorkFilter.Delete(work.Token()) + if err != nil { + return quote(""), "", fmt.Errorf("Do work failed.", err) + } + + return quote(""), "", err +} diff --git a/cube/cube-agent/src/agent/util.go b/cube/cube-agent/src/agent/util.go new file mode 100644 index 0000000000000000000000000000000000000000..c629c9c495c1857a0a2ba0c641466803a81b1c81 --- /dev/null +++ b/cube/cube-agent/src/agent/util.go @@ -0,0 +1,188 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package agent + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net" + "net/http" + "net/url" + "strconv" + "strings" + "time" + + "github.com/Badangel/logex" +) + +// restReq sends a restful request to requrl and returns response body. +func RestReq(method, requrl string, timeout int, kv *map[string]string) (string, error) { + data := url.Values{} + if kv != nil { + for k, v := range *kv { + //logex.Trace("req set:", k, v) + data.Set(k, v) + } + } + if method == "GET" || method == "DELETE" { + requrl = requrl + "?" + data.Encode() + data = url.Values{} + } + + //logex.Notice(method, requrl) + req, err := http.NewRequest(method, requrl, bytes.NewBufferString(data.Encode())) + if err != nil { + logex.Warning("NewRequest failed:", err) + return "", err + } + if method == "POST" || method == "PUT" { + req.Header.Add("Content-Type", "application/x-www-form-urlencoded") + req.Header.Add("Content-Length", strconv.Itoa(len(data.Encode()))) + } + + client := &http.Client{} + client.Timeout = time.Duration(timeout) * time.Second + resp, err := client.Do(req) + if err != nil { + logex.Warning("Do failed:", err) + return "", err + } + if resp.StatusCode < 200 || resp.StatusCode > 299 { + logex.Warning("resp status: " + resp.Status) + return "", errors.New("resp status: " + resp.Status) + } + + body, err := ioutil.ReadAll(resp.Body) + return string(body), err +} + +// restReq sends a restful request to requrl and returns response body as json. +func JsonReq(method, requrl string, timeout int, kv *map[string]string, + out interface{}) error { + s, err := RestReq(method, requrl, timeout, kv) + logex.Debugf("json request method:[%v], requrl:[%s], timeout:[%v], map[%v], out_str:[%s]", method, requrl, timeout, kv, s) + if err != nil { + return err + } + return json.Unmarshal([]byte(s), out) +} + +func GetHdfsMeta(src string) (master, ugi, path string, err error) { + //src = "hdfs://root:rootpasst@st1-inf-platform0.st01.baidu.com:54310/user/mis_user/news_dnn_ctr_cube_1/1501836820/news_dnn_ctr_cube_1_part54.tar" + //src = "hdfs://st1-inf-platform0.st01.baidu.com:54310/user/mis_user/news_dnn_ctr_cube_1/1501836820/news_dnn_ctr_cube_1_part54.tar" + + ugiBegin := strings.Index(src, "//") + ugiPos := strings.LastIndex(src, "@") + if ugiPos != -1 && ugiBegin != -1 { + ugi = src[ugiBegin+2 : ugiPos] + } + src1 := strings.Replace(strings.Replace(src, "hdfs://", "", 1), ugi, "", 1) + if ugi != "" { + src1 = src1[1:] + } + pos := strings.Index(src1, "/") + if pos != -1 { + master = src1[0:pos] + path = src1[pos:] + } else { + logex.Warningf("failed to get the master or path for (%s)", src) + err = errors.New("invalid master or path found") + } + logex.Debugf("parse the (%s) succ, master is %s, ugi is (%s), path is %s", src, master, ugi, path) + return +} + +func getHostIp() (string, error) { + if addrs, err := net.InterfaceAddrs(); err == nil { + for _, addr := range addrs { + ips := addr.String() + logex.Debugf("get host ip: %v", ips) + if strings.HasPrefix(ips, "127") { + continue + } else { + list := strings.Split(ips, "/") + if len(list) != 2 { + continue + } + return list[0], nil + } + } + } + return "unkown ip", errors.New("get host ip failed") +} + +func getHostname(ip string) (hostname string, err error) { + if hostnames, err := net.LookupAddr(ip); err != nil { + hostname = ip + //logex.Warningf("cannot find the hostname of ip (%s), error (%v)", ip, err) + } else { + if len(hostnames) > 0 { + hostname = hostnames[0][:strings.LastIndex(hostnames[0], ".baidu.com.")] + } else { + hostname = ip + } + } + + return hostname, err +} + +func GetLocalHostname() (hostname string, err error) { + if ip, err := getHostIp(); err == nil { + return getHostname(ip) + } else { + return "unkown ip", err + } +} + +func GetLocalHostnameCmd() (hostname string, err error) { + cmd := "hostname" + stdout, _, err := RetryCmd(cmd, RETRY_TIMES) + if stdout != "" && err == nil { + hostname := strings.TrimSpace(stdout) + index := strings.LastIndex(hostname, ".baidu.com") + if index > 0 { + return hostname[:strings.LastIndex(hostname, ".baidu.com")], nil + } else { + return hostname, nil + } + } else { + logex.Debugf("using hostname cmd failed. err:%v", err) + return GetLocalHostname() + } +} + +// quote quotes string for json output. eg: s="123", quote(s)=`"123"` +func quote(s string) string { + return fmt.Sprintf("%q", s) +} + +// quoteb quotes byte array for json output. +func quoteb(b []byte) string { + return quote(string(b)) +} + +// quotea quotes string array for json output +func quotea(a []string) string { + b, _ := json.Marshal(a) + return string(b) +} + +func isJsonDict(s string) bool { + var js map[string]interface{} + return json.Unmarshal([]byte(s), &js) == nil +} diff --git a/cube/cube-agent/src/agent/work.go b/cube/cube-agent/src/agent/work.go new file mode 100644 index 0000000000000000000000000000000000000000..992882c6698335a221a0ba802222d051de629f07 --- /dev/null +++ b/cube/cube-agent/src/agent/work.go @@ -0,0 +1,875 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package agent + +import ( + "crypto/md5" + "encoding/json" + "errors" + "fmt" + "github.com/Badangel/logex" + "github.com/Badangel/pipeline" + "os" + "os/exec" + "path" + "path/filepath" + "strconv" + "strings" + "sync" + "time" +) + +const ( + COMMAND_DOWNLOAD = "download" + COMMAND_RELOAD = "reload" + COMMAND_SWITCH = "enable" + COMMAND_ROLLBACK = "rollback" + COMMAND_CHECK = "check" + COMMAND_CLEAR = "clear" + COMMAND_POP = "pop" + + RETRY_TIMES = 3 + REQUEST_MASTER_TIMEOUT_SECOND = 60 + MAX_DOWN_CO = 7 + + RELOAD_RETRY_TIMES = 3 + RELOAD_RETRY_INTERVAL_SECOND = 10 + + DOWNLOAD_DONE_MARK_FILE = ".download_done" +) + +type VersionInfo struct { + Version string + Depend string + Source string +} + +type Work struct { + DictName string `json:"dict_name"` + ShardSeq int `json:"shard_seq"` + DeployPath string `json:"deploy_path"` + Command string `json:"command"` + Version string `json:"version"` + Depend string `json:"depend"` + Source string `json:"source"` + Mode string `json:"mode"` + DictMode string `json:"dict_mode"` + Port string `json:"port"` + bRollback bool `json:"b_rollback"` + RollbackInfo []VersionInfo `json:"rollback_info"` + Status string `json:"status"` + FinishStatus string `json:"finish_status"` + Service string `json:"service,omitempty"` + VersionSign string `json:"version_sign,omitempty"` + MasterAddress string `json:"master_address,omitempty"` + ActiveVersionList string `json:"active_version_list,omitempty"` +} + +func (work *Work) Token() string { + return work.DictName + strconv.Itoa(work.ShardSeq) + work.Service +} + +func (work *Work) Valid() bool { + if work.Command == "" || + work.Version == "" || + work.Depend == "" { + return false + } + return true +} + +func (work *Work) DoWork() error { + var err error + if !work.Valid() { + err = errors.New("Work is invalid") + return err + } + switch work.Command { + case COMMAND_DOWNLOAD: + err = work.Download() + case COMMAND_RELOAD: + err = work.Reload() + case COMMAND_SWITCH: + err = work.Enable() + case COMMAND_CHECK: + err = work.Check() + case COMMAND_CLEAR: + err = work.Clear() + case COMMAND_POP: + err = work.Pop() + default: + logex.Debugf("Invalid command %s received", work.Command) + err = errors.New("Invalid command.") + } + return err +} + +func GetDownloadDirs(dictName, service, version, depend, deployPath string, shardSeq, + split int) ([]string, error) { + dirs := make([]string, 0, split) + if deployPath == "" { + return dirs, errors.New("Invalid Deploy path") + } + parentDir := getParentDir(version, depend) + if split < 2 { + disk := path.Join(deployPath, "cube_data") + if service == "" { + dirs = append(dirs, path.Join(disk, strconv.Itoa(shardSeq), parentDir)) + } else { + dirs = append(dirs, path.Join(disk, strconv.Itoa(shardSeq), parentDir+"-"+dictName)) + } + } else { + for i := 0; i < split; i++ { + disk := path.Join(deployPath, "cube_data") + if service == "" { + dirs = append(dirs, path.Join(disk, strconv.Itoa(shardSeq), strconv.Itoa(i), parentDir)) + } else { + dirs = append(dirs, path.Join(disk, strconv.Itoa(shardSeq), + parentDir+"-"+dictName)) + } + } + } + + return dirs, nil +} + +func GetDataLinkDirs(dictName, service, version, depend, deployPath string, shardSeq, + split int) []string { + dirs := make([]string, 0, split) + parentDir := getParentDir(version, depend) + if split < 2 { + disk := path.Join(deployPath, "data") + if service == "" { + dirs = append(dirs, path.Join(disk, parentDir)) + } + } else { + for i := 0; i < split; i++ { + disk := path.Join(deployPath, "data") + if service == "" { + dirs = append(dirs, path.Join(disk, strconv.Itoa(i), parentDir)) + } + } + } + return dirs +} + +func (work *Work) Download() (err error) { + err = DoDownload(work.DictName, work.Service, work.Version, work.Depend, work.Mode, work.Source, + work.DeployPath, work.ShardSeq) + + if err != nil { + logex.Warningf("download error, failed to download %s, dir is %s, error is (+%v)", work.Source, work.DeployPath, err) + return + } + + if work.Service == "" { + err = UnTar(work.DictName, work.Service, work.Version, work.Depend, work.Source, + work.DeployPath, work.ShardSeq) + + if err == nil { + dataPath := path.Join(work.DeployPath, "data") + + // remove all old links + if work.Mode == "base" || len(work.RollbackInfo) != 0 { + cmd := fmt.Sprintf("ls -l %s | grep -E 'data.|index.' | awk '{print $9}'", dataPath) + stdout, _, err := RetryCmd(cmd, RETRY_TIMES) + if err == nil && stdout != "" { + fileNameLi := strings.Split(strings.TrimSpace(stdout), "\n") + for _, file := range fileNameLi { + err = os.Remove(path.Join(dataPath, file)) + logex.Debugf("os.Remove(%s) error (%+v) ", path.Join(dataPath, file), err) + } + } + } + + // create symbolic link to the version rollbacked + err = CreateSymlink(work.DictName, work.Service, work.Version, work.Depend, dataPath, + work.DeployPath, work.ShardSeq, len(strings.Split(work.Source, ";"))) + } else { + logex.Warningf("download error, failed to untar for %s, dir is %s, error is (+%v)", work.Source, work.DeployPath, err) + } + } + + if err == nil { + // clear history data + work.clearData() + work.clearLink() + } else { + logex.Warningf("create symlink failed, error is (+%v)", err) + } + + return +} + +func (work *Work) clearData() (err error) { + split := len(strings.Split(work.Source, ";")) + downloadDirs, err := GetDownloadDirs(work.DictName, work.Service, work.Version, work.Depend, + work.DeployPath, work.ShardSeq, split) + if err != nil { + logex.Warningf("clearData failed, error is (+%v)", err) + return + } + for _, downloadDir := range downloadDirs { + parentDir, _ := filepath.Split(downloadDir) + + cmd := fmt.Sprintf("ls -l %s | grep -v %s | awk '{print $9}'", parentDir, work.Depend) + + stdout, _, err := RetryCmd(cmd, RETRY_TIMES) + if err != nil || stdout == "" || work.Service != "" { + cmd = fmt.Sprintf("find %s -type d -ctime +1 -print | xargs -i rm -rf {}", parentDir) + _, _, err = RetryCmd(cmd, RETRY_TIMES) + } else { + rmList := strings.Split(strings.TrimSpace(stdout), "\n") + for i := 0; i < len(rmList); i++ { + if rmList[i] == "" { + continue + } + cmd = fmt.Sprintf("rm -rf %s/%s*", parentDir, rmList[i]) + _, _, err = RetryCmd(cmd, RETRY_TIMES) + } + } + } + + return +} + +func (work *Work) clearPatchData() (err error) { + if work.Service != "" { + return + } + split := len(strings.Split(work.Source, ";")) + downloadDirs, err := GetDownloadDirs(work.DictName, work.Service, work.Version, work.Depend, + work.DeployPath, work.ShardSeq, split) + if err != nil { + logex.Warningf("clearPatchData failed, error is (+%v)", err) + return + } + for _, downloadDir := range downloadDirs { + parentDir, _ := filepath.Split(downloadDir) + cmd := fmt.Sprintf("ls -l %s | grep %s_ | awk '{print $9}'", parentDir, work.Depend) + stdout, _, err := RetryCmd(cmd, RETRY_TIMES) + if err == nil && stdout != "" { + rmList := strings.Split(strings.TrimSpace(stdout), "\n") + for i := 0; i < len(rmList); i++ { + if rmList[i] == "" { + continue + } + cmd = fmt.Sprintf("rm -rf %s/%s*", parentDir, rmList[i]) + _, _, err = RetryCmd(cmd, RETRY_TIMES) + } + } + } + + return +} + +func (work *Work) clearLink() (err error) { + if work.Service != "" { + return + } + split := len(strings.Split(work.Source, ";")) + dataLinkDirs := GetDataLinkDirs(work.DictName, work.Service, work.Version, work.Depend, + work.DeployPath, work.ShardSeq, split) + for _, linkDir := range dataLinkDirs { + parentDir, _ := filepath.Split(linkDir) + cmd := fmt.Sprintf("ls -l %s | grep -v %s | awk '{print $9}'", parentDir, work.Depend) + + stdout, _, err := RetryCmd(cmd, RETRY_TIMES) + if err != nil || stdout == "" { + cmd = fmt.Sprintf("find %s -type d -ctime +1 -print | xargs -i rm -rf {}", parentDir) + _, _, err = RetryCmd(cmd, RETRY_TIMES) + } else { + rmList := strings.Split(strings.TrimSpace(stdout), "\n") + for i := 0; i < len(rmList); i++ { + if rmList[i] == "" { + continue + } + cmd = fmt.Sprintf("rm -rf %s/%s*", parentDir, rmList[i]) + _, _, err = RetryCmd(cmd, RETRY_TIMES) + } + } + } + + return +} + +func (work *Work) clearPatchLink() (err error) { + if work.Service != "" { + return + } + split := len(strings.Split(work.Source, ";")) + dataLinkDirs := GetDataLinkDirs(work.DictName, work.Service, work.Version, work.Depend, + work.DeployPath, work.ShardSeq, split) + for _, linkDir := range dataLinkDirs { + parentDir, _ := filepath.Split(linkDir) + cmd := fmt.Sprintf("ls -l %s | grep %s_ | awk '{print $9}'", parentDir, work.Depend) + + stdout, _, err := RetryCmd(cmd, RETRY_TIMES) + if err == nil && stdout != "" { + rmList := strings.Split(strings.TrimSpace(stdout), "\n") + for i := 0; i < len(rmList); i++ { + if rmList[i] == "" { + continue + } + cmd = fmt.Sprintf("rm -rf %s/%s*", parentDir, rmList[i]) + _, _, err = RetryCmd(cmd, RETRY_TIMES) + } + } + } + + return +} + +func UnTar(dictName, service, version, depend, source, deployPath string, shardSeq int) (err error) { + sources := strings.Split(source, ";") + downloadDirs, err := GetDownloadDirs(dictName, service, version, depend, deployPath, shardSeq, + len(sources)) + if err != nil { + logex.Warningf("UnTar failed, error is (+%v)", err) + return + } + for i := 0; i < len(sources); i++ { + fileName := GetFileName(sources[i]) + untarCmd := fmt.Sprintf("tar xvf %s -C %s", path.Join(downloadDirs[i], fileName), downloadDirs[i]) + _, _, err = RetryCmd(untarCmd, RETRY_TIMES) + } + + return +} + +func CreateSymlink(dictName, service, version, depend, dataPath, deployPath string, shardSeq, + split int) (err error) { + downloadDirs, err := GetDownloadDirs(dictName, service, version, depend, deployPath, shardSeq, split) + if err != nil { + logex.Warningf("CreateSymlink failed, error is (+%v)", err) + } + for i, downloadDir := range downloadDirs { + cmd := fmt.Sprintf("ls -l %s | grep -E 'data.|index.' | awk '{print $NF}'", downloadDir) + stdout, _, err := RetryCmd(cmd, RETRY_TIMES) + + if err == nil && stdout != "" { + fileNameLi := strings.Split(strings.TrimSpace(stdout), "\n") + versionDir := getParentDir(version, depend) + versionFile := path.Join(dataPath, "VERSION") + dataSubPath := "" + if split > 1 { + dataSubPath = path.Join(dataPath, strconv.Itoa(i), versionDir) + } else { + dataSubPath = path.Join(dataPath, versionDir) + } + if err = os.MkdirAll(dataSubPath, 0755); err != nil { + // return err + logex.Warningf("os.Mkdir %s failed, err:[%v]", dataSubPath, err) + } + if dataSubPath != "" { + cmd = fmt.Sprintf("find %s/.. -type d -ctime +5 -print | xargs -i rm -rf {}", dataSubPath) + _, _, err = RetryCmd(cmd, RETRY_TIMES) + } + for _, file := range fileNameLi { + dataLink := "" + tempDataPath := "" + if split > 1 { + dataLink = path.Join(dataPath, strconv.Itoa(i), file) + tempDataPath = path.Join(dataPath, strconv.Itoa(i)) + } else { + dataLink = path.Join(dataPath, file) + tempDataPath = dataPath + } + cmd = fmt.Sprintf("rm -rf %s", dataLink) + _, stderr, _ := RetryCmd(cmd, RETRY_TIMES) + logex.Noticef("rm -rf %s, err:[%s]", dataLink, stderr) + + // create new symlink + err = os.Symlink(path.Join(downloadDir, file), dataLink) + logex.Noticef("os.Symlink %s %s return (%+v)", path.Join(downloadDir, file), dataLink, err) + fmt.Println("os.Symlink: ", path.Join(downloadDir, file), dataLink, err) + cmd = fmt.Sprintf("cp -d %s/index.* %s/", tempDataPath, dataSubPath) + _, stderr, _ = RetryCmd(cmd, RETRY_TIMES) + logex.Noticef("cp -d index Symlink to version dir %s, err:[%s]", dataSubPath, stderr) + cmd = fmt.Sprintf("cp -d %s/data.* %s/", tempDataPath, dataSubPath) + _, stderr, _ = RetryCmd(cmd, RETRY_TIMES) + logex.Noticef("cp -d data Symlink to version dir %s, err:[%s]", dataSubPath, stderr) + } + cmd = fmt.Sprintf("echo %s > %s", versionDir, versionFile) + if _, _, err = RetryCmd(cmd, RETRY_TIMES); err != nil { + return err + } + } + } + + return +} + +func (work *Work) CheckToReload() bool { + statusCmd := fmt.Sprintf("curl -s -d '{\"cmd\":\"status\"}' http://127.0.0.1:%s/ControlService/cmd", work.Port) + stdout, _, _ := RetryCmd(statusCmd, RETRY_TIMES) + var resp CubeResp + json.Unmarshal([]byte(stdout), &resp) + version := getParentDir(work.Version, work.Depend) + + if resp.CurVersion == "" && resp.BgVersion == "" { + logex.Noticef("cube version empty") + return true + } + if resp.CurVersion == version || resp.BgVersion == version { + logex.Noticef("cube version has matched. version: %s", version) + return false + } + return true +} + +func (work *Work) Reload() (err error) { + if work.Port == "" { + err = errors.New("Reload with invalid port.") + return + } + if !work.CheckToReload() { + work.writeStatus("finish_reload", "succ") + return + } + work.writeStatus("prepare_reload", "") + + var stdout string + versionPath := getParentDir(work.Version, work.Depend) + bgLoadCmd := "bg_load_base" + if work.Mode == "delta" { + bgLoadCmd = "bg_load_patch" + } + if work.ActiveVersionList == "" { + work.ActiveVersionList = "[]" + } + for i := 0; i < RELOAD_RETRY_TIMES; i++ { + reloadCmd := fmt.Sprintf("curl -o /dev/null -s -w %%{http_code} -d '{\"cmd\":\"%s\",\"version_path\":\"/%s\"}' http://127.0.0.1:%s/ControlService/cmd", bgLoadCmd, versionPath, work.Port) + fmt.Println("reload: ", reloadCmd) + stdout, _, _ = RetryCmd(reloadCmd, 1) + fmt.Println("reload stdout: ", stdout) + if strings.TrimSpace(stdout) == "200" { + logex.Debugf("bg_load_base return succ") + break + } else { + logex.Warning("bg_load_base return failed") + time.Sleep(RELOAD_RETRY_INTERVAL_SECOND * time.Second) + } + } + + if strings.TrimSpace(stdout) == "200" { + work.writeStatus("finish_reload", "succ") + } else { + work.writeStatus("finish_reload", "failed") + err = errors.New("reload failed.") + } + + return +} + +func (work *Work) Clear() (err error) { + work.Service = "" + + var stdout string + var clearCmd string + for i := 0; i < RETRY_TIMES; i++ { + clearCmd = fmt.Sprintf("curl -o /dev/null -s -w %%{http_code} -d '{\"cmd\":\"clear\",\"table_name\":\"%s\"}' http://127.0.0.1:%s/NodeControlService/cmd", work.DictName, work.Port) + fmt.Println("clear: ", clearCmd) + stdout, _, _ = RetryCmd(clearCmd, 1) + fmt.Println("clear stdout: ", stdout) + if strings.TrimSpace(stdout) == "200" { + logex.Debugf("clear return succ") + break + } else { + logex.Warning("clear return failed") + time.Sleep(RELOAD_RETRY_INTERVAL_SECOND * time.Second) + } + } + + if strings.TrimSpace(stdout) == "200" { + err = work.writeStatus("succ", "") + } else { + err = work.writeStatus("failed", "") + } + + return +} + +func (work *Work) Check() (err error) { + if work.Service != "" || work.VersionSign == "" { + return + } + var dataLinkDirs []string + split := len(strings.Split(work.Source, ";")) + dataLinkDirs = GetDataLinkDirs(work.DictName, work.Service, work.Version, work.Depend, + work.DeployPath, work.ShardSeq, split) + + if _, t_err := os.Stat(work.DeployPath); os.IsNotExist(t_err) { + logex.Noticef("check DeployPath[%s] not exists.", work.DeployPath) + return + } + + check_succ := true + for _, linkDir := range dataLinkDirs { + parentDir, _ := filepath.Split(linkDir) + + cmd := fmt.Sprintf("ls -l %s | grep %s | awk '{print $9}' | grep -v data | grep -v index", parentDir, work.Depend) + + stdout, _, err := RetryCmd(cmd, RETRY_TIMES) + if err != nil || stdout == "" { + check_succ = false + break + } else { + versionList := strings.Split(strings.TrimSpace(stdout), "\n") + logex.Noticef("calc ver_sign for [%v]", versionList) + + var version_sign string + var version string + for i := 0; i < len(versionList); i++ { + split_index := strings.Index(versionList[i], "_") + if split_index > 0 && split_index < len(versionList[i]) { + version = versionList[i][split_index+1:] + } else { + version = versionList[i] + } + if version_sign == "" { + version_sign = fmt.Sprintf("%x", md5.Sum([]byte(version))) + } else { + version_sign = fmt.Sprintf("%x", md5.Sum([]byte(version_sign))) + } + } + + if version_sign != work.VersionSign { + logex.Warningf("version_sign check failed. real[%v] expect[%v]", version_sign, work.VersionSign) + check_succ = false + break + } + } + } + + if !check_succ { + work.clearPatchData() + work.clearPatchLink() + master_host, master_port, _ := GetMaster(work.MasterAddress) + cmd := fmt.Sprintf("cd %s && export STRATEGY_DIR=%s && ./downloader -h %s -p %s -d %s -s %d", + work.DeployPath, work.DeployPath, master_host, master_port, work.DictName, work.ShardSeq) + _, _, err = RetryCmd(cmd, RETRY_TIMES) + } + + return +} + +func (work *Work) Enable() (err error) { + if work.Port == "" { + err = errors.New("Enable with invalid port") + return + } + var stdout string + var cmd string + versionPath := getParentDir(work.Version, work.Depend) + for i := 0; i < RELOAD_RETRY_TIMES; i++ { + if work.Service != "" { + cmd = fmt.Sprintf("curl -o /dev/null -s -w %%{http_code} -d '{\"cmd\":\"reload_model\",\"version\":\"%s-%s\",\"dict_name\":\"%s\"}' http://127.0.0.1:%s/ControlService/cmd", + versionPath, work.DictName, work.DictName, work.Port) + } else { + cmd = fmt.Sprintf("curl -o /dev/null -s -w %%{http_code} -d '{\"cmd\":\"enable\",\"version\":\"%s\"}' http://127.0.0.1:%s/ControlService/cmd", versionPath, work.Port) + } + stdout, _, _ = RetryCmd(cmd, 1) + + if strings.TrimSpace(stdout) == "200" { + logex.Debugf("enable return succ for %s, work dir is %s", work.Source, work.DeployPath) + break + } else { + logex.Warningf("enable return failed for %s, work dir is %s, error is (%+v)", work.Source, work.DeployPath, err) + time.Sleep(RELOAD_RETRY_INTERVAL_SECOND * time.Second) + } + } + + if strings.TrimSpace(stdout) == "200" { + err = work.writeStatus("succ", "") + } else { + err = work.writeStatus("failed", "") + } + + if work.Service == "" { + cmd = fmt.Sprintf("curl -o /dev/null -s -w %%{http_code} -d '{\"cmd\":\"bg_unload\"}' http://127.0.0.1:%s/ControlService/cmd", work.Port) + stdout, _, _ = RetryCmd(cmd, RETRY_TIMES) + if strings.TrimSpace(stdout) == "200" { + logex.Debugf("unload return succ") + } else { + logex.Warning("unload return failed") + } + } + + RemoveStateFile(work.DictName, work.ShardSeq, work.Service) + + return +} + +func (work *Work) Pop() (err error) { + var stdout string + var cmd string + if work.ActiveVersionList == "" { + work.ActiveVersionList = "[]" + } + for i := 0; i < RELOAD_RETRY_TIMES; i++ { + cmd = fmt.Sprintf("curl -o /dev/null -s -w %%{http_code} -d '{\"cmd\":\"pop\",\"table_name\":\"%s\",\"active_versions\":%v}' http://127.0.0.1:%s/NodeControlService/cmd", work.DictName, work.ActiveVersionList, work.Port) + fmt.Println("pop: ", cmd) + stdout, _, _ = RetryCmd(cmd, 1) + fmt.Println("pop stdout: ", stdout) + if strings.TrimSpace(stdout) == "200" { + logex.Debugf("pop return succ") + break + } else { + logex.Warning("pop return failed") + time.Sleep(RELOAD_RETRY_INTERVAL_SECOND * time.Second) + } + } + + if strings.TrimSpace(stdout) == "200" { + err = work.writeStatus("succ", "") + } else { + err = work.writeStatus("failed", "") + } + + RemoveStateFile(work.DictName, work.ShardSeq, work.Service) + return +} + +func writeStateFile(dictName string, shardSeq int, service, state string) { + stateFile := fmt.Sprintf(".state_%s_%d", dictName, shardSeq) + if service != "" { + stateFile = stateFile + "_" + service + } + + cmd := fmt.Sprintf("echo '%s' > %s/state/%s", state, Dir, stateFile) + if _, _, err := RetryCmd(cmd, RETRY_TIMES); err != nil { + logex.Warningf("%s error (%+v)", cmd, err) + } +} + +func RemoveStateFile(dictName string, shardSeq int, service string) { + stateFile := fmt.Sprintf(".state_%s_%d", dictName, shardSeq) + if service != "" { + stateFile = stateFile + "_" + service + } + + cmd := fmt.Sprintf("rm -f %s/state/%s", Dir, stateFile) + if _, _, err := RetryCmd(cmd, RETRY_TIMES); err != nil { + logex.Warningf("%s error (%+v)", cmd, err) + } +} + +func (work *Work) writeStatus(status string, finishStatus string) (err error) { + work.Status = status + work.FinishStatus = finishStatus + state, _ := json.Marshal(work) + writeStateFile(work.DictName, work.ShardSeq, work.Service, string(state)) + return +} + +func DoDownloadIndividual(source, downloadDir string, isService bool, timeOut int, ch chan error, wg *sync.WaitGroup) { + err := errors.New("DoDownloadIndividual start") + for i := 0; i < RETRY_TIMES; i++ { + err = FtpDownload(source, downloadDir, timeOut) + if err == nil { + logex.Debugf("download %s to %s succ", source, downloadDir) + if !isService { + err = FtpDownload(source+".md5", downloadDir, timeOut) + } + } else { + logex.Warningf("download error , source %s, downloadDir %s, err (%+v)", source, downloadDir, err) + continue + } + + if err == nil && isService { + // touch download_succ file + cmd := fmt.Sprintf("touch %s", path.Join(downloadDir, DOWNLOAD_DONE_MARK_FILE)) + RetryCmd(cmd, RETRY_TIMES) + break + } + + // download md5 file succ, md5check + if err == nil { + // md5sum -c + fileName := GetFileName(source) + err = checkMd5(path.Join(downloadDir, fileName), path.Join(downloadDir, fileName+".md5")) + logex.Warningf("md5sum check %s %s return (%+v)", path.Join(downloadDir, fileName), path.Join(downloadDir, fileName+".md5"), err) + if err == nil { + // touch download_succ file + cmd := fmt.Sprintf("touch %s", path.Join(downloadDir, DOWNLOAD_DONE_MARK_FILE)) + RetryCmd(cmd, RETRY_TIMES) + logex.Debugf("md5sum ok, source is %s, dir is %s", source, downloadDir) + break + } else { + logex.Warningf("md5sum error, source is %s, dir is %s", source, downloadDir) + continue + } + } else { + logex.Warningf("download %s return (%+v)", source+".md5", err) + continue + } + } + + ch <- err + wg.Done() +} + +func checkSources(source string) ([]string, error) { + sources := strings.Split(source, ";") + for i := 0; i < len(sources); i++ { + if sources[i] == "" || !strings.HasPrefix(sources[i], "ftp://") { + return sources, errors.New("Invalid sources") + } + } + return sources, nil +} + +func DoDownload(dictName, service, version, depend, mode, source, deployPath string, + shardSeq int) (err error) { + sources, err := checkSources(source) + if err != nil { + logex.Warningf("checkSources %s return (%+v)", source, err) + return + } + downloadDirs, err := GetDownloadDirs(dictName, service, version, depend, deployPath, shardSeq, + len(sources)) + if err != nil { + logex.Warningf("GetDownloadDirs %s return (%+v)", source, err) + return + } + version_suffix := "" + if service != "" { + version_suffix = version_suffix + "-" + dictName + } + if !checkToDownload(downloadDirs) { + cmd := fmt.Sprintf("cd %s/cube_data && echo %s > VERSION && cp VERSION VERSION-%s", + deployPath, getParentDir(version, depend)+version_suffix, dictName) + _, _, err = RetryCmd(cmd, RETRY_TIMES) + logex.Debugf("echo VERSION cmd:[%s] err:[%v]", cmd, err) + return + } + + ch := make(chan error, len(sources)) + wg := sync.WaitGroup{} + j := 0 + numCo := 0 + for ; j < len(sources); j++ { + if numCo >= MAX_DOWN_CO { + wg.Wait() + logex.Noticef("DoDownload co down.") + numCo = 0 + } + numCo += 1 + wg.Add(1) + time.Sleep(2000 * time.Millisecond) + timeOut := 900 + if mode == "base" { + timeOut = 3600 + } + go DoDownloadIndividual(sources[j], downloadDirs[j], (service != ""), timeOut, ch, &wg) + } + wg.Wait() + close(ch) + for err = range ch { + if err != nil { + return + } + } + cmd := fmt.Sprintf("cd %s/cube_data && echo %s > VERSION && cp VERSION VERSION-%s", + deployPath, getParentDir(version, depend)+version_suffix, dictName) + _, _, err = RetryCmd(cmd, RETRY_TIMES) + logex.Debugf("echo VERSION cmd:[%s] err:[%v]", cmd, err) + return +} + +func FtpDownload(source string, dest string, timeOut int) (err error) { + dlCmd := fmt.Sprintf("wget --quiet --level=100 -P %s %s --limit-rate=10240k", dest, source) + fmt.Println(dlCmd) + + _, _, err = RetryCmdWithSleep(dlCmd, RETRY_TIMES) + return +} + +func checkToDownload(downloadDirs []string) bool { + for _, v := range downloadDirs { + if _, err := os.Stat(path.Join(v, DOWNLOAD_DONE_MARK_FILE)); err != nil { + logex.Noticef("check [%v] not exists.", v) + return true + } + } + + return false +} + +// simple hash +func getDownloadDisk(dictName string, shardSeq int) string { + index := len(dictName) * shardSeq % len(disks) + + return disks[index] +} + +func getParentDir(version string, depend string) (dir string) { + if version == depend { + dir = depend + } else { + dir = depend + "_" + version + } + + return +} + +func GetFileName(source string) (fileName string) { + s := strings.Split(source, "/") + fileName = s[len(s)-1] + + return +} + +func checkMd5(file string, fileMd5 string) (err error) { + md5Cmd := fmt.Sprintf("./scripts/md5_checker %s %s", file, fileMd5) + + _, _, err = pipeline.Run(exec.Command("/bin/sh", "-c", md5Cmd)) + + return +} + +func RetryCmd(cmd string, retryTimes int) (stdoutStr string, stderrStr string, err error) { + for i := 0; i < retryTimes; i++ { + stdout, stderr, e := pipeline.Run(exec.Command("/bin/sh", "-c", cmd)) + stdoutStr = stdout.String() + stderrStr = stderr.String() + err = e + + logex.Debugf("cmd %s, stdout: %s, stderr: %s, err: (%+v)", cmd, stdoutStr, stderrStr, err) + if err == nil { + break + } + } + + return +} + +func RetryCmdWithSleep(cmd string, retryTimes int) (stdoutStr string, stderrStr string, err error) { + for i := 0; i < retryTimes; i++ { + stdout, stderr, e := pipeline.Run(exec.Command("/bin/sh", "-c", cmd)) + stdoutStr = stdout.String() + stderrStr = stderr.String() + err = e + + logex.Debugf("cmd %s, stdout: %s, stderr: %s, err: (%+v)", cmd, stdoutStr, stderrStr, err) + if err == nil { + break + } + time.Sleep(10000 * time.Millisecond) + } + + return +} diff --git a/cube/cube-agent/src/agent/work_pool.go b/cube/cube-agent/src/agent/work_pool.go new file mode 100644 index 0000000000000000000000000000000000000000..aecf00cc645c5f94a49d30ed1abccb89ab20ce4c --- /dev/null +++ b/cube/cube-agent/src/agent/work_pool.go @@ -0,0 +1,121 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package agent + +import ( + "errors" + "fmt" + "sync" + "sync/atomic" +) + +type ( + workType struct { + poolWorker PoolWorker + resultChannel chan error + } + + WorkPool struct { + queueChannel chan workType + workChannel chan PoolWorker + queuedWorkNum int32 + activeWorkerNum int32 + queueCapacity int32 + workFilter sync.Map + } +) + +type PoolWorker interface { + Token() string + DoWork() +} + +func NewWorkPool(workerNum int, queueCapacity int32) *WorkPool { + workPool := WorkPool{ + queueChannel: make(chan workType), + workChannel: make(chan PoolWorker, queueCapacity), + queuedWorkNum: 0, + activeWorkerNum: 0, + queueCapacity: queueCapacity, + } + + for i := 0; i < workerNum; i++ { + go workPool.startWorkRoutine() + } + + go workPool.startQueueRoutine() + + return &workPool +} + +func (workPool *WorkPool) startWorkRoutine() { + for { + select { + case work := <-workPool.workChannel: + workPool.doWork(work) + break + } + } +} + +func (workPool *WorkPool) startQueueRoutine() { + for { + select { + case queueItem := <-workPool.queueChannel: + if atomic.AddInt32(&workPool.queuedWorkNum, 0) == workPool.queueCapacity { + queueItem.resultChannel <- fmt.Errorf("work pool fulled with %v pending works", QueueCapacity) + continue + } + + atomic.AddInt32(&workPool.queuedWorkNum, 1) + + workPool.workChannel <- queueItem.poolWorker + + queueItem.resultChannel <- nil + + break + } + } +} + +func (workPool *WorkPool) doWork(poolWorker PoolWorker) { + defer atomic.AddInt32(&workPool.activeWorkerNum, -1) + defer workPool.workFilter.Delete(poolWorker.Token()) + + atomic.AddInt32(&workPool.queuedWorkNum, -1) + atomic.AddInt32(&workPool.activeWorkerNum, 1) + + poolWorker.DoWork() +} + +func (workPool *WorkPool) PostWorkWithToken(poolWorker PoolWorker) (err error) { + if _, ok := workPool.workFilter.Load(poolWorker.Token()); ok { + return errors.New("another work with same key is doing.") + } + workPool.workFilter.Store(poolWorker.Token(), true) + return workPool.PostWork(poolWorker) +} + +func (workPool *WorkPool) PostWork(poolWorker PoolWorker) (err error) { + work := workType{poolWorker, make(chan error)} + + defer close(work.resultChannel) + + workPool.queueChannel <- work + + err = <-work.resultChannel + + return err +} diff --git a/cube/cube-agent/src/cube-agent.go b/cube/cube-agent/src/cube-agent.go new file mode 100644 index 0000000000000000000000000000000000000000..34f74979001c49139ba0fc14df44f6d3210dcef3 --- /dev/null +++ b/cube/cube-agent/src/cube-agent.go @@ -0,0 +1,110 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "agent" + "fmt" + "github.com/Badangel/logex" + "github.com/docopt/docopt-go" + "os" + "path/filepath" + "runtime" + "strconv" +) + +func main() { + runtime.GOMAXPROCS(runtime.NumCPU()) + + agent.Dir, _ = filepath.Abs(filepath.Dir(os.Args[0])) + usage := fmt.Sprintf(`Usage: ./m_master [options] + +Options: + -n WORKERNUM set worker num. + -q QUEUENUM set queue num. + -P LISTEN_PORT agent listen port + +Log options: + -l LOG_LEVEL set log level, values: 0,1,2,4,8,16. [default: 16] + --log_dir=DIR set log output dir. [default: ./log] + --log_name=NAME set log name. [default: m_agent]`, agent.Dir) + + opts, err := docopt.Parse(usage, nil, true, "Cube Agent Checker 1.0.0", false) + if err != nil { + fmt.Println("ERROR:", err) + os.Exit(1) + } + + log_level, _ := strconv.Atoi(opts["-l"].(string)) + log_name := opts["--log_name"].(string) + log_dir := opts["--log_dir"].(string) + logex.SetLevel(getLogLevel(log_level)) + if err := logex.SetUpFileLogger(log_dir, log_name, nil); err != nil { + fmt.Println("ERROR:", err) + } + + logex.Notice("--- NEW SESSION -------------------------") + logex.Notice(">>> log_level:", log_level) + + agent.WorkerNum = 10 + if opts["-n"] != nil { + n, err := strconv.Atoi(opts["-n"].(string)) + if err == nil { + agent.WorkerNum = n + } + } + + agent.QueueCapacity = 20 + if opts["-q"] != nil { + q, err := strconv.Atoi(opts["-q"].(string)) + if err == nil { + agent.QueueCapacity = int32(q) + } + } + + agent.CmdWorkPool = agent.NewWorkPool(agent.WorkerNum, agent.QueueCapacity) + + if opts["-P"] == nil { + logex.Fatalf("ERROR: -P LISTEN PORT must be set!") + os.Exit(255) + } + + agentPort := opts["-P"].(string) + logex.Notice(">>> starting server...") + addr := ":" + agentPort + + if agent.StartHttp(addr) != nil { + logex.Noticef("cant start http(addr=%v). quit.", addr) + os.Exit(0) + } +} + +func getLogLevel(log_level int) logex.Level { + switch log_level { + case 16: + return logex.DEBUG + case 8: + return logex.TRACE + case 4: + return logex.NOTICE + case 2: + return logex.WARNING + case 1: + return logex.FATAL + case 0: + return logex.NONE + } + return logex.DEBUG +} diff --git a/cube/cube-transfer/CMakeLists.txt b/cube/cube-transfer/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..ab91c0f5f274d971d866ad33680a49103a641934 --- /dev/null +++ b/cube/cube-transfer/CMakeLists.txt @@ -0,0 +1,26 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/cmake") + +project(cube-transfer Go) + +include(cmake/golang.cmake) + +ExternalGoProject_Add(docopt-go github.com/docopt/docopt-go) +ExternalGoProject_Add(rfw github.com/mipearson/rfw) +ExternalGoProject_Add(logex github.com/Badangel/logex) + +add_subdirectory(src) +install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/conf DESTINATION ${PADDLE_SERVING_INSTALL_DIR}) diff --git a/cube/cube-transfer/cmake/CMakeDetermineGoCompiler.cmake b/cube/cube-transfer/cmake/CMakeDetermineGoCompiler.cmake new file mode 100644 index 0000000000000000000000000000000000000000..d17449f39f904c56c3d6969db75f8a44207fc9f4 --- /dev/null +++ b/cube/cube-transfer/cmake/CMakeDetermineGoCompiler.cmake @@ -0,0 +1,58 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +if(NOT CMAKE_Go_COMPILER) + if(NOT $ENV{GO_COMPILER} STREQUAL "") + get_filename_component(CMAKE_Go_COMPILER_INIT $ENV{GO_COMPILER} PROGRAM PROGRAM_ARGS CMAKE_Go_FLAGS_ENV_INIT) + + if(CMAKE_Go_FLAGS_ENV_INIT) + set(CMAKE_Go_COMPILER_ARG1 "${CMAKE_Go_FLAGS_ENV_INIT}" CACHE STRING "First argument to Go compiler") + endif() + + if(NOT EXISTS ${CMAKE_Go_COMPILER_INIT}) + message(SEND_ERROR "Could not find compiler set in environment variable GO_COMPILER:\n$ENV{GO_COMPILER}.") + endif() + + endif() + + set(Go_BIN_PATH + $ENV{GOPATH} + $ENV{GOROOT} + $ENV{GOROOT}/../bin + $ENV{GO_COMPILER} + /usr/bin + /usr/local/bin + ) + + if(CMAKE_Go_COMPILER_INIT) + set(CMAKE_Go_COMPILER ${CMAKE_Go_COMPILER_INIT} CACHE PATH "Go Compiler") + else() + find_program(CMAKE_Go_COMPILER + NAMES go + PATHS ${Go_BIN_PATH} + ) + EXEC_PROGRAM(${CMAKE_Go_COMPILER} ARGS version OUTPUT_VARIABLE GOLANG_VERSION) + STRING(REGEX MATCH "go[0-9]+.[0-9]+.[0-9]+[ /A-Za-z0-9]*" VERSION "${GOLANG_VERSION}") + message("-- The Golang compiler identification is ${VERSION}") + message("-- Check for working Golang compiler: ${CMAKE_Go_COMPILER}") + endif() + +endif() + +mark_as_advanced(CMAKE_Go_COMPILER) + +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/cmake/CMakeGoCompiler.cmake.in + ${CMAKE_PLATFORM_INFO_DIR}/CMakeGoCompiler.cmake @ONLY) + +set(CMAKE_Go_COMPILER_ENV_VAR "GO_COMPILER") \ No newline at end of file diff --git a/cube/cube-transfer/cmake/CMakeGoCompiler.cmake.in b/cube/cube-transfer/cmake/CMakeGoCompiler.cmake.in new file mode 100644 index 0000000000000000000000000000000000000000..4d54319b0cdd2cc0373858f1981cec0f7933a9cc --- /dev/null +++ b/cube/cube-transfer/cmake/CMakeGoCompiler.cmake.in @@ -0,0 +1,22 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +set(CMAKE_Go_COMPILER "@CMAKE_Go_COMPILER@") +set(CMAKE_Go_COMPILER_LOADED 1) + +set(CMAKE_Go_SOURCE_FILE_EXTENSIONS go) +set(CMAKE_Go_LINKER_PREFERENCE 40) +set(CMAKE_Go_OUTPUT_EXTENSION .o) +set(CMAKE_Go_OUTPUT_EXTENSION_REPLACE 1) +set(CMAKE_Go_COMPILER_ENV_VAR "GO_COMPILER") diff --git a/cube/cube-transfer/cmake/CMakeGoInformation.cmake b/cube/cube-transfer/cmake/CMakeGoInformation.cmake new file mode 100644 index 0000000000000000000000000000000000000000..4171072919e94c4547203a8e8bb2b6758436f001 --- /dev/null +++ b/cube/cube-transfer/cmake/CMakeGoInformation.cmake @@ -0,0 +1,21 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +if(NOT CMAKE_Go_COMPILE_OBJECT) + set(CMAKE_Go_COMPILE_OBJECT "go tool compile -l -N -o ") +endif() + +if(NOT CMAKE_Go_LINK_EXECUTABLE) + set(CMAKE_Go_LINK_EXECUTABLE "go tool link -o ") +endif() \ No newline at end of file diff --git a/cube/cube-transfer/cmake/CMakeTestGoCompiler.cmake b/cube/cube-transfer/cmake/CMakeTestGoCompiler.cmake new file mode 100644 index 0000000000000000000000000000000000000000..e69831165e62e2e8091fc88ade83904426a4ac5f --- /dev/null +++ b/cube/cube-transfer/cmake/CMakeTestGoCompiler.cmake @@ -0,0 +1,15 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +set(CMAKE_Go_COMPILER_WORKS 1 CACHE INTERNAL "") \ No newline at end of file diff --git a/cube/cube-transfer/cmake/golang.cmake b/cube/cube-transfer/cmake/golang.cmake new file mode 100644 index 0000000000000000000000000000000000000000..817d029d946bad8da4f4cf2785e68d062fc4cada --- /dev/null +++ b/cube/cube-transfer/cmake/golang.cmake @@ -0,0 +1,60 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +set(GOPATH "${CMAKE_CURRENT_LIST_DIR}/../") +file(MAKE_DIRECTORY ${GOPATH}) + +function(ExternalGoProject_Add TARG) + add_custom_target(${TARG} env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} get ${ARGN}) +endfunction(ExternalGoProject_Add) + +function(add_go_executable NAME) + file(GLOB GO_SOURCE RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "*.go") + add_custom_command(OUTPUT ${OUTPUT_DIR}/.timestamp + COMMAND env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} build + -o "${CMAKE_CURRENT_BINARY_DIR}/${NAME}" + ${CMAKE_GO_FLAGS} ${GO_SOURCE} + WORKING_DIRECTORY ${CMAKE_CURRENT_LIST_DIR}) + + add_custom_target(${NAME} ALL DEPENDS ${OUTPUT_DIR}/.timestamp ${ARGN}) + install(PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/${NAME} DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/bin) +endfunction(add_go_executable) + + +function(ADD_GO_LIBRARY NAME BUILD_TYPE) + if(BUILD_TYPE STREQUAL "STATIC") + set(BUILD_MODE -buildmode=c-archive) + set(LIB_NAME "lib${NAME}.a") + else() + set(BUILD_MODE -buildmode=c-shared) + if(APPLE) + set(LIB_NAME "lib${NAME}.dylib") + else() + set(LIB_NAME "lib${NAME}.so") + endif() + endif() + + file(GLOB GO_SOURCE RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "*.go") + add_custom_command(OUTPUT ${OUTPUT_DIR}/.timestamp + COMMAND env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} build ${BUILD_MODE} + -o "${CMAKE_CURRENT_BINARY_DIR}/${LIB_NAME}" + ${CMAKE_GO_FLAGS} ${GO_SOURCE} + WORKING_DIRECTORY ${CMAKE_CURRENT_LIST_DIR}) + + add_custom_target(${NAME} ALL DEPENDS ${OUTPUT_DIR}/.timestamp ${ARGN}) + + if(NOT BUILD_TYPE STREQUAL "STATIC") + install(PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/${LIB_NAME} DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/bin) + endif() +endfunction(ADD_GO_LIBRARY) \ No newline at end of file diff --git a/cube/cube-transfer/conf/transfer.conf b/cube/cube-transfer/conf/transfer.conf new file mode 100755 index 0000000000000000000000000000000000000000..c3184231f95d259de1adcfa8a20e136dc756a3b9 --- /dev/null +++ b/cube/cube-transfer/conf/transfer.conf @@ -0,0 +1,18 @@ +[default] +dict_name: test_dict +mode: base_only +storage_place: LOCAL +buildtool_local: /home/work/Serving/build/output/bin/cube-builder +donefile_address: http://127.0.0.1/home/work/dangyifei/donefile +output_address: /home/work/dangyifei/test-transfer/test_data/output +tmp_address: /home/work/dangyifei/test-transfer/test_data/tmp +shard_num: 1 +copy_num: 2 +deploy_path: /home/work/test_dict +transfer_address: 127.0.0.1 + +[cube_agent] +agent0_0: 127.0.0.1:8001 +cube0_0: 127.0.0.1:8000:/ssd2/cube_open +agent0_1: 127.0.0.1:8001 +cube0_1: 127.0.0.1:8000:/home/disk1/cube_open diff --git a/cube/cube-transfer/src/CMakeLists.txt b/cube/cube-transfer/src/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..62d3f7ef7759a0d2a09eb4fe32a064694ece5408 --- /dev/null +++ b/cube/cube-transfer/src/CMakeLists.txt @@ -0,0 +1,19 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +set(SOURCE_FILE cube-transfer.go) +add_go_executable(cube-transfer ${SOURCE_FILE}) +add_dependencies(cube-transfer docopt-go) +add_dependencies(cube-transfer rfw) +add_dependencies(cube-transfer logex) diff --git a/cube/cube-transfer/src/cube-transfer.go b/cube/cube-transfer/src/cube-transfer.go new file mode 100755 index 0000000000000000000000000000000000000000..3e63d8bbcce6d41524eb53788ae069b9418d698b --- /dev/null +++ b/cube/cube-transfer/src/cube-transfer.go @@ -0,0 +1,226 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "fmt" + "github.com/docopt/docopt-go" + "github.com/Badangel/logex" + "os" + "path/filepath" + "runtime" + "strconv" + "strings" + "time" + "transfer" + "transfer/dict" +) + +func main() { + runtime.GOMAXPROCS(runtime.NumCPU()) + transfer.Dir, _ = filepath.Abs(filepath.Dir(os.Args[0])) + + usage := fmt.Sprintf(`Usage: ./m_master [options] + +Options: + -p PORT set listen port. [default: 8099] + --config=conf/transfer.conf set conf file. [defalut: ./conf/transfer.conf] + +Log options: + -l LOG_LEVEL set log level, values: 0,1,2,4,8,16. [default: 4] + --log_dir=DIR set log output dir. [default: ./log] + --log_name=NAME set log name. [default: transfer]`, + transfer.Dir) + opts, err := docopt.Parse(usage, nil, true, "Cube Transfer", false) + if err != nil { + fmt.Println("ERROR:", err) + os.Exit(1) + } + log_level, _ := strconv.Atoi(opts["-l"].(string)) + log_name := opts["--log_name"].(string) + log_dir := opts["--log_dir"].(string) + logex.SetLevel(getLogLevel(log_level)) + if err := logex.SetUpFileLogger(log_dir, log_name, nil); err != nil { + fmt.Println("ERROR:", err) + } + + fmt.Printf("%v: Print stdout1 here...\n", time.Now()) + os.Stderr.WriteString(fmt.Sprintf("%v: Print stderr here...\n", time.Now())) + + logex.Notice("--- NEW SESSION -------------------------") + logex.Notice(">>> log_level:", log_level) + + // settings: + if opts["-p"] == nil { + fmt.Fprintln(os.Stderr, "ERROR: -p PORT must be set!") + fmt.Fprintln(os.Stderr, usage) + os.Exit(1) + } + transfer.Port = opts["-p"].(string) + logex.Notice(">>> port:", transfer.Port) + + if opts["--config"] == nil { + fmt.Fprintln(os.Stderr, "ERROR: --config config_file must be set!") + fmt.Fprintln(os.Stderr, usage) + os.Exit(1) + } + //read conf + var configMgr transfer.ConfigManager + configMgr.Init(opts["--config"].(string)) + transfer.Dict.DictName = configMgr.Read("default", "dict_name") + if transfer.Dict.DictName == "" { + fmt.Fprintln(os.Stderr, "ERROR: nead [default] DictName in config_file!") + fmt.Fprintln(os.Stderr, usage) + os.Exit(1) + } + logex.Notice(">>> DictName:", transfer.Dict.DictName) + + transfer.Dict.DictMode = configMgr.Read("default", "mode") + if transfer.Dict.DictMode == "" { + fmt.Fprintln(os.Stderr, "ERROR: nead [default] DictMode in config_file!") + fmt.Fprintln(os.Stderr, usage) + os.Exit(1) + } + logex.Notice(">>> Mode:", transfer.Dict.DictMode) + + transfer.Dict.StoragePlace = configMgr.Read("default", "storage_place") + if transfer.Dict.StoragePlace == "" || transfer.Dict.StoragePlace != "LOCAL" { + fmt.Fprintln(os.Stderr, "ERROR: nead [default] StoragePlace in config_file! only support Local") + fmt.Fprintln(os.Stderr, usage) + os.Exit(1) + } + logex.Notice(">>> StoragePlace:", transfer.Dict.StoragePlace) + + transfer.BuildToolLocal = configMgr.Read("default", "buildtool_local") + if transfer.BuildToolLocal == "" { + fmt.Fprintln(os.Stderr, "ERROR: nead [default] BuildToolLocal in config_file!") + fmt.Fprintln(os.Stderr, usage) + os.Exit(1) + } + logex.Notice(">>> BuildToolLocal:", transfer.BuildToolLocal) + + transfer.Dict.DonefileAddress = configMgr.Read("default", "donefile_address") + if transfer.Dict.DonefileAddress == "" { + fmt.Fprintln(os.Stderr, "ERROR: nead [default] DonefileAddress in config_file!") + fmt.Fprintln(os.Stderr, usage) + os.Exit(1) + } + logex.Notice(">>> DonefileAddress:", transfer.Dict.DonefileAddress) + + transfer.Dict.OutputAddress = configMgr.Read("default", "output_address") + if transfer.Dict.OutputAddress == "" { + fmt.Fprintln(os.Stderr, "ERROR: nead [default] OutputAddress in config_file!") + fmt.Fprintln(os.Stderr, usage) + os.Exit(1) + } + logex.Notice(">>> OutputAddress:", transfer.Dict.OutputAddress) + + transfer.Dict.TmpAddress = configMgr.Read("default", "tmp_address") + if transfer.Dict.TmpAddress == "" { + fmt.Fprintln(os.Stderr, "ERROR: nead [default] TmpAddress in config_file!") + fmt.Fprintln(os.Stderr, usage) + os.Exit(1) + } + logex.Notice(">>> TmpAddress:", transfer.Dict.TmpAddress) + + ShardNumStr := configMgr.Read("default", "shard_num") + if ShardNumStr == "" { + fmt.Fprintln(os.Stderr, "ERROR: nead [default] ShardNum in config_file!") + fmt.Fprintln(os.Stderr, usage) + os.Exit(1) + } + transfer.Dict.ShardNum, err = strconv.Atoi(ShardNumStr) + if err != nil { + logex.Fatal("ShardNum form is not right") + os.Exit(1) + } + logex.Notice(">>> ShardNum:", transfer.Dict.ShardNum) + + CopyNumStr := configMgr.Read("default", "copy_num") + if CopyNumStr == "" { + fmt.Fprintln(os.Stderr, "ERROR: nead [default] CopyNum in config_file!") + fmt.Fprintln(os.Stderr, usage) + os.Exit(1) + } + transfer.Dict.CopyNum, err = strconv.Atoi(CopyNumStr) + if err != nil { + logex.Fatal("ShardNum form is not right") + os.Exit(1) + } + logex.Notice(">>> CopyNum:", transfer.Dict.CopyNum) + + transfer.Dict.InstancesNum = transfer.Dict.ShardNum * transfer.Dict.CopyNum + + transfer.Dict.DeployPath = configMgr.Read("default", "deploy_path") + if transfer.Dict.DeployPath == "" { + fmt.Fprintln(os.Stderr, "ERROR: nead [default] DeployPath in config_file!") + fmt.Fprintln(os.Stderr, usage) + os.Exit(1) + } + logex.Notice(">>> DeployPath:", transfer.Dict.DeployPath) + + transfer.TransferAddr = configMgr.Read("default", "transfer_address") + if transfer.TransferAddr == "" { + fmt.Fprintln(os.Stderr, "ERROR: nead [default] TransferAddr in config_file!") + fmt.Fprintln(os.Stderr, usage) + os.Exit(1) + } + logex.Notice(">>> TransferAddr:", transfer.TransferAddr) + + for i := 0; i < transfer.Dict.ShardNum; i++ { + for j := 0; j < transfer.Dict.CopyNum; j++ { + var instance dict.DictInstance + agentName := fmt.Sprintf("agent%d_%d", i, j) + agentInfo := configMgr.Read("cube_agent", agentName) + agentInfoSlice := strings.Split(agentInfo, ":") + cubeName := fmt.Sprintf("cube%d_%d", i, j) + cubeInfo := configMgr.Read("cube_agent", cubeName) + cubeInfoSlice := strings.Split(cubeInfo, ":") + instance.DictName = transfer.Dict.DictName + instance.AgentIp = agentInfoSlice[0] + instance.AgentPort, _ = strconv.Atoi(agentInfoSlice[1]) + instance.IP = cubeInfoSlice[0] + instance.Port, _ = strconv.Atoi(cubeInfoSlice[1]) + instance.DictName = transfer.Dict.DictName + instance.CreateTime = int(time.Now().Unix()) + instance.Shard = i + instance.DeployPath = cubeInfoSlice[2] + transfer.Dict.Instances = append(transfer.Dict.Instances, instance) + } + } + logex.Noticef(">>> instance: %v", transfer.Dict.Instances) + + transfer.Start() + fmt.Print("m-cube-transfer over!") + +} + +func getLogLevel(log_level int) logex.Level { + switch log_level { + case 16: + return logex.DEBUG + case 8: + return logex.TRACE + case 4: + return logex.NOTICE + case 2: + return logex.WARNING + case 1: + return logex.FATAL + case 0: + return logex.NONE + } + return logex.DEBUG +} diff --git a/cube/cube-transfer/src/transfer/builder.go b/cube/cube-transfer/src/transfer/builder.go new file mode 100755 index 0000000000000000000000000000000000000000..cfc519ffeecff5c5e0ecebfb540cdb49570f037f --- /dev/null +++ b/cube/cube-transfer/src/transfer/builder.go @@ -0,0 +1,97 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transfer + +import ( + "fmt" + "github.com/Badangel/logex" + "strconv" + "strings" + "time" + "transfer/dict" +) + +func BuilderStart(versionInfo dict.DictVersionInfo) error { + fmt.Printf("[entry]start build\n") + logex.Noticef("[entry]start build") + if strings.HasPrefix(Dict.DonefileAddress, dict.FTP_HEADER) || strings.HasPrefix(Dict.DonefileAddress, dict.HTTP_HEADER) { + if strings.HasPrefix(Dict.DonefileAddress, dict.FTP_HEADER){ + s1 := strings.Replace(Dict.DonefileAddress, dict.FTP_HEADER,"",1) + index:=strings.Index(s1,"/") + versionInfo.Input = dict.FTP_HEADER + s1[0 : index] + versionInfo.Input + } + if strings.HasPrefix(Dict.DonefileAddress, dict.HTTP_HEADER){ + s1 := strings.Replace(Dict.DonefileAddress, dict.HTTP_HEADER,"",1) + index:=strings.Index(s1,"/") + versionInfo.Input = dict.HTTP_HEADER + s1[0 : index] + versionInfo.Input + } + + localInputPath := Dict.TmpAddress + "/../input/" + Dict.DictName + "_" + strconv.Itoa(versionInfo.Version) + "_" + strconv.Itoa(versionInfo.Depend) + Wget(versionInfo.Input, localInputPath) + versionInfo.Input = localInputPath + } + Dict.WaitVersionInfo.Output = BuildIndex(versionInfo) + InitAllInstances() + + return nil +} + +func BuildIndex(versionInfo dict.DictVersionInfo) string { + versionStr := strconv.Itoa(versionInfo.Version) + dependStr := strconv.Itoa(versionInfo.Depend) + + var params []string + params = append(params, "-dict_name="+Dict.DictName) + params = append(params, "-job_mode="+Dict.WaitVersionInfo.Mode) + curlen := len(Dict.CurrentVersionInfo) + lastVersion := "0" + if curlen > 0 && Dict.WaitVersionInfo.Key == Dict.CurrentVersionInfo[curlen-1].Key { + lastVersion = strconv.Itoa(Dict.CurrentVersionInfo[curlen-1].Version) + } + + shardNum := strconv.Itoa(Dict.ShardNum) + params = append(params, "-last_version="+lastVersion) + params = append(params, "-cur_version="+versionStr) + params = append(params, "-depend_version="+dependStr) + params = append(params, "-input_path="+versionInfo.Input) + params = append(params, "-output_path="+Dict.OutputAddress) + params = append(params, "-shard_num="+shardNum) + params = append(params, "-master_address="+TransferAddr+":"+Port) + params = append(params, "-only_build=false") + err := ExeCommad(BuildToolLocal, params) + if err != nil { + fmt.Printf("build exe: %v\n", err) + logex.Noticef("[builder] exe cmd err: %v", err) + } + + outPut := Dict.OutputAddress + "/" + dependStr + "_" + versionStr + return outPut +} + +func InitAllInstances() { + for i, _ := range Dict.Instances { + Dict.Instances[i].Status = dict.Instance_Status_Init + Dict.Instances[i].BuildedTime = int(time.Now().Unix()) + Dict.Instances[i].DownloadStartTime = 0 + Dict.Instances[i].DownloadedTime = 0 + Dict.Instances[i].ReloadStartTime = 0 + Dict.Instances[i].ReloadedTime = 0 + Dict.Instances[i].EnablStartTime = 0 + Dict.Instances[i].EnabledTime = 0 + } + Dict.DownloadSuccInsts = 0 + Dict.ReloadSuccInsts = 0 + Dict.EnableSuccInsts = 0 +} diff --git a/cube/cube-transfer/src/transfer/config.go b/cube/cube-transfer/src/transfer/config.go new file mode 100755 index 0000000000000000000000000000000000000000..174dab383d189584c7b647bd40b28a3fe0148da8 --- /dev/null +++ b/cube/cube-transfer/src/transfer/config.go @@ -0,0 +1,112 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transfer + +import ( + "bufio" + "io" + "os" + "strings" +) + +const middle = "->" + +type ConfigManager struct { + Mymap map[string]string + strcet string +} + +func (c *ConfigManager) Init(path string) { + c.Mymap = make(map[string]string) + + f, err := os.Open(path) + if err != nil { + panic(err) + } + defer f.Close() + + r := bufio.NewReader(f) + for { + b, _, err := r.ReadLine() + if err != nil { + if err == io.EOF { + break + } + panic(err) + } + + s := strings.TrimSpace(string(b)) + if strings.Index(s, "#") == 0 { + continue + } + + n1 := strings.Index(s, "[") + n2 := strings.LastIndex(s, "]") + if n1 > -1 && n2 > -1 && n2 > n1+1 { + c.strcet = strings.TrimSpace(s[n1+1 : n2]) + continue + } + + if len(c.strcet) == 0 { + continue + } + index := strings.Index(s, ":") + if index < 0 { + continue + } + + frist := strings.TrimSpace(s[:index]) + if len(frist) == 0 { + continue + } + second := strings.TrimSpace(s[index+1:]) + + pos := strings.Index(second, "\t#") + if pos > -1 { + second = second[0:pos] + } + + pos = strings.Index(second, " #") + if pos > -1 { + second = second[0:pos] + } + + pos = strings.Index(second, "\t//") + if pos > -1 { + second = second[0:pos] + } + + pos = strings.Index(second, " //") + if pos > -1 { + second = second[0:pos] + } + + if len(second) == 0 { + continue + } + + key := c.strcet + middle + frist + c.Mymap[key] = strings.TrimSpace(second) + } +} + +func (c ConfigManager) Read(node, key string) string { + key = node + middle + key + v, found := c.Mymap[key] + if !found { + return "" + } + return v +} \ No newline at end of file diff --git a/cube/cube-transfer/src/transfer/deployer.go b/cube/cube-transfer/src/transfer/deployer.go new file mode 100755 index 0000000000000000000000000000000000000000..6c4b1ca8b42b5e13556d8ad71c43b26bfdb0cf14 --- /dev/null +++ b/cube/cube-transfer/src/transfer/deployer.go @@ -0,0 +1,203 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transfer + +import ( + "fmt" + "github.com/Badangel/logex" + "strconv" + "time" + "transfer/dict" +) + +func DeployStart(versionInfo dict.DictVersionInfo) error { + fmt.Printf("[entry]start deploy\n") + logex.Noticef("[entry]start deploy") + var err error + for { + switch Dict.WaitVersionInfo.Status { + case dict.Dict_Status_Deploying, dict.Dict_Status_Downloading: + CmdInstsDownload() + case dict.Dict_Status_Download_Succ, dict.Dict_Status_Reloading: + CmdInstsReload() + case dict.Dict_Status_Reload_Succ, dict.Dict_Status_Enabling: + CmdInstsEnable() + default: + logex.Noticef("dict status %v", Dict.WaitVersionInfo.Status) + } + if Dict.WaitVersionInfo.Status == dict.Dict_Status_Finished { + Dict.WaitVersionInfo.FinishTime = int(time.Now().Unix()) + Dict.CurrentVersionInfo = append(Dict.CurrentVersionInfo, Dict.WaitVersionInfo) + fmt.Printf("[deploy finish]version:%v\n", Dict.WaitVersionInfo) + logex.Noticef("[deploy finish]version:%v", Dict.WaitVersionInfo) + var newVersionInfo dict.DictVersionInfo + Dict.WaitVersionInfo = newVersionInfo + WriteCurrentVersionInfoToFile() + WriteWaitVersionInfoToFile() + break + } else { + time.Sleep(time.Duration(10) * time.Second) + } + } + return err +} + +func CmdInstsDownload() { + for { + chs := make([]chan error, len(Dict.Instances)) + keyAndRespSlice := make([]dict.CubeAgentResponse, len(Dict.Instances)) + for i, inst := range Dict.Instances { + if inst.Status != dict.Instance_Status_Download_Succ { + chs[i] = make(chan error) + var json_params dict.CubeAgentRequest + json_params.Command = dict.DOWNLOAD + json_params.DictName = inst.DictName + json_params.DeployPath = inst.DeployPath + json_params.Version = strconv.Itoa(Dict.WaitVersionInfo.Version) + json_params.Depend = strconv.Itoa(Dict.WaitVersionInfo.Depend) + json_params.Id = strconv.Itoa(Dict.WaitVersionInfo.Id) + json_params.Key = strconv.Itoa(Dict.WaitVersionInfo.Key) + json_params.Mode = Dict.WaitVersionInfo.Mode + json_params.ShardSeq = inst.Shard + json_params.Port = strconv.Itoa(inst.Port) + json_params.Source = dict.GetFileHead(Dict.StoragePlace, TransferAddr) + Dict.WaitVersionInfo.Output + "/" + json_params.DictName + "_part" + strconv.Itoa(inst.Shard) + ".tar" + var address = fmt.Sprintf("http://%v:%v/agent/cmd", inst.AgentIp, inst.AgentPort) + logex.Noticef("[download cmd]%v:%v", address, json_params) + go nonBlockSendJsonReq("POST2", address, 120, &json_params, &keyAndRespSlice[i], chs[i]) + Dict.Instances[i].DownloadStartTime = int(time.Now().Unix()) + Dict.Instances[i].Mode = Dict.WaitVersionInfo.Mode + } + } + for i, inst := range Dict.Instances { + err := <-chs[i] + logex.Noticef("[instance resp]download:%v", Dict.Instances) + if err != nil || keyAndRespSlice[i].Success != "0" { + logex.Warningf("cmd cube online downlaod of %v:%v, shard:%v failed", inst.AgentIp, inst.AgentPort, inst.Shard) + continue + } + if inst.Status < dict.Instance_Status_Download_Succ { + Dict.Instances[i].Status = dict.Instance_Status_Download_Succ + Dict.Instances[i].DownloadedTime = int(time.Now().Unix()) + Dict.DownloadSuccInsts++ + } + } + if Dict.DownloadSuccInsts == Dict.InstancesNum { + Dict.WaitVersionInfo.Status = dict.Dict_Status_Download_Succ + fmt.Printf("[all download ok]inst :%v\n", Dict.Instances) + logex.Noticef("[all download ok]inst :%v", Dict.Instances) + break + } + time.Sleep(5 * time.Second) + } +} + +func CmdInstsReload() { + for { + chs := make([]chan error, len(Dict.Instances)) + keyAndRespSlice := make([]dict.CubeAgentResponse, len(Dict.Instances)) + for i, inst := range Dict.Instances { + if inst.Status != dict.Instance_Status_Reload_Succ { + chs[i] = make(chan error) + var json_params dict.CubeAgentRequest + json_params.Command = dict.RELOAD + json_params.DictName = inst.DictName + json_params.DeployPath = inst.DeployPath + json_params.Version = strconv.Itoa(Dict.WaitVersionInfo.Version) + json_params.Depend = strconv.Itoa(Dict.WaitVersionInfo.Depend) + json_params.Id = strconv.Itoa(Dict.WaitVersionInfo.Id) + json_params.Key = strconv.Itoa(Dict.WaitVersionInfo.Key) + json_params.Mode = Dict.WaitVersionInfo.Mode + json_params.ShardSeq = inst.Shard + json_params.Port = strconv.Itoa(inst.Port) + json_params.Source = dict.GetFileHead(Dict.StoragePlace, TransferAddr) + Dict.WaitVersionInfo.Output + "/" + json_params.DictName + "_part" + strconv.Itoa(inst.Shard) + ".tar" + + var address = fmt.Sprintf("http://%v:%v/agent/cmd", inst.AgentIp, inst.AgentPort) + logex.Noticef("[reload cmd]%v:%v", address, json_params) + go nonBlockSendJsonReq("POST2", address, 120, &json_params, &keyAndRespSlice[i], chs[i]) + Dict.Instances[i].ReloadStartTime = int(time.Now().Unix()) + } + } + for i, inst := range Dict.Instances { + err := <-chs[i] + logex.Noticef("[instance resp]reload:%v", Dict.Instances) + if err != nil || keyAndRespSlice[i].Success != "0" { + logex.Warningf("cmd cube online reload of %v:%v, shard:%v failed", inst.AgentIp, inst.AgentPort, inst.Shard) + continue + } + if inst.Status < dict.Instance_Status_Reload_Succ { + Dict.Instances[i].Status = dict.Instance_Status_Reload_Succ + Dict.Instances[i].ReloadedTime = int(time.Now().Unix()) + Dict.ReloadSuccInsts++ + } + } + if Dict.ReloadSuccInsts == Dict.InstancesNum { + Dict.WaitVersionInfo.Status = dict.Dict_Status_Reload_Succ + fmt.Printf("[all reload ok]inst:%v\n", Dict.Instances) + logex.Noticef("[all reload ok]inst :%v", Dict.Instances) + break + } + time.Sleep(5 * time.Second) + } +} + +func CmdInstsEnable() { + for { + chs := make([]chan error, len(Dict.Instances)) + keyAndRespSlice := make([]dict.CubeAgentResponse, len(Dict.Instances)) + for i, inst := range Dict.Instances { + if inst.Status != dict.Instance_Status_Enable_Succ { + chs[i] = make(chan error) + var json_params dict.CubeAgentRequest + json_params.Command = dict.ENABLE + json_params.DictName = inst.DictName + json_params.DeployPath = inst.DeployPath + json_params.Version = strconv.Itoa(Dict.WaitVersionInfo.Version) + json_params.Depend = strconv.Itoa(Dict.WaitVersionInfo.Depend) + json_params.Id = strconv.Itoa(Dict.WaitVersionInfo.Id) + json_params.Key = strconv.Itoa(Dict.WaitVersionInfo.Key) + json_params.Mode = Dict.WaitVersionInfo.Mode + json_params.ShardSeq = inst.Shard + json_params.Port = strconv.Itoa(inst.Port) + json_params.Source = dict.GetFileHead(Dict.StoragePlace, TransferAddr) + Dict.WaitVersionInfo.Output + "/" + json_params.DictName + "_part" + strconv.Itoa(inst.Shard) + ".tar" + + var address = fmt.Sprintf("http://%v:%v/agent/cmd", inst.AgentIp, inst.AgentPort) + logex.Noticef("[enable cmd]%v:%v", address, json_params) + go nonBlockSendJsonReq("POST2", address, 120, &json_params, &keyAndRespSlice[i], chs[i]) + Dict.Instances[i].EnablStartTime = int(time.Now().Unix()) + } + } + for i, inst := range Dict.Instances { + err := <-chs[i] + logex.Noticef("[instance resp]enable:%v", Dict.Instances) + if err != nil || keyAndRespSlice[i].Success != "0" { + logex.Warningf("cmd cube online enable of %v:%v, shard:%v failed", inst.AgentIp, inst.AgentPort, inst.Shard) + continue + } + if inst.Status < dict.Instance_Status_Enable_Succ { + Dict.Instances[i].Status = dict.Instance_Status_Enable_Succ + Dict.Instances[i].EnabledTime = int(time.Now().Unix()) + Dict.EnableSuccInsts++ + } + } + if Dict.EnableSuccInsts == Dict.InstancesNum { + Dict.WaitVersionInfo.Status = dict.Dict_Status_Finished + fmt.Printf("[all enable ok]inst :%v\n", Dict.Instances) + logex.Noticef("[all enable ok]inst :%v", Dict.Instances) + + break + } + time.Sleep(5 * time.Second) + } +} diff --git a/cube/cube-transfer/src/transfer/dict/cube_agent_server.go b/cube/cube-transfer/src/transfer/dict/cube_agent_server.go new file mode 100755 index 0000000000000000000000000000000000000000..2b81e069dcf1db400f4ef850128e07afcd2be351 --- /dev/null +++ b/cube/cube-transfer/src/transfer/dict/cube_agent_server.go @@ -0,0 +1,41 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dict + +type CubeAgentRequest struct { + Command string `json:"command"` + DictName string `json:"dict_name"` + DeployPath string `json:"deploy_path"` + Version string `json:"version"` + Depend string `json:"depend"` + Id string `json:"id"` + Key string `json:"key"` + Mode string `json:"mode"` + ShardSeq int `json:"shard_seq"` + Source string `json:"source"` + Service string `json:"service,omitempty"` + SlotIdList string `json:"slot_id_list,omitempty"` + ActiveVersionList string `json:"active_version_list,omitempty"` + Port string `json:"port,omitempty"` + VersionSign string `json:"version_sign,omitempty"` +} + + +type CubeAgentResponse struct { + Success string `json:"success"` + Message string `json:"message"` + Data string `json:"data"` +} + diff --git a/cube/cube-transfer/src/transfer/dict/define.go b/cube/cube-transfer/src/transfer/dict/define.go new file mode 100755 index 0000000000000000000000000000000000000000..fa7ecf7d26bd34c9de45ee01a47d04f9e4ec743b --- /dev/null +++ b/cube/cube-transfer/src/transfer/dict/define.go @@ -0,0 +1,273 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dict + +import "errors" + +var ( + // Dict Mode + BASE_ONLY = "base_only" + BASR_DELTA = "base_delta" + + // Deploy Mode + BASE = "base" + DELTA = "delta" + + // Succ or Failed Status + SUCC = "succ" + FAILED = "failed" + + //command + DOWNLOAD = "download" + RELOAD = "reload" + ENABLE = "enable" + + FTP_HEADER = "ftp://" + HTTP_HEADER = "http://" +) + +// Dict Status +type DictStatus int + +const ( + // Dict Status + //clear状态编码参考InstanceStatus + Dict_Status_Clearing DictStatus = 1 + Dict_Status_Cleared DictStatus = 2 + Dict_Status_Trigging DictStatus = 10 + Dict_Status_Building DictStatus = 20 + Dict_Status_Deploying DictStatus = 30 + Dict_Status_Downloading DictStatus = 40 + Dict_Status_Download_Succ DictStatus = 50 + Dict_Status_Reloading DictStatus = 60 + Dict_Status_Reload_Succ DictStatus = 70 + Dict_Status_Enabling DictStatus = 80 + Dict_Status_Finished DictStatus = 90 + Dict_Status_Restarting DictStatus = 100 +) + +func (this DictStatus) String() DictStatusStr { + switch this { + case Dict_Status_Trigging: + return Dict_Status_Trigging_Str + case Dict_Status_Building: + return Dict_Status_Building_Str + case Dict_Status_Deploying: + return Dict_Status_Deploying_Str + case Dict_Status_Downloading: + return Dict_Status_Downloading_Str + case Dict_Status_Download_Succ: + return Dict_Status_Download_Succ_Str + case Dict_Status_Reloading: + return Dict_Status_Reloading_Str + case Dict_Status_Reload_Succ: + return Dict_Status_Reload_Succ_Str + case Dict_Status_Enabling: + return Dict_Status_Enabling_Str + case Dict_Status_Finished: + return Dict_Status_Finished_Str + case Dict_Status_Restarting: + return Dict_Status_Restarting_Str + case Dict_Status_Clearing: + return Dict_Status_Clearing_Str + case Dict_Status_Cleared: + return Dict_Status_Cleared_Str + default: + return "" + } +} + +type DictStatusStr string + +const ( + // Dict Status + Dict_Status_Trigging_Str DictStatusStr = "Trigging" + Dict_Status_Building_Str DictStatusStr = "Building" + Dict_Status_Deploying_Str DictStatusStr = "deploying" + Dict_Status_Downloading_Str DictStatusStr = "downloading" + Dict_Status_Download_Succ_Str DictStatusStr = "download_succ" + Dict_Status_Reloading_Str DictStatusStr = "reloading" + Dict_Status_Reload_Succ_Str DictStatusStr = "reload_succ" + Dict_Status_Enabling_Str DictStatusStr = "enabling" + Dict_Status_Finished_Str DictStatusStr = "finished" + Dict_Status_Restarting_Str DictStatusStr = "restarting" + Dict_Status_Clearing_Str DictStatusStr = "clearing" + Dict_Status_Cleared_Str DictStatusStr = "cleared" +) + +func (this DictStatusStr) Int() (DictStatus, error) { + switch this { + case Dict_Status_Trigging_Str: + return Dict_Status_Trigging, nil + case Dict_Status_Building_Str: + return Dict_Status_Building, nil + case Dict_Status_Deploying_Str: + return Dict_Status_Deploying, nil + case Dict_Status_Downloading_Str: + return Dict_Status_Downloading, nil + case Dict_Status_Download_Succ_Str: + return Dict_Status_Download_Succ, nil + case Dict_Status_Reloading_Str: + return Dict_Status_Reloading, nil + case Dict_Status_Reload_Succ_Str: + return Dict_Status_Reload_Succ, nil + case Dict_Status_Enabling_Str: + return Dict_Status_Enabling, nil + case Dict_Status_Finished_Str: + return Dict_Status_Finished, nil + case Dict_Status_Restarting_Str: + return Dict_Status_Restarting, nil + case Dict_Status_Clearing_Str: + return Dict_Status_Clearing, nil + case Dict_Status_Cleared_Str: + return Dict_Status_Cleared, nil + default: + return 0, errors.New("invalid dict status") + } +} + +// Instance Status: +type InstanceStatus int + +const ( + //各种状态都有可能进入clear状态,因此clear相关的状态都小于init状态 + Instance_Status_Clear InstanceStatus = 1 + Instance_Status_Clearing InstanceStatus = 2 + Instance_Status_Clear_Failed InstanceStatus = 3 + Instance_Status_Clear_Succ InstanceStatus = 4 + Instance_Status_Init InstanceStatus = 10 + Instance_Status_Downloading InstanceStatus = 20 + Instance_Status_Download_Failed InstanceStatus = 30 + Instance_Status_Download_Succ InstanceStatus = 40 + Instance_Status_Reloading InstanceStatus = 50 + Instance_Status_Reload_Failed InstanceStatus = 60 + Instance_Status_Reload_Succ InstanceStatus = 70 + Instance_Status_Enabling InstanceStatus = 80 + Instance_Status_Enable_Failed InstanceStatus = 90 + Instance_Status_Enable_Succ InstanceStatus = 100 + Instance_Status_Poping InstanceStatus = 110 + Instance_Status_Pop_Failed InstanceStatus = 120 + Instance_Status_Pop_Succ InstanceStatus = 130 + Instance_Status_Dead InstanceStatus = 250 +) + +func (this InstanceStatus) String() InstanceStatusStr { + switch this { + case Instance_Status_Init: + return Instance_Status_Init_Str + case Instance_Status_Downloading: + return Instance_Status_Downloading_Str + case Instance_Status_Download_Failed: + return Instance_Status_Download_Failed_Str + case Instance_Status_Download_Succ: + return Instance_Status_Download_Succ_Str + case Instance_Status_Reloading: + return Instance_Status_Reloading_Str + case Instance_Status_Reload_Failed: + return Instance_Status_Reload_Failed_Str + case Instance_Status_Reload_Succ: + return Instance_Status_Reload_Succ_Str + case Instance_Status_Enabling: + return Instance_Status_Enabling_Str + case Instance_Status_Enable_Failed: + return Instance_Status_Enable_Failed_Str + case Instance_Status_Enable_Succ: + return Instance_Status_Enable_Succ_Str + case Instance_Status_Dead: + return Instance_Status_Dead_Str + case Instance_Status_Clear: + return Instance_Status_Clear_Str + case Instance_Status_Clearing: + return Instance_Status_Clearing_Str + case Instance_Status_Clear_Failed: + return Instance_Status_Clear_Failed_Str + case Instance_Status_Clear_Succ: + return Instance_Status_Clear_Succ_Str + case Instance_Status_Poping: + return Instance_Status_Poping_Str + case Instance_Status_Pop_Failed: + return Instance_Status_Pop_Failed_Str + case Instance_Status_Pop_Succ: + return Instance_Status_Pop_Succ_Str + default: + return "" + } +} + +type InstanceStatusStr string + +const ( + Instance_Status_Init_Str InstanceStatusStr = "init" + Instance_Status_Downloading_Str InstanceStatusStr = "downloading" + Instance_Status_Download_Failed_Str InstanceStatusStr = "download_failed" + Instance_Status_Download_Succ_Str InstanceStatusStr = "download_succ" + Instance_Status_Reloading_Str InstanceStatusStr = "reloading" + Instance_Status_Reload_Failed_Str InstanceStatusStr = "finish_reload_failed" + Instance_Status_Reload_Succ_Str InstanceStatusStr = "finish_reload_succ" + Instance_Status_Enabling_Str InstanceStatusStr = "enabling" + Instance_Status_Enable_Failed_Str InstanceStatusStr = "enable_failed" + Instance_Status_Enable_Succ_Str InstanceStatusStr = "enable_succ" + Instance_Status_Dead_Str InstanceStatusStr = "dead" + Instance_Status_Clear_Str InstanceStatusStr = "clear" + Instance_Status_Clearing_Str InstanceStatusStr = "clearing" + Instance_Status_Clear_Failed_Str InstanceStatusStr = "clear_failed" + Instance_Status_Clear_Succ_Str InstanceStatusStr = "clear_succ" + Instance_Status_Poping_Str InstanceStatusStr = "poping" + Instance_Status_Pop_Failed_Str InstanceStatusStr = "pop_failed" + Instance_Status_Pop_Succ_Str InstanceStatusStr = "pop_succ" +) + +func (this InstanceStatusStr) Int() (InstanceStatus, error) { + switch this { + case Instance_Status_Init_Str: + return Instance_Status_Init, nil + case Instance_Status_Downloading_Str: + return Instance_Status_Downloading, nil + case Instance_Status_Download_Failed_Str: + return Instance_Status_Download_Failed, nil + case Instance_Status_Download_Succ_Str: + return Instance_Status_Download_Succ, nil + case Instance_Status_Reloading_Str: + return Instance_Status_Reloading, nil + case Instance_Status_Reload_Failed_Str: + return Instance_Status_Reload_Failed, nil + case Instance_Status_Reload_Succ_Str: + return Instance_Status_Reload_Succ, nil + case Instance_Status_Enabling_Str: + return Instance_Status_Enabling, nil + case Instance_Status_Enable_Failed_Str: + return Instance_Status_Enable_Failed, nil + case Instance_Status_Enable_Succ_Str: + return Instance_Status_Enable_Succ, nil + case Instance_Status_Dead_Str: + return Instance_Status_Dead, nil + case Instance_Status_Clear_Str: + return Instance_Status_Clear, nil + case Instance_Status_Clearing_Str: + return Instance_Status_Clearing, nil + case Instance_Status_Clear_Failed_Str: + return Instance_Status_Clear_Failed, nil + case Instance_Status_Clear_Succ_Str: + return Instance_Status_Clear_Succ, nil + case Instance_Status_Poping_Str: + return Instance_Status_Poping, nil + case Instance_Status_Pop_Failed_Str: + return Instance_Status_Pop_Failed, nil + case Instance_Status_Pop_Succ_Str: + return Instance_Status_Pop_Succ, nil + default: + return 0, errors.New("invalid instance status") + } +} \ No newline at end of file diff --git a/cube/cube-transfer/src/transfer/dict/dict_info.go b/cube/cube-transfer/src/transfer/dict/dict_info.go new file mode 100755 index 0000000000000000000000000000000000000000..5e0bbbcf5a37600b5284a0bf5b95d50566615270 --- /dev/null +++ b/cube/cube-transfer/src/transfer/dict/dict_info.go @@ -0,0 +1,34 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dict + +type DictInfo struct { + DictName string `json:"dict_name"` + DictMode string `json:"dict_mode"` + ShardNum int `json:"shard_num"` + CopyNum int `json:"copy_num"` + InstancesNum int `json:"inst_num"` + DeployPath string `json:"deploy_path"` + DonefileAddress string `json:"donefile_addr"` + OutputAddress string `json:"output_addr"` + TmpAddress string `json:"tmp_addr"` + StoragePlace string `json:"storage_place"` + DownloadSuccInsts int `json:"download_inst"` + ReloadSuccInsts int `json:"reload_insts"` + EnableSuccInsts int `json:"enable_insts"` + Instances []DictInstance `json:"instances"` + WaitVersionInfo DictVersionInfo `json:"wait_version_info"` + CurrentVersionInfo []DictVersionInfo `json:"current_version_info"` +} \ No newline at end of file diff --git a/cube/cube-transfer/src/transfer/dict/dict_instance_status.go b/cube/cube-transfer/src/transfer/dict/dict_instance_status.go new file mode 100755 index 0000000000000000000000000000000000000000..8f7181d8a108677f43e8db0739a1f28e5aa90642 --- /dev/null +++ b/cube/cube-transfer/src/transfer/dict/dict_instance_status.go @@ -0,0 +1,42 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dict + +type DictInstance struct { + DictName string `json:"dict_name"` + Mode string `json:"mode"` + Version int `json:"version"` + Depend int `json:"depend"` + Id int `json:"id"` + Key int `json:"key"` + Shard int `json:"shard"` + Source string `json:"source"` + DeployPath string `json:"deploy_path"` + IP string `json:"ip"` + Port int `json:"port"` + AgentIp string `json:"agent_ip"` + AgentPort int `json:"agent_port"` + Status InstanceStatus `json:"status_id"` + StatusStr InstanceStatusStr `json:"status"` + BuildedTime int `json:"builded_time"` + DownloadStartTime int `json:"download_start_time"` + DownloadedTime int `json:"downloaded_time"` + ReloadStartTime int `json:"reload_start_time"` + ReloadedTime int `json:"reloaded_time"` + EnablStartTime int `json:"enable_start_time"` + EnabledTime int `json:"enabled_time"` + CreateTime int `json:"create_time"` +} + diff --git a/cube/cube-transfer/src/transfer/dict/dict_shard_info.go b/cube/cube-transfer/src/transfer/dict/dict_shard_info.go new file mode 100755 index 0000000000000000000000000000000000000000..0c42a4bbbc6825a5330116db451b3a49d14c7ce3 --- /dev/null +++ b/cube/cube-transfer/src/transfer/dict/dict_shard_info.go @@ -0,0 +1,59 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dict + +import ( + "strconv" +) + +type DictShardInfo struct { + Name string `json:"name"` + Version string `json:"version"` + Depend string `json:"depend"` + Id string `json:"id"` + Key string `json:"key"` + Shard int `json:"shard"` + Mode string `json:"mode"` + DictMode string `json:"dict_mode,omitempty"` + Source string `json:"data_source"` + Service string `json:"service,omitempty"` + DeltaInfo string `json:"delta_info,omitempty"` + BuildedTime int `json:"builded_time,omitempty"` + BuildedTimeStr string `json:"build_finish_time,omitempty"` + CreateTime int `json:"create_time,omitempty"` + IsActive bool `json:"is_active,omitempty"` +} + +func GetDictShardScaler(shard int, dictVersionInfo DictVersionInfo, storagePlace string, transferaddr string)(info DictShardInfo){ + info.Name = dictVersionInfo.DictName + info.Version = strconv.Itoa(dictVersionInfo.Version) + info.Depend = strconv.Itoa(dictVersionInfo.Depend) + info.Id = strconv.Itoa(dictVersionInfo.Id) + info.Key = strconv.Itoa(dictVersionInfo.Key) + info.Mode = dictVersionInfo.Mode + info.Shard = shard + info.Source = GetFileHead(storagePlace, transferaddr) + dictVersionInfo.Output+ "/" + info.Version + "/" + info.Name + "_part" + strconv.Itoa(shard) + ".tar" + return +} + + +func GetFileHead(storagePlace string, transferaddr string) string { + if storagePlace == "LOCAL"{ + return "ftp://" + transferaddr + } else { + return "" + } + +} \ No newline at end of file diff --git a/cube/cube-transfer/src/transfer/dict/dict_version_info.go b/cube/cube-transfer/src/transfer/dict/dict_version_info.go new file mode 100755 index 0000000000000000000000000000000000000000..f835b6bcfb11199fcb23573f0c569277bd82ca51 --- /dev/null +++ b/cube/cube-transfer/src/transfer/dict/dict_version_info.go @@ -0,0 +1,53 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dict + +type ( + DictVersionInfo struct { + DictName string `json:"dict_name"` + Version int `json:"version"` + Depend int `json:"depend"` + Id int `json:"id"` + Key int `json:"key"` + Mode string `json:"mode"` + Input string `json:"input"` + Output string `json:"output"` + Status DictStatus `json:"status"` + StatusStr DictStatusStr `json:"status_str"` + FinishTime int `json:"finish_time"` + CreateTime int `json:"create_time"` + MetaInfos map[int]string `json:"meta_infos"` + } + DonefileInfo struct { + Id string `json:"id"` + Key string `json:"key"` + Input string `json:"input"` + } + DictShardMetaInfo struct { + Name string `json:"name"` + Version int `json:"version"` + Depend int `json:"depend"` + Shard int `json:"shard"` + Split int `json:"split"` + Meta string `json:"meta"` + } + + MetaInfo struct { + IndexTotalCount string `json:"index_total_count"` + IndexLenList []string `json:"index_len_list"` + DataLenList []string `json:"data_len_list"` + } +) + diff --git a/cube/cube-transfer/src/transfer/global.go b/cube/cube-transfer/src/transfer/global.go new file mode 100755 index 0000000000000000000000000000000000000000..8afe55db155ba316defa8ef239e1c1df74a815fe --- /dev/null +++ b/cube/cube-transfer/src/transfer/global.go @@ -0,0 +1,52 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transfer + +import "transfer/dict" + +var ( + Dir string + Port string + Dict dict.DictInfo + BuildToolLocal string + TransferAddr string + //DictName string + //DictMode string + //DonefileAddress string + //OutputAddress string + //TmpAddress string + //ShardNum int + //CopyNum int + //DeployPath string + //Instances []dict.DictInstance + //StoragePlace string + //WaitVersionInfo dict.DictVersionInfo + //CurrentVersionInfo []dict.DictVersionInfo + //InstancesNum int + //DownloadSuccInsts int + //ReloadSuccInsts int + //EnableSuccInsts int +) + + +type LocalAddress string +type HDFSAddress string + +const ( + STATUS_OK int = 0 + STATUS_WRONG_API int = 2 + STATUS_RETRY_LATER int = 10 + STATUS_FAIL int = 255 +) \ No newline at end of file diff --git a/cube/cube-transfer/src/transfer/http.go b/cube/cube-transfer/src/transfer/http.go new file mode 100755 index 0000000000000000000000000000000000000000..75cf847c2c3ca6c1137f07f208b5a8e884afc04f --- /dev/null +++ b/cube/cube-transfer/src/transfer/http.go @@ -0,0 +1,270 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transfer + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "github.com/Badangel/logex" + "io/ioutil" + "net" + "net/http" + "net/url" + "strconv" + "strings" + "time" +) + +type handlerFunc func(subpath string, m map[string]string) (string, string, int, error) + +var ( // key = subpath; eg: path="/checker/job", key="job" + getHandler map[string]handlerFunc + putHandler map[string]handlerFunc + deleteHandler map[string]handlerFunc + postHandler map[string]handlerFunc +) + +func startHttp(addr string) error { + + // init handlers: + initGetHandlers() + initPostHandlers() + + + http.HandleFunc("/dict/", handleRest) + http.HandleFunc("/instance/", handleRest) + logex.Notice("start http:", addr) + return http.ListenAndServe(addr, nil) +} + +func handleRest(w http.ResponseWriter, r *http.Request) { + var ( + req_log string + ) + time_begin := time.Now() + + cont_type := make([]string, 1, 1) + cont_type[0] = "application/json" + header := w.Header() + header["Content-Type"] = cont_type + w.Header().Add("Access-Control-Allow-Origin", "*") + + m := parseHttpKv(r) + + req_log = fmt.Sprintf("handle %v %v %v from %v", + r.Method, r.URL.Path, r.URL.RawQuery, r.RemoteAddr) + + api := r.URL.Path + + var showHandler map[string]handlerFunc + switch r.Method { + case "GET": + showHandler = getHandler + case "POST": // create + showHandler = postHandler + case "PUT": // update + showHandler = putHandler + case "DELETE": + showHandler = deleteHandler + default: + logex.Warningf(`{"error":1, "message":"unsupport method %v"}`, r.Method) + } + + handler, ok := showHandler[api] + + if !ok { + key_list := make([]string, 0, len(showHandler)) + for key := range showHandler { + key_list = append(key_list, key) + } + fmt.Fprintf(w, `{"success":"%v", "message":"wrong api", "method":"%s", "api":"%s", "api_list":"%v"}`, + STATUS_WRONG_API, r.Method, api, key_list) + + logex.Noticef(`%v, time=%v, status=%v`, + req_log, time.Now().Sub(time_begin).Nanoseconds()/1000000, STATUS_WRONG_API) + return + } + + var s string + rst, handle_log, status, err := handler(api, m) + if status == STATUS_OK { + s = fmt.Sprintf(`{"success":"%v", "message":"query ok", "data":%s}`, status, rst) + } else { + s = fmt.Sprintf(`{"success":"%v", "message":%v, "data":%s}`, + status, quote(err.Error()), rst) + } + + if isJsonDict(s) { + fmt.Fprintln(w, s) + } else { + logex.Fatalf("invalid json: %v", s) + } + + if err == nil { + logex.Noticef(`%v, time=%v, status=%v, handle_log=%v`, + req_log, time.Now().Sub(time_begin).Nanoseconds()/1000000, + status, quote(handle_log)) + } else { + logex.Noticef(`%v, time=%v, status=%v, err=%v, handle_log=%v`, + req_log, time.Now().Sub(time_begin).Nanoseconds()/1000000, + status, quote(err.Error()), quote(handle_log)) + } +} + +func parseHttpKv(r *http.Request) map[string]string { + r.ParseForm() + m := make(map[string]string) + for k, v := range r.Form { + switch k { + case "user": // remove @baidu.com for user + m[k] = strings.Split(v[0], "@")[0] + default: + m[k] = v[0] + } + } + + // allow passing hostname for debug + //if _, ok := m["hostname"]; !ok { + // ip := r.RemoteAddr[:strings.Index(r.RemoteAddr, ":")] + // m["hostname"], _ = getHostname(ip) + //} + return m +} + +// restReq sends a restful request to requrl and returns response body. +func restReq(method, requrl string, timeout int, kv *map[string]string) (string, error) { + logex.Debug("####restReq####") + logex.Debug(*kv) + data := url.Values{} + if kv != nil { + for k, v := range *kv { + logex.Trace("req set:", k, v) + data.Set(k, v) + } + } + if method == "GET" || method == "DELETE" { + requrl = requrl + "?" + data.Encode() + data = url.Values{} + } + + logex.Notice(method, requrl) + req, err := http.NewRequest(method, requrl, bytes.NewBufferString(data.Encode())) + if err != nil { + logex.Warning("NewRequest failed:", err) + return "", err + } + if method == "POST" || method == "PUT" { + req.Header.Add("Content-Type", "application/x-www-form-urlencoded") + req.Header.Add("Content-Length", strconv.Itoa(len(data.Encode()))) + } + + client := &http.Client{} + client.Timeout = time.Duration(timeout) * time.Second + resp, err := client.Do(req) + if err != nil { + logex.Warning("Do failed:", err) + return "", err + } + if resp.StatusCode < 200 || resp.StatusCode > 299 { + logex.Warning("resp status: " + resp.Status) + return "", errors.New("resp status: " + resp.Status) + } + + body, err := ioutil.ReadAll(resp.Body) + return string(body), err +} + +//func jsonRestReq(method, requrl string, timeout int, json_params *CheckerRequest) (string, error) { +func jsonRestReq(method, requrl string, timeout int, json_params interface{}) (int, string, error) { + logex.Debug("####jsonRestReq####") + if method == "POST2" { + method = "POST" + } + + //b, err := json.Marshal(*json_params) + b, err := json.Marshal(json_params) + if err != nil { + logex.Warning("json_params marshal failed:", err) + return 0, "", err + } + + logex.Notice(method, requrl) + req, err := http.NewRequest(method, requrl, bytes.NewBufferString(string(b))) + if err != nil { + logex.Warning("NewRequest failed:", err) + return 0, "", err + } + if method == "POST" || method == "PUT" { + req.Header.Add("Content-Type", "application/json") + //req.Header.Add("Content-Type", "application/x-www-form-urlencoded") + req.Header.Add("Content-Length", strconv.Itoa(len(string(b)))) + } + + client := &http.Client{} + client.Timeout = time.Duration(timeout) * time.Second + resp, err := client.Do(req) + if err != nil { + logex.Warning("Do failed:", err) + return 0, "", err + } + if resp.StatusCode < 200 || resp.StatusCode > 299 { + logex.Warning("resp status: " + resp.Status) + return 0, "", errors.New("resp status: " + resp.Status) + } + + body, err := ioutil.ReadAll(resp.Body) + return resp.StatusCode, string(body), err +} + +// quote quotes string for json output. eg: s="123", quote(s)=`"123"` +func quote(s string) string { + return fmt.Sprintf("%q", s) +} + +func isJsonDict(s string) bool { + var js map[string]interface{} + return json.Unmarshal([]byte(s), &js) == nil +} + +func getHostname(ip string) (hostname string, err error) { + if hostnames, err := net.LookupAddr(ip); err != nil { + hostname = ip + //logex.Warningf("cannot find the hostname of ip (%s), error (%v)", ip, err) + } else { + if len(hostnames) > 0 { + hostname = hostnames[0][:strings.LastIndex(hostnames[0], ".baidu.com.")] + } else { + hostname = ip + } + } + + return hostname, err +} + +func nonBlockSendJsonReq(method, requrl string, timeout int, json_params interface{}, + out interface{}, ch chan error) { + _, body_str, err := jsonRestReq(method, requrl, timeout, json_params) + logex.Noticef("json request method:[%v], requrl:[%s], timeout:[%v], params[%v], resbody[%v], err[%v]", method, requrl, timeout, json_params, body_str, err) + if err != nil { + ch <- err + } + err = json.Unmarshal([]byte(body_str), out) + if err != nil { + logex.Warningf("json unmarshal failed error (%v) for : %v", err, body_str) + } + ch <- err +} diff --git a/cube/cube-transfer/src/transfer/http_get.go b/cube/cube-transfer/src/transfer/http_get.go new file mode 100755 index 0000000000000000000000000000000000000000..2bc36808c9fbe32e6ebd34316751674e19a9696c --- /dev/null +++ b/cube/cube-transfer/src/transfer/http_get.go @@ -0,0 +1,159 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transfer + +import ( + "encoding/json" + "errors" + "fmt" + "strconv" + "transfer/dict" +) + +func initGetHandlers() { + getHandler = map[string]handlerFunc{ + "/dict/info": GetDictInfo, + "/instance/status": GetInstanceStatus, + "/dict/scaler": GetDictScaler, + "/dict/meta_info": GetDictShardMetaInfo, + "/dict/deploy/history": GetDictDeployHistory, + } +} +func GetDictInfo(subpath string, m map[string]string) (string, string, int, error) { + b, err := json.Marshal(Dict) + if err != nil { + return quote(""), "", STATUS_FAIL, fmt.Errorf("json marshal failed, %v", err) + } + + return string(b), "", STATUS_OK, nil +} + +func GetInstanceStatus(subpath string, m map[string]string) (string, string, int, error) { + b, err := json.Marshal(Dict.Instances) + if err != nil { + return quote(""), "", STATUS_FAIL, fmt.Errorf("json marshal failed, %v", err) + } + + return string(b), "", STATUS_OK, nil +} + +func GetDictScaler(subpath string, m map[string]string) (string, string, int, error) { + var ( + shard int + err error + infos []dict.DictShardInfo + ) + shardStr, ok := m["shard"] + if !ok { + shard = 0 + } else { + shard, err = strconv.Atoi(shardStr) + if err != nil { + return quote(""), "", STATUS_FAIL, errors.New("invalid arg: shard should be int") + } + } + + for _, version := range Dict.CurrentVersionInfo { + info := dict.GetDictShardScaler(shard, version, Dict.StoragePlace, TransferAddr) + infos = append(infos, info) + } + if Dict.WaitVersionInfo.Status > dict.Dict_Status_Deploying { + info := dict.GetDictShardScaler(shard, Dict.WaitVersionInfo, Dict.StoragePlace, TransferAddr) + infos = append(infos, info) + } + + b, err := json.Marshal(infos) + if err != nil { + return quote(""), "", STATUS_FAIL, fmt.Errorf("json marshal failed, %v", err) + } + + return quote(string(b)), "", STATUS_OK, nil +} + +func GetDictShardMetaInfo(subpath string, m map[string]string) (string, string, int, error) { + name, ok := m["name"] + if !ok { + return quote("failed"), "", STATUS_FAIL, errors.New("need arg: name") + } + version, err := strconv.Atoi(m["version"]) + if err != nil { + return quote("failed"), "", STATUS_FAIL, errors.New("need arg: version, version should be int") + } + depend, err := strconv.Atoi(m["depend"]) + if err != nil { + return quote("failed"), "", STATUS_FAIL, errors.New("need arg: depend, depend should be int") + } + shard, err := strconv.Atoi(m["shard"]) + if err != nil { + return quote("failed"), "", STATUS_FAIL, errors.New("need arg: shard, shard should be int") + } + split := 1 + split_str, ok := m["split"] + if ok { + split, err = strconv.Atoi(split_str) + if err != nil { + return quote("failed"), "", STATUS_FAIL, errors.New("arg: split, should be int") + } + } + + meta_info, err := GetDictShardMetaInfos(name, version, depend, shard, split) + if err != nil { + return quote("failed"), "", STATUS_FAIL, fmt.Errorf("info error: %v", err) + } + + b, err := json.Marshal(meta_info) + if err != nil { + return quote("failed"), "", STATUS_FAIL, fmt.Errorf("json marshal failed, %v", err) + } + + return string(b), "", STATUS_OK, nil +} + +func GetDictDeployHistory(subpath string, m map[string]string) (string, string, int, error) { + var deployVerisonInfos []dict.DictVersionInfo + deployVerisonInfos = Dict.CurrentVersionInfo + if Dict.WaitVersionInfo.Status > dict.Dict_Status_Deploying { + deployVerisonInfos = append(deployVerisonInfos, Dict.WaitVersionInfo) + } + + b, err := json.Marshal(deployVerisonInfos) + if err != nil { + return quote("failed"), "", STATUS_FAIL, fmt.Errorf("json marshal failed, %v", err) + } + + return string(b), "", STATUS_OK, nil +} + +func GetDictShardMetaInfos(dictName string, version int, depend int, + shard int, split int) (dict.DictShardMetaInfo, error) { + var info dict.DictShardMetaInfo + + for _, v := range Dict.CurrentVersionInfo { + if v.Version == version && v.Depend == depend { + if meta, ok := v.MetaInfos[shard]; ok { + info.Name = dictName; + info.Version = version + info.Depend = depend + info.Shard = shard + info.Split = split + info.Meta = meta + return info, nil + } else { + return info, fmt.Errorf("shard not right") + } + } + } + return info, fmt.Errorf("info not right") +} \ No newline at end of file diff --git a/cube/cube-transfer/src/transfer/http_post.go b/cube/cube-transfer/src/transfer/http_post.go new file mode 100644 index 0000000000000000000000000000000000000000..4d0d48842873ba21745608c9967467b3b5ad37c2 --- /dev/null +++ b/cube/cube-transfer/src/transfer/http_post.go @@ -0,0 +1,105 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transfer + +import ( + "encoding/json" + "errors" + "fmt" + "reflect" + "strconv" + "transfer/dict" +) + +func initPostHandlers() { + postHandler = map[string]handlerFunc{ + "/dict/meta_info/register": PostDictShardMetaInfoRegister, + } +} + +func PostDictShardMetaInfoRegister(subpath string, m map[string]string) (string, string, int, error) { + var ( + shardMetaInfo dict.DictShardMetaInfo + ok bool + err error + metaInfo dict.MetaInfo + ) + + shardMetaInfo.Name, ok = m["name"] + if !ok { + return quote("failed"), "", STATUS_FAIL, errors.New("need arg: name") + } + + shardMetaInfo.Version, err = strconv.Atoi(m["version"]) + if err != nil { + return quote("failed"), "", STATUS_FAIL, errors.New("need arg: version, should be int") + } + + shardMetaInfo.Depend, err = strconv.Atoi(m["depend"]) + if err != nil { + return quote("failed"), "", STATUS_FAIL, errors.New("need arg: depend, should be int") + } + + shardMetaInfo.Shard, err = strconv.Atoi(m["shard"]) + if err != nil { + return quote("failed"), "", STATUS_FAIL, errors.New("need arg: shard, should be int") + } + shardMetaInfo.Split = 1 + split_str, ok := m["split"] + if ok { + shardMetaInfo.Split, err = strconv.Atoi(split_str) + if err != nil { + return quote("failed"), "", STATUS_FAIL, errors.New("arg: split, should be int") + } + } + + // check dict shard_num and status + + + if shardMetaInfo.Shard < 0 || shardMetaInfo.Shard >= Dict.ShardNum { + return quote("failed"), "", STATUS_FAIL, fmt.Errorf("shard value invalid, dict shard is :%v", Dict.ShardNum) + } + + shardMetaInfo.Meta, ok = m["meta"] + if !ok { + return quote("failed"), "", STATUS_FAIL, errors.New("need arg: meta") + } + if err = json.Unmarshal([]byte(shardMetaInfo.Meta), &metaInfo); err != nil { + return quote("failed"), "", STATUS_FAIL, fmt.Errorf("meta string bad formatted: %v", err) + } + if reflect.DeepEqual(metaInfo, (dict.MetaInfo{})) { + return quote("failed"), "", STATUS_FAIL, errors.New("meta string bad formatted") + } + if 0 == len(metaInfo.IndexLenList) { + return quote("failed"), "", STATUS_FAIL, errors.New("meta string bad formatted, index_len_list is null") + } + if 0 == len(metaInfo.DataLenList) { + return quote("failed"), "", STATUS_FAIL, errors.New("meta string bad formatted, data_len_list is null") + } + fmt.Printf("update meta shardMetaInfo:%v metaInfo:%v\n", shardMetaInfo,metaInfo) + + if err = UpdateDictShardMetaInfo(shardMetaInfo); err != nil { + return quote("failed"), "", STATUS_FAIL, fmt.Errorf("update dict_shard_meta_info failed, %v", err) + } + fmt.Printf("update meta2\n") + + return quote("ok"), "", STATUS_OK, nil +} + +func UpdateDictShardMetaInfo(shardMetaInfo dict.DictShardMetaInfo) error { + Dict.WaitVersionInfo.MetaInfos[shardMetaInfo.Shard] = shardMetaInfo.Meta + WriteWaitVersionInfoToFile() + return nil +} \ No newline at end of file diff --git a/cube/cube-transfer/src/transfer/transfer.go b/cube/cube-transfer/src/transfer/transfer.go new file mode 100755 index 0000000000000000000000000000000000000000..84ab7427333b3a639efd2e48df3dd248209924be --- /dev/null +++ b/cube/cube-transfer/src/transfer/transfer.go @@ -0,0 +1,84 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transfer + +import ( + "fmt" + "github.com/Badangel/logex" + "os" + "time" + "transfer/dict" +) + +func Start() { + + go BackupTransfer() + logex.Notice(">>> starting server...") + addr := ":" + Port + err := startHttp(addr) + if err != nil { + logex.Fatalf("start http(addr=%v) failed: %v", addr, err) + os.Exit(255) + } + + logex.Notice(">>> start server succ") +} + +func BackupTransfer() { + for { + //trigger + version, err := TriggerStart(Dict.DonefileAddress) + if err != nil { + logex.Fatalf("[trigger err]trigger err:%v ", err) + fmt.Printf("[error]trigger err:%v \n", err) + break + } + logex.Noticef("[trigger] get version:%v \n", version) + if version.Id == 0 { + logex.Noticef("[sleep]no new version, sleep 5 min") + fmt.Printf("[sleep]no new version, wait 5 min\n") + time.Sleep(5 * time.Minute) + continue + } + Dict.WaitVersionInfo = version + logex.Noticef("[trigger finish] WaitVersionInfo version:%v \n", Dict.WaitVersionInfo) + WriteWaitVersionInfoToFile() + + //builder + Dict.WaitVersionInfo.Status = dict.Dict_Status_Building + Dict.WaitVersionInfo.MetaInfos = make(map[int]string) + WriteWaitVersionInfoToFile() + if err = BuilderStart(Dict.WaitVersionInfo); err != nil { + logex.Fatalf("builder err:%v \n", err) + } + + if Dict.WaitVersionInfo.Mode == dict.BASE { + var newCurrentVersion []dict.DictVersionInfo + Dict.CurrentVersionInfo = newCurrentVersion + WriteCurrentVersionInfoToFile() + } + logex.Noticef("[builder finish] WaitVersionInfo version:%v \n", Dict.WaitVersionInfo) + + //deployer + Dict.WaitVersionInfo.Status = dict.Dict_Status_Deploying + WriteWaitVersionInfoToFile() + if err = DeployStart(Dict.WaitVersionInfo); err != nil { + logex.Fatalf("deploy err:%v \n", err) + } + logex.Noticef("[deploy finish]current version: %v\n",Dict.CurrentVersionInfo) + } + fmt.Print("transfer over!") + logex.Noticef("[transfer]status machine exit!") +} diff --git a/cube/cube-transfer/src/transfer/trigger.go b/cube/cube-transfer/src/transfer/trigger.go new file mode 100755 index 0000000000000000000000000000000000000000..c962726d20c1fee7bbbaa282000640df1c8036f6 --- /dev/null +++ b/cube/cube-transfer/src/transfer/trigger.go @@ -0,0 +1,109 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transfer + +import ( + "encoding/json" + "fmt" + "github.com/Badangel/logex" + "io/ioutil" + "strconv" + "strings" + "time" + "transfer/dict" +) + +func TriggerStart(addr string) (version dict.DictVersionInfo, err error) { + return GetDoneFileInfo(addr) +} + +func GetDoneFileInfo(addr string) (version dict.DictVersionInfo, err error) { + fmt.Printf("[entry]start trigger\n") + logex.Noticef("[entry]start trigger") + //if donefile is in ftp, first download to local + if strings.HasPrefix(addr, dict.FTP_HEADER) || strings.HasPrefix(addr, dict.HTTP_HEADER) { + donefileAddr := Dict.TmpAddress + "/../donefile" + Wget(addr, donefileAddr) + addr = donefileAddr + } + + baseDonefile := addr + "/base.txt" + fmt.Printf("[trigrer]donefile path:%v \n", baseDonefile) + logex.Noticef("[trigrer]base donefile path:%v", baseDonefile) + contents, err := ioutil.ReadFile(baseDonefile) + VersionLen := len(Dict.CurrentVersionInfo) + version.DictName = Dict.DictName + if err != nil { + fmt.Printf("[trigrer]read files err:%v \n", err) + logex.Fatalf("[trigrer]read files err:%v ", err) + return + } else { + contentss := string(contents) + lines := strings.Split(contentss, "\n") + index := len(lines) - 2 + var donefileInfo dict.DonefileInfo + fmt.Printf("line %v: %v\n", index, lines[index]) + if err = json.Unmarshal([]byte(lines[index]), &donefileInfo); err != nil { + return + } + logex.Noticef("[trigrer]donfile info:%v", donefileInfo) + newId, _ := strconv.Atoi(donefileInfo.Id) + if VersionLen == 0 || newId > Dict.CurrentVersionInfo[VersionLen-1].Key { + version.Id = newId + version.Key, _ = strconv.Atoi(donefileInfo.Key) + version.Input = donefileInfo.Input + deployVersion := int(time.Now().Unix()) + version.CreateTime = deployVersion + version.Version = deployVersion + version.Depend = deployVersion + version.Mode = dict.BASE + return + } + } + if Dict.DictMode == dict.BASR_DELTA && VersionLen > 0 { + patchDonefile := addr + "/patch.txt" + fmt.Printf("[trigrer]patchDonefile path:%v \n", patchDonefile) + logex.Noticef("[trigrer]patch donefile path:%v", patchDonefile) + contents, err = ioutil.ReadFile(patchDonefile) + if err != nil { + fmt.Printf("read files err:%v \n", err) + return + } else { + contentss := string(contents) + lines := strings.Split(contentss, "\n") + for index := 0; index < len(lines)-1; index++ { + var donefileInfo dict.DonefileInfo + if err = json.Unmarshal([]byte(lines[index]), &donefileInfo); err != nil { + return + } + logex.Noticef("[trigrer]donfile info:%v", donefileInfo) + newId, _ := strconv.Atoi(donefileInfo.Id) + newKey, _ := strconv.Atoi(donefileInfo.Key) + if newId > Dict.CurrentVersionInfo[VersionLen-1].Id && newKey == Dict.CurrentVersionInfo[VersionLen-1].Key { + version.Id = newId + version.Key, _ = strconv.Atoi(donefileInfo.Key) + version.Input = donefileInfo.Input + deployVersion := int(time.Now().Unix()) + version.CreateTime = deployVersion + version.Version = deployVersion + version.Depend = Dict.CurrentVersionInfo[VersionLen-1].Depend + version.Mode = dict.DELTA + return + } + } + } + } + return +} diff --git a/cube/cube-transfer/src/transfer/util.go b/cube/cube-transfer/src/transfer/util.go new file mode 100755 index 0000000000000000000000000000000000000000..f3c1834319ab2752a2338cda737855854cf73356 --- /dev/null +++ b/cube/cube-transfer/src/transfer/util.go @@ -0,0 +1,113 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transfer + +import ( + "bufio" + "encoding/json" + "fmt" + "github.com/Badangel/logex" + "io" + "io/ioutil" + "os/exec" +) + +func WriteWaitVersionInfoToFile(){ + waitFilePath := Dict.TmpAddress + "/" + "WaitVersionInfo.json" + b, err := json.Marshal(Dict.WaitVersionInfo) + fmt.Printf("wait version %v\n", Dict.WaitVersionInfo) + if err != nil { + logex.Fatalf("WaitVersionInfo format error : %v", err) + } + err = ioutil.WriteFile(waitFilePath, b, 0644) + if err != nil { + logex.Fatalf("write wait verison info error : %v", err) + return + } + return +} + +func WriteCurrentVersionInfoToFile(){ + currentFilePath := Dict.TmpAddress + "/" + "CurrentVersionInfo.json" + b, err := json.Marshal(Dict.CurrentVersionInfo) + if err != nil { + logex.Fatalf(" CurrentVersionInfo format error : %v", err) + } + err = ioutil.WriteFile(currentFilePath, b, 0644) + if err != nil { + logex.Fatalf("write current verison info error : %v", err) + return + } + return +} + +func ExeCommad(files string, params []string) (err error) { + cmd := exec.Command(files, params...) + fmt.Println(cmd.Args) + /*if stdout, err := cmd.StdoutPipe(); err != nil { + logex.Fatalf("%v", err) + return + }*/ + + stdout, err := cmd.StdoutPipe() + defer stdout.Close() + + if err != nil { + fmt.Println(err) + return + } + + + if err := cmd.Start(); err != nil { + logex.Fatalf("%v",err) + } + reader := bufio.NewReader(stdout) + + /*buf, err := cmd.Output() + fmt.Printf("output: %s\n",buf) + fmt.Printf("err: %v\n",err)*/ + for { + line, err2 := reader.ReadString('\n') + if err2 != nil || io.EOF == err2 { + break + } + fmt.Printf("[cmd]>%s",line) + } + + err = cmd.Wait() + if nil != err { + fmt.Println(err) + } + + return nil +} + +func Wget(ftpPath string, downPath string) { + var params []string + params = append(params, "-P") + params = append(params, downPath) + params = append(params, "-r") + params = append(params, "-N") + params = append(params, "-np") + params = append(params, "-nd") + params = append(params, "-R") + params = append(params, "index.html") + params = append(params, ftpPath) + + err := ExeCommad("wget", params) + if err != nil { + fmt.Printf("wget exe: %v\n", err) + } +} \ No newline at end of file diff --git a/doc/INSTALL.md b/doc/INSTALL.md index 4229067f6351979bb8d23c0dcea27c9d32c4e868..d17df500e09d1a2a497953f59706e6bd28f72886 100644 --- a/doc/INSTALL.md +++ b/doc/INSTALL.md @@ -4,11 +4,16 @@ OS: Linux -CMake: 3.2 +CMake: (验证过的版本:3.2) -python +C++编译器 (验证过的版本:GCC 4.8.2/5.4.0) + +python (验证过的版本:2.7) + +Go编译器 (验证过的版本:1.9.2) ## 编译 + ```shell $ git clone https://github.com/PaddlePaddle/serving.git $ cd serving