diff --git a/cube/CMakeLists.txt b/cube/CMakeLists.txt index 8273ca482e70b4479a9493dacba69357b5fdcf87..07cf04977b618a515a2459f646c2dba298a5d58b 100644 --- a/cube/CMakeLists.txt +++ b/cube/CMakeLists.txt @@ -16,3 +16,4 @@ add_subdirectory(cube-server) add_subdirectory(cube-api) add_subdirectory(cube-builder) add_subdirectory(cube-transfer) +add_subdirectory(cube-agent) diff --git a/cube/cube-agent/CMakeLists.txt b/cube/cube-agent/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..30158aa506e53ec8a37d10aef4f29bfcd5a60d06 --- /dev/null +++ b/cube/cube-agent/CMakeLists.txt @@ -0,0 +1,25 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/cmake") + +project(cube-agent Go) + +include(cmake/golang.cmake) + +ExternalGoProject_Add(agent-docopt-go github.com/docopt/docopt-go) +ExternalGoProject_Add(agent-logex github.com/Badangel/logex) +ExternalGoProject_Add(agent-pipeline github.com/Badangel/pipeline) + +add_subdirectory(src) diff --git a/cube/cube-agent/cmake/CMakeDetermineGoCompiler.cmake b/cube/cube-agent/cmake/CMakeDetermineGoCompiler.cmake new file mode 100644 index 0000000000000000000000000000000000000000..d17449f39f904c56c3d6969db75f8a44207fc9f4 --- /dev/null +++ b/cube/cube-agent/cmake/CMakeDetermineGoCompiler.cmake @@ -0,0 +1,58 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +if(NOT CMAKE_Go_COMPILER) + if(NOT $ENV{GO_COMPILER} STREQUAL "") + get_filename_component(CMAKE_Go_COMPILER_INIT $ENV{GO_COMPILER} PROGRAM PROGRAM_ARGS CMAKE_Go_FLAGS_ENV_INIT) + + if(CMAKE_Go_FLAGS_ENV_INIT) + set(CMAKE_Go_COMPILER_ARG1 "${CMAKE_Go_FLAGS_ENV_INIT}" CACHE STRING "First argument to Go compiler") + endif() + + if(NOT EXISTS ${CMAKE_Go_COMPILER_INIT}) + message(SEND_ERROR "Could not find compiler set in environment variable GO_COMPILER:\n$ENV{GO_COMPILER}.") + endif() + + endif() + + set(Go_BIN_PATH + $ENV{GOPATH} + $ENV{GOROOT} + $ENV{GOROOT}/../bin + $ENV{GO_COMPILER} + /usr/bin + /usr/local/bin + ) + + if(CMAKE_Go_COMPILER_INIT) + set(CMAKE_Go_COMPILER ${CMAKE_Go_COMPILER_INIT} CACHE PATH "Go Compiler") + else() + find_program(CMAKE_Go_COMPILER + NAMES go + PATHS ${Go_BIN_PATH} + ) + EXEC_PROGRAM(${CMAKE_Go_COMPILER} ARGS version OUTPUT_VARIABLE GOLANG_VERSION) + STRING(REGEX MATCH "go[0-9]+.[0-9]+.[0-9]+[ /A-Za-z0-9]*" VERSION "${GOLANG_VERSION}") + message("-- The Golang compiler identification is ${VERSION}") + message("-- Check for working Golang compiler: ${CMAKE_Go_COMPILER}") + endif() + +endif() + +mark_as_advanced(CMAKE_Go_COMPILER) + +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/cmake/CMakeGoCompiler.cmake.in + ${CMAKE_PLATFORM_INFO_DIR}/CMakeGoCompiler.cmake @ONLY) + +set(CMAKE_Go_COMPILER_ENV_VAR "GO_COMPILER") \ No newline at end of file diff --git a/cube/cube-agent/cmake/CMakeGoCompiler.cmake.in b/cube/cube-agent/cmake/CMakeGoCompiler.cmake.in new file mode 100644 index 0000000000000000000000000000000000000000..4d54319b0cdd2cc0373858f1981cec0f7933a9cc --- /dev/null +++ b/cube/cube-agent/cmake/CMakeGoCompiler.cmake.in @@ -0,0 +1,22 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +set(CMAKE_Go_COMPILER "@CMAKE_Go_COMPILER@") +set(CMAKE_Go_COMPILER_LOADED 1) + +set(CMAKE_Go_SOURCE_FILE_EXTENSIONS go) +set(CMAKE_Go_LINKER_PREFERENCE 40) +set(CMAKE_Go_OUTPUT_EXTENSION .o) +set(CMAKE_Go_OUTPUT_EXTENSION_REPLACE 1) +set(CMAKE_Go_COMPILER_ENV_VAR "GO_COMPILER") diff --git a/cube/cube-agent/cmake/CMakeGoInformation.cmake b/cube/cube-agent/cmake/CMakeGoInformation.cmake new file mode 100644 index 0000000000000000000000000000000000000000..4171072919e94c4547203a8e8bb2b6758436f001 --- /dev/null +++ b/cube/cube-agent/cmake/CMakeGoInformation.cmake @@ -0,0 +1,21 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +if(NOT CMAKE_Go_COMPILE_OBJECT) + set(CMAKE_Go_COMPILE_OBJECT "go tool compile -l -N -o ") +endif() + +if(NOT CMAKE_Go_LINK_EXECUTABLE) + set(CMAKE_Go_LINK_EXECUTABLE "go tool link -o ") +endif() \ No newline at end of file diff --git a/cube/cube-agent/cmake/CMakeTestGoCompiler.cmake b/cube/cube-agent/cmake/CMakeTestGoCompiler.cmake new file mode 100644 index 0000000000000000000000000000000000000000..e69831165e62e2e8091fc88ade83904426a4ac5f --- /dev/null +++ b/cube/cube-agent/cmake/CMakeTestGoCompiler.cmake @@ -0,0 +1,15 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +set(CMAKE_Go_COMPILER_WORKS 1 CACHE INTERNAL "") \ No newline at end of file diff --git a/cube/cube-agent/cmake/golang.cmake b/cube/cube-agent/cmake/golang.cmake new file mode 100644 index 0000000000000000000000000000000000000000..817d029d946bad8da4f4cf2785e68d062fc4cada --- /dev/null +++ b/cube/cube-agent/cmake/golang.cmake @@ -0,0 +1,60 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +set(GOPATH "${CMAKE_CURRENT_LIST_DIR}/../") +file(MAKE_DIRECTORY ${GOPATH}) + +function(ExternalGoProject_Add TARG) + add_custom_target(${TARG} env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} get ${ARGN}) +endfunction(ExternalGoProject_Add) + +function(add_go_executable NAME) + file(GLOB GO_SOURCE RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "*.go") + add_custom_command(OUTPUT ${OUTPUT_DIR}/.timestamp + COMMAND env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} build + -o "${CMAKE_CURRENT_BINARY_DIR}/${NAME}" + ${CMAKE_GO_FLAGS} ${GO_SOURCE} + WORKING_DIRECTORY ${CMAKE_CURRENT_LIST_DIR}) + + add_custom_target(${NAME} ALL DEPENDS ${OUTPUT_DIR}/.timestamp ${ARGN}) + install(PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/${NAME} DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/bin) +endfunction(add_go_executable) + + +function(ADD_GO_LIBRARY NAME BUILD_TYPE) + if(BUILD_TYPE STREQUAL "STATIC") + set(BUILD_MODE -buildmode=c-archive) + set(LIB_NAME "lib${NAME}.a") + else() + set(BUILD_MODE -buildmode=c-shared) + if(APPLE) + set(LIB_NAME "lib${NAME}.dylib") + else() + set(LIB_NAME "lib${NAME}.so") + endif() + endif() + + file(GLOB GO_SOURCE RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "*.go") + add_custom_command(OUTPUT ${OUTPUT_DIR}/.timestamp + COMMAND env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} build ${BUILD_MODE} + -o "${CMAKE_CURRENT_BINARY_DIR}/${LIB_NAME}" + ${CMAKE_GO_FLAGS} ${GO_SOURCE} + WORKING_DIRECTORY ${CMAKE_CURRENT_LIST_DIR}) + + add_custom_target(${NAME} ALL DEPENDS ${OUTPUT_DIR}/.timestamp ${ARGN}) + + if(NOT BUILD_TYPE STREQUAL "STATIC") + install(PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/${LIB_NAME} DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/bin) + endif() +endfunction(ADD_GO_LIBRARY) \ No newline at end of file diff --git a/cube/cube-agent/src/CMakeLists.txt b/cube/cube-agent/src/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..eb192f0fd14969e9f25a71a0ba968ea244bca830 --- /dev/null +++ b/cube/cube-agent/src/CMakeLists.txt @@ -0,0 +1,19 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +set(SOURCE_FILE cube-agent.go) +add_go_executable(cube-agent ${SOURCE_FILE}) +add_dependencies(cube-agent agent-docopt-go) +add_dependencies(cube-agent agent-logex) +add_dependencies(cube-agent agent-pipeline) diff --git a/cube/cube-agent/src/agent/define.go b/cube/cube-agent/src/agent/define.go new file mode 100644 index 0000000000000000000000000000000000000000..e741f016604ab75dd65e2ec35402e1bfdb497acd --- /dev/null +++ b/cube/cube-agent/src/agent/define.go @@ -0,0 +1,100 @@ +package agent + +import ( + "errors" + _ "github.com/Badangel/logex" + "github.com/Badangel/pipeline" + "os/exec" + "strconv" + "strings" + "sync" +) + +var ( + Dir string + WorkerNum int + QueueCapacity int32 + MasterHost []string + MasterPort []string + + TestHostname string + TestIdc string + ShardLock sync.RWMutex + + CmdWorkPool *WorkPool + CmdWorkFilter sync.Map +) + +type ( + Status struct { + Status string `json:"status"` + Version string `json:"version"` + } + + MasterResp struct { + Success string `json:"success"` + Message string `json:"message"` + Data string `json:"data"` + } + + ShardInfo struct { + DictName string + ShardSeq int + SlotIdList string + DataDir string + Service string `json:"service,omitempty"` + Libcube string `json:"libcube,omitempty"` + } + + CubeResp struct { + Status int `json:"status"` + CurVersion string `json:"cur_version"` + BgVersion string `json:"bg_version"` + } +) + +var BUILTIN_STATUS = Status{"RUNNING", "3.0.0.1"} + +var ShardInfoMap map[string]map[string]*ShardInfo +var disks []string + +func GetMaster(master string) (host, port string, err error) { + if len(ShardInfoMap) < 1 { + return "", "", errors.New("empty master list.") + } + if master == "" { + return MasterHost[0], MasterPort[0], nil + } + if _, ok := ShardInfoMap[master]; ok { + m := strings.Split(master, ":") + if len(m) != 2 { + return MasterHost[0], MasterPort[0], nil + } + return m[0], m[1], nil + } else { + return MasterHost[0], MasterPort[0], nil + } +} + +func init() { + dfCmd := "df -h | grep -E '/home|/ssd'" + stdout, _, err := pipeline.Run(exec.Command("/bin/bash", "-c", dfCmd)) + + if err == nil && stdout.String() != "" { + t := strings.TrimSpace(stdout.String()) + diskLi := strings.Split(t, "\n") + for _, diskStr := range diskLi { + disk := strings.Fields(diskStr) + usedPercent, _ := strconv.Atoi(strings.TrimRight(disk[4], "%")) + if usedPercent <= 40 { + disks = append(disks, disk[5]) + } + } + } + + if len(disks) == 0 { + disks = append(disks, "/home") + } + + //logex.Debugf("available disks found: (%+v)", disks) +} diff --git a/cube/cube-agent/src/agent/http.go b/cube/cube-agent/src/agent/http.go new file mode 100755 index 0000000000000000000000000000000000000000..f0e4002b3b79197ec4df99dd1c53986af9057a27 --- /dev/null +++ b/cube/cube-agent/src/agent/http.go @@ -0,0 +1,179 @@ +//-*- coding:utf-8; indent-tabs-mode:nil; -*- + +package agent + +import ( + "bytes" + "errors" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "strconv" + "strings" + "time" + + "github.com/Badangel/logex" +) + +type handlerFunc func(subpath string, m map[string]string, b []byte) (string, string, error) + +var ( // key = subpath; eg: path="/checker/job", key="job" + getHandler map[string]handlerFunc + putHandler map[string]handlerFunc + deleteHandler map[string]handlerFunc + postHandler map[string]handlerFunc +) + +func StartHttp(addr string) error { + + // init handlers: + initGetHandlers() + initPostHandlers() + + http.HandleFunc("/agent/", handleRest) + logex.Notice("start http ", addr) + return http.ListenAndServe(addr, nil) +} + +func handleRest(w http.ResponseWriter, r *http.Request) { + var ( + req_log string + status int32 + ) + time_begin := time.Now() + + cont_type := make([]string, 1, 1) + cont_type[0] = "application/json" + header := w.Header() + header["Content-Type"] = cont_type + w.Header().Add("Access-Control-Allow-Origin", "*") + + m := parseHttpKv(r) + b, _ := ioutil.ReadAll(r.Body) + + req_log = fmt.Sprintf("handle %v %v %v from %v, len(m)=%v, m=%+v", + r.Method, r.URL.Path, r.URL.RawQuery, r.RemoteAddr, len(m), m) + + api := r.URL.Path + + var showHandler map[string]handlerFunc + switch r.Method { + case "GET": + showHandler = getHandler + case "POST": // create + showHandler = postHandler + case "PUT": // update + showHandler = putHandler + case "DELETE": + showHandler = deleteHandler + default: + logex.Warningf(`{"error":1, "message":"unsupport method %v"}`, r.Method) + } + + handler, ok := showHandler[api] + + if !ok { + key_list := make([]string, 0, len(showHandler)) + for key := range showHandler { + key_list = append(key_list, key) + } + status = 2 + fmt.Fprintf(w, `{"success":"%v", "message":"wrong api", "method":"%s", "api":"%s", "api_list":"%v"}`, + status, r.Method, api, key_list) + + logex.Noticef(`%v, time=%v, status=%v`, + req_log, time.Now().Sub(time_begin).Nanoseconds()/1000000, status) + return + } + + var s string + rst, handle_log, err := handler(api, m, b) + if err == nil { + status = 0 + s = fmt.Sprintf(`{"success":"%v", "message":"query ok", "data":%s}`, status, rst) + } else { + status = 255 + s = fmt.Sprintf(`{"success":"%v", "message":%v, "data":%s}`, + status, quote(err.Error()), rst) + } + + if isJsonDict(s) { + fmt.Fprintln(w, s) + }else { + logex.Fatalf("invalid json: %v", s) + } + + if err == nil { + logex.Noticef(`%v, time=%v, status=%v, handle_log=%v`, + req_log, time.Now().Sub(time_begin).Nanoseconds()/1000000, + status, quote(handle_log)) + } else { + logex.Noticef(`%v, time=%v, status=%v, err=%v, handle_log=%v`, + req_log, time.Now().Sub(time_begin).Nanoseconds()/1000000, + status, quote(err.Error()), quote(handle_log)) + } +} + +func parseHttpKv(r *http.Request) map[string]string { + r.ParseForm() + m := make(map[string]string) + for k, v := range r.Form { + switch k { + case "user": // remove @baidu.com for user + m[k] = strings.Split(v[0], "@")[0] + default: + m[k] = v[0] + } + } + + // allow passing hostname for debug + if _, ok := m["hostname"]; !ok{ + ip := r.RemoteAddr[:strings.Index(r.RemoteAddr, ":")] + m["hostname"], _ = getHostname(ip) + } + return m +} + +// restReq sends a restful request to requrl and returns response body. +func restReq(method, requrl string, timeout int, kv *map[string]string) (string, error) { + logex.Debug("####restReq####") + logex.Debug(*kv) + data := url.Values{} + if kv != nil { + for k, v := range *kv { + logex.Trace("req set:", k, v) + data.Set(k, v) + } + } + if method == "GET" || method == "DELETE" { + requrl = requrl + "?" + data.Encode() + data = url.Values{} + } + + logex.Notice(method, requrl) + req, err := http.NewRequest(method, requrl, bytes.NewBufferString(data.Encode())) + if err != nil { + logex.Warning("NewRequest failed:", err) + return "", err + } + if method == "POST" || method == "PUT" { + req.Header.Add("Content-Type", "application/x-www-form-urlencoded") + req.Header.Add("Content-Length", strconv.Itoa(len(data.Encode()))) + } + + client := &http.Client{} + client.Timeout = time.Duration(timeout) * time.Second + resp, err := client.Do(req) + if err != nil { + logex.Warning("Do failed:", err) + return "", err + } + if resp.StatusCode < 200 || resp.StatusCode > 299 { + logex.Warning("resp status: " + resp.Status) + return "", errors.New("resp status: " + resp.Status) + } + + body, err := ioutil.ReadAll(resp.Body) + return string(body), err +} diff --git a/cube/cube-agent/src/agent/http_get.go b/cube/cube-agent/src/agent/http_get.go new file mode 100755 index 0000000000000000000000000000000000000000..967e6426ef895edfc8e9d3669d76a80ac8218030 --- /dev/null +++ b/cube/cube-agent/src/agent/http_get.go @@ -0,0 +1,22 @@ +//-*- coding:utf-8; indent-tabs-mode:nil; -*- +package agent + +import ( + "fmt" + "encoding/json" +) + +func initGetHandlers() { + getHandler = map[string]handlerFunc{ + "/agent/status": GetStatus, + } +} + +func GetStatus(subpath string, m map[string]string, b []byte) (string, string, error) { + b, err := json.Marshal(BUILTIN_STATUS); + if err != nil { + return quote(""), "", fmt.Errorf("json marshal failed, %v", err) + } + + return string(b), "", err +} \ No newline at end of file diff --git a/cube/cube-agent/src/agent/http_post.go b/cube/cube-agent/src/agent/http_post.go new file mode 100755 index 0000000000000000000000000000000000000000..024ede44133b84abc1685557818f63c4af4c0f12 --- /dev/null +++ b/cube/cube-agent/src/agent/http_post.go @@ -0,0 +1,37 @@ +//-*- coding:utf-8; indent-tabs-mode:nil; -*- +package agent + +import ( + "fmt" + "encoding/json" + "github.com/Badangel/logex" +) + +func initPostHandlers() { + postHandler = map[string]handlerFunc{ + "/agent/cmd": PostCmd, + } +} + +func PostCmd(subpath string, m map[string]string, b []byte) (string, string, error) { + var work Work + err := json.Unmarshal(b, &work) + if err != nil { + logex.Warningf("Unmarshal from %s error (+%v)", string(b), err) + return quote(""), "", fmt.Errorf("Work json unmarshal work failed, %v", err) + } + + if _, ok := CmdWorkFilter.Load(work.Token()); ok { + logex.Warningf("Another work with same token is doing. Token(%s)", work.Token()) + return quote(""), "", fmt.Errorf("Another work with same key is doing.", err) + } + + CmdWorkFilter.Store(work.Token(), true) + err = work.DoWork() + CmdWorkFilter.Delete(work.Token()) + if err != nil { + return quote(""), "", fmt.Errorf("Do work failed.", err) + } + + return quote(""), "", err +} \ No newline at end of file diff --git a/cube/cube-agent/src/agent/util.go b/cube/cube-agent/src/agent/util.go new file mode 100644 index 0000000000000000000000000000000000000000..22d36ff54a2d8670955f8fa66f358fa723ac7ccf --- /dev/null +++ b/cube/cube-agent/src/agent/util.go @@ -0,0 +1,174 @@ +package agent + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net" + "net/http" + "net/url" + "strconv" + "strings" + "time" + + "github.com/Badangel/logex" +) + +// restReq sends a restful request to requrl and returns response body. +func RestReq(method, requrl string, timeout int, kv *map[string]string) (string, error) { + data := url.Values{} + if kv != nil { + for k, v := range *kv { + //logex.Trace("req set:", k, v) + data.Set(k, v) + } + } + if method == "GET" || method == "DELETE" { + requrl = requrl + "?" + data.Encode() + data = url.Values{} + } + + //logex.Notice(method, requrl) + req, err := http.NewRequest(method, requrl, bytes.NewBufferString(data.Encode())) + if err != nil { + logex.Warning("NewRequest failed:", err) + return "", err + } + if method == "POST" || method == "PUT" { + req.Header.Add("Content-Type", "application/x-www-form-urlencoded") + req.Header.Add("Content-Length", strconv.Itoa(len(data.Encode()))) + } + + client := &http.Client{} + client.Timeout = time.Duration(timeout) * time.Second + resp, err := client.Do(req) + if err != nil { + logex.Warning("Do failed:", err) + return "", err + } + if resp.StatusCode < 200 || resp.StatusCode > 299 { + logex.Warning("resp status: " + resp.Status) + return "", errors.New("resp status: " + resp.Status) + } + + body, err := ioutil.ReadAll(resp.Body) + return string(body), err +} + +// restReq sends a restful request to requrl and returns response body as json. +func JsonReq(method, requrl string, timeout int, kv *map[string]string, + out interface{}) error { + s, err := RestReq(method, requrl, timeout, kv) + logex.Debugf("json request method:[%v], requrl:[%s], timeout:[%v], map[%v], out_str:[%s]", method, requrl, timeout, kv, s) + if err != nil { + return err + } + return json.Unmarshal([]byte(s), out) +} + +func GetHdfsMeta(src string) (master, ugi, path string, err error) { + //src = "hdfs://root:rootpasst@st1-inf-platform0.st01.baidu.com:54310/user/mis_user/news_dnn_ctr_cube_1/1501836820/news_dnn_ctr_cube_1_part54.tar" + //src = "hdfs://st1-inf-platform0.st01.baidu.com:54310/user/mis_user/news_dnn_ctr_cube_1/1501836820/news_dnn_ctr_cube_1_part54.tar" + + ugiBegin := strings.Index(src, "//") + ugiPos := strings.LastIndex(src, "@") + if ugiPos != -1 && ugiBegin != -1 { + ugi = src[ugiBegin+2 : ugiPos] + } + src1 := strings.Replace(strings.Replace(src, "hdfs://", "", 1), ugi, "", 1) + if ugi != "" { + src1 = src1[1:] + } + pos := strings.Index(src1, "/") + if pos != -1 { + master = src1[0:pos] + path = src1[pos:] + } else { + logex.Warningf("failed to get the master or path for (%s)", src) + err = errors.New("invalid master or path found") + } + logex.Debugf("parse the (%s) succ, master is %s, ugi is (%s), path is %s", src, master, ugi, path) + return +} + +func getHostIp() (string, error) { + if addrs, err := net.InterfaceAddrs(); err == nil { + for _, addr := range addrs { + ips := addr.String() + logex.Debugf("get host ip: %v", ips) + if strings.HasPrefix(ips, "127") { + continue + } else { + list := strings.Split(ips, "/") + if len(list) != 2 { + continue + } + return list[0], nil + } + } + } + return "unkown ip", errors.New("get host ip failed") +} + +func getHostname(ip string) (hostname string, err error) { + if hostnames, err := net.LookupAddr(ip); err != nil { + hostname = ip + //logex.Warningf("cannot find the hostname of ip (%s), error (%v)", ip, err) + } else { + if len(hostnames) > 0 { + hostname = hostnames[0][:strings.LastIndex(hostnames[0], ".baidu.com.")] + } else { + hostname = ip + } + } + + return hostname, err +} + +func GetLocalHostname() (hostname string, err error) { + if ip, err := getHostIp(); err == nil { + return getHostname(ip) + } else { + return "unkown ip", err + } +} + +func GetLocalHostnameCmd() (hostname string, err error) { + cmd := "hostname" + stdout, _, err := RetryCmd(cmd, RETRY_TIMES) + if stdout != "" && err == nil { + hostname := strings.TrimSpace(stdout) + index := strings.LastIndex(hostname, ".baidu.com") + if index > 0 { + return hostname[:strings.LastIndex(hostname, ".baidu.com")], nil + } else { + return hostname, nil + } + } else { + logex.Debugf("using hostname cmd failed. err:%v", err) + return GetLocalHostname() + } +} + +// quote quotes string for json output. eg: s="123", quote(s)=`"123"` +func quote(s string) string { + return fmt.Sprintf("%q", s) +} + +// quoteb quotes byte array for json output. +func quoteb(b []byte) string { + return quote(string(b)) +} + +// quotea quotes string array for json output +func quotea(a []string) string { + b, _ := json.Marshal(a) + return string(b) +} + +func isJsonDict(s string) bool { + var js map[string]interface{} + return json.Unmarshal([]byte(s), &js) == nil +} diff --git a/cube/cube-agent/src/agent/work.go b/cube/cube-agent/src/agent/work.go new file mode 100644 index 0000000000000000000000000000000000000000..b9be00609a4212a1e6e0c4bf12cce7b4c44e787f --- /dev/null +++ b/cube/cube-agent/src/agent/work.go @@ -0,0 +1,861 @@ +package agent + +import ( + "crypto/md5" + "encoding/json" + "errors" + "fmt" + "github.com/Badangel/logex" + "github.com/Badangel/pipeline" + "os" + "os/exec" + "path" + "path/filepath" + "strconv" + "strings" + "sync" + "time" +) + +const ( + COMMAND_DOWNLOAD = "download" + COMMAND_RELOAD = "reload" + COMMAND_SWITCH = "enable" + COMMAND_ROLLBACK = "rollback" + COMMAND_CHECK = "check" + COMMAND_CLEAR = "clear" + COMMAND_POP = "pop" + + RETRY_TIMES = 3 + REQUEST_MASTER_TIMEOUT_SECOND = 60 + MAX_DOWN_CO = 7 + + RELOAD_RETRY_TIMES = 3 + RELOAD_RETRY_INTERVAL_SECOND = 10 + + DOWNLOAD_DONE_MARK_FILE = ".download_done" +) + +type VersionInfo struct { + Version string + Depend string + Source string +} + +type Work struct { + DictName string `json:"dict_name"` + ShardSeq int `json:"shard_seq"` + DeployPath string `json:"deploy_path"` + Command string `json:"command"` + Version string `json:"version"` + Depend string `json:"depend"` + Source string `json:"source"` + Mode string `json:"mode"` + DictMode string `json:"dict_mode"` + Port string `json:"port"` + bRollback bool `json:"b_rollback"` + RollbackInfo []VersionInfo `json:"rollback_info"` + Status string `json:"status"` + FinishStatus string `json:"finish_status"` + Service string `json:"service,omitempty"` + VersionSign string `json:"version_sign,omitempty"` + MasterAddress string `json:"master_address,omitempty"` + ActiveVersionList string `json:"active_version_list,omitempty"` +} + +func (work *Work) Token() string { + return work.DictName + strconv.Itoa(work.ShardSeq) + work.Service +} + +func (work *Work) Valid() bool { + if work.Command == "" || + work.Version == "" || + work.Depend == "" { + return false + } + return true +} + +func (work *Work) DoWork() error { + var err error + if !work.Valid() { + err = errors.New("Work is invalid") + return err + } + switch work.Command { + case COMMAND_DOWNLOAD: + err = work.Download() + case COMMAND_RELOAD: + err = work.Reload() + case COMMAND_SWITCH: + err = work.Enable() + case COMMAND_CHECK: + err = work.Check() + case COMMAND_CLEAR: + err = work.Clear() + case COMMAND_POP: + err = work.Pop() + default: + logex.Debugf("Invalid command %s received", work.Command) + err = errors.New("Invalid command.") + } + return err +} + +func GetDownloadDirs(dictName, service, version, depend, deployPath string, shardSeq, + split int) ([]string, error) { + dirs := make([]string, 0, split) + if deployPath == "" { + return dirs, errors.New("Invalid Deploy path") + } + parentDir := getParentDir(version, depend) + if split < 2 { + disk := path.Join(deployPath, "cube_data") + if service == "" { + dirs = append(dirs, path.Join(disk, strconv.Itoa(shardSeq), parentDir)) + } else { + dirs = append(dirs, path.Join(disk, strconv.Itoa(shardSeq), parentDir+"-"+dictName)) + } + } else { + for i := 0; i < split; i++ { + disk := path.Join(deployPath, "cube_data") + if service == "" { + dirs = append(dirs, path.Join(disk, strconv.Itoa(shardSeq), strconv.Itoa(i), parentDir)) + } else { + dirs = append(dirs, path.Join(disk, strconv.Itoa(shardSeq), + parentDir+"-"+dictName)) + } + } + } + + return dirs, nil +} + +func GetDataLinkDirs(dictName, service, version, depend, deployPath string, shardSeq, + split int) []string { + dirs := make([]string, 0, split) + parentDir := getParentDir(version, depend) + if split < 2 { + disk := path.Join(deployPath, "data") + if service == "" { + dirs = append(dirs, path.Join(disk, parentDir)) + } + } else { + for i := 0; i < split; i++ { + disk := path.Join(deployPath, "data") + if service == "" { + dirs = append(dirs, path.Join(disk, strconv.Itoa(i), parentDir)) + } + } + } + return dirs +} + +func (work *Work) Download() (err error) { + err = DoDownload(work.DictName, work.Service, work.Version, work.Depend, work.Mode, work.Source, + work.DeployPath, work.ShardSeq) + + if err != nil { + logex.Warningf("download error, failed to download %s, dir is %s, error is (+%v)", work.Source, work.DeployPath, err) + return + } + + if work.Service == "" { + err = UnTar(work.DictName, work.Service, work.Version, work.Depend, work.Source, + work.DeployPath, work.ShardSeq) + + if err == nil { + dataPath := path.Join(work.DeployPath, "data") + + // remove all old links + if work.Mode == "base" || len(work.RollbackInfo) != 0 { + cmd := fmt.Sprintf("ls -l %s | grep -E 'data.|index.' | awk '{print $9}'", dataPath) + stdout, _, err := RetryCmd(cmd, RETRY_TIMES) + if err == nil && stdout != "" { + fileNameLi := strings.Split(strings.TrimSpace(stdout), "\n") + for _, file := range fileNameLi { + err = os.Remove(path.Join(dataPath, file)) + logex.Debugf("os.Remove(%s) error (%+v) ", path.Join(dataPath, file), err) + } + } + } + + // create symbolic link to the version rollbacked + err = CreateSymlink(work.DictName, work.Service, work.Version, work.Depend, dataPath, + work.DeployPath, work.ShardSeq, len(strings.Split(work.Source, ";"))) + } else { + logex.Warningf("download error, failed to untar for %s, dir is %s, error is (+%v)", work.Source, work.DeployPath, err) + } + } + + if err == nil { + // clear history data + work.clearData() + work.clearLink() + } else { + logex.Warningf("create symlink failed, error is (+%v)", err) + } + + return +} + +func (work *Work) clearData() (err error) { + split := len(strings.Split(work.Source, ";")) + downloadDirs, err := GetDownloadDirs(work.DictName, work.Service, work.Version, work.Depend, + work.DeployPath, work.ShardSeq, split) + if err != nil { + logex.Warningf("clearData failed, error is (+%v)", err) + return + } + for _, downloadDir := range downloadDirs { + parentDir, _ := filepath.Split(downloadDir) + + cmd := fmt.Sprintf("ls -l %s | grep -v %s | awk '{print $9}'", parentDir, work.Depend) + + stdout, _, err := RetryCmd(cmd, RETRY_TIMES) + if err != nil || stdout == "" || work.Service != "" { + cmd = fmt.Sprintf("find %s -type d -ctime +1 -print | xargs -i rm -rf {}", parentDir) + _, _, err = RetryCmd(cmd, RETRY_TIMES) + } else { + rmList := strings.Split(strings.TrimSpace(stdout), "\n") + for i := 0; i < len(rmList); i++ { + if rmList[i] == "" { + continue + } + cmd = fmt.Sprintf("rm -rf %s/%s*", parentDir, rmList[i]) + _, _, err = RetryCmd(cmd, RETRY_TIMES) + } + } + } + + return +} + +func (work *Work) clearPatchData() (err error) { + if work.Service != "" { + return + } + split := len(strings.Split(work.Source, ";")) + downloadDirs, err := GetDownloadDirs(work.DictName, work.Service, work.Version, work.Depend, + work.DeployPath, work.ShardSeq, split) + if err != nil { + logex.Warningf("clearPatchData failed, error is (+%v)", err) + return + } + for _, downloadDir := range downloadDirs { + parentDir, _ := filepath.Split(downloadDir) + cmd := fmt.Sprintf("ls -l %s | grep %s_ | awk '{print $9}'", parentDir, work.Depend) + stdout, _, err := RetryCmd(cmd, RETRY_TIMES) + if err == nil && stdout != "" { + rmList := strings.Split(strings.TrimSpace(stdout), "\n") + for i := 0; i < len(rmList); i++ { + if rmList[i] == "" { + continue + } + cmd = fmt.Sprintf("rm -rf %s/%s*", parentDir, rmList[i]) + _, _, err = RetryCmd(cmd, RETRY_TIMES) + } + } + } + + return +} + +func (work *Work) clearLink() (err error) { + if work.Service != "" { + return + } + split := len(strings.Split(work.Source, ";")) + dataLinkDirs := GetDataLinkDirs(work.DictName, work.Service, work.Version, work.Depend, + work.DeployPath, work.ShardSeq, split) + for _, linkDir := range dataLinkDirs { + parentDir, _ := filepath.Split(linkDir) + cmd := fmt.Sprintf("ls -l %s | grep -v %s | awk '{print $9}'", parentDir, work.Depend) + + stdout, _, err := RetryCmd(cmd, RETRY_TIMES) + if err != nil || stdout == "" { + cmd = fmt.Sprintf("find %s -type d -ctime +1 -print | xargs -i rm -rf {}", parentDir) + _, _, err = RetryCmd(cmd, RETRY_TIMES) + } else { + rmList := strings.Split(strings.TrimSpace(stdout), "\n") + for i := 0; i < len(rmList); i++ { + if rmList[i] == "" { + continue + } + cmd = fmt.Sprintf("rm -rf %s/%s*", parentDir, rmList[i]) + _, _, err = RetryCmd(cmd, RETRY_TIMES) + } + } + } + + return +} + +func (work *Work) clearPatchLink() (err error) { + if work.Service != "" { + return + } + split := len(strings.Split(work.Source, ";")) + dataLinkDirs := GetDataLinkDirs(work.DictName, work.Service, work.Version, work.Depend, + work.DeployPath, work.ShardSeq, split) + for _, linkDir := range dataLinkDirs { + parentDir, _ := filepath.Split(linkDir) + cmd := fmt.Sprintf("ls -l %s | grep %s_ | awk '{print $9}'", parentDir, work.Depend) + + stdout, _, err := RetryCmd(cmd, RETRY_TIMES) + if err == nil && stdout != "" { + rmList := strings.Split(strings.TrimSpace(stdout), "\n") + for i := 0; i < len(rmList); i++ { + if rmList[i] == "" { + continue + } + cmd = fmt.Sprintf("rm -rf %s/%s*", parentDir, rmList[i]) + _, _, err = RetryCmd(cmd, RETRY_TIMES) + } + } + } + + return +} + +func UnTar(dictName, service, version, depend, source, deployPath string, shardSeq int) (err error) { + sources := strings.Split(source, ";") + downloadDirs, err := GetDownloadDirs(dictName, service, version, depend, deployPath, shardSeq, + len(sources)) + if err != nil { + logex.Warningf("UnTar failed, error is (+%v)", err) + return + } + for i := 0; i < len(sources); i++ { + fileName := GetFileName(sources[i]) + untarCmd := fmt.Sprintf("tar xvf %s -C %s", path.Join(downloadDirs[i], fileName), downloadDirs[i]) + _, _, err = RetryCmd(untarCmd, RETRY_TIMES) + } + + return +} + +func CreateSymlink(dictName, service, version, depend, dataPath, deployPath string, shardSeq, + split int) (err error) { + downloadDirs, err := GetDownloadDirs(dictName, service, version, depend, deployPath, shardSeq, split) + if err != nil { + logex.Warningf("CreateSymlink failed, error is (+%v)", err) + } + for i, downloadDir := range downloadDirs { + cmd := fmt.Sprintf("ls -l %s | grep -E 'data.|index.' | awk '{print $NF}'", downloadDir) + stdout, _, err := RetryCmd(cmd, RETRY_TIMES) + + if err == nil && stdout != "" { + fileNameLi := strings.Split(strings.TrimSpace(stdout), "\n") + versionDir := getParentDir(version, depend) + versionFile := path.Join(dataPath, "VERSION") + dataSubPath := "" + if split > 1 { + dataSubPath = path.Join(dataPath, strconv.Itoa(i), versionDir) + } else { + dataSubPath = path.Join(dataPath, versionDir) + } + if err = os.MkdirAll(dataSubPath, 0755); err != nil { + // return err + logex.Warningf("os.Mkdir %s failed, err:[%v]", dataSubPath, err) + } + if dataSubPath != "" { + cmd = fmt.Sprintf("find %s/.. -type d -ctime +5 -print | xargs -i rm -rf {}", dataSubPath) + _, _, err = RetryCmd(cmd, RETRY_TIMES) + } + for _, file := range fileNameLi { + dataLink := "" + tempDataPath := "" + if split > 1 { + dataLink = path.Join(dataPath, strconv.Itoa(i), file) + tempDataPath = path.Join(dataPath, strconv.Itoa(i)) + } else { + dataLink = path.Join(dataPath, file) + tempDataPath = dataPath + } + cmd = fmt.Sprintf("rm -rf %s", dataLink) + _, stderr, _ := RetryCmd(cmd, RETRY_TIMES) + logex.Noticef("rm -rf %s, err:[%s]", dataLink, stderr) + + // create new symlink + err = os.Symlink(path.Join(downloadDir, file), dataLink) + logex.Noticef("os.Symlink %s %s return (%+v)", path.Join(downloadDir, file), dataLink, err) + fmt.Println("os.Symlink: ", path.Join(downloadDir, file), dataLink, err) + cmd = fmt.Sprintf("cp -d %s/index.* %s/", tempDataPath, dataSubPath) + _, stderr, _ = RetryCmd(cmd, RETRY_TIMES) + logex.Noticef("cp -d index Symlink to version dir %s, err:[%s]", dataSubPath, stderr) + cmd = fmt.Sprintf("cp -d %s/data.* %s/", tempDataPath, dataSubPath) + _, stderr, _ = RetryCmd(cmd, RETRY_TIMES) + logex.Noticef("cp -d data Symlink to version dir %s, err:[%s]", dataSubPath, stderr) + } + cmd = fmt.Sprintf("echo %s > %s", versionDir, versionFile) + if _, _, err = RetryCmd(cmd, RETRY_TIMES); err != nil { + return err + } + } + } + + return +} + +func (work *Work) CheckToReload() bool { + statusCmd := fmt.Sprintf("curl -s -d '{\"cmd\":\"status\"}' http://127.0.0.1:%s/ControlService/cmd", work.Port) + stdout, _, _ := RetryCmd(statusCmd, RETRY_TIMES) + var resp CubeResp + json.Unmarshal([]byte(stdout), &resp) + version := getParentDir(work.Version, work.Depend) + + if resp.CurVersion == "" && resp.BgVersion == "" { + logex.Noticef("cube version empty") + return true + } + if resp.CurVersion == version || resp.BgVersion == version { + logex.Noticef("cube version has matched. version: %s", version) + return false + } + return true +} + +func (work *Work) Reload() (err error) { + if (work.Port == "") { + err = errors.New("Reload with invalid port.") + return + } + if !work.CheckToReload() { + work.writeStatus("finish_reload", "succ") + return + } + work.writeStatus("prepare_reload", "") + + var stdout string + versionPath := getParentDir(work.Version, work.Depend) + bgLoadCmd := "bg_load_base" + if work.Mode == "delta" { + bgLoadCmd = "bg_load_patch" + } + if work.ActiveVersionList == "" { + work.ActiveVersionList = "[]" + } + for i := 0; i < RELOAD_RETRY_TIMES; i++ { + reloadCmd := fmt.Sprintf("curl -o /dev/null -s -w %%{http_code} -d '{\"cmd\":\"%s\",\"version_path\":\"/%s\"}' http://127.0.0.1:%s/ControlService/cmd", bgLoadCmd, versionPath, work.Port) + fmt.Println("reload: ", reloadCmd) + stdout, _, _ = RetryCmd(reloadCmd, 1) + fmt.Println("reload stdout: ", stdout) + if strings.TrimSpace(stdout) == "200" { + logex.Debugf("bg_load_base return succ") + break + } else { + logex.Warning("bg_load_base return failed") + time.Sleep(RELOAD_RETRY_INTERVAL_SECOND * time.Second) + } + } + + if strings.TrimSpace(stdout) == "200" { + work.writeStatus("finish_reload", "succ") + } else { + work.writeStatus("finish_reload", "failed") + err = errors.New("reload failed.") + } + + return +} + +func (work *Work) Clear() (err error) { + work.Service = "" + + var stdout string + var clearCmd string + for i := 0; i < RETRY_TIMES; i++ { + clearCmd = fmt.Sprintf("curl -o /dev/null -s -w %%{http_code} -d '{\"cmd\":\"clear\",\"table_name\":\"%s\"}' http://127.0.0.1:%s/NodeControlService/cmd", work.DictName, work.Port) + fmt.Println("clear: ", clearCmd) + stdout, _, _ = RetryCmd(clearCmd, 1) + fmt.Println("clear stdout: ", stdout) + if strings.TrimSpace(stdout) == "200" { + logex.Debugf("clear return succ") + break + } else { + logex.Warning("clear return failed") + time.Sleep(RELOAD_RETRY_INTERVAL_SECOND * time.Second) + } + } + + if strings.TrimSpace(stdout) == "200" { + err = work.writeStatus("succ", "") + } else { + err = work.writeStatus("failed", "") + } + + return +} + +func (work *Work) Check() (err error) { + if work.Service != "" || work.VersionSign == "" { + return + } + var dataLinkDirs []string + split := len(strings.Split(work.Source, ";")) + dataLinkDirs = GetDataLinkDirs(work.DictName, work.Service, work.Version, work.Depend, + work.DeployPath, work.ShardSeq, split) + + if _, t_err := os.Stat(work.DeployPath); os.IsNotExist(t_err) { + logex.Noticef("check DeployPath[%s] not exists.", work.DeployPath) + return + } + + check_succ := true + for _, linkDir := range dataLinkDirs { + parentDir, _ := filepath.Split(linkDir) + + cmd := fmt.Sprintf("ls -l %s | grep %s | awk '{print $9}' | grep -v data | grep -v index", parentDir, work.Depend) + + stdout, _, err := RetryCmd(cmd, RETRY_TIMES) + if err != nil || stdout == "" { + check_succ = false + break + } else { + versionList := strings.Split(strings.TrimSpace(stdout), "\n") + logex.Noticef("calc ver_sign for [%v]", versionList) + + var version_sign string + var version string + for i := 0; i < len(versionList); i++ { + split_index := strings.Index(versionList[i], "_") + if split_index > 0 && split_index < len(versionList[i]) { + version = versionList[i][split_index+1:] + } else { + version = versionList[i] + } + if version_sign == "" { + version_sign = fmt.Sprintf("%x", md5.Sum([]byte(version))) + } else { + version_sign = fmt.Sprintf("%x", md5.Sum([]byte(version_sign))) + } + } + + if version_sign != work.VersionSign { + logex.Warningf("version_sign check failed. real[%v] expect[%v]", version_sign, work.VersionSign) + check_succ = false + break + } + } + } + + if !check_succ { + work.clearPatchData() + work.clearPatchLink() + master_host, master_port, _ := GetMaster(work.MasterAddress) + cmd := fmt.Sprintf("cd %s && export STRATEGY_DIR=%s && ./downloader -h %s -p %s -d %s -s %d", + work.DeployPath, work.DeployPath, master_host, master_port, work.DictName, work.ShardSeq) + _, _, err = RetryCmd(cmd, RETRY_TIMES) + } + + return +} + +func (work *Work) Enable() (err error) { + if work.Port == "" { + err = errors.New("Enable with invalid port") + return + } + var stdout string + var cmd string + versionPath := getParentDir(work.Version, work.Depend) + for i := 0; i < RELOAD_RETRY_TIMES; i++ { + if work.Service != "" { + cmd = fmt.Sprintf("curl -o /dev/null -s -w %%{http_code} -d '{\"cmd\":\"reload_model\",\"version\":\"%s-%s\",\"dict_name\":\"%s\"}' http://127.0.0.1:%s/ControlService/cmd", + versionPath, work.DictName, work.DictName, work.Port) + } else { + cmd = fmt.Sprintf("curl -o /dev/null -s -w %%{http_code} -d '{\"cmd\":\"enable\",\"version\":\"%s\"}' http://127.0.0.1:%s/ControlService/cmd", versionPath, work.Port) + } + stdout, _, _ = RetryCmd(cmd, 1) + + if strings.TrimSpace(stdout) == "200" { + logex.Debugf("enable return succ for %s, work dir is %s", work.Source, work.DeployPath) + break + } else { + logex.Warningf("enable return failed for %s, work dir is %s, error is (%+v)", work.Source, work.DeployPath, err) + time.Sleep(RELOAD_RETRY_INTERVAL_SECOND * time.Second) + } + } + + if strings.TrimSpace(stdout) == "200" { + err = work.writeStatus("succ", "") + } else { + err = work.writeStatus("failed", "") + } + + if work.Service == "" { + cmd = fmt.Sprintf("curl -o /dev/null -s -w %%{http_code} -d '{\"cmd\":\"bg_unload\"}' http://127.0.0.1:%s/ControlService/cmd", work.Port) + stdout, _, _ = RetryCmd(cmd, RETRY_TIMES) + if strings.TrimSpace(stdout) == "200" { + logex.Debugf("unload return succ") + } else { + logex.Warning("unload return failed") + } + } + + RemoveStateFile(work.DictName, work.ShardSeq, work.Service) + + return +} + +func (work *Work) Pop() (err error) { + var stdout string + var cmd string + if work.ActiveVersionList == "" { + work.ActiveVersionList = "[]" + } + for i := 0; i < RELOAD_RETRY_TIMES; i++ { + cmd = fmt.Sprintf("curl -o /dev/null -s -w %%{http_code} -d '{\"cmd\":\"pop\",\"table_name\":\"%s\",\"active_versions\":%v}' http://127.0.0.1:%s/NodeControlService/cmd", work.DictName, work.ActiveVersionList, work.Port) + fmt.Println("pop: ", cmd) + stdout, _, _ = RetryCmd(cmd, 1) + fmt.Println("pop stdout: ", stdout) + if strings.TrimSpace(stdout) == "200" { + logex.Debugf("pop return succ") + break + } else { + logex.Warning("pop return failed") + time.Sleep(RELOAD_RETRY_INTERVAL_SECOND * time.Second) + } + } + + if strings.TrimSpace(stdout) == "200" { + err = work.writeStatus("succ", "") + } else { + err = work.writeStatus("failed", "") + } + + RemoveStateFile(work.DictName, work.ShardSeq, work.Service) + return +} + +func writeStateFile(dictName string, shardSeq int, service, state string) { + stateFile := fmt.Sprintf(".state_%s_%d", dictName, shardSeq) + if service != "" { + stateFile = stateFile + "_" + service; + } + + cmd := fmt.Sprintf("echo '%s' > %s/state/%s", state, Dir, stateFile) + if _, _, err := RetryCmd(cmd, RETRY_TIMES); err != nil { + logex.Warningf("%s error (%+v)", cmd, err) + } +} + +func RemoveStateFile(dictName string, shardSeq int, service string) { + stateFile := fmt.Sprintf(".state_%s_%d", dictName, shardSeq) + if service != "" { + stateFile = stateFile + "_" + service; + } + + cmd := fmt.Sprintf("rm -f %s/state/%s", Dir, stateFile) + if _, _, err := RetryCmd(cmd, RETRY_TIMES); err != nil { + logex.Warningf("%s error (%+v)", cmd, err) + } +} + +func (work *Work) writeStatus(status string, finishStatus string) (err error) { + work.Status = status + work.FinishStatus = finishStatus + state, _ := json.Marshal(work) + writeStateFile(work.DictName, work.ShardSeq, work.Service, string(state)) + return +} + +func DoDownloadIndividual(source, downloadDir string, isService bool, timeOut int, ch chan error, wg *sync.WaitGroup) { + err := errors.New("DoDownloadIndividual start") + for i := 0; i < RETRY_TIMES; i++ { + err = FtpDownload(source, downloadDir, timeOut) + if err == nil { + logex.Debugf("download %s to %s succ", source, downloadDir) + if !isService { + err = FtpDownload(source+".md5", downloadDir, timeOut) + } + } else { + logex.Warningf("download error , source %s, downloadDir %s, err (%+v)", source, downloadDir, err) + continue + } + + if err == nil && isService { + // touch download_succ file + cmd := fmt.Sprintf("touch %s", path.Join(downloadDir, DOWNLOAD_DONE_MARK_FILE)) + RetryCmd(cmd, RETRY_TIMES) + break + } + + // download md5 file succ, md5check + if err == nil { + // md5sum -c + fileName := GetFileName(source) + err = checkMd5(path.Join(downloadDir, fileName), path.Join(downloadDir, fileName+".md5")) + logex.Warningf("md5sum check %s %s return (%+v)", path.Join(downloadDir, fileName), path.Join(downloadDir, fileName+".md5"), err) + if err == nil { + // touch download_succ file + cmd := fmt.Sprintf("touch %s", path.Join(downloadDir, DOWNLOAD_DONE_MARK_FILE)) + RetryCmd(cmd, RETRY_TIMES) + logex.Debugf("md5sum ok, source is %s, dir is %s", source, downloadDir) + break + } else { + logex.Warningf("md5sum error, source is %s, dir is %s", source, downloadDir) + continue + } + } else { + logex.Warningf("download %s return (%+v)", source+".md5", err) + continue + } + } + + ch <- err + wg.Done() +} + +func checkSources(source string) ([]string, error) { + sources := strings.Split(source, ";") + for i := 0; i < len(sources); i++ { + if sources[i] == "" || !strings.HasPrefix(sources[i], "ftp://") { + return sources, errors.New("Invalid sources") + } + } + return sources, nil +} + +func DoDownload(dictName, service, version, depend, mode, source, deployPath string, + shardSeq int) (err error) { + sources, err := checkSources(source) + if err != nil { + logex.Warningf("checkSources %s return (%+v)", source, err) + return + } + downloadDirs, err := GetDownloadDirs(dictName, service, version, depend, deployPath, shardSeq, + len(sources)) + if err != nil { + logex.Warningf("GetDownloadDirs %s return (%+v)", source, err) + return + } + version_suffix := "" + if service != "" { + version_suffix = version_suffix + "-" + dictName + } + if !checkToDownload(downloadDirs) { + cmd := fmt.Sprintf("cd %s/cube_data && echo %s > VERSION && cp VERSION VERSION-%s", + deployPath, getParentDir(version, depend)+version_suffix, dictName) + _, _, err = RetryCmd(cmd, RETRY_TIMES) + logex.Debugf("echo VERSION cmd:[%s] err:[%v]", cmd, err) + return + } + + ch := make(chan error, len(sources)) + wg := sync.WaitGroup{} + j := 0 + numCo := 0 + for ; j < len(sources); j++ { + if numCo >= MAX_DOWN_CO { + wg.Wait() + logex.Noticef("DoDownload co down.") + numCo = 0 + } + numCo += 1 + wg.Add(1) + time.Sleep(2000 * time.Millisecond) + timeOut := 900 + if mode == "base" { + timeOut = 3600 + } + go DoDownloadIndividual(sources[j], downloadDirs[j], (service != ""), timeOut, ch, &wg) + } + wg.Wait() + close(ch) + for err = range ch { + if err != nil { + return + } + } + cmd := fmt.Sprintf("cd %s/cube_data && echo %s > VERSION && cp VERSION VERSION-%s", + deployPath, getParentDir(version, depend)+version_suffix, dictName) + _, _, err = RetryCmd(cmd, RETRY_TIMES) + logex.Debugf("echo VERSION cmd:[%s] err:[%v]", cmd, err) + return +} + +func FtpDownload(source string, dest string, timeOut int) (err error) { + dlCmd := fmt.Sprintf("wget --quiet --level=100 -P %s %s --limit-rate=10240k", dest, source) + fmt.Println(dlCmd) + + _, _, err = RetryCmdWithSleep(dlCmd, RETRY_TIMES) + return +} + +func checkToDownload(downloadDirs []string) bool { + for _, v := range downloadDirs { + if _, err := os.Stat(path.Join(v, DOWNLOAD_DONE_MARK_FILE)); err != nil { + logex.Noticef("check [%v] not exists.", v) + return true + } + } + + return false +} + +// simple hash +func getDownloadDisk(dictName string, shardSeq int) string { + index := len(dictName) * shardSeq % len(disks) + + return disks[index] +} + +func getParentDir(version string, depend string) (dir string) { + if version == depend { + dir = depend + } else { + dir = depend + "_" + version + } + + return +} + +func GetFileName(source string) (fileName string) { + s := strings.Split(source, "/") + fileName = s[len(s)-1] + + return +} + +func checkMd5(file string, fileMd5 string) (err error) { + md5Cmd := fmt.Sprintf("./scripts/md5_checker %s %s", file, fileMd5) + + _, _, err = pipeline.Run(exec.Command("/bin/sh", "-c", md5Cmd)) + + return +} + +func RetryCmd(cmd string, retryTimes int) (stdoutStr string, stderrStr string, err error) { + for i := 0; i < retryTimes; i++ { + stdout, stderr, e := pipeline.Run(exec.Command("/bin/sh", "-c", cmd)) + stdoutStr = stdout.String() + stderrStr = stderr.String() + err = e + + logex.Debugf("cmd %s, stdout: %s, stderr: %s, err: (%+v)", cmd, stdoutStr, stderrStr, err) + if err == nil { + break + } + } + + return +} + +func RetryCmdWithSleep(cmd string, retryTimes int) (stdoutStr string, stderrStr string, err error) { + for i := 0; i < retryTimes; i++ { + stdout, stderr, e := pipeline.Run(exec.Command("/bin/sh", "-c", cmd)) + stdoutStr = stdout.String() + stderrStr = stderr.String() + err = e + + logex.Debugf("cmd %s, stdout: %s, stderr: %s, err: (%+v)", cmd, stdoutStr, stderrStr, err) + if err == nil { + break + } + time.Sleep(10000 * time.Millisecond) + } + + return +} diff --git a/cube/cube-agent/src/agent/work_pool.go b/cube/cube-agent/src/agent/work_pool.go new file mode 100644 index 0000000000000000000000000000000000000000..5fd3dc6154ed64caa2e4ecd8b6eeb8320e76f112 --- /dev/null +++ b/cube/cube-agent/src/agent/work_pool.go @@ -0,0 +1,107 @@ +package agent + +import ( + "errors" + "fmt" + "sync/atomic" + "sync" +) + +type ( + workType struct { + poolWorker PoolWorker + resultChannel chan error + } + + WorkPool struct { + queueChannel chan workType + workChannel chan PoolWorker + queuedWorkNum int32 + activeWorkerNum int32 + queueCapacity int32 + workFilter sync.Map + } +) + +type PoolWorker interface { + Token() string + DoWork() +} + +func NewWorkPool(workerNum int, queueCapacity int32) *WorkPool { + workPool := WorkPool{ + queueChannel: make(chan workType), + workChannel: make(chan PoolWorker, queueCapacity), + queuedWorkNum: 0, + activeWorkerNum: 0, + queueCapacity: queueCapacity, + } + + for i := 0; i < workerNum; i++ { + go workPool.startWorkRoutine() + } + + go workPool.startQueueRoutine() + + return &workPool +} + +func (workPool *WorkPool) startWorkRoutine() { + for { + select { + case work := <-workPool.workChannel: + workPool.doWork(work) + break + } + } +} + +func (workPool *WorkPool) startQueueRoutine() { + for { + select { + case queueItem := <-workPool.queueChannel: + if atomic.AddInt32(&workPool.queuedWorkNum, 0) == workPool.queueCapacity { + queueItem.resultChannel <- fmt.Errorf("work pool fulled with %v pending works", QueueCapacity) + continue + } + + atomic.AddInt32(&workPool.queuedWorkNum, 1) + + workPool.workChannel <- queueItem.poolWorker + + queueItem.resultChannel <- nil + + break + } + } +} + +func (workPool *WorkPool) doWork(poolWorker PoolWorker) { + defer atomic.AddInt32(&workPool.activeWorkerNum, -1) + defer workPool.workFilter.Delete(poolWorker.Token()) + + atomic.AddInt32(&workPool.queuedWorkNum, -1) + atomic.AddInt32(&workPool.activeWorkerNum, 1) + + poolWorker.DoWork() +} + +func (workPool *WorkPool) PostWorkWithToken(poolWorker PoolWorker) (err error) { + if _, ok := workPool.workFilter.Load(poolWorker.Token()); ok { + return errors.New("another work with same key is doing.") + } + workPool.workFilter.Store(poolWorker.Token(), true) + return workPool.PostWork(poolWorker) +} + +func (workPool *WorkPool) PostWork(poolWorker PoolWorker) (err error) { + work := workType{poolWorker, make(chan error)} + + defer close(work.resultChannel) + + workPool.queueChannel <- work + + err = <-work.resultChannel + + return err +} diff --git a/cube/cube-agent/src/cube-agent.go b/cube/cube-agent/src/cube-agent.go new file mode 100644 index 0000000000000000000000000000000000000000..ee8bc918c14b2c10aeaf32c8e2ef48bcbbfcb518 --- /dev/null +++ b/cube/cube-agent/src/cube-agent.go @@ -0,0 +1,96 @@ +package main + +import ( + "agent" + "fmt" + "github.com/docopt/docopt-go" + "github.com/Badangel/logex" + "os" + "path/filepath" + "runtime" + "strconv" +) + +func main() { + runtime.GOMAXPROCS(runtime.NumCPU()) + + agent.Dir, _ = filepath.Abs(filepath.Dir(os.Args[0])) + usage := fmt.Sprintf(`Usage: ./m_master [options] + +Options: + -n WORKERNUM set worker num. + -q QUEUENUM set queue num. + -P LISTEN_PORT agent listen port + +Log options: + -l LOG_LEVEL set log level, values: 0,1,2,4,8,16. [default: 16] + --log_dir=DIR set log output dir. [default: ./log] + --log_name=NAME set log name. [default: m_agent]`, agent.Dir) + + opts, err := docopt.Parse(usage, nil, true, "Cube Agent Checker 1.0.0", false) + if err != nil { + fmt.Println("ERROR:", err) + os.Exit(1) + } + + log_level, _ := strconv.Atoi(opts["-l"].(string)) + log_name := opts["--log_name"].(string) + log_dir := opts["--log_dir"].(string) + logex.SetLevel(getLogLevel(log_level)) + if err := logex.SetUpFileLogger(log_dir, log_name, nil); err != nil { + fmt.Println("ERROR:", err) + } + + logex.Notice("--- NEW SESSION -------------------------") + logex.Notice(">>> log_level:", log_level) + + agent.WorkerNum = 10 + if opts["-n"] != nil { + n, err := strconv.Atoi(opts["-n"].(string)) + if err == nil { + agent.WorkerNum = n + } + } + + agent.QueueCapacity = 20 + if opts["-q"] != nil { + q, err := strconv.Atoi(opts["-q"].(string)) + if err == nil { + agent.QueueCapacity = int32(q) + } + } + + agent.CmdWorkPool = agent.NewWorkPool(agent.WorkerNum, agent.QueueCapacity) + + if opts["-P"] == nil { + logex.Fatalf("ERROR: -P LISTEN PORT must be set!") + os.Exit(255) + } + + agentPort := opts["-P"].(string) + logex.Notice(">>> starting server...") + addr := ":" + agentPort + + if agent.StartHttp(addr) != nil { + logex.Noticef("cant start http(addr=%v). quit.", addr) + os.Exit(0) + } +} + +func getLogLevel(log_level int) logex.Level { + switch log_level { + case 16: + return logex.DEBUG + case 8: + return logex.TRACE + case 4: + return logex.NOTICE + case 2: + return logex.WARNING + case 1: + return logex.FATAL + case 0: + return logex.NONE + } + return logex.DEBUG +}