Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
PaddlePaddle
Serving
提交
1edd3817
S
Serving
项目概览
PaddlePaddle
/
Serving
1 年多 前同步成功
通知
186
Star
833
Fork
253
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
105
列表
看板
标记
里程碑
合并请求
10
Wiki
2
Wiki
分析
仓库
DevOps
项目成员
Pages
S
Serving
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
105
Issue
105
列表
看板
标记
里程碑
合并请求
10
合并请求
10
Pages
分析
分析
仓库分析
DevOps
Wiki
2
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
1edd3817
编写于
8月 08, 2019
作者:
D
Dang Yifei
提交者:
GitHub
8月 08, 2019
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #41 from Badangel/cube-builder
Cube Transfer init
上级
45375bc7
75e43403
变更
26
隐藏空白更改
内联
并排
Showing
26 changed file
with
2270 addition
and
0 deletion
+2270
-0
cube/CMakeLists.txt
cube/CMakeLists.txt
+1
-0
cube/cube-transfer/CMakeLists.txt
cube/cube-transfer/CMakeLists.txt
+24
-0
cube/cube-transfer/cmake/CMakeDetermineGoCompiler.cmake
cube/cube-transfer/cmake/CMakeDetermineGoCompiler.cmake
+58
-0
cube/cube-transfer/cmake/CMakeGoCompiler.cmake.in
cube/cube-transfer/cmake/CMakeGoCompiler.cmake.in
+22
-0
cube/cube-transfer/cmake/CMakeGoInformation.cmake
cube/cube-transfer/cmake/CMakeGoInformation.cmake
+21
-0
cube/cube-transfer/cmake/CMakeTestGoCompiler.cmake
cube/cube-transfer/cmake/CMakeTestGoCompiler.cmake
+15
-0
cube/cube-transfer/cmake/golang.cmake
cube/cube-transfer/cmake/golang.cmake
+60
-0
cube/cube-transfer/conf/transfer.conf
cube/cube-transfer/conf/transfer.conf
+18
-0
cube/cube-transfer/src/CMakeLists.txt
cube/cube-transfer/src/CMakeLists.txt
+19
-0
cube/cube-transfer/src/cube-transfer.go
cube/cube-transfer/src/cube-transfer.go
+226
-0
cube/cube-transfer/src/transfer/builder.go
cube/cube-transfer/src/transfer/builder.go
+97
-0
cube/cube-transfer/src/transfer/config.go
cube/cube-transfer/src/transfer/config.go
+112
-0
cube/cube-transfer/src/transfer/deployer.go
cube/cube-transfer/src/transfer/deployer.go
+203
-0
cube/cube-transfer/src/transfer/dict/cube_agent_server.go
cube/cube-transfer/src/transfer/dict/cube_agent_server.go
+41
-0
cube/cube-transfer/src/transfer/dict/define.go
cube/cube-transfer/src/transfer/dict/define.go
+273
-0
cube/cube-transfer/src/transfer/dict/dict_info.go
cube/cube-transfer/src/transfer/dict/dict_info.go
+34
-0
cube/cube-transfer/src/transfer/dict/dict_instance_status.go
cube/cube-transfer/src/transfer/dict/dict_instance_status.go
+42
-0
cube/cube-transfer/src/transfer/dict/dict_shard_info.go
cube/cube-transfer/src/transfer/dict/dict_shard_info.go
+59
-0
cube/cube-transfer/src/transfer/dict/dict_version_info.go
cube/cube-transfer/src/transfer/dict/dict_version_info.go
+53
-0
cube/cube-transfer/src/transfer/global.go
cube/cube-transfer/src/transfer/global.go
+52
-0
cube/cube-transfer/src/transfer/http.go
cube/cube-transfer/src/transfer/http.go
+270
-0
cube/cube-transfer/src/transfer/http_get.go
cube/cube-transfer/src/transfer/http_get.go
+159
-0
cube/cube-transfer/src/transfer/http_post.go
cube/cube-transfer/src/transfer/http_post.go
+105
-0
cube/cube-transfer/src/transfer/transfer.go
cube/cube-transfer/src/transfer/transfer.go
+84
-0
cube/cube-transfer/src/transfer/trigger.go
cube/cube-transfer/src/transfer/trigger.go
+109
-0
cube/cube-transfer/src/transfer/util.go
cube/cube-transfer/src/transfer/util.go
+113
-0
未找到文件。
cube/CMakeLists.txt
浏览文件 @
1edd3817
...
...
@@ -15,3 +15,4 @@
add_subdirectory
(
cube-server
)
add_subdirectory
(
cube-api
)
add_subdirectory
(
cube-builder
)
add_subdirectory
(
cube-transfer
)
cube/cube-transfer/CMakeLists.txt
0 → 100644
浏览文件 @
1edd3817
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
set
(
CMAKE_MODULE_PATH
${
CMAKE_MODULE_PATH
}
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/cmake"
)
include
(
cmake/golang.cmake
)
ExternalGoProject_Add
(
docopt-go github.com/docopt/docopt-go
)
ExternalGoProject_Add
(
rfw github.com/mipearson/rfw
)
ExternalGoProject_Add
(
logex github.com/Badangel/logex
)
add_subdirectory
(
src
)
install
(
DIRECTORY
${
CMAKE_CURRENT_LIST_DIR
}
/conf DESTINATION
${
PADDLE_SERVING_INSTALL_DIR
}
)
cube/cube-transfer/cmake/CMakeDetermineGoCompiler.cmake
0 → 100644
浏览文件 @
1edd3817
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
if
(
NOT CMAKE_Go_COMPILER
)
if
(
NOT $ENV{GO_COMPILER} STREQUAL
""
)
get_filename_component
(
CMAKE_Go_COMPILER_INIT $ENV{GO_COMPILER} PROGRAM PROGRAM_ARGS CMAKE_Go_FLAGS_ENV_INIT
)
if
(
CMAKE_Go_FLAGS_ENV_INIT
)
set
(
CMAKE_Go_COMPILER_ARG1
"
${
CMAKE_Go_FLAGS_ENV_INIT
}
"
CACHE STRING
"First argument to Go compiler"
)
endif
()
if
(
NOT EXISTS
${
CMAKE_Go_COMPILER_INIT
}
)
message
(
SEND_ERROR
"Could not find compiler set in environment variable GO_COMPILER:
\n
$ENV{GO_COMPILER}."
)
endif
()
endif
()
set
(
Go_BIN_PATH
$ENV{GOPATH}
$ENV{GOROOT}
$ENV{GOROOT}/../bin
$ENV{GO_COMPILER}
/usr/bin
/usr/local/bin
)
if
(
CMAKE_Go_COMPILER_INIT
)
set
(
CMAKE_Go_COMPILER
${
CMAKE_Go_COMPILER_INIT
}
CACHE PATH
"Go Compiler"
)
else
()
find_program
(
CMAKE_Go_COMPILER
NAMES go
PATHS
${
Go_BIN_PATH
}
)
EXEC_PROGRAM
(
${
CMAKE_Go_COMPILER
}
ARGS version OUTPUT_VARIABLE GOLANG_VERSION
)
STRING
(
REGEX MATCH
"go[0-9]+.[0-9]+.[0-9]+[ /A-Za-z0-9]*"
VERSION
"
${
GOLANG_VERSION
}
"
)
message
(
"-- The Golang compiler identification is
${
VERSION
}
"
)
message
(
"-- Check for working Golang compiler:
${
CMAKE_Go_COMPILER
}
"
)
endif
()
endif
()
mark_as_advanced
(
CMAKE_Go_COMPILER
)
configure_file
(
${
CMAKE_CURRENT_SOURCE_DIR
}
/cmake/CMakeGoCompiler.cmake.in
${
CMAKE_PLATFORM_INFO_DIR
}
/CMakeGoCompiler.cmake @ONLY
)
set
(
CMAKE_Go_COMPILER_ENV_VAR
"GO_COMPILER"
)
\ No newline at end of file
cube/cube-transfer/cmake/CMakeGoCompiler.cmake.in
0 → 100644
浏览文件 @
1edd3817
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
set(CMAKE_Go_COMPILER "@CMAKE_Go_COMPILER@")
set(CMAKE_Go_COMPILER_LOADED 1)
set(CMAKE_Go_SOURCE_FILE_EXTENSIONS go)
set(CMAKE_Go_LINKER_PREFERENCE 40)
set(CMAKE_Go_OUTPUT_EXTENSION .o)
set(CMAKE_Go_OUTPUT_EXTENSION_REPLACE 1)
set(CMAKE_Go_COMPILER_ENV_VAR "GO_COMPILER")
cube/cube-transfer/cmake/CMakeGoInformation.cmake
0 → 100644
浏览文件 @
1edd3817
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
if
(
NOT CMAKE_Go_COMPILE_OBJECT
)
set
(
CMAKE_Go_COMPILE_OBJECT
"go tool compile -l -N -o <OBJECT> <SOURCE> "
)
endif
()
if
(
NOT CMAKE_Go_LINK_EXECUTABLE
)
set
(
CMAKE_Go_LINK_EXECUTABLE
"go tool link -o <TARGET> <OBJECTS> "
)
endif
()
\ No newline at end of file
cube/cube-transfer/cmake/CMakeTestGoCompiler.cmake
0 → 100644
浏览文件 @
1edd3817
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
set
(
CMAKE_Go_COMPILER_WORKS 1 CACHE INTERNAL
""
)
\ No newline at end of file
cube/cube-transfer/cmake/golang.cmake
0 → 100644
浏览文件 @
1edd3817
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
set
(
GOPATH
"
${
CMAKE_CURRENT_LIST_DIR
}
/../"
)
file
(
MAKE_DIRECTORY
${
GOPATH
}
)
function
(
ExternalGoProject_Add TARG
)
add_custom_target
(
${
TARG
}
env GOPATH=
${
GOPATH
}
${
CMAKE_Go_COMPILER
}
get
${
ARGN
}
)
endfunction
(
ExternalGoProject_Add
)
function
(
add_go_executable NAME
)
file
(
GLOB GO_SOURCE RELATIVE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
"
"*.go"
)
add_custom_command
(
OUTPUT
${
OUTPUT_DIR
}
/.timestamp
COMMAND env GOPATH=
${
GOPATH
}
${
CMAKE_Go_COMPILER
}
build
-o
"
${
CMAKE_CURRENT_BINARY_DIR
}
/
${
NAME
}
"
${
CMAKE_GO_FLAGS
}
${
GO_SOURCE
}
WORKING_DIRECTORY
${
CMAKE_CURRENT_LIST_DIR
}
)
add_custom_target
(
${
NAME
}
ALL DEPENDS
${
OUTPUT_DIR
}
/.timestamp
${
ARGN
}
)
install
(
PROGRAMS
${
CMAKE_CURRENT_BINARY_DIR
}
/
${
NAME
}
DESTINATION
${
PADDLE_SERVING_INSTALL_DIR
}
/bin
)
endfunction
(
add_go_executable
)
function
(
ADD_GO_LIBRARY NAME BUILD_TYPE
)
if
(
BUILD_TYPE STREQUAL
"STATIC"
)
set
(
BUILD_MODE -buildmode=c-archive
)
set
(
LIB_NAME
"lib
${
NAME
}
.a"
)
else
()
set
(
BUILD_MODE -buildmode=c-shared
)
if
(
APPLE
)
set
(
LIB_NAME
"lib
${
NAME
}
.dylib"
)
else
()
set
(
LIB_NAME
"lib
${
NAME
}
.so"
)
endif
()
endif
()
file
(
GLOB GO_SOURCE RELATIVE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
"
"*.go"
)
add_custom_command
(
OUTPUT
${
OUTPUT_DIR
}
/.timestamp
COMMAND env GOPATH=
${
GOPATH
}
${
CMAKE_Go_COMPILER
}
build
${
BUILD_MODE
}
-o
"
${
CMAKE_CURRENT_BINARY_DIR
}
/
${
LIB_NAME
}
"
${
CMAKE_GO_FLAGS
}
${
GO_SOURCE
}
WORKING_DIRECTORY
${
CMAKE_CURRENT_LIST_DIR
}
)
add_custom_target
(
${
NAME
}
ALL DEPENDS
${
OUTPUT_DIR
}
/.timestamp
${
ARGN
}
)
if
(
NOT BUILD_TYPE STREQUAL
"STATIC"
)
install
(
PROGRAMS
${
CMAKE_CURRENT_BINARY_DIR
}
/
${
LIB_NAME
}
DESTINATION
${
PADDLE_SERVING_INSTALL_DIR
}
/bin
)
endif
()
endfunction
(
ADD_GO_LIBRARY
)
\ No newline at end of file
cube/cube-transfer/conf/transfer.conf
0 → 100755
浏览文件 @
1edd3817
[
default
]
dict_name
:
test_dict
mode
:
base_only
storage_place
:
LOCAL
buildtool_local
: /
home
/
work
/
Serving
/
build
/
output
/
bin
/
cube
-
builder
donefile_address
:
http
://
127
.
0
.
0
.
1
/
home
/
work
/
dangyifei
/
donefile
output_address
: /
home
/
work
/
dangyifei
/
test
-
transfer
/
test_data
/
output
tmp_address
: /
home
/
work
/
dangyifei
/
test
-
transfer
/
test_data
/
tmp
shard_num
:
1
copy_num
:
2
deploy_path
: /
home
/
work
/
test_dict
transfer_address
:
127
.
0
.
0
.
1
[
cube_agent
]
agent0_0
:
127
.
0
.
0
.
1
:
8001
cube0_0
:
127
.
0
.
0
.
1
:
8000
:/
ssd2
/
cube_open
agent0_1
:
127
.
0
.
0
.
1
:
8001
cube0_1
:
127
.
0
.
0
.
1
:
8000
:/
home
/
disk1
/
cube_open
cube/cube-transfer/src/CMakeLists.txt
0 → 100644
浏览文件 @
1edd3817
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
set
(
SOURCE_FILE cube-transfer.go
)
add_go_executable
(
cube-transfer
${
SOURCE_FILE
}
)
add_dependencies
(
cube-transfer docopt-go
)
add_dependencies
(
cube-transfer rfw
)
add_dependencies
(
cube-transfer logex
)
cube/cube-transfer/src/cube-transfer.go
0 → 100755
浏览文件 @
1edd3817
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package
main
import
(
"fmt"
"github.com/docopt/docopt-go"
"github.com/Badangel/logex"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"time"
"transfer"
"transfer/dict"
)
func
main
()
{
runtime
.
GOMAXPROCS
(
runtime
.
NumCPU
())
transfer
.
Dir
,
_
=
filepath
.
Abs
(
filepath
.
Dir
(
os
.
Args
[
0
]))
usage
:=
fmt
.
Sprintf
(
`Usage: ./m_master [options]
Options:
-p PORT set listen port. [default: 8099]
--config=conf/transfer.conf set conf file. [defalut: ./conf/transfer.conf]
Log options:
-l LOG_LEVEL set log level, values: 0,1,2,4,8,16. [default: 4]
--log_dir=DIR set log output dir. [default: ./log]
--log_name=NAME set log name. [default: transfer]`
,
transfer
.
Dir
)
opts
,
err
:=
docopt
.
Parse
(
usage
,
nil
,
true
,
"Cube Transfer"
,
false
)
if
err
!=
nil
{
fmt
.
Println
(
"ERROR:"
,
err
)
os
.
Exit
(
1
)
}
log_level
,
_
:=
strconv
.
Atoi
(
opts
[
"-l"
]
.
(
string
))
log_name
:=
opts
[
"--log_name"
]
.
(
string
)
log_dir
:=
opts
[
"--log_dir"
]
.
(
string
)
logex
.
SetLevel
(
getLogLevel
(
log_level
))
if
err
:=
logex
.
SetUpFileLogger
(
log_dir
,
log_name
,
nil
);
err
!=
nil
{
fmt
.
Println
(
"ERROR:"
,
err
)
}
fmt
.
Printf
(
"%v: Print stdout1 here...
\n
"
,
time
.
Now
())
os
.
Stderr
.
WriteString
(
fmt
.
Sprintf
(
"%v: Print stderr here...
\n
"
,
time
.
Now
()))
logex
.
Notice
(
"--- NEW SESSION -------------------------"
)
logex
.
Notice
(
">>> log_level:"
,
log_level
)
// settings:
if
opts
[
"-p"
]
==
nil
{
fmt
.
Fprintln
(
os
.
Stderr
,
"ERROR: -p PORT must be set!"
)
fmt
.
Fprintln
(
os
.
Stderr
,
usage
)
os
.
Exit
(
1
)
}
transfer
.
Port
=
opts
[
"-p"
]
.
(
string
)
logex
.
Notice
(
">>> port:"
,
transfer
.
Port
)
if
opts
[
"--config"
]
==
nil
{
fmt
.
Fprintln
(
os
.
Stderr
,
"ERROR: --config config_file must be set!"
)
fmt
.
Fprintln
(
os
.
Stderr
,
usage
)
os
.
Exit
(
1
)
}
//read conf
var
configMgr
transfer
.
ConfigManager
configMgr
.
Init
(
opts
[
"--config"
]
.
(
string
))
transfer
.
Dict
.
DictName
=
configMgr
.
Read
(
"default"
,
"dict_name"
)
if
transfer
.
Dict
.
DictName
==
""
{
fmt
.
Fprintln
(
os
.
Stderr
,
"ERROR: nead [default] DictName in config_file!"
)
fmt
.
Fprintln
(
os
.
Stderr
,
usage
)
os
.
Exit
(
1
)
}
logex
.
Notice
(
">>> DictName:"
,
transfer
.
Dict
.
DictName
)
transfer
.
Dict
.
DictMode
=
configMgr
.
Read
(
"default"
,
"mode"
)
if
transfer
.
Dict
.
DictMode
==
""
{
fmt
.
Fprintln
(
os
.
Stderr
,
"ERROR: nead [default] DictMode in config_file!"
)
fmt
.
Fprintln
(
os
.
Stderr
,
usage
)
os
.
Exit
(
1
)
}
logex
.
Notice
(
">>> Mode:"
,
transfer
.
Dict
.
DictMode
)
transfer
.
Dict
.
StoragePlace
=
configMgr
.
Read
(
"default"
,
"storage_place"
)
if
transfer
.
Dict
.
StoragePlace
==
""
||
transfer
.
Dict
.
StoragePlace
!=
"LOCAL"
{
fmt
.
Fprintln
(
os
.
Stderr
,
"ERROR: nead [default] StoragePlace in config_file! only support Local"
)
fmt
.
Fprintln
(
os
.
Stderr
,
usage
)
os
.
Exit
(
1
)
}
logex
.
Notice
(
">>> StoragePlace:"
,
transfer
.
Dict
.
StoragePlace
)
transfer
.
BuildToolLocal
=
configMgr
.
Read
(
"default"
,
"buildtool_local"
)
if
transfer
.
BuildToolLocal
==
""
{
fmt
.
Fprintln
(
os
.
Stderr
,
"ERROR: nead [default] BuildToolLocal in config_file!"
)
fmt
.
Fprintln
(
os
.
Stderr
,
usage
)
os
.
Exit
(
1
)
}
logex
.
Notice
(
">>> BuildToolLocal:"
,
transfer
.
BuildToolLocal
)
transfer
.
Dict
.
DonefileAddress
=
configMgr
.
Read
(
"default"
,
"donefile_address"
)
if
transfer
.
Dict
.
DonefileAddress
==
""
{
fmt
.
Fprintln
(
os
.
Stderr
,
"ERROR: nead [default] DonefileAddress in config_file!"
)
fmt
.
Fprintln
(
os
.
Stderr
,
usage
)
os
.
Exit
(
1
)
}
logex
.
Notice
(
">>> DonefileAddress:"
,
transfer
.
Dict
.
DonefileAddress
)
transfer
.
Dict
.
OutputAddress
=
configMgr
.
Read
(
"default"
,
"output_address"
)
if
transfer
.
Dict
.
OutputAddress
==
""
{
fmt
.
Fprintln
(
os
.
Stderr
,
"ERROR: nead [default] OutputAddress in config_file!"
)
fmt
.
Fprintln
(
os
.
Stderr
,
usage
)
os
.
Exit
(
1
)
}
logex
.
Notice
(
">>> OutputAddress:"
,
transfer
.
Dict
.
OutputAddress
)
transfer
.
Dict
.
TmpAddress
=
configMgr
.
Read
(
"default"
,
"tmp_address"
)
if
transfer
.
Dict
.
TmpAddress
==
""
{
fmt
.
Fprintln
(
os
.
Stderr
,
"ERROR: nead [default] TmpAddress in config_file!"
)
fmt
.
Fprintln
(
os
.
Stderr
,
usage
)
os
.
Exit
(
1
)
}
logex
.
Notice
(
">>> TmpAddress:"
,
transfer
.
Dict
.
TmpAddress
)
ShardNumStr
:=
configMgr
.
Read
(
"default"
,
"shard_num"
)
if
ShardNumStr
==
""
{
fmt
.
Fprintln
(
os
.
Stderr
,
"ERROR: nead [default] ShardNum in config_file!"
)
fmt
.
Fprintln
(
os
.
Stderr
,
usage
)
os
.
Exit
(
1
)
}
transfer
.
Dict
.
ShardNum
,
err
=
strconv
.
Atoi
(
ShardNumStr
)
if
err
!=
nil
{
logex
.
Fatal
(
"ShardNum form is not right"
)
os
.
Exit
(
1
)
}
logex
.
Notice
(
">>> ShardNum:"
,
transfer
.
Dict
.
ShardNum
)
CopyNumStr
:=
configMgr
.
Read
(
"default"
,
"copy_num"
)
if
CopyNumStr
==
""
{
fmt
.
Fprintln
(
os
.
Stderr
,
"ERROR: nead [default] CopyNum in config_file!"
)
fmt
.
Fprintln
(
os
.
Stderr
,
usage
)
os
.
Exit
(
1
)
}
transfer
.
Dict
.
CopyNum
,
err
=
strconv
.
Atoi
(
CopyNumStr
)
if
err
!=
nil
{
logex
.
Fatal
(
"ShardNum form is not right"
)
os
.
Exit
(
1
)
}
logex
.
Notice
(
">>> CopyNum:"
,
transfer
.
Dict
.
CopyNum
)
transfer
.
Dict
.
InstancesNum
=
transfer
.
Dict
.
ShardNum
*
transfer
.
Dict
.
CopyNum
transfer
.
Dict
.
DeployPath
=
configMgr
.
Read
(
"default"
,
"deploy_path"
)
if
transfer
.
Dict
.
DeployPath
==
""
{
fmt
.
Fprintln
(
os
.
Stderr
,
"ERROR: nead [default] DeployPath in config_file!"
)
fmt
.
Fprintln
(
os
.
Stderr
,
usage
)
os
.
Exit
(
1
)
}
logex
.
Notice
(
">>> DeployPath:"
,
transfer
.
Dict
.
DeployPath
)
transfer
.
TransferAddr
=
configMgr
.
Read
(
"default"
,
"transfer_address"
)
if
transfer
.
TransferAddr
==
""
{
fmt
.
Fprintln
(
os
.
Stderr
,
"ERROR: nead [default] TransferAddr in config_file!"
)
fmt
.
Fprintln
(
os
.
Stderr
,
usage
)
os
.
Exit
(
1
)
}
logex
.
Notice
(
">>> TransferAddr:"
,
transfer
.
TransferAddr
)
for
i
:=
0
;
i
<
transfer
.
Dict
.
ShardNum
;
i
++
{
for
j
:=
0
;
j
<
transfer
.
Dict
.
CopyNum
;
j
++
{
var
instance
dict
.
DictInstance
agentName
:=
fmt
.
Sprintf
(
"agent%d_%d"
,
i
,
j
)
agentInfo
:=
configMgr
.
Read
(
"cube_agent"
,
agentName
)
agentInfoSlice
:=
strings
.
Split
(
agentInfo
,
":"
)
cubeName
:=
fmt
.
Sprintf
(
"cube%d_%d"
,
i
,
j
)
cubeInfo
:=
configMgr
.
Read
(
"cube_agent"
,
cubeName
)
cubeInfoSlice
:=
strings
.
Split
(
cubeInfo
,
":"
)
instance
.
DictName
=
transfer
.
Dict
.
DictName
instance
.
AgentIp
=
agentInfoSlice
[
0
]
instance
.
AgentPort
,
_
=
strconv
.
Atoi
(
agentInfoSlice
[
1
])
instance
.
IP
=
cubeInfoSlice
[
0
]
instance
.
Port
,
_
=
strconv
.
Atoi
(
cubeInfoSlice
[
1
])
instance
.
DictName
=
transfer
.
Dict
.
DictName
instance
.
CreateTime
=
int
(
time
.
Now
()
.
Unix
())
instance
.
Shard
=
i
instance
.
DeployPath
=
cubeInfoSlice
[
2
]
transfer
.
Dict
.
Instances
=
append
(
transfer
.
Dict
.
Instances
,
instance
)
}
}
logex
.
Noticef
(
">>> instance: %v"
,
transfer
.
Dict
.
Instances
)
transfer
.
Start
()
fmt
.
Print
(
"m-cube-transfer over!"
)
}
func
getLogLevel
(
log_level
int
)
logex
.
Level
{
switch
log_level
{
case
16
:
return
logex
.
DEBUG
case
8
:
return
logex
.
TRACE
case
4
:
return
logex
.
NOTICE
case
2
:
return
logex
.
WARNING
case
1
:
return
logex
.
FATAL
case
0
:
return
logex
.
NONE
}
return
logex
.
DEBUG
}
cube/cube-transfer/src/transfer/builder.go
0 → 100755
浏览文件 @
1edd3817
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package
transfer
import
(
"fmt"
"github.com/Badangel/logex"
"strconv"
"strings"
"time"
"transfer/dict"
)
func
BuilderStart
(
versionInfo
dict
.
DictVersionInfo
)
error
{
fmt
.
Printf
(
"[entry]start build
\n
"
)
logex
.
Noticef
(
"[entry]start build"
)
if
strings
.
HasPrefix
(
Dict
.
DonefileAddress
,
dict
.
FTP_HEADER
)
||
strings
.
HasPrefix
(
Dict
.
DonefileAddress
,
dict
.
HTTP_HEADER
)
{
if
strings
.
HasPrefix
(
Dict
.
DonefileAddress
,
dict
.
FTP_HEADER
){
s1
:=
strings
.
Replace
(
Dict
.
DonefileAddress
,
dict
.
FTP_HEADER
,
""
,
1
)
index
:=
strings
.
Index
(
s1
,
"/"
)
versionInfo
.
Input
=
dict
.
FTP_HEADER
+
s1
[
0
:
index
]
+
versionInfo
.
Input
}
if
strings
.
HasPrefix
(
Dict
.
DonefileAddress
,
dict
.
HTTP_HEADER
){
s1
:=
strings
.
Replace
(
Dict
.
DonefileAddress
,
dict
.
HTTP_HEADER
,
""
,
1
)
index
:=
strings
.
Index
(
s1
,
"/"
)
versionInfo
.
Input
=
dict
.
HTTP_HEADER
+
s1
[
0
:
index
]
+
versionInfo
.
Input
}
localInputPath
:=
Dict
.
TmpAddress
+
"/../input/"
+
Dict
.
DictName
+
"_"
+
strconv
.
Itoa
(
versionInfo
.
Version
)
+
"_"
+
strconv
.
Itoa
(
versionInfo
.
Depend
)
Wget
(
versionInfo
.
Input
,
localInputPath
)
versionInfo
.
Input
=
localInputPath
}
Dict
.
WaitVersionInfo
.
Output
=
BuildIndex
(
versionInfo
)
InitAllInstances
()
return
nil
}
func
BuildIndex
(
versionInfo
dict
.
DictVersionInfo
)
string
{
versionStr
:=
strconv
.
Itoa
(
versionInfo
.
Version
)
dependStr
:=
strconv
.
Itoa
(
versionInfo
.
Depend
)
var
params
[]
string
params
=
append
(
params
,
"-dict_name="
+
Dict
.
DictName
)
params
=
append
(
params
,
"-job_mode="
+
Dict
.
WaitVersionInfo
.
Mode
)
curlen
:=
len
(
Dict
.
CurrentVersionInfo
)
lastVersion
:=
"0"
if
curlen
>
0
&&
Dict
.
WaitVersionInfo
.
Key
==
Dict
.
CurrentVersionInfo
[
curlen
-
1
]
.
Key
{
lastVersion
=
strconv
.
Itoa
(
Dict
.
CurrentVersionInfo
[
curlen
-
1
]
.
Version
)
}
shardNum
:=
strconv
.
Itoa
(
Dict
.
ShardNum
)
params
=
append
(
params
,
"-last_version="
+
lastVersion
)
params
=
append
(
params
,
"-cur_version="
+
versionStr
)
params
=
append
(
params
,
"-depend_version="
+
dependStr
)
params
=
append
(
params
,
"-input_path="
+
versionInfo
.
Input
)
params
=
append
(
params
,
"-output_path="
+
Dict
.
OutputAddress
)
params
=
append
(
params
,
"-shard_num="
+
shardNum
)
params
=
append
(
params
,
"-master_address="
+
TransferAddr
+
":"
+
Port
)
params
=
append
(
params
,
"-only_build=false"
)
err
:=
ExeCommad
(
BuildToolLocal
,
params
)
if
err
!=
nil
{
fmt
.
Printf
(
"build exe: %v
\n
"
,
err
)
logex
.
Noticef
(
"[builder] exe cmd err: %v"
,
err
)
}
outPut
:=
Dict
.
OutputAddress
+
"/"
+
dependStr
+
"_"
+
versionStr
return
outPut
}
func
InitAllInstances
()
{
for
i
,
_
:=
range
Dict
.
Instances
{
Dict
.
Instances
[
i
]
.
Status
=
dict
.
Instance_Status_Init
Dict
.
Instances
[
i
]
.
BuildedTime
=
int
(
time
.
Now
()
.
Unix
())
Dict
.
Instances
[
i
]
.
DownloadStartTime
=
0
Dict
.
Instances
[
i
]
.
DownloadedTime
=
0
Dict
.
Instances
[
i
]
.
ReloadStartTime
=
0
Dict
.
Instances
[
i
]
.
ReloadedTime
=
0
Dict
.
Instances
[
i
]
.
EnablStartTime
=
0
Dict
.
Instances
[
i
]
.
EnabledTime
=
0
}
Dict
.
DownloadSuccInsts
=
0
Dict
.
ReloadSuccInsts
=
0
Dict
.
EnableSuccInsts
=
0
}
cube/cube-transfer/src/transfer/config.go
0 → 100755
浏览文件 @
1edd3817
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package
transfer
import
(
"bufio"
"io"
"os"
"strings"
)
const
middle
=
"->"
type
ConfigManager
struct
{
Mymap
map
[
string
]
string
strcet
string
}
func
(
c
*
ConfigManager
)
Init
(
path
string
)
{
c
.
Mymap
=
make
(
map
[
string
]
string
)
f
,
err
:=
os
.
Open
(
path
)
if
err
!=
nil
{
panic
(
err
)
}
defer
f
.
Close
()
r
:=
bufio
.
NewReader
(
f
)
for
{
b
,
_
,
err
:=
r
.
ReadLine
()
if
err
!=
nil
{
if
err
==
io
.
EOF
{
break
}
panic
(
err
)
}
s
:=
strings
.
TrimSpace
(
string
(
b
))
if
strings
.
Index
(
s
,
"#"
)
==
0
{
continue
}
n1
:=
strings
.
Index
(
s
,
"["
)
n2
:=
strings
.
LastIndex
(
s
,
"]"
)
if
n1
>
-
1
&&
n2
>
-
1
&&
n2
>
n1
+
1
{
c
.
strcet
=
strings
.
TrimSpace
(
s
[
n1
+
1
:
n2
])
continue
}
if
len
(
c
.
strcet
)
==
0
{
continue
}
index
:=
strings
.
Index
(
s
,
":"
)
if
index
<
0
{
continue
}
frist
:=
strings
.
TrimSpace
(
s
[
:
index
])
if
len
(
frist
)
==
0
{
continue
}
second
:=
strings
.
TrimSpace
(
s
[
index
+
1
:
])
pos
:=
strings
.
Index
(
second
,
"
\t
#"
)
if
pos
>
-
1
{
second
=
second
[
0
:
pos
]
}
pos
=
strings
.
Index
(
second
,
" #"
)
if
pos
>
-
1
{
second
=
second
[
0
:
pos
]
}
pos
=
strings
.
Index
(
second
,
"
\t
//"
)
if
pos
>
-
1
{
second
=
second
[
0
:
pos
]
}
pos
=
strings
.
Index
(
second
,
" //"
)
if
pos
>
-
1
{
second
=
second
[
0
:
pos
]
}
if
len
(
second
)
==
0
{
continue
}
key
:=
c
.
strcet
+
middle
+
frist
c
.
Mymap
[
key
]
=
strings
.
TrimSpace
(
second
)
}
}
func
(
c
ConfigManager
)
Read
(
node
,
key
string
)
string
{
key
=
node
+
middle
+
key
v
,
found
:=
c
.
Mymap
[
key
]
if
!
found
{
return
""
}
return
v
}
\ No newline at end of file
cube/cube-transfer/src/transfer/deployer.go
0 → 100755
浏览文件 @
1edd3817
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package
transfer
import
(
"fmt"
"github.com/Badangel/logex"
"strconv"
"time"
"transfer/dict"
)
func
DeployStart
(
versionInfo
dict
.
DictVersionInfo
)
error
{
fmt
.
Printf
(
"[entry]start deploy
\n
"
)
logex
.
Noticef
(
"[entry]start deploy"
)
var
err
error
for
{
switch
Dict
.
WaitVersionInfo
.
Status
{
case
dict
.
Dict_Status_Deploying
,
dict
.
Dict_Status_Downloading
:
CmdInstsDownload
()
case
dict
.
Dict_Status_Download_Succ
,
dict
.
Dict_Status_Reloading
:
CmdInstsReload
()
case
dict
.
Dict_Status_Reload_Succ
,
dict
.
Dict_Status_Enabling
:
CmdInstsEnable
()
default
:
logex
.
Noticef
(
"dict status %v"
,
Dict
.
WaitVersionInfo
.
Status
)
}
if
Dict
.
WaitVersionInfo
.
Status
==
dict
.
Dict_Status_Finished
{
Dict
.
WaitVersionInfo
.
FinishTime
=
int
(
time
.
Now
()
.
Unix
())
Dict
.
CurrentVersionInfo
=
append
(
Dict
.
CurrentVersionInfo
,
Dict
.
WaitVersionInfo
)
fmt
.
Printf
(
"[deploy finish]version:%v
\n
"
,
Dict
.
WaitVersionInfo
)
logex
.
Noticef
(
"[deploy finish]version:%v"
,
Dict
.
WaitVersionInfo
)
var
newVersionInfo
dict
.
DictVersionInfo
Dict
.
WaitVersionInfo
=
newVersionInfo
WriteCurrentVersionInfoToFile
()
WriteWaitVersionInfoToFile
()
break
}
else
{
time
.
Sleep
(
time
.
Duration
(
10
)
*
time
.
Second
)
}
}
return
err
}
func
CmdInstsDownload
()
{
for
{
chs
:=
make
([]
chan
error
,
len
(
Dict
.
Instances
))
keyAndRespSlice
:=
make
([]
dict
.
CubeAgentResponse
,
len
(
Dict
.
Instances
))
for
i
,
inst
:=
range
Dict
.
Instances
{
if
inst
.
Status
!=
dict
.
Instance_Status_Download_Succ
{
chs
[
i
]
=
make
(
chan
error
)
var
json_params
dict
.
CubeAgentRequest
json_params
.
Command
=
dict
.
DOWNLOAD
json_params
.
DictName
=
inst
.
DictName
json_params
.
DeployPath
=
inst
.
DeployPath
json_params
.
Version
=
strconv
.
Itoa
(
Dict
.
WaitVersionInfo
.
Version
)
json_params
.
Depend
=
strconv
.
Itoa
(
Dict
.
WaitVersionInfo
.
Depend
)
json_params
.
Id
=
strconv
.
Itoa
(
Dict
.
WaitVersionInfo
.
Id
)
json_params
.
Key
=
strconv
.
Itoa
(
Dict
.
WaitVersionInfo
.
Key
)
json_params
.
Mode
=
Dict
.
WaitVersionInfo
.
Mode
json_params
.
ShardSeq
=
inst
.
Shard
json_params
.
Port
=
strconv
.
Itoa
(
inst
.
Port
)
json_params
.
Source
=
dict
.
GetFileHead
(
Dict
.
StoragePlace
,
TransferAddr
)
+
Dict
.
WaitVersionInfo
.
Output
+
"/"
+
json_params
.
DictName
+
"_part"
+
strconv
.
Itoa
(
inst
.
Shard
)
+
".tar"
var
address
=
fmt
.
Sprintf
(
"http://%v:%v/agent/cmd"
,
inst
.
AgentIp
,
inst
.
AgentPort
)
logex
.
Noticef
(
"[download cmd]%v:%v"
,
address
,
json_params
)
go
nonBlockSendJsonReq
(
"POST2"
,
address
,
120
,
&
json_params
,
&
keyAndRespSlice
[
i
],
chs
[
i
])
Dict
.
Instances
[
i
]
.
DownloadStartTime
=
int
(
time
.
Now
()
.
Unix
())
Dict
.
Instances
[
i
]
.
Mode
=
Dict
.
WaitVersionInfo
.
Mode
}
}
for
i
,
inst
:=
range
Dict
.
Instances
{
err
:=
<-
chs
[
i
]
logex
.
Noticef
(
"[instance resp]download:%v"
,
Dict
.
Instances
)
if
err
!=
nil
||
keyAndRespSlice
[
i
]
.
Success
!=
"0"
{
logex
.
Warningf
(
"cmd cube online downlaod of %v:%v, shard:%v failed"
,
inst
.
AgentIp
,
inst
.
AgentPort
,
inst
.
Shard
)
continue
}
if
inst
.
Status
<
dict
.
Instance_Status_Download_Succ
{
Dict
.
Instances
[
i
]
.
Status
=
dict
.
Instance_Status_Download_Succ
Dict
.
Instances
[
i
]
.
DownloadedTime
=
int
(
time
.
Now
()
.
Unix
())
Dict
.
DownloadSuccInsts
++
}
}
if
Dict
.
DownloadSuccInsts
==
Dict
.
InstancesNum
{
Dict
.
WaitVersionInfo
.
Status
=
dict
.
Dict_Status_Download_Succ
fmt
.
Printf
(
"[all download ok]inst :%v
\n
"
,
Dict
.
Instances
)
logex
.
Noticef
(
"[all download ok]inst :%v"
,
Dict
.
Instances
)
break
}
time
.
Sleep
(
5
*
time
.
Second
)
}
}
func
CmdInstsReload
()
{
for
{
chs
:=
make
([]
chan
error
,
len
(
Dict
.
Instances
))
keyAndRespSlice
:=
make
([]
dict
.
CubeAgentResponse
,
len
(
Dict
.
Instances
))
for
i
,
inst
:=
range
Dict
.
Instances
{
if
inst
.
Status
!=
dict
.
Instance_Status_Reload_Succ
{
chs
[
i
]
=
make
(
chan
error
)
var
json_params
dict
.
CubeAgentRequest
json_params
.
Command
=
dict
.
RELOAD
json_params
.
DictName
=
inst
.
DictName
json_params
.
DeployPath
=
inst
.
DeployPath
json_params
.
Version
=
strconv
.
Itoa
(
Dict
.
WaitVersionInfo
.
Version
)
json_params
.
Depend
=
strconv
.
Itoa
(
Dict
.
WaitVersionInfo
.
Depend
)
json_params
.
Id
=
strconv
.
Itoa
(
Dict
.
WaitVersionInfo
.
Id
)
json_params
.
Key
=
strconv
.
Itoa
(
Dict
.
WaitVersionInfo
.
Key
)
json_params
.
Mode
=
Dict
.
WaitVersionInfo
.
Mode
json_params
.
ShardSeq
=
inst
.
Shard
json_params
.
Port
=
strconv
.
Itoa
(
inst
.
Port
)
json_params
.
Source
=
dict
.
GetFileHead
(
Dict
.
StoragePlace
,
TransferAddr
)
+
Dict
.
WaitVersionInfo
.
Output
+
"/"
+
json_params
.
DictName
+
"_part"
+
strconv
.
Itoa
(
inst
.
Shard
)
+
".tar"
var
address
=
fmt
.
Sprintf
(
"http://%v:%v/agent/cmd"
,
inst
.
AgentIp
,
inst
.
AgentPort
)
logex
.
Noticef
(
"[reload cmd]%v:%v"
,
address
,
json_params
)
go
nonBlockSendJsonReq
(
"POST2"
,
address
,
120
,
&
json_params
,
&
keyAndRespSlice
[
i
],
chs
[
i
])
Dict
.
Instances
[
i
]
.
ReloadStartTime
=
int
(
time
.
Now
()
.
Unix
())
}
}
for
i
,
inst
:=
range
Dict
.
Instances
{
err
:=
<-
chs
[
i
]
logex
.
Noticef
(
"[instance resp]reload:%v"
,
Dict
.
Instances
)
if
err
!=
nil
||
keyAndRespSlice
[
i
]
.
Success
!=
"0"
{
logex
.
Warningf
(
"cmd cube online reload of %v:%v, shard:%v failed"
,
inst
.
AgentIp
,
inst
.
AgentPort
,
inst
.
Shard
)
continue
}
if
inst
.
Status
<
dict
.
Instance_Status_Reload_Succ
{
Dict
.
Instances
[
i
]
.
Status
=
dict
.
Instance_Status_Reload_Succ
Dict
.
Instances
[
i
]
.
ReloadedTime
=
int
(
time
.
Now
()
.
Unix
())
Dict
.
ReloadSuccInsts
++
}
}
if
Dict
.
ReloadSuccInsts
==
Dict
.
InstancesNum
{
Dict
.
WaitVersionInfo
.
Status
=
dict
.
Dict_Status_Reload_Succ
fmt
.
Printf
(
"[all reload ok]inst:%v
\n
"
,
Dict
.
Instances
)
logex
.
Noticef
(
"[all reload ok]inst :%v"
,
Dict
.
Instances
)
break
}
time
.
Sleep
(
5
*
time
.
Second
)
}
}
func
CmdInstsEnable
()
{
for
{
chs
:=
make
([]
chan
error
,
len
(
Dict
.
Instances
))
keyAndRespSlice
:=
make
([]
dict
.
CubeAgentResponse
,
len
(
Dict
.
Instances
))
for
i
,
inst
:=
range
Dict
.
Instances
{
if
inst
.
Status
!=
dict
.
Instance_Status_Enable_Succ
{
chs
[
i
]
=
make
(
chan
error
)
var
json_params
dict
.
CubeAgentRequest
json_params
.
Command
=
dict
.
ENABLE
json_params
.
DictName
=
inst
.
DictName
json_params
.
DeployPath
=
inst
.
DeployPath
json_params
.
Version
=
strconv
.
Itoa
(
Dict
.
WaitVersionInfo
.
Version
)
json_params
.
Depend
=
strconv
.
Itoa
(
Dict
.
WaitVersionInfo
.
Depend
)
json_params
.
Id
=
strconv
.
Itoa
(
Dict
.
WaitVersionInfo
.
Id
)
json_params
.
Key
=
strconv
.
Itoa
(
Dict
.
WaitVersionInfo
.
Key
)
json_params
.
Mode
=
Dict
.
WaitVersionInfo
.
Mode
json_params
.
ShardSeq
=
inst
.
Shard
json_params
.
Port
=
strconv
.
Itoa
(
inst
.
Port
)
json_params
.
Source
=
dict
.
GetFileHead
(
Dict
.
StoragePlace
,
TransferAddr
)
+
Dict
.
WaitVersionInfo
.
Output
+
"/"
+
json_params
.
DictName
+
"_part"
+
strconv
.
Itoa
(
inst
.
Shard
)
+
".tar"
var
address
=
fmt
.
Sprintf
(
"http://%v:%v/agent/cmd"
,
inst
.
AgentIp
,
inst
.
AgentPort
)
logex
.
Noticef
(
"[enable cmd]%v:%v"
,
address
,
json_params
)
go
nonBlockSendJsonReq
(
"POST2"
,
address
,
120
,
&
json_params
,
&
keyAndRespSlice
[
i
],
chs
[
i
])
Dict
.
Instances
[
i
]
.
EnablStartTime
=
int
(
time
.
Now
()
.
Unix
())
}
}
for
i
,
inst
:=
range
Dict
.
Instances
{
err
:=
<-
chs
[
i
]
logex
.
Noticef
(
"[instance resp]enable:%v"
,
Dict
.
Instances
)
if
err
!=
nil
||
keyAndRespSlice
[
i
]
.
Success
!=
"0"
{
logex
.
Warningf
(
"cmd cube online enable of %v:%v, shard:%v failed"
,
inst
.
AgentIp
,
inst
.
AgentPort
,
inst
.
Shard
)
continue
}
if
inst
.
Status
<
dict
.
Instance_Status_Enable_Succ
{
Dict
.
Instances
[
i
]
.
Status
=
dict
.
Instance_Status_Enable_Succ
Dict
.
Instances
[
i
]
.
EnabledTime
=
int
(
time
.
Now
()
.
Unix
())
Dict
.
EnableSuccInsts
++
}
}
if
Dict
.
EnableSuccInsts
==
Dict
.
InstancesNum
{
Dict
.
WaitVersionInfo
.
Status
=
dict
.
Dict_Status_Finished
fmt
.
Printf
(
"[all enable ok]inst :%v
\n
"
,
Dict
.
Instances
)
logex
.
Noticef
(
"[all enable ok]inst :%v"
,
Dict
.
Instances
)
break
}
time
.
Sleep
(
5
*
time
.
Second
)
}
}
cube/cube-transfer/src/transfer/dict/cube_agent_server.go
0 → 100755
浏览文件 @
1edd3817
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package
dict
type
CubeAgentRequest
struct
{
Command
string
`json:"command"`
DictName
string
`json:"dict_name"`
DeployPath
string
`json:"deploy_path"`
Version
string
`json:"version"`
Depend
string
`json:"depend"`
Id
string
`json:"id"`
Key
string
`json:"key"`
Mode
string
`json:"mode"`
ShardSeq
int
`json:"shard_seq"`
Source
string
`json:"source"`
Service
string
`json:"service,omitempty"`
SlotIdList
string
`json:"slot_id_list,omitempty"`
ActiveVersionList
string
`json:"active_version_list,omitempty"`
Port
string
`json:"port,omitempty"`
VersionSign
string
`json:"version_sign,omitempty"`
}
type
CubeAgentResponse
struct
{
Success
string
`json:"success"`
Message
string
`json:"message"`
Data
string
`json:"data"`
}
cube/cube-transfer/src/transfer/dict/define.go
0 → 100755
浏览文件 @
1edd3817
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package
dict
import
"errors"
var
(
// Dict Mode
BASE_ONLY
=
"base_only"
BASR_DELTA
=
"base_delta"
// Deploy Mode
BASE
=
"base"
DELTA
=
"delta"
// Succ or Failed Status
SUCC
=
"succ"
FAILED
=
"failed"
//command
DOWNLOAD
=
"download"
RELOAD
=
"reload"
ENABLE
=
"enable"
FTP_HEADER
=
"ftp://"
HTTP_HEADER
=
"http://"
)
// Dict Status
type
DictStatus
int
const
(
// Dict Status
//clear状态编码参考InstanceStatus
Dict_Status_Clearing
DictStatus
=
1
Dict_Status_Cleared
DictStatus
=
2
Dict_Status_Trigging
DictStatus
=
10
Dict_Status_Building
DictStatus
=
20
Dict_Status_Deploying
DictStatus
=
30
Dict_Status_Downloading
DictStatus
=
40
Dict_Status_Download_Succ
DictStatus
=
50
Dict_Status_Reloading
DictStatus
=
60
Dict_Status_Reload_Succ
DictStatus
=
70
Dict_Status_Enabling
DictStatus
=
80
Dict_Status_Finished
DictStatus
=
90
Dict_Status_Restarting
DictStatus
=
100
)
func
(
this
DictStatus
)
String
()
DictStatusStr
{
switch
this
{
case
Dict_Status_Trigging
:
return
Dict_Status_Trigging_Str
case
Dict_Status_Building
:
return
Dict_Status_Building_Str
case
Dict_Status_Deploying
:
return
Dict_Status_Deploying_Str
case
Dict_Status_Downloading
:
return
Dict_Status_Downloading_Str
case
Dict_Status_Download_Succ
:
return
Dict_Status_Download_Succ_Str
case
Dict_Status_Reloading
:
return
Dict_Status_Reloading_Str
case
Dict_Status_Reload_Succ
:
return
Dict_Status_Reload_Succ_Str
case
Dict_Status_Enabling
:
return
Dict_Status_Enabling_Str
case
Dict_Status_Finished
:
return
Dict_Status_Finished_Str
case
Dict_Status_Restarting
:
return
Dict_Status_Restarting_Str
case
Dict_Status_Clearing
:
return
Dict_Status_Clearing_Str
case
Dict_Status_Cleared
:
return
Dict_Status_Cleared_Str
default
:
return
""
}
}
type
DictStatusStr
string
const
(
// Dict Status
Dict_Status_Trigging_Str
DictStatusStr
=
"Trigging"
Dict_Status_Building_Str
DictStatusStr
=
"Building"
Dict_Status_Deploying_Str
DictStatusStr
=
"deploying"
Dict_Status_Downloading_Str
DictStatusStr
=
"downloading"
Dict_Status_Download_Succ_Str
DictStatusStr
=
"download_succ"
Dict_Status_Reloading_Str
DictStatusStr
=
"reloading"
Dict_Status_Reload_Succ_Str
DictStatusStr
=
"reload_succ"
Dict_Status_Enabling_Str
DictStatusStr
=
"enabling"
Dict_Status_Finished_Str
DictStatusStr
=
"finished"
Dict_Status_Restarting_Str
DictStatusStr
=
"restarting"
Dict_Status_Clearing_Str
DictStatusStr
=
"clearing"
Dict_Status_Cleared_Str
DictStatusStr
=
"cleared"
)
func
(
this
DictStatusStr
)
Int
()
(
DictStatus
,
error
)
{
switch
this
{
case
Dict_Status_Trigging_Str
:
return
Dict_Status_Trigging
,
nil
case
Dict_Status_Building_Str
:
return
Dict_Status_Building
,
nil
case
Dict_Status_Deploying_Str
:
return
Dict_Status_Deploying
,
nil
case
Dict_Status_Downloading_Str
:
return
Dict_Status_Downloading
,
nil
case
Dict_Status_Download_Succ_Str
:
return
Dict_Status_Download_Succ
,
nil
case
Dict_Status_Reloading_Str
:
return
Dict_Status_Reloading
,
nil
case
Dict_Status_Reload_Succ_Str
:
return
Dict_Status_Reload_Succ
,
nil
case
Dict_Status_Enabling_Str
:
return
Dict_Status_Enabling
,
nil
case
Dict_Status_Finished_Str
:
return
Dict_Status_Finished
,
nil
case
Dict_Status_Restarting_Str
:
return
Dict_Status_Restarting
,
nil
case
Dict_Status_Clearing_Str
:
return
Dict_Status_Clearing
,
nil
case
Dict_Status_Cleared_Str
:
return
Dict_Status_Cleared
,
nil
default
:
return
0
,
errors
.
New
(
"invalid dict status"
)
}
}
// Instance Status:
type
InstanceStatus
int
const
(
//各种状态都有可能进入clear状态,因此clear相关的状态都小于init状态
Instance_Status_Clear
InstanceStatus
=
1
Instance_Status_Clearing
InstanceStatus
=
2
Instance_Status_Clear_Failed
InstanceStatus
=
3
Instance_Status_Clear_Succ
InstanceStatus
=
4
Instance_Status_Init
InstanceStatus
=
10
Instance_Status_Downloading
InstanceStatus
=
20
Instance_Status_Download_Failed
InstanceStatus
=
30
Instance_Status_Download_Succ
InstanceStatus
=
40
Instance_Status_Reloading
InstanceStatus
=
50
Instance_Status_Reload_Failed
InstanceStatus
=
60
Instance_Status_Reload_Succ
InstanceStatus
=
70
Instance_Status_Enabling
InstanceStatus
=
80
Instance_Status_Enable_Failed
InstanceStatus
=
90
Instance_Status_Enable_Succ
InstanceStatus
=
100
Instance_Status_Poping
InstanceStatus
=
110
Instance_Status_Pop_Failed
InstanceStatus
=
120
Instance_Status_Pop_Succ
InstanceStatus
=
130
Instance_Status_Dead
InstanceStatus
=
250
)
func
(
this
InstanceStatus
)
String
()
InstanceStatusStr
{
switch
this
{
case
Instance_Status_Init
:
return
Instance_Status_Init_Str
case
Instance_Status_Downloading
:
return
Instance_Status_Downloading_Str
case
Instance_Status_Download_Failed
:
return
Instance_Status_Download_Failed_Str
case
Instance_Status_Download_Succ
:
return
Instance_Status_Download_Succ_Str
case
Instance_Status_Reloading
:
return
Instance_Status_Reloading_Str
case
Instance_Status_Reload_Failed
:
return
Instance_Status_Reload_Failed_Str
case
Instance_Status_Reload_Succ
:
return
Instance_Status_Reload_Succ_Str
case
Instance_Status_Enabling
:
return
Instance_Status_Enabling_Str
case
Instance_Status_Enable_Failed
:
return
Instance_Status_Enable_Failed_Str
case
Instance_Status_Enable_Succ
:
return
Instance_Status_Enable_Succ_Str
case
Instance_Status_Dead
:
return
Instance_Status_Dead_Str
case
Instance_Status_Clear
:
return
Instance_Status_Clear_Str
case
Instance_Status_Clearing
:
return
Instance_Status_Clearing_Str
case
Instance_Status_Clear_Failed
:
return
Instance_Status_Clear_Failed_Str
case
Instance_Status_Clear_Succ
:
return
Instance_Status_Clear_Succ_Str
case
Instance_Status_Poping
:
return
Instance_Status_Poping_Str
case
Instance_Status_Pop_Failed
:
return
Instance_Status_Pop_Failed_Str
case
Instance_Status_Pop_Succ
:
return
Instance_Status_Pop_Succ_Str
default
:
return
""
}
}
type
InstanceStatusStr
string
const
(
Instance_Status_Init_Str
InstanceStatusStr
=
"init"
Instance_Status_Downloading_Str
InstanceStatusStr
=
"downloading"
Instance_Status_Download_Failed_Str
InstanceStatusStr
=
"download_failed"
Instance_Status_Download_Succ_Str
InstanceStatusStr
=
"download_succ"
Instance_Status_Reloading_Str
InstanceStatusStr
=
"reloading"
Instance_Status_Reload_Failed_Str
InstanceStatusStr
=
"finish_reload_failed"
Instance_Status_Reload_Succ_Str
InstanceStatusStr
=
"finish_reload_succ"
Instance_Status_Enabling_Str
InstanceStatusStr
=
"enabling"
Instance_Status_Enable_Failed_Str
InstanceStatusStr
=
"enable_failed"
Instance_Status_Enable_Succ_Str
InstanceStatusStr
=
"enable_succ"
Instance_Status_Dead_Str
InstanceStatusStr
=
"dead"
Instance_Status_Clear_Str
InstanceStatusStr
=
"clear"
Instance_Status_Clearing_Str
InstanceStatusStr
=
"clearing"
Instance_Status_Clear_Failed_Str
InstanceStatusStr
=
"clear_failed"
Instance_Status_Clear_Succ_Str
InstanceStatusStr
=
"clear_succ"
Instance_Status_Poping_Str
InstanceStatusStr
=
"poping"
Instance_Status_Pop_Failed_Str
InstanceStatusStr
=
"pop_failed"
Instance_Status_Pop_Succ_Str
InstanceStatusStr
=
"pop_succ"
)
func
(
this
InstanceStatusStr
)
Int
()
(
InstanceStatus
,
error
)
{
switch
this
{
case
Instance_Status_Init_Str
:
return
Instance_Status_Init
,
nil
case
Instance_Status_Downloading_Str
:
return
Instance_Status_Downloading
,
nil
case
Instance_Status_Download_Failed_Str
:
return
Instance_Status_Download_Failed
,
nil
case
Instance_Status_Download_Succ_Str
:
return
Instance_Status_Download_Succ
,
nil
case
Instance_Status_Reloading_Str
:
return
Instance_Status_Reloading
,
nil
case
Instance_Status_Reload_Failed_Str
:
return
Instance_Status_Reload_Failed
,
nil
case
Instance_Status_Reload_Succ_Str
:
return
Instance_Status_Reload_Succ
,
nil
case
Instance_Status_Enabling_Str
:
return
Instance_Status_Enabling
,
nil
case
Instance_Status_Enable_Failed_Str
:
return
Instance_Status_Enable_Failed
,
nil
case
Instance_Status_Enable_Succ_Str
:
return
Instance_Status_Enable_Succ
,
nil
case
Instance_Status_Dead_Str
:
return
Instance_Status_Dead
,
nil
case
Instance_Status_Clear_Str
:
return
Instance_Status_Clear
,
nil
case
Instance_Status_Clearing_Str
:
return
Instance_Status_Clearing
,
nil
case
Instance_Status_Clear_Failed_Str
:
return
Instance_Status_Clear_Failed
,
nil
case
Instance_Status_Clear_Succ_Str
:
return
Instance_Status_Clear_Succ
,
nil
case
Instance_Status_Poping_Str
:
return
Instance_Status_Poping
,
nil
case
Instance_Status_Pop_Failed_Str
:
return
Instance_Status_Pop_Failed
,
nil
case
Instance_Status_Pop_Succ_Str
:
return
Instance_Status_Pop_Succ
,
nil
default
:
return
0
,
errors
.
New
(
"invalid instance status"
)
}
}
\ No newline at end of file
cube/cube-transfer/src/transfer/dict/dict_info.go
0 → 100755
浏览文件 @
1edd3817
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package
dict
type
DictInfo
struct
{
DictName
string
`json:"dict_name"`
DictMode
string
`json:"dict_mode"`
ShardNum
int
`json:"shard_num"`
CopyNum
int
`json:"copy_num"`
InstancesNum
int
`json:"inst_num"`
DeployPath
string
`json:"deploy_path"`
DonefileAddress
string
`json:"donefile_addr"`
OutputAddress
string
`json:"output_addr"`
TmpAddress
string
`json:"tmp_addr"`
StoragePlace
string
`json:"storage_place"`
DownloadSuccInsts
int
`json:"download_inst"`
ReloadSuccInsts
int
`json:"reload_insts"`
EnableSuccInsts
int
`json:"enable_insts"`
Instances
[]
DictInstance
`json:"instances"`
WaitVersionInfo
DictVersionInfo
`json:"wait_version_info"`
CurrentVersionInfo
[]
DictVersionInfo
`json:"current_version_info"`
}
\ No newline at end of file
cube/cube-transfer/src/transfer/dict/dict_instance_status.go
0 → 100755
浏览文件 @
1edd3817
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package
dict
type
DictInstance
struct
{
DictName
string
`json:"dict_name"`
Mode
string
`json:"mode"`
Version
int
`json:"version"`
Depend
int
`json:"depend"`
Id
int
`json:"id"`
Key
int
`json:"key"`
Shard
int
`json:"shard"`
Source
string
`json:"source"`
DeployPath
string
`json:"deploy_path"`
IP
string
`json:"ip"`
Port
int
`json:"port"`
AgentIp
string
`json:"agent_ip"`
AgentPort
int
`json:"agent_port"`
Status
InstanceStatus
`json:"status_id"`
StatusStr
InstanceStatusStr
`json:"status"`
BuildedTime
int
`json:"builded_time"`
DownloadStartTime
int
`json:"download_start_time"`
DownloadedTime
int
`json:"downloaded_time"`
ReloadStartTime
int
`json:"reload_start_time"`
ReloadedTime
int
`json:"reloaded_time"`
EnablStartTime
int
`json:"enable_start_time"`
EnabledTime
int
`json:"enabled_time"`
CreateTime
int
`json:"create_time"`
}
cube/cube-transfer/src/transfer/dict/dict_shard_info.go
0 → 100755
浏览文件 @
1edd3817
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package
dict
import
(
"strconv"
)
type
DictShardInfo
struct
{
Name
string
`json:"name"`
Version
string
`json:"version"`
Depend
string
`json:"depend"`
Id
string
`json:"id"`
Key
string
`json:"key"`
Shard
int
`json:"shard"`
Mode
string
`json:"mode"`
DictMode
string
`json:"dict_mode,omitempty"`
Source
string
`json:"data_source"`
Service
string
`json:"service,omitempty"`
DeltaInfo
string
`json:"delta_info,omitempty"`
BuildedTime
int
`json:"builded_time,omitempty"`
BuildedTimeStr
string
`json:"build_finish_time,omitempty"`
CreateTime
int
`json:"create_time,omitempty"`
IsActive
bool
`json:"is_active,omitempty"`
}
func
GetDictShardScaler
(
shard
int
,
dictVersionInfo
DictVersionInfo
,
storagePlace
string
,
transferaddr
string
)(
info
DictShardInfo
){
info
.
Name
=
dictVersionInfo
.
DictName
info
.
Version
=
strconv
.
Itoa
(
dictVersionInfo
.
Version
)
info
.
Depend
=
strconv
.
Itoa
(
dictVersionInfo
.
Depend
)
info
.
Id
=
strconv
.
Itoa
(
dictVersionInfo
.
Id
)
info
.
Key
=
strconv
.
Itoa
(
dictVersionInfo
.
Key
)
info
.
Mode
=
dictVersionInfo
.
Mode
info
.
Shard
=
shard
info
.
Source
=
GetFileHead
(
storagePlace
,
transferaddr
)
+
dictVersionInfo
.
Output
+
"/"
+
info
.
Version
+
"/"
+
info
.
Name
+
"_part"
+
strconv
.
Itoa
(
shard
)
+
".tar"
return
}
func
GetFileHead
(
storagePlace
string
,
transferaddr
string
)
string
{
if
storagePlace
==
"LOCAL"
{
return
"ftp://"
+
transferaddr
}
else
{
return
""
}
}
\ No newline at end of file
cube/cube-transfer/src/transfer/dict/dict_version_info.go
0 → 100755
浏览文件 @
1edd3817
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package
dict
type
(
DictVersionInfo
struct
{
DictName
string
`json:"dict_name"`
Version
int
`json:"version"`
Depend
int
`json:"depend"`
Id
int
`json:"id"`
Key
int
`json:"key"`
Mode
string
`json:"mode"`
Input
string
`json:"input"`
Output
string
`json:"output"`
Status
DictStatus
`json:"status"`
StatusStr
DictStatusStr
`json:"status_str"`
FinishTime
int
`json:"finish_time"`
CreateTime
int
`json:"create_time"`
MetaInfos
map
[
int
]
string
`json:"meta_infos"`
}
DonefileInfo
struct
{
Id
string
`json:"id"`
Key
string
`json:"key"`
Input
string
`json:"input"`
}
DictShardMetaInfo
struct
{
Name
string
`json:"name"`
Version
int
`json:"version"`
Depend
int
`json:"depend"`
Shard
int
`json:"shard"`
Split
int
`json:"split"`
Meta
string
`json:"meta"`
}
MetaInfo
struct
{
IndexTotalCount
string
`json:"index_total_count"`
IndexLenList
[]
string
`json:"index_len_list"`
DataLenList
[]
string
`json:"data_len_list"`
}
)
cube/cube-transfer/src/transfer/global.go
0 → 100755
浏览文件 @
1edd3817
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package
transfer
import
"transfer/dict"
var
(
Dir
string
Port
string
Dict
dict
.
DictInfo
BuildToolLocal
string
TransferAddr
string
//DictName string
//DictMode string
//DonefileAddress string
//OutputAddress string
//TmpAddress string
//ShardNum int
//CopyNum int
//DeployPath string
//Instances []dict.DictInstance
//StoragePlace string
//WaitVersionInfo dict.DictVersionInfo
//CurrentVersionInfo []dict.DictVersionInfo
//InstancesNum int
//DownloadSuccInsts int
//ReloadSuccInsts int
//EnableSuccInsts int
)
type
LocalAddress
string
type
HDFSAddress
string
const
(
STATUS_OK
int
=
0
STATUS_WRONG_API
int
=
2
STATUS_RETRY_LATER
int
=
10
STATUS_FAIL
int
=
255
)
\ No newline at end of file
cube/cube-transfer/src/transfer/http.go
0 → 100755
浏览文件 @
1edd3817
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package
transfer
import
(
"bytes"
"encoding/json"
"errors"
"fmt"
"github.com/Badangel/logex"
"io/ioutil"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"time"
)
type
handlerFunc
func
(
subpath
string
,
m
map
[
string
]
string
)
(
string
,
string
,
int
,
error
)
var
(
// key = subpath; eg: path="/checker/job", key="job"
getHandler
map
[
string
]
handlerFunc
putHandler
map
[
string
]
handlerFunc
deleteHandler
map
[
string
]
handlerFunc
postHandler
map
[
string
]
handlerFunc
)
func
startHttp
(
addr
string
)
error
{
// init handlers:
initGetHandlers
()
initPostHandlers
()
http
.
HandleFunc
(
"/dict/"
,
handleRest
)
http
.
HandleFunc
(
"/instance/"
,
handleRest
)
logex
.
Notice
(
"start http:"
,
addr
)
return
http
.
ListenAndServe
(
addr
,
nil
)
}
func
handleRest
(
w
http
.
ResponseWriter
,
r
*
http
.
Request
)
{
var
(
req_log
string
)
time_begin
:=
time
.
Now
()
cont_type
:=
make
([]
string
,
1
,
1
)
cont_type
[
0
]
=
"application/json"
header
:=
w
.
Header
()
header
[
"Content-Type"
]
=
cont_type
w
.
Header
()
.
Add
(
"Access-Control-Allow-Origin"
,
"*"
)
m
:=
parseHttpKv
(
r
)
req_log
=
fmt
.
Sprintf
(
"handle %v %v %v from %v"
,
r
.
Method
,
r
.
URL
.
Path
,
r
.
URL
.
RawQuery
,
r
.
RemoteAddr
)
api
:=
r
.
URL
.
Path
var
showHandler
map
[
string
]
handlerFunc
switch
r
.
Method
{
case
"GET"
:
showHandler
=
getHandler
case
"POST"
:
// create
showHandler
=
postHandler
case
"PUT"
:
// update
showHandler
=
putHandler
case
"DELETE"
:
showHandler
=
deleteHandler
default
:
logex
.
Warningf
(
`{"error":1, "message":"unsupport method %v"}`
,
r
.
Method
)
}
handler
,
ok
:=
showHandler
[
api
]
if
!
ok
{
key_list
:=
make
([]
string
,
0
,
len
(
showHandler
))
for
key
:=
range
showHandler
{
key_list
=
append
(
key_list
,
key
)
}
fmt
.
Fprintf
(
w
,
`{"success":"%v", "message":"wrong api", "method":"%s", "api":"%s", "api_list":"%v"}`
,
STATUS_WRONG_API
,
r
.
Method
,
api
,
key_list
)
logex
.
Noticef
(
`%v, time=%v, status=%v`
,
req_log
,
time
.
Now
()
.
Sub
(
time_begin
)
.
Nanoseconds
()
/
1000000
,
STATUS_WRONG_API
)
return
}
var
s
string
rst
,
handle_log
,
status
,
err
:=
handler
(
api
,
m
)
if
status
==
STATUS_OK
{
s
=
fmt
.
Sprintf
(
`{"success":"%v", "message":"query ok", "data":%s}`
,
status
,
rst
)
}
else
{
s
=
fmt
.
Sprintf
(
`{"success":"%v", "message":%v, "data":%s}`
,
status
,
quote
(
err
.
Error
()),
rst
)
}
if
isJsonDict
(
s
)
{
fmt
.
Fprintln
(
w
,
s
)
}
else
{
logex
.
Fatalf
(
"invalid json: %v"
,
s
)
}
if
err
==
nil
{
logex
.
Noticef
(
`%v, time=%v, status=%v, handle_log=%v`
,
req_log
,
time
.
Now
()
.
Sub
(
time_begin
)
.
Nanoseconds
()
/
1000000
,
status
,
quote
(
handle_log
))
}
else
{
logex
.
Noticef
(
`%v, time=%v, status=%v, err=%v, handle_log=%v`
,
req_log
,
time
.
Now
()
.
Sub
(
time_begin
)
.
Nanoseconds
()
/
1000000
,
status
,
quote
(
err
.
Error
()),
quote
(
handle_log
))
}
}
func
parseHttpKv
(
r
*
http
.
Request
)
map
[
string
]
string
{
r
.
ParseForm
()
m
:=
make
(
map
[
string
]
string
)
for
k
,
v
:=
range
r
.
Form
{
switch
k
{
case
"user"
:
// remove @baidu.com for user
m
[
k
]
=
strings
.
Split
(
v
[
0
],
"@"
)[
0
]
default
:
m
[
k
]
=
v
[
0
]
}
}
// allow passing hostname for debug
//if _, ok := m["hostname"]; !ok {
// ip := r.RemoteAddr[:strings.Index(r.RemoteAddr, ":")]
// m["hostname"], _ = getHostname(ip)
//}
return
m
}
// restReq sends a restful request to requrl and returns response body.
func
restReq
(
method
,
requrl
string
,
timeout
int
,
kv
*
map
[
string
]
string
)
(
string
,
error
)
{
logex
.
Debug
(
"####restReq####"
)
logex
.
Debug
(
*
kv
)
data
:=
url
.
Values
{}
if
kv
!=
nil
{
for
k
,
v
:=
range
*
kv
{
logex
.
Trace
(
"req set:"
,
k
,
v
)
data
.
Set
(
k
,
v
)
}
}
if
method
==
"GET"
||
method
==
"DELETE"
{
requrl
=
requrl
+
"?"
+
data
.
Encode
()
data
=
url
.
Values
{}
}
logex
.
Notice
(
method
,
requrl
)
req
,
err
:=
http
.
NewRequest
(
method
,
requrl
,
bytes
.
NewBufferString
(
data
.
Encode
()))
if
err
!=
nil
{
logex
.
Warning
(
"NewRequest failed:"
,
err
)
return
""
,
err
}
if
method
==
"POST"
||
method
==
"PUT"
{
req
.
Header
.
Add
(
"Content-Type"
,
"application/x-www-form-urlencoded"
)
req
.
Header
.
Add
(
"Content-Length"
,
strconv
.
Itoa
(
len
(
data
.
Encode
())))
}
client
:=
&
http
.
Client
{}
client
.
Timeout
=
time
.
Duration
(
timeout
)
*
time
.
Second
resp
,
err
:=
client
.
Do
(
req
)
if
err
!=
nil
{
logex
.
Warning
(
"Do failed:"
,
err
)
return
""
,
err
}
if
resp
.
StatusCode
<
200
||
resp
.
StatusCode
>
299
{
logex
.
Warning
(
"resp status: "
+
resp
.
Status
)
return
""
,
errors
.
New
(
"resp status: "
+
resp
.
Status
)
}
body
,
err
:=
ioutil
.
ReadAll
(
resp
.
Body
)
return
string
(
body
),
err
}
//func jsonRestReq(method, requrl string, timeout int, json_params *CheckerRequest) (string, error) {
func
jsonRestReq
(
method
,
requrl
string
,
timeout
int
,
json_params
interface
{})
(
int
,
string
,
error
)
{
logex
.
Debug
(
"####jsonRestReq####"
)
if
method
==
"POST2"
{
method
=
"POST"
}
//b, err := json.Marshal(*json_params)
b
,
err
:=
json
.
Marshal
(
json_params
)
if
err
!=
nil
{
logex
.
Warning
(
"json_params marshal failed:"
,
err
)
return
0
,
""
,
err
}
logex
.
Notice
(
method
,
requrl
)
req
,
err
:=
http
.
NewRequest
(
method
,
requrl
,
bytes
.
NewBufferString
(
string
(
b
)))
if
err
!=
nil
{
logex
.
Warning
(
"NewRequest failed:"
,
err
)
return
0
,
""
,
err
}
if
method
==
"POST"
||
method
==
"PUT"
{
req
.
Header
.
Add
(
"Content-Type"
,
"application/json"
)
//req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
req
.
Header
.
Add
(
"Content-Length"
,
strconv
.
Itoa
(
len
(
string
(
b
))))
}
client
:=
&
http
.
Client
{}
client
.
Timeout
=
time
.
Duration
(
timeout
)
*
time
.
Second
resp
,
err
:=
client
.
Do
(
req
)
if
err
!=
nil
{
logex
.
Warning
(
"Do failed:"
,
err
)
return
0
,
""
,
err
}
if
resp
.
StatusCode
<
200
||
resp
.
StatusCode
>
299
{
logex
.
Warning
(
"resp status: "
+
resp
.
Status
)
return
0
,
""
,
errors
.
New
(
"resp status: "
+
resp
.
Status
)
}
body
,
err
:=
ioutil
.
ReadAll
(
resp
.
Body
)
return
resp
.
StatusCode
,
string
(
body
),
err
}
// quote quotes string for json output. eg: s="123", quote(s)=`"123"`
func
quote
(
s
string
)
string
{
return
fmt
.
Sprintf
(
"%q"
,
s
)
}
func
isJsonDict
(
s
string
)
bool
{
var
js
map
[
string
]
interface
{}
return
json
.
Unmarshal
([]
byte
(
s
),
&
js
)
==
nil
}
func
getHostname
(
ip
string
)
(
hostname
string
,
err
error
)
{
if
hostnames
,
err
:=
net
.
LookupAddr
(
ip
);
err
!=
nil
{
hostname
=
ip
//logex.Warningf("cannot find the hostname of ip (%s), error (%v)", ip, err)
}
else
{
if
len
(
hostnames
)
>
0
{
hostname
=
hostnames
[
0
][
:
strings
.
LastIndex
(
hostnames
[
0
],
".baidu.com."
)]
}
else
{
hostname
=
ip
}
}
return
hostname
,
err
}
func
nonBlockSendJsonReq
(
method
,
requrl
string
,
timeout
int
,
json_params
interface
{},
out
interface
{},
ch
chan
error
)
{
_
,
body_str
,
err
:=
jsonRestReq
(
method
,
requrl
,
timeout
,
json_params
)
logex
.
Noticef
(
"json request method:[%v], requrl:[%s], timeout:[%v], params[%v], resbody[%v], err[%v]"
,
method
,
requrl
,
timeout
,
json_params
,
body_str
,
err
)
if
err
!=
nil
{
ch
<-
err
}
err
=
json
.
Unmarshal
([]
byte
(
body_str
),
out
)
if
err
!=
nil
{
logex
.
Warningf
(
"json unmarshal failed error (%v) for : %v"
,
err
,
body_str
)
}
ch
<-
err
}
cube/cube-transfer/src/transfer/http_get.go
0 → 100755
浏览文件 @
1edd3817
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package
transfer
import
(
"encoding/json"
"errors"
"fmt"
"strconv"
"transfer/dict"
)
func
initGetHandlers
()
{
getHandler
=
map
[
string
]
handlerFunc
{
"/dict/info"
:
GetDictInfo
,
"/instance/status"
:
GetInstanceStatus
,
"/dict/scaler"
:
GetDictScaler
,
"/dict/meta_info"
:
GetDictShardMetaInfo
,
"/dict/deploy/history"
:
GetDictDeployHistory
,
}
}
func
GetDictInfo
(
subpath
string
,
m
map
[
string
]
string
)
(
string
,
string
,
int
,
error
)
{
b
,
err
:=
json
.
Marshal
(
Dict
)
if
err
!=
nil
{
return
quote
(
""
),
""
,
STATUS_FAIL
,
fmt
.
Errorf
(
"json marshal failed, %v"
,
err
)
}
return
string
(
b
),
""
,
STATUS_OK
,
nil
}
func
GetInstanceStatus
(
subpath
string
,
m
map
[
string
]
string
)
(
string
,
string
,
int
,
error
)
{
b
,
err
:=
json
.
Marshal
(
Dict
.
Instances
)
if
err
!=
nil
{
return
quote
(
""
),
""
,
STATUS_FAIL
,
fmt
.
Errorf
(
"json marshal failed, %v"
,
err
)
}
return
string
(
b
),
""
,
STATUS_OK
,
nil
}
func
GetDictScaler
(
subpath
string
,
m
map
[
string
]
string
)
(
string
,
string
,
int
,
error
)
{
var
(
shard
int
err
error
infos
[]
dict
.
DictShardInfo
)
shardStr
,
ok
:=
m
[
"shard"
]
if
!
ok
{
shard
=
0
}
else
{
shard
,
err
=
strconv
.
Atoi
(
shardStr
)
if
err
!=
nil
{
return
quote
(
""
),
""
,
STATUS_FAIL
,
errors
.
New
(
"invalid arg: shard should be int"
)
}
}
for
_
,
version
:=
range
Dict
.
CurrentVersionInfo
{
info
:=
dict
.
GetDictShardScaler
(
shard
,
version
,
Dict
.
StoragePlace
,
TransferAddr
)
infos
=
append
(
infos
,
info
)
}
if
Dict
.
WaitVersionInfo
.
Status
>
dict
.
Dict_Status_Deploying
{
info
:=
dict
.
GetDictShardScaler
(
shard
,
Dict
.
WaitVersionInfo
,
Dict
.
StoragePlace
,
TransferAddr
)
infos
=
append
(
infos
,
info
)
}
b
,
err
:=
json
.
Marshal
(
infos
)
if
err
!=
nil
{
return
quote
(
""
),
""
,
STATUS_FAIL
,
fmt
.
Errorf
(
"json marshal failed, %v"
,
err
)
}
return
quote
(
string
(
b
)),
""
,
STATUS_OK
,
nil
}
func
GetDictShardMetaInfo
(
subpath
string
,
m
map
[
string
]
string
)
(
string
,
string
,
int
,
error
)
{
name
,
ok
:=
m
[
"name"
]
if
!
ok
{
return
quote
(
"failed"
),
""
,
STATUS_FAIL
,
errors
.
New
(
"need arg: name"
)
}
version
,
err
:=
strconv
.
Atoi
(
m
[
"version"
])
if
err
!=
nil
{
return
quote
(
"failed"
),
""
,
STATUS_FAIL
,
errors
.
New
(
"need arg: version, version should be int"
)
}
depend
,
err
:=
strconv
.
Atoi
(
m
[
"depend"
])
if
err
!=
nil
{
return
quote
(
"failed"
),
""
,
STATUS_FAIL
,
errors
.
New
(
"need arg: depend, depend should be int"
)
}
shard
,
err
:=
strconv
.
Atoi
(
m
[
"shard"
])
if
err
!=
nil
{
return
quote
(
"failed"
),
""
,
STATUS_FAIL
,
errors
.
New
(
"need arg: shard, shard should be int"
)
}
split
:=
1
split_str
,
ok
:=
m
[
"split"
]
if
ok
{
split
,
err
=
strconv
.
Atoi
(
split_str
)
if
err
!=
nil
{
return
quote
(
"failed"
),
""
,
STATUS_FAIL
,
errors
.
New
(
"arg: split, should be int"
)
}
}
meta_info
,
err
:=
GetDictShardMetaInfos
(
name
,
version
,
depend
,
shard
,
split
)
if
err
!=
nil
{
return
quote
(
"failed"
),
""
,
STATUS_FAIL
,
fmt
.
Errorf
(
"info error: %v"
,
err
)
}
b
,
err
:=
json
.
Marshal
(
meta_info
)
if
err
!=
nil
{
return
quote
(
"failed"
),
""
,
STATUS_FAIL
,
fmt
.
Errorf
(
"json marshal failed, %v"
,
err
)
}
return
string
(
b
),
""
,
STATUS_OK
,
nil
}
func
GetDictDeployHistory
(
subpath
string
,
m
map
[
string
]
string
)
(
string
,
string
,
int
,
error
)
{
var
deployVerisonInfos
[]
dict
.
DictVersionInfo
deployVerisonInfos
=
Dict
.
CurrentVersionInfo
if
Dict
.
WaitVersionInfo
.
Status
>
dict
.
Dict_Status_Deploying
{
deployVerisonInfos
=
append
(
deployVerisonInfos
,
Dict
.
WaitVersionInfo
)
}
b
,
err
:=
json
.
Marshal
(
deployVerisonInfos
)
if
err
!=
nil
{
return
quote
(
"failed"
),
""
,
STATUS_FAIL
,
fmt
.
Errorf
(
"json marshal failed, %v"
,
err
)
}
return
string
(
b
),
""
,
STATUS_OK
,
nil
}
func
GetDictShardMetaInfos
(
dictName
string
,
version
int
,
depend
int
,
shard
int
,
split
int
)
(
dict
.
DictShardMetaInfo
,
error
)
{
var
info
dict
.
DictShardMetaInfo
for
_
,
v
:=
range
Dict
.
CurrentVersionInfo
{
if
v
.
Version
==
version
&&
v
.
Depend
==
depend
{
if
meta
,
ok
:=
v
.
MetaInfos
[
shard
];
ok
{
info
.
Name
=
dictName
;
info
.
Version
=
version
info
.
Depend
=
depend
info
.
Shard
=
shard
info
.
Split
=
split
info
.
Meta
=
meta
return
info
,
nil
}
else
{
return
info
,
fmt
.
Errorf
(
"shard not right"
)
}
}
}
return
info
,
fmt
.
Errorf
(
"info not right"
)
}
\ No newline at end of file
cube/cube-transfer/src/transfer/http_post.go
0 → 100644
浏览文件 @
1edd3817
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package
transfer
import
(
"encoding/json"
"errors"
"fmt"
"reflect"
"strconv"
"transfer/dict"
)
func
initPostHandlers
()
{
postHandler
=
map
[
string
]
handlerFunc
{
"/dict/meta_info/register"
:
PostDictShardMetaInfoRegister
,
}
}
func
PostDictShardMetaInfoRegister
(
subpath
string
,
m
map
[
string
]
string
)
(
string
,
string
,
int
,
error
)
{
var
(
shardMetaInfo
dict
.
DictShardMetaInfo
ok
bool
err
error
metaInfo
dict
.
MetaInfo
)
shardMetaInfo
.
Name
,
ok
=
m
[
"name"
]
if
!
ok
{
return
quote
(
"failed"
),
""
,
STATUS_FAIL
,
errors
.
New
(
"need arg: name"
)
}
shardMetaInfo
.
Version
,
err
=
strconv
.
Atoi
(
m
[
"version"
])
if
err
!=
nil
{
return
quote
(
"failed"
),
""
,
STATUS_FAIL
,
errors
.
New
(
"need arg: version, should be int"
)
}
shardMetaInfo
.
Depend
,
err
=
strconv
.
Atoi
(
m
[
"depend"
])
if
err
!=
nil
{
return
quote
(
"failed"
),
""
,
STATUS_FAIL
,
errors
.
New
(
"need arg: depend, should be int"
)
}
shardMetaInfo
.
Shard
,
err
=
strconv
.
Atoi
(
m
[
"shard"
])
if
err
!=
nil
{
return
quote
(
"failed"
),
""
,
STATUS_FAIL
,
errors
.
New
(
"need arg: shard, should be int"
)
}
shardMetaInfo
.
Split
=
1
split_str
,
ok
:=
m
[
"split"
]
if
ok
{
shardMetaInfo
.
Split
,
err
=
strconv
.
Atoi
(
split_str
)
if
err
!=
nil
{
return
quote
(
"failed"
),
""
,
STATUS_FAIL
,
errors
.
New
(
"arg: split, should be int"
)
}
}
// check dict shard_num and status
if
shardMetaInfo
.
Shard
<
0
||
shardMetaInfo
.
Shard
>=
Dict
.
ShardNum
{
return
quote
(
"failed"
),
""
,
STATUS_FAIL
,
fmt
.
Errorf
(
"shard value invalid, dict shard is :%v"
,
Dict
.
ShardNum
)
}
shardMetaInfo
.
Meta
,
ok
=
m
[
"meta"
]
if
!
ok
{
return
quote
(
"failed"
),
""
,
STATUS_FAIL
,
errors
.
New
(
"need arg: meta"
)
}
if
err
=
json
.
Unmarshal
([]
byte
(
shardMetaInfo
.
Meta
),
&
metaInfo
);
err
!=
nil
{
return
quote
(
"failed"
),
""
,
STATUS_FAIL
,
fmt
.
Errorf
(
"meta string bad formatted: %v"
,
err
)
}
if
reflect
.
DeepEqual
(
metaInfo
,
(
dict
.
MetaInfo
{}))
{
return
quote
(
"failed"
),
""
,
STATUS_FAIL
,
errors
.
New
(
"meta string bad formatted"
)
}
if
0
==
len
(
metaInfo
.
IndexLenList
)
{
return
quote
(
"failed"
),
""
,
STATUS_FAIL
,
errors
.
New
(
"meta string bad formatted, index_len_list is null"
)
}
if
0
==
len
(
metaInfo
.
DataLenList
)
{
return
quote
(
"failed"
),
""
,
STATUS_FAIL
,
errors
.
New
(
"meta string bad formatted, data_len_list is null"
)
}
fmt
.
Printf
(
"update meta shardMetaInfo:%v metaInfo:%v
\n
"
,
shardMetaInfo
,
metaInfo
)
if
err
=
UpdateDictShardMetaInfo
(
shardMetaInfo
);
err
!=
nil
{
return
quote
(
"failed"
),
""
,
STATUS_FAIL
,
fmt
.
Errorf
(
"update dict_shard_meta_info failed, %v"
,
err
)
}
fmt
.
Printf
(
"update meta2
\n
"
)
return
quote
(
"ok"
),
""
,
STATUS_OK
,
nil
}
func
UpdateDictShardMetaInfo
(
shardMetaInfo
dict
.
DictShardMetaInfo
)
error
{
Dict
.
WaitVersionInfo
.
MetaInfos
[
shardMetaInfo
.
Shard
]
=
shardMetaInfo
.
Meta
WriteWaitVersionInfoToFile
()
return
nil
}
\ No newline at end of file
cube/cube-transfer/src/transfer/transfer.go
0 → 100755
浏览文件 @
1edd3817
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package
transfer
import
(
"fmt"
"github.com/Badangel/logex"
"os"
"time"
"transfer/dict"
)
func
Start
()
{
go
BackupTransfer
()
logex
.
Notice
(
">>> starting server..."
)
addr
:=
":"
+
Port
err
:=
startHttp
(
addr
)
if
err
!=
nil
{
logex
.
Fatalf
(
"start http(addr=%v) failed: %v"
,
addr
,
err
)
os
.
Exit
(
255
)
}
logex
.
Notice
(
">>> start server succ"
)
}
func
BackupTransfer
()
{
for
{
//trigger
version
,
err
:=
TriggerStart
(
Dict
.
DonefileAddress
)
if
err
!=
nil
{
logex
.
Fatalf
(
"[trigger err]trigger err:%v "
,
err
)
fmt
.
Printf
(
"[error]trigger err:%v
\n
"
,
err
)
break
}
logex
.
Noticef
(
"[trigger] get version:%v
\n
"
,
version
)
if
version
.
Id
==
0
{
logex
.
Noticef
(
"[sleep]no new version, sleep 5 min"
)
fmt
.
Printf
(
"[sleep]no new version, wait 5 min
\n
"
)
time
.
Sleep
(
5
*
time
.
Minute
)
continue
}
Dict
.
WaitVersionInfo
=
version
logex
.
Noticef
(
"[trigger finish] WaitVersionInfo version:%v
\n
"
,
Dict
.
WaitVersionInfo
)
WriteWaitVersionInfoToFile
()
//builder
Dict
.
WaitVersionInfo
.
Status
=
dict
.
Dict_Status_Building
Dict
.
WaitVersionInfo
.
MetaInfos
=
make
(
map
[
int
]
string
)
WriteWaitVersionInfoToFile
()
if
err
=
BuilderStart
(
Dict
.
WaitVersionInfo
);
err
!=
nil
{
logex
.
Fatalf
(
"builder err:%v
\n
"
,
err
)
}
if
Dict
.
WaitVersionInfo
.
Mode
==
dict
.
BASE
{
var
newCurrentVersion
[]
dict
.
DictVersionInfo
Dict
.
CurrentVersionInfo
=
newCurrentVersion
WriteCurrentVersionInfoToFile
()
}
logex
.
Noticef
(
"[builder finish] WaitVersionInfo version:%v
\n
"
,
Dict
.
WaitVersionInfo
)
//deployer
Dict
.
WaitVersionInfo
.
Status
=
dict
.
Dict_Status_Deploying
WriteWaitVersionInfoToFile
()
if
err
=
DeployStart
(
Dict
.
WaitVersionInfo
);
err
!=
nil
{
logex
.
Fatalf
(
"deploy err:%v
\n
"
,
err
)
}
logex
.
Noticef
(
"[deploy finish]current version: %v
\n
"
,
Dict
.
CurrentVersionInfo
)
}
fmt
.
Print
(
"transfer over!"
)
logex
.
Noticef
(
"[transfer]status machine exit!"
)
}
cube/cube-transfer/src/transfer/trigger.go
0 → 100755
浏览文件 @
1edd3817
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package
transfer
import
(
"encoding/json"
"fmt"
"github.com/Badangel/logex"
"io/ioutil"
"strconv"
"strings"
"time"
"transfer/dict"
)
func
TriggerStart
(
addr
string
)
(
version
dict
.
DictVersionInfo
,
err
error
)
{
return
GetDoneFileInfo
(
addr
)
}
func
GetDoneFileInfo
(
addr
string
)
(
version
dict
.
DictVersionInfo
,
err
error
)
{
fmt
.
Printf
(
"[entry]start trigger
\n
"
)
logex
.
Noticef
(
"[entry]start trigger"
)
//if donefile is in ftp, first download to local
if
strings
.
HasPrefix
(
addr
,
dict
.
FTP_HEADER
)
||
strings
.
HasPrefix
(
addr
,
dict
.
HTTP_HEADER
)
{
donefileAddr
:=
Dict
.
TmpAddress
+
"/../donefile"
Wget
(
addr
,
donefileAddr
)
addr
=
donefileAddr
}
baseDonefile
:=
addr
+
"/base.txt"
fmt
.
Printf
(
"[trigrer]donefile path:%v
\n
"
,
baseDonefile
)
logex
.
Noticef
(
"[trigrer]base donefile path:%v"
,
baseDonefile
)
contents
,
err
:=
ioutil
.
ReadFile
(
baseDonefile
)
VersionLen
:=
len
(
Dict
.
CurrentVersionInfo
)
version
.
DictName
=
Dict
.
DictName
if
err
!=
nil
{
fmt
.
Printf
(
"[trigrer]read files err:%v
\n
"
,
err
)
logex
.
Fatalf
(
"[trigrer]read files err:%v "
,
err
)
return
}
else
{
contentss
:=
string
(
contents
)
lines
:=
strings
.
Split
(
contentss
,
"
\n
"
)
index
:=
len
(
lines
)
-
2
var
donefileInfo
dict
.
DonefileInfo
fmt
.
Printf
(
"line %v: %v
\n
"
,
index
,
lines
[
index
])
if
err
=
json
.
Unmarshal
([]
byte
(
lines
[
index
]),
&
donefileInfo
);
err
!=
nil
{
return
}
logex
.
Noticef
(
"[trigrer]donfile info:%v"
,
donefileInfo
)
newId
,
_
:=
strconv
.
Atoi
(
donefileInfo
.
Id
)
if
VersionLen
==
0
||
newId
>
Dict
.
CurrentVersionInfo
[
VersionLen
-
1
]
.
Key
{
version
.
Id
=
newId
version
.
Key
,
_
=
strconv
.
Atoi
(
donefileInfo
.
Key
)
version
.
Input
=
donefileInfo
.
Input
deployVersion
:=
int
(
time
.
Now
()
.
Unix
())
version
.
CreateTime
=
deployVersion
version
.
Version
=
deployVersion
version
.
Depend
=
deployVersion
version
.
Mode
=
dict
.
BASE
return
}
}
if
Dict
.
DictMode
==
dict
.
BASR_DELTA
&&
VersionLen
>
0
{
patchDonefile
:=
addr
+
"/patch.txt"
fmt
.
Printf
(
"[trigrer]patchDonefile path:%v
\n
"
,
patchDonefile
)
logex
.
Noticef
(
"[trigrer]patch donefile path:%v"
,
patchDonefile
)
contents
,
err
=
ioutil
.
ReadFile
(
patchDonefile
)
if
err
!=
nil
{
fmt
.
Printf
(
"read files err:%v
\n
"
,
err
)
return
}
else
{
contentss
:=
string
(
contents
)
lines
:=
strings
.
Split
(
contentss
,
"
\n
"
)
for
index
:=
0
;
index
<
len
(
lines
)
-
1
;
index
++
{
var
donefileInfo
dict
.
DonefileInfo
if
err
=
json
.
Unmarshal
([]
byte
(
lines
[
index
]),
&
donefileInfo
);
err
!=
nil
{
return
}
logex
.
Noticef
(
"[trigrer]donfile info:%v"
,
donefileInfo
)
newId
,
_
:=
strconv
.
Atoi
(
donefileInfo
.
Id
)
newKey
,
_
:=
strconv
.
Atoi
(
donefileInfo
.
Key
)
if
newId
>
Dict
.
CurrentVersionInfo
[
VersionLen
-
1
]
.
Id
&&
newKey
==
Dict
.
CurrentVersionInfo
[
VersionLen
-
1
]
.
Key
{
version
.
Id
=
newId
version
.
Key
,
_
=
strconv
.
Atoi
(
donefileInfo
.
Key
)
version
.
Input
=
donefileInfo
.
Input
deployVersion
:=
int
(
time
.
Now
()
.
Unix
())
version
.
CreateTime
=
deployVersion
version
.
Version
=
deployVersion
version
.
Depend
=
Dict
.
CurrentVersionInfo
[
VersionLen
-
1
]
.
Depend
version
.
Mode
=
dict
.
DELTA
return
}
}
}
}
return
}
cube/cube-transfer/src/transfer/util.go
0 → 100755
浏览文件 @
1edd3817
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package
transfer
import
(
"bufio"
"encoding/json"
"fmt"
"github.com/Badangel/logex"
"io"
"io/ioutil"
"os/exec"
)
func
WriteWaitVersionInfoToFile
(){
waitFilePath
:=
Dict
.
TmpAddress
+
"/"
+
"WaitVersionInfo.json"
b
,
err
:=
json
.
Marshal
(
Dict
.
WaitVersionInfo
)
fmt
.
Printf
(
"wait version %v
\n
"
,
Dict
.
WaitVersionInfo
)
if
err
!=
nil
{
logex
.
Fatalf
(
"WaitVersionInfo format error : %v"
,
err
)
}
err
=
ioutil
.
WriteFile
(
waitFilePath
,
b
,
0644
)
if
err
!=
nil
{
logex
.
Fatalf
(
"write wait verison info error : %v"
,
err
)
return
}
return
}
func
WriteCurrentVersionInfoToFile
(){
currentFilePath
:=
Dict
.
TmpAddress
+
"/"
+
"CurrentVersionInfo.json"
b
,
err
:=
json
.
Marshal
(
Dict
.
CurrentVersionInfo
)
if
err
!=
nil
{
logex
.
Fatalf
(
" CurrentVersionInfo format error : %v"
,
err
)
}
err
=
ioutil
.
WriteFile
(
currentFilePath
,
b
,
0644
)
if
err
!=
nil
{
logex
.
Fatalf
(
"write current verison info error : %v"
,
err
)
return
}
return
}
func
ExeCommad
(
files
string
,
params
[]
string
)
(
err
error
)
{
cmd
:=
exec
.
Command
(
files
,
params
...
)
fmt
.
Println
(
cmd
.
Args
)
/*if stdout, err := cmd.StdoutPipe(); err != nil {
logex.Fatalf("%v", err)
return
}*/
stdout
,
err
:=
cmd
.
StdoutPipe
()
defer
stdout
.
Close
()
if
err
!=
nil
{
fmt
.
Println
(
err
)
return
}
if
err
:=
cmd
.
Start
();
err
!=
nil
{
logex
.
Fatalf
(
"%v"
,
err
)
}
reader
:=
bufio
.
NewReader
(
stdout
)
/*buf, err := cmd.Output()
fmt.Printf("output: %s\n",buf)
fmt.Printf("err: %v\n",err)*/
for
{
line
,
err2
:=
reader
.
ReadString
(
'\n'
)
if
err2
!=
nil
||
io
.
EOF
==
err2
{
break
}
fmt
.
Printf
(
"[cmd]>%s"
,
line
)
}
err
=
cmd
.
Wait
()
if
nil
!=
err
{
fmt
.
Println
(
err
)
}
return
nil
}
func
Wget
(
ftpPath
string
,
downPath
string
)
{
var
params
[]
string
params
=
append
(
params
,
"-P"
)
params
=
append
(
params
,
downPath
)
params
=
append
(
params
,
"-r"
)
params
=
append
(
params
,
"-N"
)
params
=
append
(
params
,
"-np"
)
params
=
append
(
params
,
"-nd"
)
params
=
append
(
params
,
"-R"
)
params
=
append
(
params
,
"index.html"
)
params
=
append
(
params
,
ftpPath
)
err
:=
ExeCommad
(
"wget"
,
params
)
if
err
!=
nil
{
fmt
.
Printf
(
"wget exe: %v
\n
"
,
err
)
}
}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录