diff --git a/cube/CMakeLists.txt b/cube/CMakeLists.txt index 55fd966cf553e013ecbdf50eddc9766471a33075..8273ca482e70b4479a9493dacba69357b5fdcf87 100644 --- a/cube/CMakeLists.txt +++ b/cube/CMakeLists.txt @@ -15,3 +15,4 @@ add_subdirectory(cube-server) add_subdirectory(cube-api) add_subdirectory(cube-builder) +add_subdirectory(cube-transfer) diff --git a/cube/cube-transfer/CMakeLists.txt b/cube/cube-transfer/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..9cee602608f3533cfcbcf93048f101f9ca5b797f --- /dev/null +++ b/cube/cube-transfer/CMakeLists.txt @@ -0,0 +1,12 @@ +set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/cmake") + +project(cube-transfer Go) + +include(cmake/golang.cmake) + +ExternalGoProject_Add(docopt-go github.com/docopt/docopt-go) +ExternalGoProject_Add(rfw github.com/mipearson/rfw) +ExternalGoProject_Add(logex github.com/Badangel/logex) + +add_subdirectory(src) +install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/conf DESTINATION ${PADDLE_SERVING_INSTALL_DIR}) diff --git a/cube/cube-transfer/cmake/CMakeDetermineGoCompiler.cmake b/cube/cube-transfer/cmake/CMakeDetermineGoCompiler.cmake new file mode 100644 index 0000000000000000000000000000000000000000..ff11feb3fd9ffb71a9af65b4ecb2a4086732ec14 --- /dev/null +++ b/cube/cube-transfer/cmake/CMakeDetermineGoCompiler.cmake @@ -0,0 +1,44 @@ +if(NOT CMAKE_Go_COMPILER) + if(NOT $ENV{GO_COMPILER} STREQUAL "") + get_filename_component(CMAKE_Go_COMPILER_INIT $ENV{GO_COMPILER} PROGRAM PROGRAM_ARGS CMAKE_Go_FLAGS_ENV_INIT) + + if(CMAKE_Go_FLAGS_ENV_INIT) + set(CMAKE_Go_COMPILER_ARG1 "${CMAKE_Go_FLAGS_ENV_INIT}" CACHE STRING "First argument to Go compiler") + endif() + + if(NOT EXISTS ${CMAKE_Go_COMPILER_INIT}) + message(SEND_ERROR "Could not find compiler set in environment variable GO_COMPILER:\n$ENV{GO_COMPILER}.") + endif() + + endif() + + set(Go_BIN_PATH + $ENV{GOPATH} + $ENV{GOROOT} + $ENV{GOROOT}/../bin + $ENV{GO_COMPILER} + /usr/bin + /usr/local/bin + ) + + if(CMAKE_Go_COMPILER_INIT) + set(CMAKE_Go_COMPILER ${CMAKE_Go_COMPILER_INIT} CACHE PATH "Go Compiler") + else() + find_program(CMAKE_Go_COMPILER + NAMES go + PATHS ${Go_BIN_PATH} + ) + EXEC_PROGRAM(${CMAKE_Go_COMPILER} ARGS version OUTPUT_VARIABLE GOLANG_VERSION) + STRING(REGEX MATCH "go[0-9]+.[0-9]+.[0-9]+[ /A-Za-z0-9]*" VERSION "${GOLANG_VERSION}") + message("-- The Golang compiler identification is ${VERSION}") + message("-- Check for working Golang compiler: ${CMAKE_Go_COMPILER}") + endif() + +endif() + +mark_as_advanced(CMAKE_Go_COMPILER) + +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/cmake/CMakeGoCompiler.cmake.in + ${CMAKE_PLATFORM_INFO_DIR}/CMakeGoCompiler.cmake @ONLY) + +set(CMAKE_Go_COMPILER_ENV_VAR "GO_COMPILER") \ No newline at end of file diff --git a/cube/cube-transfer/cmake/CMakeGoCompiler.cmake.in b/cube/cube-transfer/cmake/CMakeGoCompiler.cmake.in new file mode 100644 index 0000000000000000000000000000000000000000..a71f08e064656fbaad8cfa77aea6f216515712ef --- /dev/null +++ b/cube/cube-transfer/cmake/CMakeGoCompiler.cmake.in @@ -0,0 +1,8 @@ +set(CMAKE_Go_COMPILER "@CMAKE_Go_COMPILER@") +set(CMAKE_Go_COMPILER_LOADED 1) + +set(CMAKE_Go_SOURCE_FILE_EXTENSIONS go) +set(CMAKE_Go_LINKER_PREFERENCE 40) +set(CMAKE_Go_OUTPUT_EXTENSION .o) +set(CMAKE_Go_OUTPUT_EXTENSION_REPLACE 1) +set(CMAKE_Go_COMPILER_ENV_VAR "GO_COMPILER") diff --git a/cube/cube-transfer/cmake/CMakeGoInformation.cmake b/cube/cube-transfer/cmake/CMakeGoInformation.cmake new file mode 100644 index 0000000000000000000000000000000000000000..128651513e4b79dd4d5d3e3c881baf2929b08465 --- /dev/null +++ b/cube/cube-transfer/cmake/CMakeGoInformation.cmake @@ -0,0 +1,7 @@ +if(NOT CMAKE_Go_COMPILE_OBJECT) + set(CMAKE_Go_COMPILE_OBJECT "go tool compile -l -N -o ") +endif() + +if(NOT CMAKE_Go_LINK_EXECUTABLE) + set(CMAKE_Go_LINK_EXECUTABLE "go tool link -o ") +endif() \ No newline at end of file diff --git a/cube/cube-transfer/cmake/CMakeTestGoCompiler.cmake b/cube/cube-transfer/cmake/CMakeTestGoCompiler.cmake new file mode 100644 index 0000000000000000000000000000000000000000..c1544ea9bc77498346fdf607d2a95873f8a55809 --- /dev/null +++ b/cube/cube-transfer/cmake/CMakeTestGoCompiler.cmake @@ -0,0 +1 @@ +set(CMAKE_Go_COMPILER_WORKS 1 CACHE INTERNAL "") \ No newline at end of file diff --git a/cube/cube-transfer/cmake/golang.cmake b/cube/cube-transfer/cmake/golang.cmake new file mode 100644 index 0000000000000000000000000000000000000000..2b70b29051c5fb5c3c22762e58fdf796fe617918 --- /dev/null +++ b/cube/cube-transfer/cmake/golang.cmake @@ -0,0 +1,46 @@ +set(GOPATH "${CMAKE_CURRENT_LIST_DIR}/../") +file(MAKE_DIRECTORY ${GOPATH}) + +function(ExternalGoProject_Add TARG) + add_custom_target(${TARG} env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} get ${ARGN}) +endfunction(ExternalGoProject_Add) + +function(add_go_executable NAME) + file(GLOB GO_SOURCE RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "*.go") + add_custom_command(OUTPUT ${OUTPUT_DIR}/.timestamp + COMMAND env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} build + -o "${CMAKE_CURRENT_BINARY_DIR}/${NAME}" + ${CMAKE_GO_FLAGS} ${GO_SOURCE} + WORKING_DIRECTORY ${CMAKE_CURRENT_LIST_DIR}) + + add_custom_target(${NAME} ALL DEPENDS ${OUTPUT_DIR}/.timestamp ${ARGN}) + install(PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/${NAME} DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/bin) +endfunction(add_go_executable) + + +function(ADD_GO_LIBRARY NAME BUILD_TYPE) + if(BUILD_TYPE STREQUAL "STATIC") + set(BUILD_MODE -buildmode=c-archive) + set(LIB_NAME "lib${NAME}.a") + else() + set(BUILD_MODE -buildmode=c-shared) + if(APPLE) + set(LIB_NAME "lib${NAME}.dylib") + else() + set(LIB_NAME "lib${NAME}.so") + endif() + endif() + + file(GLOB GO_SOURCE RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "*.go") + add_custom_command(OUTPUT ${OUTPUT_DIR}/.timestamp + COMMAND env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} build ${BUILD_MODE} + -o "${CMAKE_CURRENT_BINARY_DIR}/${LIB_NAME}" + ${CMAKE_GO_FLAGS} ${GO_SOURCE} + WORKING_DIRECTORY ${CMAKE_CURRENT_LIST_DIR}) + + add_custom_target(${NAME} ALL DEPENDS ${OUTPUT_DIR}/.timestamp ${ARGN}) + + if(NOT BUILD_TYPE STREQUAL "STATIC") + install(PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/${LIB_NAME} DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/bin) + endif() +endfunction(ADD_GO_LIBRARY) \ No newline at end of file diff --git a/cube/cube-transfer/conf/transfer.conf b/cube/cube-transfer/conf/transfer.conf new file mode 100755 index 0000000000000000000000000000000000000000..c3184231f95d259de1adcfa8a20e136dc756a3b9 --- /dev/null +++ b/cube/cube-transfer/conf/transfer.conf @@ -0,0 +1,18 @@ +[default] +dict_name: test_dict +mode: base_only +storage_place: LOCAL +buildtool_local: /home/work/Serving/build/output/bin/cube-builder +donefile_address: http://127.0.0.1/home/work/dangyifei/donefile +output_address: /home/work/dangyifei/test-transfer/test_data/output +tmp_address: /home/work/dangyifei/test-transfer/test_data/tmp +shard_num: 1 +copy_num: 2 +deploy_path: /home/work/test_dict +transfer_address: 127.0.0.1 + +[cube_agent] +agent0_0: 127.0.0.1:8001 +cube0_0: 127.0.0.1:8000:/ssd2/cube_open +agent0_1: 127.0.0.1:8001 +cube0_1: 127.0.0.1:8000:/home/disk1/cube_open diff --git a/cube/cube-transfer/src/CMakeLists.txt b/cube/cube-transfer/src/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..db1865127d1f8058fad8e69522722a294df2e0dd --- /dev/null +++ b/cube/cube-transfer/src/CMakeLists.txt @@ -0,0 +1,6 @@ + +set(SOURCE_FILE cube-transfer.go) +add_go_executable(cube-transfer ${SOURCE_FILE}) +add_dependencies(cube-transfer docopt-go) +add_dependencies(cube-transfer rfw) +add_dependencies(cube-transfer logex) diff --git a/cube/cube-transfer/src/cube-transfer.go b/cube/cube-transfer/src/cube-transfer.go new file mode 100755 index 0000000000000000000000000000000000000000..3e63d8bbcce6d41524eb53788ae069b9418d698b --- /dev/null +++ b/cube/cube-transfer/src/cube-transfer.go @@ -0,0 +1,226 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "fmt" + "github.com/docopt/docopt-go" + "github.com/Badangel/logex" + "os" + "path/filepath" + "runtime" + "strconv" + "strings" + "time" + "transfer" + "transfer/dict" +) + +func main() { + runtime.GOMAXPROCS(runtime.NumCPU()) + transfer.Dir, _ = filepath.Abs(filepath.Dir(os.Args[0])) + + usage := fmt.Sprintf(`Usage: ./m_master [options] + +Options: + -p PORT set listen port. [default: 8099] + --config=conf/transfer.conf set conf file. [defalut: ./conf/transfer.conf] + +Log options: + -l LOG_LEVEL set log level, values: 0,1,2,4,8,16. [default: 4] + --log_dir=DIR set log output dir. [default: ./log] + --log_name=NAME set log name. [default: transfer]`, + transfer.Dir) + opts, err := docopt.Parse(usage, nil, true, "Cube Transfer", false) + if err != nil { + fmt.Println("ERROR:", err) + os.Exit(1) + } + log_level, _ := strconv.Atoi(opts["-l"].(string)) + log_name := opts["--log_name"].(string) + log_dir := opts["--log_dir"].(string) + logex.SetLevel(getLogLevel(log_level)) + if err := logex.SetUpFileLogger(log_dir, log_name, nil); err != nil { + fmt.Println("ERROR:", err) + } + + fmt.Printf("%v: Print stdout1 here...\n", time.Now()) + os.Stderr.WriteString(fmt.Sprintf("%v: Print stderr here...\n", time.Now())) + + logex.Notice("--- NEW SESSION -------------------------") + logex.Notice(">>> log_level:", log_level) + + // settings: + if opts["-p"] == nil { + fmt.Fprintln(os.Stderr, "ERROR: -p PORT must be set!") + fmt.Fprintln(os.Stderr, usage) + os.Exit(1) + } + transfer.Port = opts["-p"].(string) + logex.Notice(">>> port:", transfer.Port) + + if opts["--config"] == nil { + fmt.Fprintln(os.Stderr, "ERROR: --config config_file must be set!") + fmt.Fprintln(os.Stderr, usage) + os.Exit(1) + } + //read conf + var configMgr transfer.ConfigManager + configMgr.Init(opts["--config"].(string)) + transfer.Dict.DictName = configMgr.Read("default", "dict_name") + if transfer.Dict.DictName == "" { + fmt.Fprintln(os.Stderr, "ERROR: nead [default] DictName in config_file!") + fmt.Fprintln(os.Stderr, usage) + os.Exit(1) + } + logex.Notice(">>> DictName:", transfer.Dict.DictName) + + transfer.Dict.DictMode = configMgr.Read("default", "mode") + if transfer.Dict.DictMode == "" { + fmt.Fprintln(os.Stderr, "ERROR: nead [default] DictMode in config_file!") + fmt.Fprintln(os.Stderr, usage) + os.Exit(1) + } + logex.Notice(">>> Mode:", transfer.Dict.DictMode) + + transfer.Dict.StoragePlace = configMgr.Read("default", "storage_place") + if transfer.Dict.StoragePlace == "" || transfer.Dict.StoragePlace != "LOCAL" { + fmt.Fprintln(os.Stderr, "ERROR: nead [default] StoragePlace in config_file! only support Local") + fmt.Fprintln(os.Stderr, usage) + os.Exit(1) + } + logex.Notice(">>> StoragePlace:", transfer.Dict.StoragePlace) + + transfer.BuildToolLocal = configMgr.Read("default", "buildtool_local") + if transfer.BuildToolLocal == "" { + fmt.Fprintln(os.Stderr, "ERROR: nead [default] BuildToolLocal in config_file!") + fmt.Fprintln(os.Stderr, usage) + os.Exit(1) + } + logex.Notice(">>> BuildToolLocal:", transfer.BuildToolLocal) + + transfer.Dict.DonefileAddress = configMgr.Read("default", "donefile_address") + if transfer.Dict.DonefileAddress == "" { + fmt.Fprintln(os.Stderr, "ERROR: nead [default] DonefileAddress in config_file!") + fmt.Fprintln(os.Stderr, usage) + os.Exit(1) + } + logex.Notice(">>> DonefileAddress:", transfer.Dict.DonefileAddress) + + transfer.Dict.OutputAddress = configMgr.Read("default", "output_address") + if transfer.Dict.OutputAddress == "" { + fmt.Fprintln(os.Stderr, "ERROR: nead [default] OutputAddress in config_file!") + fmt.Fprintln(os.Stderr, usage) + os.Exit(1) + } + logex.Notice(">>> OutputAddress:", transfer.Dict.OutputAddress) + + transfer.Dict.TmpAddress = configMgr.Read("default", "tmp_address") + if transfer.Dict.TmpAddress == "" { + fmt.Fprintln(os.Stderr, "ERROR: nead [default] TmpAddress in config_file!") + fmt.Fprintln(os.Stderr, usage) + os.Exit(1) + } + logex.Notice(">>> TmpAddress:", transfer.Dict.TmpAddress) + + ShardNumStr := configMgr.Read("default", "shard_num") + if ShardNumStr == "" { + fmt.Fprintln(os.Stderr, "ERROR: nead [default] ShardNum in config_file!") + fmt.Fprintln(os.Stderr, usage) + os.Exit(1) + } + transfer.Dict.ShardNum, err = strconv.Atoi(ShardNumStr) + if err != nil { + logex.Fatal("ShardNum form is not right") + os.Exit(1) + } + logex.Notice(">>> ShardNum:", transfer.Dict.ShardNum) + + CopyNumStr := configMgr.Read("default", "copy_num") + if CopyNumStr == "" { + fmt.Fprintln(os.Stderr, "ERROR: nead [default] CopyNum in config_file!") + fmt.Fprintln(os.Stderr, usage) + os.Exit(1) + } + transfer.Dict.CopyNum, err = strconv.Atoi(CopyNumStr) + if err != nil { + logex.Fatal("ShardNum form is not right") + os.Exit(1) + } + logex.Notice(">>> CopyNum:", transfer.Dict.CopyNum) + + transfer.Dict.InstancesNum = transfer.Dict.ShardNum * transfer.Dict.CopyNum + + transfer.Dict.DeployPath = configMgr.Read("default", "deploy_path") + if transfer.Dict.DeployPath == "" { + fmt.Fprintln(os.Stderr, "ERROR: nead [default] DeployPath in config_file!") + fmt.Fprintln(os.Stderr, usage) + os.Exit(1) + } + logex.Notice(">>> DeployPath:", transfer.Dict.DeployPath) + + transfer.TransferAddr = configMgr.Read("default", "transfer_address") + if transfer.TransferAddr == "" { + fmt.Fprintln(os.Stderr, "ERROR: nead [default] TransferAddr in config_file!") + fmt.Fprintln(os.Stderr, usage) + os.Exit(1) + } + logex.Notice(">>> TransferAddr:", transfer.TransferAddr) + + for i := 0; i < transfer.Dict.ShardNum; i++ { + for j := 0; j < transfer.Dict.CopyNum; j++ { + var instance dict.DictInstance + agentName := fmt.Sprintf("agent%d_%d", i, j) + agentInfo := configMgr.Read("cube_agent", agentName) + agentInfoSlice := strings.Split(agentInfo, ":") + cubeName := fmt.Sprintf("cube%d_%d", i, j) + cubeInfo := configMgr.Read("cube_agent", cubeName) + cubeInfoSlice := strings.Split(cubeInfo, ":") + instance.DictName = transfer.Dict.DictName + instance.AgentIp = agentInfoSlice[0] + instance.AgentPort, _ = strconv.Atoi(agentInfoSlice[1]) + instance.IP = cubeInfoSlice[0] + instance.Port, _ = strconv.Atoi(cubeInfoSlice[1]) + instance.DictName = transfer.Dict.DictName + instance.CreateTime = int(time.Now().Unix()) + instance.Shard = i + instance.DeployPath = cubeInfoSlice[2] + transfer.Dict.Instances = append(transfer.Dict.Instances, instance) + } + } + logex.Noticef(">>> instance: %v", transfer.Dict.Instances) + + transfer.Start() + fmt.Print("m-cube-transfer over!") + +} + +func getLogLevel(log_level int) logex.Level { + switch log_level { + case 16: + return logex.DEBUG + case 8: + return logex.TRACE + case 4: + return logex.NOTICE + case 2: + return logex.WARNING + case 1: + return logex.FATAL + case 0: + return logex.NONE + } + return logex.DEBUG +} diff --git a/cube/cube-transfer/src/transfer/builder.go b/cube/cube-transfer/src/transfer/builder.go new file mode 100755 index 0000000000000000000000000000000000000000..cfc519ffeecff5c5e0ecebfb540cdb49570f037f --- /dev/null +++ b/cube/cube-transfer/src/transfer/builder.go @@ -0,0 +1,97 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transfer + +import ( + "fmt" + "github.com/Badangel/logex" + "strconv" + "strings" + "time" + "transfer/dict" +) + +func BuilderStart(versionInfo dict.DictVersionInfo) error { + fmt.Printf("[entry]start build\n") + logex.Noticef("[entry]start build") + if strings.HasPrefix(Dict.DonefileAddress, dict.FTP_HEADER) || strings.HasPrefix(Dict.DonefileAddress, dict.HTTP_HEADER) { + if strings.HasPrefix(Dict.DonefileAddress, dict.FTP_HEADER){ + s1 := strings.Replace(Dict.DonefileAddress, dict.FTP_HEADER,"",1) + index:=strings.Index(s1,"/") + versionInfo.Input = dict.FTP_HEADER + s1[0 : index] + versionInfo.Input + } + if strings.HasPrefix(Dict.DonefileAddress, dict.HTTP_HEADER){ + s1 := strings.Replace(Dict.DonefileAddress, dict.HTTP_HEADER,"",1) + index:=strings.Index(s1,"/") + versionInfo.Input = dict.HTTP_HEADER + s1[0 : index] + versionInfo.Input + } + + localInputPath := Dict.TmpAddress + "/../input/" + Dict.DictName + "_" + strconv.Itoa(versionInfo.Version) + "_" + strconv.Itoa(versionInfo.Depend) + Wget(versionInfo.Input, localInputPath) + versionInfo.Input = localInputPath + } + Dict.WaitVersionInfo.Output = BuildIndex(versionInfo) + InitAllInstances() + + return nil +} + +func BuildIndex(versionInfo dict.DictVersionInfo) string { + versionStr := strconv.Itoa(versionInfo.Version) + dependStr := strconv.Itoa(versionInfo.Depend) + + var params []string + params = append(params, "-dict_name="+Dict.DictName) + params = append(params, "-job_mode="+Dict.WaitVersionInfo.Mode) + curlen := len(Dict.CurrentVersionInfo) + lastVersion := "0" + if curlen > 0 && Dict.WaitVersionInfo.Key == Dict.CurrentVersionInfo[curlen-1].Key { + lastVersion = strconv.Itoa(Dict.CurrentVersionInfo[curlen-1].Version) + } + + shardNum := strconv.Itoa(Dict.ShardNum) + params = append(params, "-last_version="+lastVersion) + params = append(params, "-cur_version="+versionStr) + params = append(params, "-depend_version="+dependStr) + params = append(params, "-input_path="+versionInfo.Input) + params = append(params, "-output_path="+Dict.OutputAddress) + params = append(params, "-shard_num="+shardNum) + params = append(params, "-master_address="+TransferAddr+":"+Port) + params = append(params, "-only_build=false") + err := ExeCommad(BuildToolLocal, params) + if err != nil { + fmt.Printf("build exe: %v\n", err) + logex.Noticef("[builder] exe cmd err: %v", err) + } + + outPut := Dict.OutputAddress + "/" + dependStr + "_" + versionStr + return outPut +} + +func InitAllInstances() { + for i, _ := range Dict.Instances { + Dict.Instances[i].Status = dict.Instance_Status_Init + Dict.Instances[i].BuildedTime = int(time.Now().Unix()) + Dict.Instances[i].DownloadStartTime = 0 + Dict.Instances[i].DownloadedTime = 0 + Dict.Instances[i].ReloadStartTime = 0 + Dict.Instances[i].ReloadedTime = 0 + Dict.Instances[i].EnablStartTime = 0 + Dict.Instances[i].EnabledTime = 0 + } + Dict.DownloadSuccInsts = 0 + Dict.ReloadSuccInsts = 0 + Dict.EnableSuccInsts = 0 +} diff --git a/cube/cube-transfer/src/transfer/config.go b/cube/cube-transfer/src/transfer/config.go new file mode 100755 index 0000000000000000000000000000000000000000..174dab383d189584c7b647bd40b28a3fe0148da8 --- /dev/null +++ b/cube/cube-transfer/src/transfer/config.go @@ -0,0 +1,112 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transfer + +import ( + "bufio" + "io" + "os" + "strings" +) + +const middle = "->" + +type ConfigManager struct { + Mymap map[string]string + strcet string +} + +func (c *ConfigManager) Init(path string) { + c.Mymap = make(map[string]string) + + f, err := os.Open(path) + if err != nil { + panic(err) + } + defer f.Close() + + r := bufio.NewReader(f) + for { + b, _, err := r.ReadLine() + if err != nil { + if err == io.EOF { + break + } + panic(err) + } + + s := strings.TrimSpace(string(b)) + if strings.Index(s, "#") == 0 { + continue + } + + n1 := strings.Index(s, "[") + n2 := strings.LastIndex(s, "]") + if n1 > -1 && n2 > -1 && n2 > n1+1 { + c.strcet = strings.TrimSpace(s[n1+1 : n2]) + continue + } + + if len(c.strcet) == 0 { + continue + } + index := strings.Index(s, ":") + if index < 0 { + continue + } + + frist := strings.TrimSpace(s[:index]) + if len(frist) == 0 { + continue + } + second := strings.TrimSpace(s[index+1:]) + + pos := strings.Index(second, "\t#") + if pos > -1 { + second = second[0:pos] + } + + pos = strings.Index(second, " #") + if pos > -1 { + second = second[0:pos] + } + + pos = strings.Index(second, "\t//") + if pos > -1 { + second = second[0:pos] + } + + pos = strings.Index(second, " //") + if pos > -1 { + second = second[0:pos] + } + + if len(second) == 0 { + continue + } + + key := c.strcet + middle + frist + c.Mymap[key] = strings.TrimSpace(second) + } +} + +func (c ConfigManager) Read(node, key string) string { + key = node + middle + key + v, found := c.Mymap[key] + if !found { + return "" + } + return v +} \ No newline at end of file diff --git a/cube/cube-transfer/src/transfer/deployer.go b/cube/cube-transfer/src/transfer/deployer.go new file mode 100755 index 0000000000000000000000000000000000000000..6c4b1ca8b42b5e13556d8ad71c43b26bfdb0cf14 --- /dev/null +++ b/cube/cube-transfer/src/transfer/deployer.go @@ -0,0 +1,203 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transfer + +import ( + "fmt" + "github.com/Badangel/logex" + "strconv" + "time" + "transfer/dict" +) + +func DeployStart(versionInfo dict.DictVersionInfo) error { + fmt.Printf("[entry]start deploy\n") + logex.Noticef("[entry]start deploy") + var err error + for { + switch Dict.WaitVersionInfo.Status { + case dict.Dict_Status_Deploying, dict.Dict_Status_Downloading: + CmdInstsDownload() + case dict.Dict_Status_Download_Succ, dict.Dict_Status_Reloading: + CmdInstsReload() + case dict.Dict_Status_Reload_Succ, dict.Dict_Status_Enabling: + CmdInstsEnable() + default: + logex.Noticef("dict status %v", Dict.WaitVersionInfo.Status) + } + if Dict.WaitVersionInfo.Status == dict.Dict_Status_Finished { + Dict.WaitVersionInfo.FinishTime = int(time.Now().Unix()) + Dict.CurrentVersionInfo = append(Dict.CurrentVersionInfo, Dict.WaitVersionInfo) + fmt.Printf("[deploy finish]version:%v\n", Dict.WaitVersionInfo) + logex.Noticef("[deploy finish]version:%v", Dict.WaitVersionInfo) + var newVersionInfo dict.DictVersionInfo + Dict.WaitVersionInfo = newVersionInfo + WriteCurrentVersionInfoToFile() + WriteWaitVersionInfoToFile() + break + } else { + time.Sleep(time.Duration(10) * time.Second) + } + } + return err +} + +func CmdInstsDownload() { + for { + chs := make([]chan error, len(Dict.Instances)) + keyAndRespSlice := make([]dict.CubeAgentResponse, len(Dict.Instances)) + for i, inst := range Dict.Instances { + if inst.Status != dict.Instance_Status_Download_Succ { + chs[i] = make(chan error) + var json_params dict.CubeAgentRequest + json_params.Command = dict.DOWNLOAD + json_params.DictName = inst.DictName + json_params.DeployPath = inst.DeployPath + json_params.Version = strconv.Itoa(Dict.WaitVersionInfo.Version) + json_params.Depend = strconv.Itoa(Dict.WaitVersionInfo.Depend) + json_params.Id = strconv.Itoa(Dict.WaitVersionInfo.Id) + json_params.Key = strconv.Itoa(Dict.WaitVersionInfo.Key) + json_params.Mode = Dict.WaitVersionInfo.Mode + json_params.ShardSeq = inst.Shard + json_params.Port = strconv.Itoa(inst.Port) + json_params.Source = dict.GetFileHead(Dict.StoragePlace, TransferAddr) + Dict.WaitVersionInfo.Output + "/" + json_params.DictName + "_part" + strconv.Itoa(inst.Shard) + ".tar" + var address = fmt.Sprintf("http://%v:%v/agent/cmd", inst.AgentIp, inst.AgentPort) + logex.Noticef("[download cmd]%v:%v", address, json_params) + go nonBlockSendJsonReq("POST2", address, 120, &json_params, &keyAndRespSlice[i], chs[i]) + Dict.Instances[i].DownloadStartTime = int(time.Now().Unix()) + Dict.Instances[i].Mode = Dict.WaitVersionInfo.Mode + } + } + for i, inst := range Dict.Instances { + err := <-chs[i] + logex.Noticef("[instance resp]download:%v", Dict.Instances) + if err != nil || keyAndRespSlice[i].Success != "0" { + logex.Warningf("cmd cube online downlaod of %v:%v, shard:%v failed", inst.AgentIp, inst.AgentPort, inst.Shard) + continue + } + if inst.Status < dict.Instance_Status_Download_Succ { + Dict.Instances[i].Status = dict.Instance_Status_Download_Succ + Dict.Instances[i].DownloadedTime = int(time.Now().Unix()) + Dict.DownloadSuccInsts++ + } + } + if Dict.DownloadSuccInsts == Dict.InstancesNum { + Dict.WaitVersionInfo.Status = dict.Dict_Status_Download_Succ + fmt.Printf("[all download ok]inst :%v\n", Dict.Instances) + logex.Noticef("[all download ok]inst :%v", Dict.Instances) + break + } + time.Sleep(5 * time.Second) + } +} + +func CmdInstsReload() { + for { + chs := make([]chan error, len(Dict.Instances)) + keyAndRespSlice := make([]dict.CubeAgentResponse, len(Dict.Instances)) + for i, inst := range Dict.Instances { + if inst.Status != dict.Instance_Status_Reload_Succ { + chs[i] = make(chan error) + var json_params dict.CubeAgentRequest + json_params.Command = dict.RELOAD + json_params.DictName = inst.DictName + json_params.DeployPath = inst.DeployPath + json_params.Version = strconv.Itoa(Dict.WaitVersionInfo.Version) + json_params.Depend = strconv.Itoa(Dict.WaitVersionInfo.Depend) + json_params.Id = strconv.Itoa(Dict.WaitVersionInfo.Id) + json_params.Key = strconv.Itoa(Dict.WaitVersionInfo.Key) + json_params.Mode = Dict.WaitVersionInfo.Mode + json_params.ShardSeq = inst.Shard + json_params.Port = strconv.Itoa(inst.Port) + json_params.Source = dict.GetFileHead(Dict.StoragePlace, TransferAddr) + Dict.WaitVersionInfo.Output + "/" + json_params.DictName + "_part" + strconv.Itoa(inst.Shard) + ".tar" + + var address = fmt.Sprintf("http://%v:%v/agent/cmd", inst.AgentIp, inst.AgentPort) + logex.Noticef("[reload cmd]%v:%v", address, json_params) + go nonBlockSendJsonReq("POST2", address, 120, &json_params, &keyAndRespSlice[i], chs[i]) + Dict.Instances[i].ReloadStartTime = int(time.Now().Unix()) + } + } + for i, inst := range Dict.Instances { + err := <-chs[i] + logex.Noticef("[instance resp]reload:%v", Dict.Instances) + if err != nil || keyAndRespSlice[i].Success != "0" { + logex.Warningf("cmd cube online reload of %v:%v, shard:%v failed", inst.AgentIp, inst.AgentPort, inst.Shard) + continue + } + if inst.Status < dict.Instance_Status_Reload_Succ { + Dict.Instances[i].Status = dict.Instance_Status_Reload_Succ + Dict.Instances[i].ReloadedTime = int(time.Now().Unix()) + Dict.ReloadSuccInsts++ + } + } + if Dict.ReloadSuccInsts == Dict.InstancesNum { + Dict.WaitVersionInfo.Status = dict.Dict_Status_Reload_Succ + fmt.Printf("[all reload ok]inst:%v\n", Dict.Instances) + logex.Noticef("[all reload ok]inst :%v", Dict.Instances) + break + } + time.Sleep(5 * time.Second) + } +} + +func CmdInstsEnable() { + for { + chs := make([]chan error, len(Dict.Instances)) + keyAndRespSlice := make([]dict.CubeAgentResponse, len(Dict.Instances)) + for i, inst := range Dict.Instances { + if inst.Status != dict.Instance_Status_Enable_Succ { + chs[i] = make(chan error) + var json_params dict.CubeAgentRequest + json_params.Command = dict.ENABLE + json_params.DictName = inst.DictName + json_params.DeployPath = inst.DeployPath + json_params.Version = strconv.Itoa(Dict.WaitVersionInfo.Version) + json_params.Depend = strconv.Itoa(Dict.WaitVersionInfo.Depend) + json_params.Id = strconv.Itoa(Dict.WaitVersionInfo.Id) + json_params.Key = strconv.Itoa(Dict.WaitVersionInfo.Key) + json_params.Mode = Dict.WaitVersionInfo.Mode + json_params.ShardSeq = inst.Shard + json_params.Port = strconv.Itoa(inst.Port) + json_params.Source = dict.GetFileHead(Dict.StoragePlace, TransferAddr) + Dict.WaitVersionInfo.Output + "/" + json_params.DictName + "_part" + strconv.Itoa(inst.Shard) + ".tar" + + var address = fmt.Sprintf("http://%v:%v/agent/cmd", inst.AgentIp, inst.AgentPort) + logex.Noticef("[enable cmd]%v:%v", address, json_params) + go nonBlockSendJsonReq("POST2", address, 120, &json_params, &keyAndRespSlice[i], chs[i]) + Dict.Instances[i].EnablStartTime = int(time.Now().Unix()) + } + } + for i, inst := range Dict.Instances { + err := <-chs[i] + logex.Noticef("[instance resp]enable:%v", Dict.Instances) + if err != nil || keyAndRespSlice[i].Success != "0" { + logex.Warningf("cmd cube online enable of %v:%v, shard:%v failed", inst.AgentIp, inst.AgentPort, inst.Shard) + continue + } + if inst.Status < dict.Instance_Status_Enable_Succ { + Dict.Instances[i].Status = dict.Instance_Status_Enable_Succ + Dict.Instances[i].EnabledTime = int(time.Now().Unix()) + Dict.EnableSuccInsts++ + } + } + if Dict.EnableSuccInsts == Dict.InstancesNum { + Dict.WaitVersionInfo.Status = dict.Dict_Status_Finished + fmt.Printf("[all enable ok]inst :%v\n", Dict.Instances) + logex.Noticef("[all enable ok]inst :%v", Dict.Instances) + + break + } + time.Sleep(5 * time.Second) + } +} diff --git a/cube/cube-transfer/src/transfer/dict/cube_agent_server.go b/cube/cube-transfer/src/transfer/dict/cube_agent_server.go new file mode 100755 index 0000000000000000000000000000000000000000..2b81e069dcf1db400f4ef850128e07afcd2be351 --- /dev/null +++ b/cube/cube-transfer/src/transfer/dict/cube_agent_server.go @@ -0,0 +1,41 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dict + +type CubeAgentRequest struct { + Command string `json:"command"` + DictName string `json:"dict_name"` + DeployPath string `json:"deploy_path"` + Version string `json:"version"` + Depend string `json:"depend"` + Id string `json:"id"` + Key string `json:"key"` + Mode string `json:"mode"` + ShardSeq int `json:"shard_seq"` + Source string `json:"source"` + Service string `json:"service,omitempty"` + SlotIdList string `json:"slot_id_list,omitempty"` + ActiveVersionList string `json:"active_version_list,omitempty"` + Port string `json:"port,omitempty"` + VersionSign string `json:"version_sign,omitempty"` +} + + +type CubeAgentResponse struct { + Success string `json:"success"` + Message string `json:"message"` + Data string `json:"data"` +} + diff --git a/cube/cube-transfer/src/transfer/dict/define.go b/cube/cube-transfer/src/transfer/dict/define.go new file mode 100755 index 0000000000000000000000000000000000000000..fa7ecf7d26bd34c9de45ee01a47d04f9e4ec743b --- /dev/null +++ b/cube/cube-transfer/src/transfer/dict/define.go @@ -0,0 +1,273 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dict + +import "errors" + +var ( + // Dict Mode + BASE_ONLY = "base_only" + BASR_DELTA = "base_delta" + + // Deploy Mode + BASE = "base" + DELTA = "delta" + + // Succ or Failed Status + SUCC = "succ" + FAILED = "failed" + + //command + DOWNLOAD = "download" + RELOAD = "reload" + ENABLE = "enable" + + FTP_HEADER = "ftp://" + HTTP_HEADER = "http://" +) + +// Dict Status +type DictStatus int + +const ( + // Dict Status + //clear状态编码参考InstanceStatus + Dict_Status_Clearing DictStatus = 1 + Dict_Status_Cleared DictStatus = 2 + Dict_Status_Trigging DictStatus = 10 + Dict_Status_Building DictStatus = 20 + Dict_Status_Deploying DictStatus = 30 + Dict_Status_Downloading DictStatus = 40 + Dict_Status_Download_Succ DictStatus = 50 + Dict_Status_Reloading DictStatus = 60 + Dict_Status_Reload_Succ DictStatus = 70 + Dict_Status_Enabling DictStatus = 80 + Dict_Status_Finished DictStatus = 90 + Dict_Status_Restarting DictStatus = 100 +) + +func (this DictStatus) String() DictStatusStr { + switch this { + case Dict_Status_Trigging: + return Dict_Status_Trigging_Str + case Dict_Status_Building: + return Dict_Status_Building_Str + case Dict_Status_Deploying: + return Dict_Status_Deploying_Str + case Dict_Status_Downloading: + return Dict_Status_Downloading_Str + case Dict_Status_Download_Succ: + return Dict_Status_Download_Succ_Str + case Dict_Status_Reloading: + return Dict_Status_Reloading_Str + case Dict_Status_Reload_Succ: + return Dict_Status_Reload_Succ_Str + case Dict_Status_Enabling: + return Dict_Status_Enabling_Str + case Dict_Status_Finished: + return Dict_Status_Finished_Str + case Dict_Status_Restarting: + return Dict_Status_Restarting_Str + case Dict_Status_Clearing: + return Dict_Status_Clearing_Str + case Dict_Status_Cleared: + return Dict_Status_Cleared_Str + default: + return "" + } +} + +type DictStatusStr string + +const ( + // Dict Status + Dict_Status_Trigging_Str DictStatusStr = "Trigging" + Dict_Status_Building_Str DictStatusStr = "Building" + Dict_Status_Deploying_Str DictStatusStr = "deploying" + Dict_Status_Downloading_Str DictStatusStr = "downloading" + Dict_Status_Download_Succ_Str DictStatusStr = "download_succ" + Dict_Status_Reloading_Str DictStatusStr = "reloading" + Dict_Status_Reload_Succ_Str DictStatusStr = "reload_succ" + Dict_Status_Enabling_Str DictStatusStr = "enabling" + Dict_Status_Finished_Str DictStatusStr = "finished" + Dict_Status_Restarting_Str DictStatusStr = "restarting" + Dict_Status_Clearing_Str DictStatusStr = "clearing" + Dict_Status_Cleared_Str DictStatusStr = "cleared" +) + +func (this DictStatusStr) Int() (DictStatus, error) { + switch this { + case Dict_Status_Trigging_Str: + return Dict_Status_Trigging, nil + case Dict_Status_Building_Str: + return Dict_Status_Building, nil + case Dict_Status_Deploying_Str: + return Dict_Status_Deploying, nil + case Dict_Status_Downloading_Str: + return Dict_Status_Downloading, nil + case Dict_Status_Download_Succ_Str: + return Dict_Status_Download_Succ, nil + case Dict_Status_Reloading_Str: + return Dict_Status_Reloading, nil + case Dict_Status_Reload_Succ_Str: + return Dict_Status_Reload_Succ, nil + case Dict_Status_Enabling_Str: + return Dict_Status_Enabling, nil + case Dict_Status_Finished_Str: + return Dict_Status_Finished, nil + case Dict_Status_Restarting_Str: + return Dict_Status_Restarting, nil + case Dict_Status_Clearing_Str: + return Dict_Status_Clearing, nil + case Dict_Status_Cleared_Str: + return Dict_Status_Cleared, nil + default: + return 0, errors.New("invalid dict status") + } +} + +// Instance Status: +type InstanceStatus int + +const ( + //各种状态都有可能进入clear状态,因此clear相关的状态都小于init状态 + Instance_Status_Clear InstanceStatus = 1 + Instance_Status_Clearing InstanceStatus = 2 + Instance_Status_Clear_Failed InstanceStatus = 3 + Instance_Status_Clear_Succ InstanceStatus = 4 + Instance_Status_Init InstanceStatus = 10 + Instance_Status_Downloading InstanceStatus = 20 + Instance_Status_Download_Failed InstanceStatus = 30 + Instance_Status_Download_Succ InstanceStatus = 40 + Instance_Status_Reloading InstanceStatus = 50 + Instance_Status_Reload_Failed InstanceStatus = 60 + Instance_Status_Reload_Succ InstanceStatus = 70 + Instance_Status_Enabling InstanceStatus = 80 + Instance_Status_Enable_Failed InstanceStatus = 90 + Instance_Status_Enable_Succ InstanceStatus = 100 + Instance_Status_Poping InstanceStatus = 110 + Instance_Status_Pop_Failed InstanceStatus = 120 + Instance_Status_Pop_Succ InstanceStatus = 130 + Instance_Status_Dead InstanceStatus = 250 +) + +func (this InstanceStatus) String() InstanceStatusStr { + switch this { + case Instance_Status_Init: + return Instance_Status_Init_Str + case Instance_Status_Downloading: + return Instance_Status_Downloading_Str + case Instance_Status_Download_Failed: + return Instance_Status_Download_Failed_Str + case Instance_Status_Download_Succ: + return Instance_Status_Download_Succ_Str + case Instance_Status_Reloading: + return Instance_Status_Reloading_Str + case Instance_Status_Reload_Failed: + return Instance_Status_Reload_Failed_Str + case Instance_Status_Reload_Succ: + return Instance_Status_Reload_Succ_Str + case Instance_Status_Enabling: + return Instance_Status_Enabling_Str + case Instance_Status_Enable_Failed: + return Instance_Status_Enable_Failed_Str + case Instance_Status_Enable_Succ: + return Instance_Status_Enable_Succ_Str + case Instance_Status_Dead: + return Instance_Status_Dead_Str + case Instance_Status_Clear: + return Instance_Status_Clear_Str + case Instance_Status_Clearing: + return Instance_Status_Clearing_Str + case Instance_Status_Clear_Failed: + return Instance_Status_Clear_Failed_Str + case Instance_Status_Clear_Succ: + return Instance_Status_Clear_Succ_Str + case Instance_Status_Poping: + return Instance_Status_Poping_Str + case Instance_Status_Pop_Failed: + return Instance_Status_Pop_Failed_Str + case Instance_Status_Pop_Succ: + return Instance_Status_Pop_Succ_Str + default: + return "" + } +} + +type InstanceStatusStr string + +const ( + Instance_Status_Init_Str InstanceStatusStr = "init" + Instance_Status_Downloading_Str InstanceStatusStr = "downloading" + Instance_Status_Download_Failed_Str InstanceStatusStr = "download_failed" + Instance_Status_Download_Succ_Str InstanceStatusStr = "download_succ" + Instance_Status_Reloading_Str InstanceStatusStr = "reloading" + Instance_Status_Reload_Failed_Str InstanceStatusStr = "finish_reload_failed" + Instance_Status_Reload_Succ_Str InstanceStatusStr = "finish_reload_succ" + Instance_Status_Enabling_Str InstanceStatusStr = "enabling" + Instance_Status_Enable_Failed_Str InstanceStatusStr = "enable_failed" + Instance_Status_Enable_Succ_Str InstanceStatusStr = "enable_succ" + Instance_Status_Dead_Str InstanceStatusStr = "dead" + Instance_Status_Clear_Str InstanceStatusStr = "clear" + Instance_Status_Clearing_Str InstanceStatusStr = "clearing" + Instance_Status_Clear_Failed_Str InstanceStatusStr = "clear_failed" + Instance_Status_Clear_Succ_Str InstanceStatusStr = "clear_succ" + Instance_Status_Poping_Str InstanceStatusStr = "poping" + Instance_Status_Pop_Failed_Str InstanceStatusStr = "pop_failed" + Instance_Status_Pop_Succ_Str InstanceStatusStr = "pop_succ" +) + +func (this InstanceStatusStr) Int() (InstanceStatus, error) { + switch this { + case Instance_Status_Init_Str: + return Instance_Status_Init, nil + case Instance_Status_Downloading_Str: + return Instance_Status_Downloading, nil + case Instance_Status_Download_Failed_Str: + return Instance_Status_Download_Failed, nil + case Instance_Status_Download_Succ_Str: + return Instance_Status_Download_Succ, nil + case Instance_Status_Reloading_Str: + return Instance_Status_Reloading, nil + case Instance_Status_Reload_Failed_Str: + return Instance_Status_Reload_Failed, nil + case Instance_Status_Reload_Succ_Str: + return Instance_Status_Reload_Succ, nil + case Instance_Status_Enabling_Str: + return Instance_Status_Enabling, nil + case Instance_Status_Enable_Failed_Str: + return Instance_Status_Enable_Failed, nil + case Instance_Status_Enable_Succ_Str: + return Instance_Status_Enable_Succ, nil + case Instance_Status_Dead_Str: + return Instance_Status_Dead, nil + case Instance_Status_Clear_Str: + return Instance_Status_Clear, nil + case Instance_Status_Clearing_Str: + return Instance_Status_Clearing, nil + case Instance_Status_Clear_Failed_Str: + return Instance_Status_Clear_Failed, nil + case Instance_Status_Clear_Succ_Str: + return Instance_Status_Clear_Succ, nil + case Instance_Status_Poping_Str: + return Instance_Status_Poping, nil + case Instance_Status_Pop_Failed_Str: + return Instance_Status_Pop_Failed, nil + case Instance_Status_Pop_Succ_Str: + return Instance_Status_Pop_Succ, nil + default: + return 0, errors.New("invalid instance status") + } +} \ No newline at end of file diff --git a/cube/cube-transfer/src/transfer/dict/dict_info.go b/cube/cube-transfer/src/transfer/dict/dict_info.go new file mode 100755 index 0000000000000000000000000000000000000000..5e0bbbcf5a37600b5284a0bf5b95d50566615270 --- /dev/null +++ b/cube/cube-transfer/src/transfer/dict/dict_info.go @@ -0,0 +1,34 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dict + +type DictInfo struct { + DictName string `json:"dict_name"` + DictMode string `json:"dict_mode"` + ShardNum int `json:"shard_num"` + CopyNum int `json:"copy_num"` + InstancesNum int `json:"inst_num"` + DeployPath string `json:"deploy_path"` + DonefileAddress string `json:"donefile_addr"` + OutputAddress string `json:"output_addr"` + TmpAddress string `json:"tmp_addr"` + StoragePlace string `json:"storage_place"` + DownloadSuccInsts int `json:"download_inst"` + ReloadSuccInsts int `json:"reload_insts"` + EnableSuccInsts int `json:"enable_insts"` + Instances []DictInstance `json:"instances"` + WaitVersionInfo DictVersionInfo `json:"wait_version_info"` + CurrentVersionInfo []DictVersionInfo `json:"current_version_info"` +} \ No newline at end of file diff --git a/cube/cube-transfer/src/transfer/dict/dict_instance_status.go b/cube/cube-transfer/src/transfer/dict/dict_instance_status.go new file mode 100755 index 0000000000000000000000000000000000000000..8f7181d8a108677f43e8db0739a1f28e5aa90642 --- /dev/null +++ b/cube/cube-transfer/src/transfer/dict/dict_instance_status.go @@ -0,0 +1,42 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dict + +type DictInstance struct { + DictName string `json:"dict_name"` + Mode string `json:"mode"` + Version int `json:"version"` + Depend int `json:"depend"` + Id int `json:"id"` + Key int `json:"key"` + Shard int `json:"shard"` + Source string `json:"source"` + DeployPath string `json:"deploy_path"` + IP string `json:"ip"` + Port int `json:"port"` + AgentIp string `json:"agent_ip"` + AgentPort int `json:"agent_port"` + Status InstanceStatus `json:"status_id"` + StatusStr InstanceStatusStr `json:"status"` + BuildedTime int `json:"builded_time"` + DownloadStartTime int `json:"download_start_time"` + DownloadedTime int `json:"downloaded_time"` + ReloadStartTime int `json:"reload_start_time"` + ReloadedTime int `json:"reloaded_time"` + EnablStartTime int `json:"enable_start_time"` + EnabledTime int `json:"enabled_time"` + CreateTime int `json:"create_time"` +} + diff --git a/cube/cube-transfer/src/transfer/dict/dict_shard_info.go b/cube/cube-transfer/src/transfer/dict/dict_shard_info.go new file mode 100755 index 0000000000000000000000000000000000000000..0c42a4bbbc6825a5330116db451b3a49d14c7ce3 --- /dev/null +++ b/cube/cube-transfer/src/transfer/dict/dict_shard_info.go @@ -0,0 +1,59 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dict + +import ( + "strconv" +) + +type DictShardInfo struct { + Name string `json:"name"` + Version string `json:"version"` + Depend string `json:"depend"` + Id string `json:"id"` + Key string `json:"key"` + Shard int `json:"shard"` + Mode string `json:"mode"` + DictMode string `json:"dict_mode,omitempty"` + Source string `json:"data_source"` + Service string `json:"service,omitempty"` + DeltaInfo string `json:"delta_info,omitempty"` + BuildedTime int `json:"builded_time,omitempty"` + BuildedTimeStr string `json:"build_finish_time,omitempty"` + CreateTime int `json:"create_time,omitempty"` + IsActive bool `json:"is_active,omitempty"` +} + +func GetDictShardScaler(shard int, dictVersionInfo DictVersionInfo, storagePlace string, transferaddr string)(info DictShardInfo){ + info.Name = dictVersionInfo.DictName + info.Version = strconv.Itoa(dictVersionInfo.Version) + info.Depend = strconv.Itoa(dictVersionInfo.Depend) + info.Id = strconv.Itoa(dictVersionInfo.Id) + info.Key = strconv.Itoa(dictVersionInfo.Key) + info.Mode = dictVersionInfo.Mode + info.Shard = shard + info.Source = GetFileHead(storagePlace, transferaddr) + dictVersionInfo.Output+ "/" + info.Version + "/" + info.Name + "_part" + strconv.Itoa(shard) + ".tar" + return +} + + +func GetFileHead(storagePlace string, transferaddr string) string { + if storagePlace == "LOCAL"{ + return "ftp://" + transferaddr + } else { + return "" + } + +} \ No newline at end of file diff --git a/cube/cube-transfer/src/transfer/dict/dict_version_info.go b/cube/cube-transfer/src/transfer/dict/dict_version_info.go new file mode 100755 index 0000000000000000000000000000000000000000..f835b6bcfb11199fcb23573f0c569277bd82ca51 --- /dev/null +++ b/cube/cube-transfer/src/transfer/dict/dict_version_info.go @@ -0,0 +1,53 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dict + +type ( + DictVersionInfo struct { + DictName string `json:"dict_name"` + Version int `json:"version"` + Depend int `json:"depend"` + Id int `json:"id"` + Key int `json:"key"` + Mode string `json:"mode"` + Input string `json:"input"` + Output string `json:"output"` + Status DictStatus `json:"status"` + StatusStr DictStatusStr `json:"status_str"` + FinishTime int `json:"finish_time"` + CreateTime int `json:"create_time"` + MetaInfos map[int]string `json:"meta_infos"` + } + DonefileInfo struct { + Id string `json:"id"` + Key string `json:"key"` + Input string `json:"input"` + } + DictShardMetaInfo struct { + Name string `json:"name"` + Version int `json:"version"` + Depend int `json:"depend"` + Shard int `json:"shard"` + Split int `json:"split"` + Meta string `json:"meta"` + } + + MetaInfo struct { + IndexTotalCount string `json:"index_total_count"` + IndexLenList []string `json:"index_len_list"` + DataLenList []string `json:"data_len_list"` + } +) + diff --git a/cube/cube-transfer/src/transfer/global.go b/cube/cube-transfer/src/transfer/global.go new file mode 100755 index 0000000000000000000000000000000000000000..8afe55db155ba316defa8ef239e1c1df74a815fe --- /dev/null +++ b/cube/cube-transfer/src/transfer/global.go @@ -0,0 +1,52 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transfer + +import "transfer/dict" + +var ( + Dir string + Port string + Dict dict.DictInfo + BuildToolLocal string + TransferAddr string + //DictName string + //DictMode string + //DonefileAddress string + //OutputAddress string + //TmpAddress string + //ShardNum int + //CopyNum int + //DeployPath string + //Instances []dict.DictInstance + //StoragePlace string + //WaitVersionInfo dict.DictVersionInfo + //CurrentVersionInfo []dict.DictVersionInfo + //InstancesNum int + //DownloadSuccInsts int + //ReloadSuccInsts int + //EnableSuccInsts int +) + + +type LocalAddress string +type HDFSAddress string + +const ( + STATUS_OK int = 0 + STATUS_WRONG_API int = 2 + STATUS_RETRY_LATER int = 10 + STATUS_FAIL int = 255 +) \ No newline at end of file diff --git a/cube/cube-transfer/src/transfer/http.go b/cube/cube-transfer/src/transfer/http.go new file mode 100755 index 0000000000000000000000000000000000000000..75cf847c2c3ca6c1137f07f208b5a8e884afc04f --- /dev/null +++ b/cube/cube-transfer/src/transfer/http.go @@ -0,0 +1,270 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transfer + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "github.com/Badangel/logex" + "io/ioutil" + "net" + "net/http" + "net/url" + "strconv" + "strings" + "time" +) + +type handlerFunc func(subpath string, m map[string]string) (string, string, int, error) + +var ( // key = subpath; eg: path="/checker/job", key="job" + getHandler map[string]handlerFunc + putHandler map[string]handlerFunc + deleteHandler map[string]handlerFunc + postHandler map[string]handlerFunc +) + +func startHttp(addr string) error { + + // init handlers: + initGetHandlers() + initPostHandlers() + + + http.HandleFunc("/dict/", handleRest) + http.HandleFunc("/instance/", handleRest) + logex.Notice("start http:", addr) + return http.ListenAndServe(addr, nil) +} + +func handleRest(w http.ResponseWriter, r *http.Request) { + var ( + req_log string + ) + time_begin := time.Now() + + cont_type := make([]string, 1, 1) + cont_type[0] = "application/json" + header := w.Header() + header["Content-Type"] = cont_type + w.Header().Add("Access-Control-Allow-Origin", "*") + + m := parseHttpKv(r) + + req_log = fmt.Sprintf("handle %v %v %v from %v", + r.Method, r.URL.Path, r.URL.RawQuery, r.RemoteAddr) + + api := r.URL.Path + + var showHandler map[string]handlerFunc + switch r.Method { + case "GET": + showHandler = getHandler + case "POST": // create + showHandler = postHandler + case "PUT": // update + showHandler = putHandler + case "DELETE": + showHandler = deleteHandler + default: + logex.Warningf(`{"error":1, "message":"unsupport method %v"}`, r.Method) + } + + handler, ok := showHandler[api] + + if !ok { + key_list := make([]string, 0, len(showHandler)) + for key := range showHandler { + key_list = append(key_list, key) + } + fmt.Fprintf(w, `{"success":"%v", "message":"wrong api", "method":"%s", "api":"%s", "api_list":"%v"}`, + STATUS_WRONG_API, r.Method, api, key_list) + + logex.Noticef(`%v, time=%v, status=%v`, + req_log, time.Now().Sub(time_begin).Nanoseconds()/1000000, STATUS_WRONG_API) + return + } + + var s string + rst, handle_log, status, err := handler(api, m) + if status == STATUS_OK { + s = fmt.Sprintf(`{"success":"%v", "message":"query ok", "data":%s}`, status, rst) + } else { + s = fmt.Sprintf(`{"success":"%v", "message":%v, "data":%s}`, + status, quote(err.Error()), rst) + } + + if isJsonDict(s) { + fmt.Fprintln(w, s) + } else { + logex.Fatalf("invalid json: %v", s) + } + + if err == nil { + logex.Noticef(`%v, time=%v, status=%v, handle_log=%v`, + req_log, time.Now().Sub(time_begin).Nanoseconds()/1000000, + status, quote(handle_log)) + } else { + logex.Noticef(`%v, time=%v, status=%v, err=%v, handle_log=%v`, + req_log, time.Now().Sub(time_begin).Nanoseconds()/1000000, + status, quote(err.Error()), quote(handle_log)) + } +} + +func parseHttpKv(r *http.Request) map[string]string { + r.ParseForm() + m := make(map[string]string) + for k, v := range r.Form { + switch k { + case "user": // remove @baidu.com for user + m[k] = strings.Split(v[0], "@")[0] + default: + m[k] = v[0] + } + } + + // allow passing hostname for debug + //if _, ok := m["hostname"]; !ok { + // ip := r.RemoteAddr[:strings.Index(r.RemoteAddr, ":")] + // m["hostname"], _ = getHostname(ip) + //} + return m +} + +// restReq sends a restful request to requrl and returns response body. +func restReq(method, requrl string, timeout int, kv *map[string]string) (string, error) { + logex.Debug("####restReq####") + logex.Debug(*kv) + data := url.Values{} + if kv != nil { + for k, v := range *kv { + logex.Trace("req set:", k, v) + data.Set(k, v) + } + } + if method == "GET" || method == "DELETE" { + requrl = requrl + "?" + data.Encode() + data = url.Values{} + } + + logex.Notice(method, requrl) + req, err := http.NewRequest(method, requrl, bytes.NewBufferString(data.Encode())) + if err != nil { + logex.Warning("NewRequest failed:", err) + return "", err + } + if method == "POST" || method == "PUT" { + req.Header.Add("Content-Type", "application/x-www-form-urlencoded") + req.Header.Add("Content-Length", strconv.Itoa(len(data.Encode()))) + } + + client := &http.Client{} + client.Timeout = time.Duration(timeout) * time.Second + resp, err := client.Do(req) + if err != nil { + logex.Warning("Do failed:", err) + return "", err + } + if resp.StatusCode < 200 || resp.StatusCode > 299 { + logex.Warning("resp status: " + resp.Status) + return "", errors.New("resp status: " + resp.Status) + } + + body, err := ioutil.ReadAll(resp.Body) + return string(body), err +} + +//func jsonRestReq(method, requrl string, timeout int, json_params *CheckerRequest) (string, error) { +func jsonRestReq(method, requrl string, timeout int, json_params interface{}) (int, string, error) { + logex.Debug("####jsonRestReq####") + if method == "POST2" { + method = "POST" + } + + //b, err := json.Marshal(*json_params) + b, err := json.Marshal(json_params) + if err != nil { + logex.Warning("json_params marshal failed:", err) + return 0, "", err + } + + logex.Notice(method, requrl) + req, err := http.NewRequest(method, requrl, bytes.NewBufferString(string(b))) + if err != nil { + logex.Warning("NewRequest failed:", err) + return 0, "", err + } + if method == "POST" || method == "PUT" { + req.Header.Add("Content-Type", "application/json") + //req.Header.Add("Content-Type", "application/x-www-form-urlencoded") + req.Header.Add("Content-Length", strconv.Itoa(len(string(b)))) + } + + client := &http.Client{} + client.Timeout = time.Duration(timeout) * time.Second + resp, err := client.Do(req) + if err != nil { + logex.Warning("Do failed:", err) + return 0, "", err + } + if resp.StatusCode < 200 || resp.StatusCode > 299 { + logex.Warning("resp status: " + resp.Status) + return 0, "", errors.New("resp status: " + resp.Status) + } + + body, err := ioutil.ReadAll(resp.Body) + return resp.StatusCode, string(body), err +} + +// quote quotes string for json output. eg: s="123", quote(s)=`"123"` +func quote(s string) string { + return fmt.Sprintf("%q", s) +} + +func isJsonDict(s string) bool { + var js map[string]interface{} + return json.Unmarshal([]byte(s), &js) == nil +} + +func getHostname(ip string) (hostname string, err error) { + if hostnames, err := net.LookupAddr(ip); err != nil { + hostname = ip + //logex.Warningf("cannot find the hostname of ip (%s), error (%v)", ip, err) + } else { + if len(hostnames) > 0 { + hostname = hostnames[0][:strings.LastIndex(hostnames[0], ".baidu.com.")] + } else { + hostname = ip + } + } + + return hostname, err +} + +func nonBlockSendJsonReq(method, requrl string, timeout int, json_params interface{}, + out interface{}, ch chan error) { + _, body_str, err := jsonRestReq(method, requrl, timeout, json_params) + logex.Noticef("json request method:[%v], requrl:[%s], timeout:[%v], params[%v], resbody[%v], err[%v]", method, requrl, timeout, json_params, body_str, err) + if err != nil { + ch <- err + } + err = json.Unmarshal([]byte(body_str), out) + if err != nil { + logex.Warningf("json unmarshal failed error (%v) for : %v", err, body_str) + } + ch <- err +} diff --git a/cube/cube-transfer/src/transfer/http_get.go b/cube/cube-transfer/src/transfer/http_get.go new file mode 100755 index 0000000000000000000000000000000000000000..2bc36808c9fbe32e6ebd34316751674e19a9696c --- /dev/null +++ b/cube/cube-transfer/src/transfer/http_get.go @@ -0,0 +1,159 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transfer + +import ( + "encoding/json" + "errors" + "fmt" + "strconv" + "transfer/dict" +) + +func initGetHandlers() { + getHandler = map[string]handlerFunc{ + "/dict/info": GetDictInfo, + "/instance/status": GetInstanceStatus, + "/dict/scaler": GetDictScaler, + "/dict/meta_info": GetDictShardMetaInfo, + "/dict/deploy/history": GetDictDeployHistory, + } +} +func GetDictInfo(subpath string, m map[string]string) (string, string, int, error) { + b, err := json.Marshal(Dict) + if err != nil { + return quote(""), "", STATUS_FAIL, fmt.Errorf("json marshal failed, %v", err) + } + + return string(b), "", STATUS_OK, nil +} + +func GetInstanceStatus(subpath string, m map[string]string) (string, string, int, error) { + b, err := json.Marshal(Dict.Instances) + if err != nil { + return quote(""), "", STATUS_FAIL, fmt.Errorf("json marshal failed, %v", err) + } + + return string(b), "", STATUS_OK, nil +} + +func GetDictScaler(subpath string, m map[string]string) (string, string, int, error) { + var ( + shard int + err error + infos []dict.DictShardInfo + ) + shardStr, ok := m["shard"] + if !ok { + shard = 0 + } else { + shard, err = strconv.Atoi(shardStr) + if err != nil { + return quote(""), "", STATUS_FAIL, errors.New("invalid arg: shard should be int") + } + } + + for _, version := range Dict.CurrentVersionInfo { + info := dict.GetDictShardScaler(shard, version, Dict.StoragePlace, TransferAddr) + infos = append(infos, info) + } + if Dict.WaitVersionInfo.Status > dict.Dict_Status_Deploying { + info := dict.GetDictShardScaler(shard, Dict.WaitVersionInfo, Dict.StoragePlace, TransferAddr) + infos = append(infos, info) + } + + b, err := json.Marshal(infos) + if err != nil { + return quote(""), "", STATUS_FAIL, fmt.Errorf("json marshal failed, %v", err) + } + + return quote(string(b)), "", STATUS_OK, nil +} + +func GetDictShardMetaInfo(subpath string, m map[string]string) (string, string, int, error) { + name, ok := m["name"] + if !ok { + return quote("failed"), "", STATUS_FAIL, errors.New("need arg: name") + } + version, err := strconv.Atoi(m["version"]) + if err != nil { + return quote("failed"), "", STATUS_FAIL, errors.New("need arg: version, version should be int") + } + depend, err := strconv.Atoi(m["depend"]) + if err != nil { + return quote("failed"), "", STATUS_FAIL, errors.New("need arg: depend, depend should be int") + } + shard, err := strconv.Atoi(m["shard"]) + if err != nil { + return quote("failed"), "", STATUS_FAIL, errors.New("need arg: shard, shard should be int") + } + split := 1 + split_str, ok := m["split"] + if ok { + split, err = strconv.Atoi(split_str) + if err != nil { + return quote("failed"), "", STATUS_FAIL, errors.New("arg: split, should be int") + } + } + + meta_info, err := GetDictShardMetaInfos(name, version, depend, shard, split) + if err != nil { + return quote("failed"), "", STATUS_FAIL, fmt.Errorf("info error: %v", err) + } + + b, err := json.Marshal(meta_info) + if err != nil { + return quote("failed"), "", STATUS_FAIL, fmt.Errorf("json marshal failed, %v", err) + } + + return string(b), "", STATUS_OK, nil +} + +func GetDictDeployHistory(subpath string, m map[string]string) (string, string, int, error) { + var deployVerisonInfos []dict.DictVersionInfo + deployVerisonInfos = Dict.CurrentVersionInfo + if Dict.WaitVersionInfo.Status > dict.Dict_Status_Deploying { + deployVerisonInfos = append(deployVerisonInfos, Dict.WaitVersionInfo) + } + + b, err := json.Marshal(deployVerisonInfos) + if err != nil { + return quote("failed"), "", STATUS_FAIL, fmt.Errorf("json marshal failed, %v", err) + } + + return string(b), "", STATUS_OK, nil +} + +func GetDictShardMetaInfos(dictName string, version int, depend int, + shard int, split int) (dict.DictShardMetaInfo, error) { + var info dict.DictShardMetaInfo + + for _, v := range Dict.CurrentVersionInfo { + if v.Version == version && v.Depend == depend { + if meta, ok := v.MetaInfos[shard]; ok { + info.Name = dictName; + info.Version = version + info.Depend = depend + info.Shard = shard + info.Split = split + info.Meta = meta + return info, nil + } else { + return info, fmt.Errorf("shard not right") + } + } + } + return info, fmt.Errorf("info not right") +} \ No newline at end of file diff --git a/cube/cube-transfer/src/transfer/http_post.go b/cube/cube-transfer/src/transfer/http_post.go new file mode 100644 index 0000000000000000000000000000000000000000..4d0d48842873ba21745608c9967467b3b5ad37c2 --- /dev/null +++ b/cube/cube-transfer/src/transfer/http_post.go @@ -0,0 +1,105 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transfer + +import ( + "encoding/json" + "errors" + "fmt" + "reflect" + "strconv" + "transfer/dict" +) + +func initPostHandlers() { + postHandler = map[string]handlerFunc{ + "/dict/meta_info/register": PostDictShardMetaInfoRegister, + } +} + +func PostDictShardMetaInfoRegister(subpath string, m map[string]string) (string, string, int, error) { + var ( + shardMetaInfo dict.DictShardMetaInfo + ok bool + err error + metaInfo dict.MetaInfo + ) + + shardMetaInfo.Name, ok = m["name"] + if !ok { + return quote("failed"), "", STATUS_FAIL, errors.New("need arg: name") + } + + shardMetaInfo.Version, err = strconv.Atoi(m["version"]) + if err != nil { + return quote("failed"), "", STATUS_FAIL, errors.New("need arg: version, should be int") + } + + shardMetaInfo.Depend, err = strconv.Atoi(m["depend"]) + if err != nil { + return quote("failed"), "", STATUS_FAIL, errors.New("need arg: depend, should be int") + } + + shardMetaInfo.Shard, err = strconv.Atoi(m["shard"]) + if err != nil { + return quote("failed"), "", STATUS_FAIL, errors.New("need arg: shard, should be int") + } + shardMetaInfo.Split = 1 + split_str, ok := m["split"] + if ok { + shardMetaInfo.Split, err = strconv.Atoi(split_str) + if err != nil { + return quote("failed"), "", STATUS_FAIL, errors.New("arg: split, should be int") + } + } + + // check dict shard_num and status + + + if shardMetaInfo.Shard < 0 || shardMetaInfo.Shard >= Dict.ShardNum { + return quote("failed"), "", STATUS_FAIL, fmt.Errorf("shard value invalid, dict shard is :%v", Dict.ShardNum) + } + + shardMetaInfo.Meta, ok = m["meta"] + if !ok { + return quote("failed"), "", STATUS_FAIL, errors.New("need arg: meta") + } + if err = json.Unmarshal([]byte(shardMetaInfo.Meta), &metaInfo); err != nil { + return quote("failed"), "", STATUS_FAIL, fmt.Errorf("meta string bad formatted: %v", err) + } + if reflect.DeepEqual(metaInfo, (dict.MetaInfo{})) { + return quote("failed"), "", STATUS_FAIL, errors.New("meta string bad formatted") + } + if 0 == len(metaInfo.IndexLenList) { + return quote("failed"), "", STATUS_FAIL, errors.New("meta string bad formatted, index_len_list is null") + } + if 0 == len(metaInfo.DataLenList) { + return quote("failed"), "", STATUS_FAIL, errors.New("meta string bad formatted, data_len_list is null") + } + fmt.Printf("update meta shardMetaInfo:%v metaInfo:%v\n", shardMetaInfo,metaInfo) + + if err = UpdateDictShardMetaInfo(shardMetaInfo); err != nil { + return quote("failed"), "", STATUS_FAIL, fmt.Errorf("update dict_shard_meta_info failed, %v", err) + } + fmt.Printf("update meta2\n") + + return quote("ok"), "", STATUS_OK, nil +} + +func UpdateDictShardMetaInfo(shardMetaInfo dict.DictShardMetaInfo) error { + Dict.WaitVersionInfo.MetaInfos[shardMetaInfo.Shard] = shardMetaInfo.Meta + WriteWaitVersionInfoToFile() + return nil +} \ No newline at end of file diff --git a/cube/cube-transfer/src/transfer/transfer.go b/cube/cube-transfer/src/transfer/transfer.go new file mode 100755 index 0000000000000000000000000000000000000000..84ab7427333b3a639efd2e48df3dd248209924be --- /dev/null +++ b/cube/cube-transfer/src/transfer/transfer.go @@ -0,0 +1,84 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transfer + +import ( + "fmt" + "github.com/Badangel/logex" + "os" + "time" + "transfer/dict" +) + +func Start() { + + go BackupTransfer() + logex.Notice(">>> starting server...") + addr := ":" + Port + err := startHttp(addr) + if err != nil { + logex.Fatalf("start http(addr=%v) failed: %v", addr, err) + os.Exit(255) + } + + logex.Notice(">>> start server succ") +} + +func BackupTransfer() { + for { + //trigger + version, err := TriggerStart(Dict.DonefileAddress) + if err != nil { + logex.Fatalf("[trigger err]trigger err:%v ", err) + fmt.Printf("[error]trigger err:%v \n", err) + break + } + logex.Noticef("[trigger] get version:%v \n", version) + if version.Id == 0 { + logex.Noticef("[sleep]no new version, sleep 5 min") + fmt.Printf("[sleep]no new version, wait 5 min\n") + time.Sleep(5 * time.Minute) + continue + } + Dict.WaitVersionInfo = version + logex.Noticef("[trigger finish] WaitVersionInfo version:%v \n", Dict.WaitVersionInfo) + WriteWaitVersionInfoToFile() + + //builder + Dict.WaitVersionInfo.Status = dict.Dict_Status_Building + Dict.WaitVersionInfo.MetaInfos = make(map[int]string) + WriteWaitVersionInfoToFile() + if err = BuilderStart(Dict.WaitVersionInfo); err != nil { + logex.Fatalf("builder err:%v \n", err) + } + + if Dict.WaitVersionInfo.Mode == dict.BASE { + var newCurrentVersion []dict.DictVersionInfo + Dict.CurrentVersionInfo = newCurrentVersion + WriteCurrentVersionInfoToFile() + } + logex.Noticef("[builder finish] WaitVersionInfo version:%v \n", Dict.WaitVersionInfo) + + //deployer + Dict.WaitVersionInfo.Status = dict.Dict_Status_Deploying + WriteWaitVersionInfoToFile() + if err = DeployStart(Dict.WaitVersionInfo); err != nil { + logex.Fatalf("deploy err:%v \n", err) + } + logex.Noticef("[deploy finish]current version: %v\n",Dict.CurrentVersionInfo) + } + fmt.Print("transfer over!") + logex.Noticef("[transfer]status machine exit!") +} diff --git a/cube/cube-transfer/src/transfer/trigger.go b/cube/cube-transfer/src/transfer/trigger.go new file mode 100755 index 0000000000000000000000000000000000000000..c962726d20c1fee7bbbaa282000640df1c8036f6 --- /dev/null +++ b/cube/cube-transfer/src/transfer/trigger.go @@ -0,0 +1,109 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transfer + +import ( + "encoding/json" + "fmt" + "github.com/Badangel/logex" + "io/ioutil" + "strconv" + "strings" + "time" + "transfer/dict" +) + +func TriggerStart(addr string) (version dict.DictVersionInfo, err error) { + return GetDoneFileInfo(addr) +} + +func GetDoneFileInfo(addr string) (version dict.DictVersionInfo, err error) { + fmt.Printf("[entry]start trigger\n") + logex.Noticef("[entry]start trigger") + //if donefile is in ftp, first download to local + if strings.HasPrefix(addr, dict.FTP_HEADER) || strings.HasPrefix(addr, dict.HTTP_HEADER) { + donefileAddr := Dict.TmpAddress + "/../donefile" + Wget(addr, donefileAddr) + addr = donefileAddr + } + + baseDonefile := addr + "/base.txt" + fmt.Printf("[trigrer]donefile path:%v \n", baseDonefile) + logex.Noticef("[trigrer]base donefile path:%v", baseDonefile) + contents, err := ioutil.ReadFile(baseDonefile) + VersionLen := len(Dict.CurrentVersionInfo) + version.DictName = Dict.DictName + if err != nil { + fmt.Printf("[trigrer]read files err:%v \n", err) + logex.Fatalf("[trigrer]read files err:%v ", err) + return + } else { + contentss := string(contents) + lines := strings.Split(contentss, "\n") + index := len(lines) - 2 + var donefileInfo dict.DonefileInfo + fmt.Printf("line %v: %v\n", index, lines[index]) + if err = json.Unmarshal([]byte(lines[index]), &donefileInfo); err != nil { + return + } + logex.Noticef("[trigrer]donfile info:%v", donefileInfo) + newId, _ := strconv.Atoi(donefileInfo.Id) + if VersionLen == 0 || newId > Dict.CurrentVersionInfo[VersionLen-1].Key { + version.Id = newId + version.Key, _ = strconv.Atoi(donefileInfo.Key) + version.Input = donefileInfo.Input + deployVersion := int(time.Now().Unix()) + version.CreateTime = deployVersion + version.Version = deployVersion + version.Depend = deployVersion + version.Mode = dict.BASE + return + } + } + if Dict.DictMode == dict.BASR_DELTA && VersionLen > 0 { + patchDonefile := addr + "/patch.txt" + fmt.Printf("[trigrer]patchDonefile path:%v \n", patchDonefile) + logex.Noticef("[trigrer]patch donefile path:%v", patchDonefile) + contents, err = ioutil.ReadFile(patchDonefile) + if err != nil { + fmt.Printf("read files err:%v \n", err) + return + } else { + contentss := string(contents) + lines := strings.Split(contentss, "\n") + for index := 0; index < len(lines)-1; index++ { + var donefileInfo dict.DonefileInfo + if err = json.Unmarshal([]byte(lines[index]), &donefileInfo); err != nil { + return + } + logex.Noticef("[trigrer]donfile info:%v", donefileInfo) + newId, _ := strconv.Atoi(donefileInfo.Id) + newKey, _ := strconv.Atoi(donefileInfo.Key) + if newId > Dict.CurrentVersionInfo[VersionLen-1].Id && newKey == Dict.CurrentVersionInfo[VersionLen-1].Key { + version.Id = newId + version.Key, _ = strconv.Atoi(donefileInfo.Key) + version.Input = donefileInfo.Input + deployVersion := int(time.Now().Unix()) + version.CreateTime = deployVersion + version.Version = deployVersion + version.Depend = Dict.CurrentVersionInfo[VersionLen-1].Depend + version.Mode = dict.DELTA + return + } + } + } + } + return +} diff --git a/cube/cube-transfer/src/transfer/util.go b/cube/cube-transfer/src/transfer/util.go new file mode 100755 index 0000000000000000000000000000000000000000..f3c1834319ab2752a2338cda737855854cf73356 --- /dev/null +++ b/cube/cube-transfer/src/transfer/util.go @@ -0,0 +1,113 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transfer + +import ( + "bufio" + "encoding/json" + "fmt" + "github.com/Badangel/logex" + "io" + "io/ioutil" + "os/exec" +) + +func WriteWaitVersionInfoToFile(){ + waitFilePath := Dict.TmpAddress + "/" + "WaitVersionInfo.json" + b, err := json.Marshal(Dict.WaitVersionInfo) + fmt.Printf("wait version %v\n", Dict.WaitVersionInfo) + if err != nil { + logex.Fatalf("WaitVersionInfo format error : %v", err) + } + err = ioutil.WriteFile(waitFilePath, b, 0644) + if err != nil { + logex.Fatalf("write wait verison info error : %v", err) + return + } + return +} + +func WriteCurrentVersionInfoToFile(){ + currentFilePath := Dict.TmpAddress + "/" + "CurrentVersionInfo.json" + b, err := json.Marshal(Dict.CurrentVersionInfo) + if err != nil { + logex.Fatalf(" CurrentVersionInfo format error : %v", err) + } + err = ioutil.WriteFile(currentFilePath, b, 0644) + if err != nil { + logex.Fatalf("write current verison info error : %v", err) + return + } + return +} + +func ExeCommad(files string, params []string) (err error) { + cmd := exec.Command(files, params...) + fmt.Println(cmd.Args) + /*if stdout, err := cmd.StdoutPipe(); err != nil { + logex.Fatalf("%v", err) + return + }*/ + + stdout, err := cmd.StdoutPipe() + defer stdout.Close() + + if err != nil { + fmt.Println(err) + return + } + + + if err := cmd.Start(); err != nil { + logex.Fatalf("%v",err) + } + reader := bufio.NewReader(stdout) + + /*buf, err := cmd.Output() + fmt.Printf("output: %s\n",buf) + fmt.Printf("err: %v\n",err)*/ + for { + line, err2 := reader.ReadString('\n') + if err2 != nil || io.EOF == err2 { + break + } + fmt.Printf("[cmd]>%s",line) + } + + err = cmd.Wait() + if nil != err { + fmt.Println(err) + } + + return nil +} + +func Wget(ftpPath string, downPath string) { + var params []string + params = append(params, "-P") + params = append(params, downPath) + params = append(params, "-r") + params = append(params, "-N") + params = append(params, "-np") + params = append(params, "-nd") + params = append(params, "-R") + params = append(params, "index.html") + params = append(params, ftpPath) + + err := ExeCommad("wget", params) + if err != nil { + fmt.Printf("wget exe: %v\n", err) + } +} \ No newline at end of file