Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
PaddlePaddle
Paddle
提交
ce0d6704
P
Paddle
项目概览
PaddlePaddle
/
Paddle
1 年多 前同步成功
通知
2302
Star
20931
Fork
5422
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1423
列表
看板
标记
里程碑
合并请求
543
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1,423
Issue
1,423
列表
看板
标记
里程碑
合并请求
543
合并请求
543
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
ce0d6704
编写于
7月 07, 2017
作者:
L
liaogang
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'develop' of
https://github.com/PaddlePaddle/Paddle
into fix_glog
上级
c78f41a3
8e8f3601
变更
54
隐藏空白更改
内联
并排
Showing
54 changed file
with
474 addition
and
1319 deletion
+474
-1319
.pre-commit-config.yaml
.pre-commit-config.yaml
+7
-0
.travis.yml
.travis.yml
+3
-2
cmake/external/protobuf.cmake
cmake/external/protobuf.cmake
+59
-0
cmake/generic.cmake
cmake/generic.cmake
+50
-19
go/cmd/master/CMakeLists.txt
go/cmd/master/CMakeLists.txt
+1
-1
go/cmd/pserver/CMakeLists.txt
go/cmd/pserver/CMakeLists.txt
+1
-1
go/master/c/client.go
go/master/c/client.go
+12
-1
go/master/client.go
go/master/client.go
+12
-5
go/master/client_test.go
go/master/client_test.go
+8
-3
go/pserver/client/c/CMakeLists.txt
go/pserver/client/c/CMakeLists.txt
+4
-1
go/pserver/client/c/test/CMakeLists.txt
go/pserver/client/c/test/CMakeLists.txt
+1
-1
go/pserver/optimizer.go
go/pserver/optimizer.go
+3
-3
go/pserver/service.go
go/pserver/service.go
+4
-2
paddle/api/CMakeLists.txt
paddle/api/CMakeLists.txt
+1
-0
paddle/framework/CMakeLists.txt
paddle/framework/CMakeLists.txt
+4
-1
paddle/gserver/layers/AverageLayer.h
paddle/gserver/layers/AverageLayer.h
+4
-0
paddle/gserver/layers/CrossChannelNormLayer.cpp
paddle/gserver/layers/CrossChannelNormLayer.cpp
+26
-11
paddle/gserver/layers/MaxLayer.h
paddle/gserver/layers/MaxLayer.h
+4
-0
paddle/gserver/layers/NormLayer.cpp
paddle/gserver/layers/NormLayer.cpp
+0
-10
paddle/gserver/layers/SequenceLastInstanceLayer.cpp
paddle/gserver/layers/SequenceLastInstanceLayer.cpp
+4
-6
paddle/gserver/layers/SequencePoolLayer.cpp
paddle/gserver/layers/SequencePoolLayer.cpp
+2
-3
paddle/gserver/layers/SequencePoolLayer.h
paddle/gserver/layers/SequencePoolLayer.h
+3
-4
paddle/gserver/tests/LayerGradUtil.cpp
paddle/gserver/tests/LayerGradUtil.cpp
+4
-2
paddle/gserver/tests/LayerGradUtil.h
paddle/gserver/tests/LayerGradUtil.h
+4
-0
paddle/gserver/tests/test_LayerGrad.cpp
paddle/gserver/tests/test_LayerGrad.cpp
+13
-3
paddle/parameter/Argument.cpp
paddle/parameter/Argument.cpp
+3
-3
paddle/parameter/Argument.h
paddle/parameter/Argument.h
+1
-1
paddle/parameter/tests/test_argument.cpp
paddle/parameter/tests/test_argument.cpp
+2
-2
paddle/parameter/tests/test_common.cpp
paddle/parameter/tests/test_common.cpp
+0
-50
paddle/pserver/LightNetwork.cpp
paddle/pserver/LightNetwork.cpp
+14
-14
paddle/pserver/ParameterServer2.cpp
paddle/pserver/ParameterServer2.cpp
+0
-215
paddle/pserver/ParameterServer2.h
paddle/pserver/ParameterServer2.h
+0
-49
paddle/pserver/SocketChannel.cpp
paddle/pserver/SocketChannel.cpp
+11
-11
paddle/pserver/test/SocketTest.cpp
paddle/pserver/test/SocketTest.cpp
+14
-14
paddle/scripts/docker/build.sh
paddle/scripts/docker/build.sh
+1
-1
paddle/scripts/travis/build_doc.sh
paddle/scripts/travis/build_doc.sh
+6
-5
paddle/trainer/Tester.cpp
paddle/trainer/Tester.cpp
+1
-1
paddle/utils/BarrierStat.cpp
paddle/utils/BarrierStat.cpp
+0
-340
paddle/utils/BarrierStat.h
paddle/utils/BarrierStat.h
+0
-425
paddle/utils/Stat.cpp
paddle/utils/Stat.cpp
+0
-61
paddle/utils/Stat.h
paddle/utils/Stat.h
+0
-17
paddle/utils/ThreadLocal.h
paddle/utils/ThreadLocal.h
+6
-6
python/CMakeLists.txt
python/CMakeLists.txt
+2
-1
python/paddle/trainer/config_parser.py
python/paddle/trainer/config_parser.py
+14
-11
python/paddle/trainer_config_helpers/layers.py
python/paddle/trainer_config_helpers/layers.py
+17
-2
python/paddle/trainer_config_helpers/tests/configs/protostr/test_sequence_pooling.protostr
...ers/tests/configs/protostr/test_sequence_pooling.protostr
+51
-0
python/paddle/trainer_config_helpers/tests/configs/test_sequence_pooling.py
...ner_config_helpers/tests/configs/test_sequence_pooling.py
+8
-0
python/paddle/v2/framework/__init__.py
python/paddle/v2/framework/__init__.py
+1
-0
python/paddle/v2/framework/tests/CMakeLists.txt
python/paddle/v2/framework/tests/CMakeLists.txt
+1
-0
python/paddle/v2/framework/tests/test_protobuf.py
python/paddle/v2/framework/tests/test_protobuf.py
+26
-0
python/paddle/v2/master/client.py
python/paddle/v2/master/client.py
+10
-2
python/paddle/v2/reader/creator.py
python/paddle/v2/reader/creator.py
+42
-5
python/paddle/v2/reader/tests/creator_test.py
python/paddle/v2/reader/tests/creator_test.py
+1
-1
python/setup.py.in
python/setup.py.in
+8
-3
未找到文件。
.pre-commit-config.yaml
浏览文件 @
ce0d6704
...
...
@@ -21,3 +21,10 @@
sha
:
28c0ea8a67a3e2dbbf4822ef44e85b63a0080a29
hooks
:
-
id
:
clang-formater
-
repo
:
https://github.com/dnephin/pre-commit-golang
sha
:
e4693a4c282b4fc878eda172a929f7a6508e7d16
hooks
:
-
id
:
go-fmt
files
:
(.*\.go)
-
id
:
go-lint
files
:
(.*\.go)
.travis.yml
浏览文件 @
ce0d6704
...
...
@@ -33,16 +33,17 @@ addons:
-
ccache
before_install
:
-
if [[ "$JOB" == "check_style" ]]; then sudo ln -s /usr/bin/clang-format-3.8 /usr/bin/clang-format; fi
# Paddle is using protobuf 3.1 currently. Protobuf 3.2 breaks the compatibility. So we specify the python
# Paddle is using protobuf 3.1 currently. Protobuf 3.2 breaks the compatibility. So we specify the python
# protobuf version.
-
pip install numpy wheel 'protobuf==3.1' sphinx==1.5.6 recommonmark sphinx-rtd-theme==0.1.9 virtualenv pre-commit requests==2.9.2 LinkChecker
-
pip install rarfile
-
curl https://glide.sh/get | bash
-
eval "$(GIMME_GO_VERSION=1.8.3 gimme)"
-
|
function timeout() { perl -e 'alarm shift; exec @ARGV' "$@"; }
script
:
-
|
export WITH_GOLANG=ON &&
timeout 2580 paddle/scripts/travis/${JOB}.sh # 43min timeout
timeout 2580 paddle/scripts/travis/${JOB}.sh # 43min timeout
RESULT=$?; if [ $RESULT -eq 0 ] || [ $RESULT -eq 142 ]; then true; else false; fi;
notifications
:
email
:
...
...
cmake/external/protobuf.cmake
浏览文件 @
ce0d6704
...
...
@@ -17,6 +17,65 @@ INCLUDE(ExternalProject)
FIND_PACKAGE
(
Protobuf QUIET
)
SET
(
PROTOBUF_FOUND
"OFF"
)
if
(
NOT COMMAND protobuf_generate_python
)
# before cmake 3.4, protobuf_genrerate_python is not defined.
function
(
protobuf_generate_python SRCS
)
# shameless copy from https://github.com/Kitware/CMake/blob/master/Modules/FindProtobuf.cmake
if
(
NOT ARGN
)
message
(
SEND_ERROR
"Error: PROTOBUF_GENERATE_PYTHON() called without any proto files"
)
return
()
endif
()
if
(
PROTOBUF_GENERATE_CPP_APPEND_PATH
)
# Create an include path for each file specified
foreach
(
FIL
${
ARGN
}
)
get_filename_component
(
ABS_FIL
${
FIL
}
ABSOLUTE
)
get_filename_component
(
ABS_PATH
${
ABS_FIL
}
PATH
)
list
(
FIND _protobuf_include_path
${
ABS_PATH
}
_contains_already
)
if
(
${
_contains_already
}
EQUAL -1
)
list
(
APPEND _protobuf_include_path -I
${
ABS_PATH
}
)
endif
()
endforeach
()
else
()
set
(
_protobuf_include_path -I
${
CMAKE_CURRENT_SOURCE_DIR
}
)
endif
()
if
(
DEFINED PROTOBUF_IMPORT_DIRS AND NOT DEFINED Protobuf_IMPORT_DIRS
)
set
(
Protobuf_IMPORT_DIRS
"
${
PROTOBUF_IMPORT_DIRS
}
"
)
endif
()
if
(
DEFINED Protobuf_IMPORT_DIRS
)
foreach
(
DIR
${
Protobuf_IMPORT_DIRS
}
)
get_filename_component
(
ABS_PATH
${
DIR
}
ABSOLUTE
)
list
(
FIND _protobuf_include_path
${
ABS_PATH
}
_contains_already
)
if
(
${
_contains_already
}
EQUAL -1
)
list
(
APPEND _protobuf_include_path -I
${
ABS_PATH
}
)
endif
()
endforeach
()
endif
()
set
(
${
SRCS
}
)
foreach
(
FIL
${
ARGN
}
)
get_filename_component
(
ABS_FIL
${
FIL
}
ABSOLUTE
)
get_filename_component
(
FIL_WE
${
FIL
}
NAME_WE
)
if
(
NOT PROTOBUF_GENERATE_CPP_APPEND_PATH
)
get_filename_component
(
FIL_DIR
${
FIL
}
DIRECTORY
)
if
(
FIL_DIR
)
set
(
FIL_WE
"
${
FIL_DIR
}
/
${
FIL_WE
}
"
)
endif
()
endif
()
list
(
APPEND
${
SRCS
}
"
${
CMAKE_CURRENT_BINARY_DIR
}
/
${
FIL_WE
}
_pb2.py"
)
add_custom_command
(
OUTPUT
"
${
CMAKE_CURRENT_BINARY_DIR
}
/
${
FIL_WE
}
_pb2.py"
COMMAND
${
Protobuf_PROTOC_EXECUTABLE
}
--python_out
${
CMAKE_CURRENT_BINARY_DIR
}
${
_protobuf_include_path
}
${
ABS_FIL
}
DEPENDS
${
ABS_FIL
}
${
Protobuf_PROTOC_EXECUTABLE
}
COMMENT
"Running Python protocol buffer compiler on
${
FIL
}
"
VERBATIM
)
endforeach
()
set
(
${
SRCS
}
${${
SRCS
}}
PARENT_SCOPE
)
endfunction
()
endif
()
# Print and set the protobuf library information,
# finish this cmake process and exit from this file.
...
...
cmake/generic.cmake
浏览文件 @
ce0d6704
...
...
@@ -99,25 +99,44 @@ function(merge_static_libs TARGET_NAME)
set
(
libs
${
ARGN
}
)
list
(
REMOVE_DUPLICATES libs
)
#
First get the file names of the libraries to be merged
#
Get all propagation dependencies from the merged libraries
foreach
(
lib
${
libs
}
)
set
(
libfiles
${
libfiles
}
$<TARGET_FILE:
${
lib
}
>
)
list
(
APPEND libs_deps
${${
lib
}
_LIB_DEPENDS
}
)
endforeach
()
if
(
APPLE
)
# Use OSX's libtool to merge archives
# To produce a library we need at least one source file.
# It is created by add_custom_command below and will helps
# also help to track dependencies.
set
(
dummyfile
${
CMAKE_CURRENT_BINARY_DIR
}
/
${
TARGET_NAME
}
_dummy.c
)
# Make the generated dummy source file depended on all static input
# libs. If input lib changes,the source file is touched
# which causes the desired effect (relink).
add_custom_command
(
OUTPUT
${
dummyfile
}
COMMAND
${
CMAKE_COMMAND
}
-E touch
${
dummyfile
}
DEPENDS
${
libs
}
)
# Generate dummy staic lib
file
(
WRITE
${
dummyfile
}
"const char * dummy =
\"
${
dummyfile
}
\"
;"
)
add_library
(
${
TARGET_NAME
}
STATIC
${
dummyfile
}
)
target_link_libraries
(
${
TARGET_NAME
}
${
libs_deps
}
)
foreach
(
lib
${
libs
}
)
# Get the file names of the libraries to be merged
set
(
libfiles
${
libfiles
}
$<TARGET_FILE:
${
lib
}
>
)
endforeach
()
add_custom_command
(
TARGET
${
TARGET_NAME
}
POST_BUILD
COMMAND rm
"
${
CMAKE_CURRENT_BINARY_DIR
}
/lib
${
TARGET_NAME
}
.a"
COMMAND /usr/bin/libtool -static -o
"
${
CMAKE_CURRENT_BINARY_DIR
}
/lib
${
TARGET_NAME
}
.a"
${
libfiles
}
)
else
()
# general UNIX: use "ar" to extract objects and re-add to a common lib
else
()
# general UNIX: use "ar" to extract objects and re-add to a common lib
foreach
(
lib
${
libs
}
)
set
(
objlistfile
${
lib
}
.objlist
)
# list of objects in the input library
set
(
objdir
${
lib
}
.objdir
)
add_custom_command
(
OUTPUT
${
objdir
}
COMMAND
${
CMAKE_COMMAND
}
-E make_directory
${
objdir
}
)
COMMAND
${
CMAKE_COMMAND
}
-E make_directory
${
objdir
}
DEPENDS
${
lib
}
)
add_custom_command
(
OUTPUT
${
objlistfile
}
COMMAND
${
CMAKE_AR
}
-x
"$<TARGET_FILE:
${
lib
}
>"
...
...
@@ -125,27 +144,27 @@ function(merge_static_libs TARGET_NAME)
DEPENDS
${
lib
}
${
objdir
}
WORKING_DIRECTORY
${
objdir
}
)
# Empty dummy source file that goes into merged library
set
(
mergebase
${
lib
}
.mergebase.c
)
add_custom_command
(
OUTPUT
${
mergebase
}
COMMAND
${
CMAKE_COMMAND
}
-E touch
${
mergebase
}
DEPENDS
${
objlistfile
}
)
# Empty dummy source file that goes into merged library
set
(
mergebase
${
lib
}
.mergebase.c
)
add_custom_command
(
OUTPUT
${
mergebase
}
COMMAND
${
CMAKE_COMMAND
}
-E touch
${
mergebase
}
DEPENDS
${
objlistfile
}
)
list
(
APPEND mergebases
"
${
mergebase
}
"
)
endforeach
()
# We need a target for the output merged library
add_library
(
${
TARGET_NAME
}
STATIC
${
mergebases
}
)
target_link_libraries
(
${
TARGET_NAME
}
${
libs_deps
}
)
# Get the file name of the generated library
set
(
outlibfile
"$<TARGET_FILE:
${
TARGET_NAME
}
>"
)
foreach
(
lib
${
libs
}
)
add_custom_command
(
TARGET
${
TARGET_NAME
}
POST_BUILD
COMMAND
${
CMAKE_AR
}
ru
${
outlibfile
}
@
"../
${
lib
}
.objlist"
WORKING_DIRECTORY
${
lib
}
.objdir
)
COMMAND
${
CMAKE_AR
}
cr
${
outlibfile
}
*.o
COMMAND
${
CMAKE_RANLIB
}
${
outlibfile
}
WORKING_DIRECTORY
${
lib
}
.objdir
)
endforeach
()
add_custom_command
(
TARGET
${
TARGET_NAME
}
POST_BUILD
COMMAND
${
CMAKE_RANLIB
}
${
outlibfile
}
)
endif
()
endfunction
(
merge_static_libs
)
...
...
@@ -194,7 +213,7 @@ function(cc_test TARGET_NAME)
add_executable
(
${
TARGET_NAME
}
${
cc_test_SRCS
}
)
target_link_libraries
(
${
TARGET_NAME
}
${
cc_test_DEPS
}
gtest gtest_main
)
add_dependencies
(
${
TARGET_NAME
}
${
cc_test_DEPS
}
gtest gtest_main
)
add_test
(
${
TARGET_NAME
}
${
TARGET_NAME
}
)
add_test
(
NAME
${
TARGET_NAME
}
COMMAND
${
TARGET_NAME
}
WORKING_DIRECTORY
${
CMAKE_CURRENT_SOURCE_DIR
}
)
endif
()
endfunction
(
cc_test
)
...
...
@@ -281,10 +300,11 @@ function(go_library TARGET_NAME)
file
(
GLOB GO_SOURCE RELATIVE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
"
"*.go"
)
string
(
REPLACE
"
${
PADDLE_GO_PATH
}
/"
""
CMAKE_CURRENT_SOURCE_REL_DIR
${
CMAKE_CURRENT_SOURCE_DIR
}
)
# FIXME: link path
add_custom_command
(
TARGET
${
TARGET_NAME
}
POST_BUILD
COMMAND rm
"
${${
TARGET_NAME
}
_LIB_PATH
}
"
# Golang build source code
COMMAND
env
GOPATH=
${
GOPATH
}
${
CMAKE_Go_COMPILER
}
build
${
BUILD_MODE
}
COMMAND GOPATH=
${
GOPATH
}
${
CMAKE_Go_COMPILER
}
build
${
BUILD_MODE
}
-o
"
${${
TARGET_NAME
}
_LIB_PATH
}
"
"./
${
CMAKE_CURRENT_SOURCE_REL_DIR
}
/
${
GO_SOURCE
}
"
# must run under GOPATH
...
...
@@ -299,11 +319,13 @@ function(go_binary TARGET_NAME)
cmake_parse_arguments
(
go_binary
"
${
options
}
"
"
${
oneValueArgs
}
"
"
${
multiValueArgs
}
"
${
ARGN
}
)
string
(
REPLACE
"
${
PADDLE_GO_PATH
}
/"
""
CMAKE_CURRENT_SOURCE_REL_DIR
${
CMAKE_CURRENT_SOURCE_DIR
}
)
# FIXME: link path
add_custom_command
(
OUTPUT
${
TARGET_NAME
}
_timestamp
COMMAND env GOPATH=
${
GOPATH
}
${
CMAKE_Go_COMPILER
}
build
COMMAND env LIBRARY_PATH=
${
CMAKE_BINARY_DIR
}
/go/pserver/client/c/:$ENV{LIBRARY_PATH}
GOPATH=
${
GOPATH
}
${
CMAKE_Go_COMPILER
}
build
-o
"
${
CMAKE_CURRENT_BINARY_DIR
}
/
${
TARGET_NAME
}
"
"./
${
CMAKE_CURRENT_SOURCE_REL_DIR
}
/
${
go_binary_SRCS
}
"
WORKING_DIRECTORY
"
${
PADDLE_IN_GOPATH
}
/go"
)
WORKING_DIRECTORY
"
${
PADDLE_IN_GOPATH
}
/go"
)
# TODO: don't know what ${TARGET_NAME}_link does
add_custom_target
(
${
TARGET_NAME
}
ALL DEPENDS go_vendor
${
TARGET_NAME
}
_timestamp
${
go_binary_DEPS
}
)
install
(
PROGRAMS
${
CMAKE_CURRENT_BINARY_DIR
}
/
${
TARGET_NAME
}
DESTINATION bin
)
...
...
@@ -332,3 +354,12 @@ function(proto_library TARGET_NAME)
protobuf_generate_cpp
(
proto_srcs proto_hdrs
${
proto_library_SRCS
}
)
cc_library
(
${
TARGET_NAME
}
SRCS
${
proto_srcs
}
DEPS
${
proto_library_DEPS
}
protobuf
)
endfunction
()
function
(
py_proto_compile TARGET_NAME
)
set
(
oneValueArgs
""
)
set
(
multiValueArgs SRCS
)
cmake_parse_arguments
(
py_proto_compile
"
${
options
}
"
"
${
oneValueArgs
}
"
"
${
multiValueArgs
}
"
${
ARGN
}
)
set
(
py_srcs
)
protobuf_generate_python
(
py_srcs
${
py_proto_compile_SRCS
}
)
add_custom_target
(
${
TARGET_NAME
}
ALL DEPENDS
${
py_srcs
}
)
endfunction
()
\ No newline at end of file
go/cmd/master/CMakeLists.txt
浏览文件 @
ce0d6704
...
...
@@ -12,4 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
go_binary
(
master SRC master.go
)
go_binary
(
master SRC master.go
DEPS paddle_go_optimizer
)
go/cmd/pserver/CMakeLists.txt
浏览文件 @
ce0d6704
...
...
@@ -12,4 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
go_binary
(
pserver SRCS pserver.go
)
go_binary
(
pserver SRCS pserver.go
DEPS paddle_go_optimizer
)
go/master/c/client.go
浏览文件 @
ce0d6704
...
...
@@ -104,11 +104,22 @@ func paddle_set_dataset(client C.paddle_master_client, path **C.char, size C.int
return
C
.
PADDLE_MASTER_OK
}
// return value:
// 0:ok
// -1:error
//export paddle_next_record
func
paddle_next_record
(
client
C
.
paddle_master_client
,
record
**
C
.
uchar
)
C
.
int
{
c
:=
get
(
client
)
r
:=
c
.
NextRecord
()
r
,
err
:=
c
.
NextRecord
()
if
err
!=
nil
{
// Error
// TODO: return the type of error?
*
record
=
(
*
C
.
uchar
)(
nullPtr
)
return
-
1
}
if
len
(
r
)
==
0
{
// Empty record
*
record
=
(
*
C
.
uchar
)(
nullPtr
)
return
0
}
...
...
go/master/client.go
浏览文件 @
ce0d6704
...
...
@@ -11,7 +11,12 @@ import (
// Client is the client of the master server.
type
Client
struct
{
conn
*
connection
.
Conn
ch
chan
[]
byte
ch
chan
record
}
type
record
struct
{
r
[]
byte
err
error
}
// NewClient creates a new Client.
...
...
@@ -21,7 +26,7 @@ type Client struct {
func
NewClient
(
addrCh
<-
chan
string
,
bufSize
int
)
*
Client
{
c
:=
&
Client
{}
c
.
conn
=
connection
.
New
()
c
.
ch
=
make
(
chan
[]
byte
,
bufSize
)
c
.
ch
=
make
(
chan
record
,
bufSize
)
go
c
.
monitorMaster
(
addrCh
)
go
c
.
getRecords
()
return
c
...
...
@@ -46,10 +51,11 @@ func (c *Client) getRecords() {
s
:=
recordio
.
NewRangeScanner
(
f
,
&
chunk
.
Index
,
-
1
,
-
1
)
for
s
.
Scan
()
{
c
.
ch
<-
s
.
Record
()
c
.
ch
<-
record
{
s
.
Record
(),
nil
}
}
if
s
.
Err
()
!=
nil
{
c
.
ch
<-
record
{
nil
,
s
.
Err
()}
log
.
Errorln
(
err
,
chunk
.
Path
)
}
...
...
@@ -116,6 +122,7 @@ func (c *Client) taskFinished(taskID int) error {
//
// NextRecord will block until the next record is available. It is
// thread-safe.
func
(
c
*
Client
)
NextRecord
()
[]
byte
{
return
<-
c
.
ch
func
(
c
*
Client
)
NextRecord
()
([]
byte
,
error
)
{
r
:=
<-
c
.
ch
return
r
.
r
,
r
.
err
}
go/master/client_test.go
浏览文件 @
ce0d6704
...
...
@@ -68,12 +68,17 @@ func TestNextRecord(t *testing.T) {
for
pass
:=
0
;
pass
<
50
;
pass
++
{
received
:=
make
(
map
[
byte
]
bool
)
for
i
:=
0
;
i
<
total
;
i
++
{
r
:=
c
.
NextRecord
()
r
,
err
:=
c
.
NextRecord
()
if
err
!=
nil
{
t
.
Fatal
(
pass
,
i
,
"Read error:"
,
err
)
}
if
len
(
r
)
!=
1
{
t
.
Fatal
(
"Length should be 1."
,
r
)
t
.
Fatal
(
pass
,
i
,
"Length should be 1."
,
r
)
}
if
received
[
r
[
0
]]
{
t
.
Fatal
(
"Received duplicate."
,
received
,
r
)
t
.
Fatal
(
pass
,
i
,
"Received duplicate."
,
received
,
r
)
}
received
[
r
[
0
]]
=
true
}
...
...
go/pserver/client/c/CMakeLists.txt
浏览文件 @
ce0d6704
cc_library
(
paddle_go_optimizer DEPS paddle_optimizer paddle_proto glog gflags protobuf
)
target_link_libraries
(
paddle_go_optimizer stdc++ m
)
go_library
(
paddle_pserver_cclient STATIC DEPS paddle_go_optimizer
)
if
(
WITH_TESTING
)
add_subdirectory
(
test
)
# FIXME: this test requires pserver which is not managed by the test
# we need some kind of e2e testing machanism.
# add_subdirectory(test)
endif
()
go/pserver/client/c/test/CMakeLists.txt
浏览文件 @
ce0d6704
cc_test
(
test_cclient SRCS test_cclient.c DEPS paddle_pserver_cclient
)
cc_test
(
test_cclient SRCS test_cclient.c DEPS paddle_pserver_cclient
paddle_go_optimizer
)
add_style_check_target
(
test_cclient test_cclient.c
)
go/pserver/optimizer.go
浏览文件 @
ce0d6704
...
...
@@ -2,7 +2,7 @@ package pserver
// #cgo CFLAGS: -I ../../
// //FIXME: ldflags contain "build" path
// #cgo LDFLAGS: ../../build/go/pserver/client/c/libpaddle_go_optimizer.a -lstdc++ -lm
// #cgo LDFLAGS:
${SRCDIR}/
../../build/go/pserver/client/c/libpaddle_go_optimizer.a -lstdc++ -lm
// #include "paddle/optimizer/optimizer.h"
// #include <stdlib.h>
// #include <string.h>
...
...
@@ -56,8 +56,8 @@ func newOptimizer(paramWithConfigs ParameterWithConfig) *optimizer {
func
(
o
*
optimizer
)
GetWeights
()
[]
byte
{
var
buffer
unsafe
.
Pointer
buffer
_l
en
:=
C
.
paddle_optimizer_get_weights
(
o
.
opt
,
&
buffer
)
return
cArrayToSlice
(
buffer
,
int
(
buffer
_l
en
)
*
C
.
sizeof_float
)
buffer
L
en
:=
C
.
paddle_optimizer_get_weights
(
o
.
opt
,
&
buffer
)
return
cArrayToSlice
(
buffer
,
int
(
buffer
L
en
)
*
C
.
sizeof_float
)
}
func
(
o
*
optimizer
)
UpdateParameter
(
g
Gradient
)
error
{
...
...
go/pserver/service.go
浏览文件 @
ce0d6704
...
...
@@ -10,8 +10,10 @@ import (
type
ElementType
int
const
(
// AlreadyInitialized is true if pserver is initialized
AlreadyInitialized
=
"pserver already initialized"
Uninitialized
=
"pserver not fully initialized"
// Uninitialized is true if pserver not fully initialized
Uninitialized
=
"pserver not fully initialized"
)
// Supported element types
...
...
@@ -55,7 +57,7 @@ func NewService(idx int) (*Service, error) {
s
:=
&
Service
{
idx
:
idx
,
}
s
.
optMap
=
make
(
map
[
string
]
*
optimizer
)
s
.
optMap
=
make
(
map
[
string
]
*
optimizer
)
s
.
initialized
=
make
(
chan
struct
{})
return
s
,
nil
}
...
...
paddle/api/CMakeLists.txt
浏览文件 @
ce0d6704
...
...
@@ -66,6 +66,7 @@ SWIG_LINK_LIBRARIES(swig_paddle
paddle_trainer_lib
paddle_network
paddle_parameter
paddle_optimizer
paddle_math
paddle_utils
paddle_proto
...
...
paddle/framework/CMakeLists.txt
浏览文件 @
ce0d6704
...
...
@@ -9,6 +9,9 @@ cc_test(enforce_test SRCS enforce_test.cc)
proto_library
(
attr_type SRCS attr_type.proto
)
proto_library
(
op_proto SRCS op_proto.proto DEPS attr_type
)
cc_test
(
op_proto_test SRCS op_proto_test.cc DEPS op_proto protobuf
)
proto_library
(
op_desc SRCS op_desc.proto DEPS attr_type
)
cc_test
(
op_desc_test SRCS op_desc_test.cc DEPS op_desc protobuf
)
py_proto_compile
(
framework_py_proto SRCS attr_type.proto op_proto.proto op_desc.proto
)
# Generate an empty __init__.py to make framework_py_proto as a valid python module.
add_custom_target
(
framework_py_proto_init ALL COMMAND
${
CMAKE_COMMAND
}
-E touch __init__.py
)
add_dependencies
(
framework_py_proto framework_py_proto_init
)
paddle/gserver/layers/AverageLayer.h
浏览文件 @
ce0d6704
...
...
@@ -25,6 +25,10 @@ namespace paddle {
* If SequenceLevel = kNonSeq:
* Output: output size is the number of input sequences (NOT input instances)
* output[i] = average_{for each instance in this sequence}{input[i]}
* If stride_ > 0:
* Output: a shorten sequence. Stride is the step size by which we slide a
* window upon the input sequence, and the average pooling
* operation is then applied to each interval independently.
* If SequenceLevel = kSeq:
* Check input sequence must has sub-sequence
* Output: output size is the number of input sub-sequences
...
...
paddle/gserver/layers/CrossChannelNormLayer.cpp
浏览文件 @
ce0d6704
...
...
@@ -36,6 +36,16 @@ MatrixPtr CrossChannelNormLayer::createSpatialMatrix(MatrixPtr data,
data
->
getData
()
+
iter
*
spatialDim
,
1
,
spatialDim
,
false
,
useGpu_
);
}
bool
CrossChannelNormLayer
::
init
(
const
LayerMap
&
layerMap
,
const
ParameterMap
&
parameterMap
)
{
Layer
::
init
(
layerMap
,
parameterMap
);
CHECK
(
parameters_
[
0
]);
const
NormConfig
&
conf
=
config_
.
inputs
(
0
).
norm_conf
();
channels_
=
conf
.
channels
();
scale_
.
reset
(
new
Weight
(
channels_
,
1
,
parameters_
[
0
]));
return
true
;
}
void
CrossChannelNormLayer
::
forward
(
PassType
passType
)
{
Layer
::
forward
(
passType
);
MatrixPtr
inV
=
getInputValue
(
0
);
...
...
@@ -51,9 +61,7 @@ void CrossChannelNormLayer::forward(PassType passType) {
Matrix
::
resizeOrCreate
(
dataBuffer_
,
batchSize
,
dataDim
,
false
,
useGpu_
);
Matrix
::
resizeOrCreate
(
spatialBuffer_
,
1
,
spatialDim
,
false
,
useGpu_
);
Matrix
::
resizeOrCreate
(
normBuffer_
,
batchSize
,
spatialDim
,
false
,
useGpu_
);
normBuffer_
->
zeroMem
();
// add eps to avoid overflow
normBuffer_
->
addScalar
(
*
normBuffer_
,
1e-6
);
inV
->
square2
(
*
dataBuffer_
);
for
(
size_t
i
=
0
;
i
<
batchSize
;
i
++
)
{
const
MatrixPtr
inVTmp
=
createSampleMatrix
(
inV
,
i
,
spatialDim
);
...
...
@@ -63,6 +71,8 @@ void CrossChannelNormLayer::forward(PassType passType) {
// compute norm.
spatialBuffer_
->
sumCols
(
*
dataTmp
,
1
,
0
);
// add eps to avoid overflow
spatialBuffer_
->
add
(
1e-6
);
spatialBuffer_
->
sqrt2
(
*
spatialBuffer_
);
normTmp
->
copyFrom
(
*
spatialBuffer_
);
outVTmp
->
copyFrom
(
*
inVTmp
);
...
...
@@ -82,6 +92,9 @@ void CrossChannelNormLayer::backward(const UpdateCallback& callback) {
size_t
dataDim
=
inG
->
getWidth
();
size_t
spatialDim
=
dataDim
/
channels_
;
MatrixPtr
inGBuffer
;
Matrix
::
resizeOrCreate
(
inGBuffer
,
channels_
,
spatialDim
,
false
,
useGpu_
);
dataBuffer_
->
dotMul
(
*
outG
,
*
outV
);
Matrix
::
resizeOrCreate
(
scaleDiff_
,
channels_
,
1
,
false
,
useGpu_
);
Matrix
::
resizeOrCreate
(
channelBuffer_
,
channels_
,
1
,
false
,
useGpu_
);
...
...
@@ -100,22 +113,24 @@ void CrossChannelNormLayer::backward(const UpdateCallback& callback) {
scaleDiff_
->
add
(
*
channelBuffer_
,
1.
);
sampleBuffer_
->
dotMul
(
*
inVTmp
,
*
outGTmp
);
spatialBuffer_
->
sumCols
(
*
sampleBuffer_
,
1.
,
1
.
);
spatialBuffer_
->
sumCols
(
*
sampleBuffer_
,
1.
,
0
.
);
// scale the grad
inG
Tmp
->
copyFrom
(
*
inVTmp
);
inG
Tmp
->
mulRowVector
(
*
spatialBuffer_
);
inG
Buffer
->
copyFrom
(
*
inVTmp
);
inG
Buffer
->
mulRowVector
(
*
spatialBuffer_
);
// divide by square of norm
spatialBuffer_
->
dotMul
(
*
normTmp
,
*
normTmp
);
inG
Tmp
->
divRowVector
(
*
spatialBuffer_
);
inG
Buffer
->
divRowVector
(
*
spatialBuffer_
);
// subtract
inG
Tmp
->
add
(
*
outGTmp
,
-
1
,
1
);
inG
Buffer
->
add
(
*
outGTmp
,
-
1
,
1
);
// divide by norm
inG
Tmp
->
divRowVector
(
*
normTmp
);
inG
Buffer
->
divRowVector
(
*
normTmp
);
// scale the diff
inGTmp
->
mulColVector
(
*
scale_
->
getW
());
inGBuffer
->
mulColVector
(
*
scale_
->
getW
());
inGTmp
->
add
(
*
inGBuffer
);
}
// updata scale
if
(
scale_
->
getWGrad
())
scale_
->
getWGrad
()
->
copyFrom
(
*
scaleDiff_
);
if
(
scale_
->
getWGrad
())
scale_
->
getWGrad
()
->
add
(
*
scaleDiff_
);
scale_
->
getParameterPtr
()
->
incUpdate
(
callback
);
}
...
...
paddle/gserver/layers/MaxLayer.h
浏览文件 @
ce0d6704
...
...
@@ -26,6 +26,10 @@ namespace paddle {
* If SequenceLevel = kNonSeq:
* Output: output size is the number of input sequences (NOT input instances)
* output[i] = max_{for each instance in this sequence}{input[i]}
* If stride_ > 0:
* Output: a shorten sequence. Stride is the step size by which we slide a
* window upon the input sequence, and the max pooling operation is
* then applied to each interval independently.
* If SequenceLevel = kSeq:
* Check input sequence must has sub-sequence
* Output: output size is the number of input sub-sequences
...
...
paddle/gserver/layers/NormLayer.cpp
浏览文件 @
ce0d6704
...
...
@@ -56,14 +56,4 @@ bool ResponseNormLayer::init(const LayerMap& layerMap,
return
true
;
}
bool
CrossChannelNormLayer
::
init
(
const
LayerMap
&
layerMap
,
const
ParameterMap
&
parameterMap
)
{
Layer
::
init
(
layerMap
,
parameterMap
);
CHECK
(
parameters_
[
0
]);
const
NormConfig
&
conf
=
config_
.
inputs
(
0
).
norm_conf
();
channels_
=
conf
.
channels
();
scale_
.
reset
(
new
Weight
(
channels_
,
1
,
parameters_
[
0
]));
return
true
;
}
}
// namespace paddle
paddle/gserver/layers/SequenceLastInstanceLayer.cpp
浏览文件 @
ce0d6704
...
...
@@ -26,10 +26,9 @@ namespace paddle {
* If SequenceLevel = kNonseq:
* Output: a sequence containing only the last instance of the input sequence
* If stride_ > 0:
* Output: a shorten sequence. The operation of getting last instance of a
* sequence is independently performed on every slice of the input
* sequence, which is obtained by sliding a window with the window
* size set to stride_.
* Output: a shorten sequence. Stride is the step size by which we slide a
* window upon the input sequence, and getting last instance
* operation is then applied to each interval independently.
* If SequenceLevel = kSeq:
* Check input sequence must has sub-sequence
* Output: a sequence containing only the last instance of each sub-sequence
...
...
@@ -73,8 +72,7 @@ bool SequenceLastInstanceLayer::init(const LayerMap& layerMap,
void
SequenceLastInstanceLayer
::
forward
(
PassType
passType
)
{
SequencePoolLayer
::
forward
(
passType
);
auto
starts
=
(
stride_
>
0
)
?
stridePositions_
->
getData
()
:
startPositions_
->
getData
(
false
);
auto
starts
=
startPositions_
->
getData
(
false
);
MatrixPtr
inputValue
=
getInputValue
(
0
);
MatrixPtr
outputValue
=
getOutputValue
();
...
...
paddle/gserver/layers/SequencePoolLayer.cpp
浏览文件 @
ce0d6704
...
...
@@ -72,9 +72,8 @@ void SequencePoolLayer::forward(PassType passType) {
if
(
stride_
>
0
)
{
CHECK_EQ
(
input
.
hasSubseq
(),
0UL
)
<<
"sequence stride pooling is invalid for hasSubseq now"
;
output_
.
poolSequenceWithStride
(
input
,
stride_
,
&
stridePositions_
,
reversed_
);
newBatchSize_
=
stridePositions_
->
getSize
()
-
1
;
output_
.
poolSequenceWithStride
(
input
,
stride_
,
&
startPositions_
,
reversed_
);
newBatchSize_
=
startPositions_
->
getSize
()
-
1
;
}
resetOutput
(
newBatchSize_
,
dim
);
...
...
paddle/gserver/layers/SequencePoolLayer.h
浏览文件 @
ce0d6704
...
...
@@ -28,8 +28,9 @@ namespace paddle {
* sequence}{input[i]}
* If stride_ > 0:
* Check input sequence must not have sub-sequence
* Output: a shorten sequence, pooling is performed upon a small local
* area
* Output: a shorten sequence. Stride is the step size by which we slide
* a window upon the input sequence, and the pooling operation
* is then applied to each interval independently.
* If SequenceLevel = kSeq:
* Check input sequence must has sub-sequence
* Output: output size is the number of input sub-sequences
...
...
@@ -47,8 +48,6 @@ protected:
size_t
newBatchSize_
;
ICpuGpuVectorPtr
startPositions_
;
int
stride_
;
// Store the start position of each window.
IVectorPtr
stridePositions_
;
// Whether the input sequence is reversed or not.
bool
reversed_
=
false
;
...
...
paddle/gserver/tests/LayerGradUtil.cpp
浏览文件 @
ce0d6704
...
...
@@ -241,7 +241,7 @@ void testBatchState(LayerPtr testLayer,
std
::
vector
<
Argument
>
args
;
args
.
push_back
(
out
);
EXPECT_EQ
(
0
,
Argument
::
sum
(
args
)
)
<<
"testBatchState failed"
;
ASSERT_NEAR
(
0
,
Argument
::
sum
(
args
),
1e-5
)
<<
"testBatchState failed"
;
for
(
size_t
seqId
=
0
;
seqId
<
numSequences
;
++
seqId
)
{
start
[
seqId
]
+=
seqLens
[
seqId
];
}
...
...
@@ -465,7 +465,6 @@ void initTestLayer(TestConfig testConf,
ParameterConfig
paraConfig
)
{
paraConfig
.
set_name
(
paraName
);
paraConfig
.
set_size
(
paraSize
);
paraConfig
.
set_initial_std
(
1
);
paraConfig
.
set_is_static
(
isStatic
);
auto
para
=
std
::
make_shared
<
Parameter
>
(
paraConfig
,
FLAGS_use_gpu
,
initialize
);
...
...
@@ -499,6 +498,9 @@ void initTestLayer(TestConfig testConf,
paraConfig
.
add_dims
((
*
layerMap
)[
input
.
input_layer_name
()]
->
getSize
());
paraConfig
.
add_dims
(
testConf
.
layerConfig
.
size
());
}
CHECK_GE
(
testConf
.
paramInitialStd
,
0
);
paraConfig
.
set_initial_mean
(
testConf
.
paramInitialMean
);
paraConfig
.
set_initial_std
(
testConf
.
paramInitialStd
);
initParameter
(
paraName
,
paraSize
,
inputDef
.
isStatic
,
false
,
paraConfig
);
}
}
...
...
paddle/gserver/tests/LayerGradUtil.h
浏览文件 @
ce0d6704
...
...
@@ -125,12 +125,16 @@ struct TestConfig {
LayerConfig
layerConfig
;
std
::
vector
<
InputDef
>
inputDefs
;
size_t
biasSize
;
real
paramInitialMean
;
real
paramInitialStd
;
bool
testAccumulate
;
bool
testState
;
bool
staticBias
;
bool
testBatchState
;
TestConfig
()
:
biasSize
(
0
),
paramInitialMean
(
0.0
),
paramInitialStd
(
1.0
),
testAccumulate
(
true
),
testState
(
false
),
staticBias
(
false
),
...
...
paddle/gserver/tests/test_LayerGrad.cpp
浏览文件 @
ce0d6704
...
...
@@ -845,8 +845,12 @@ void testDegradeLayer(bool hasSubseq,
TEST
(
Layer
,
MaxLayer
)
{
testDegradeLayer
(
false
,
"max"
,
"non-seq"
,
-
1
);
// seq max to non-seq
testDegradeLayer
(
true
,
"max"
,
"non-seq"
,
-
1
);
// hasSubseq max to non-seq
testDegradeLayer
(
true
,
"max"
,
"seq"
,
-
1
);
// hasSubseq max to seq
testDegradeLayer
(
false
,
"max"
,
"non-seq"
,
5
);
// seq max to a shorten seq, stride window = 5
testDegradeLayer
(
true
,
"max"
,
"non-seq"
,
-
1
);
// hasSubseq max to non-seq
testDegradeLayer
(
true
,
"max"
,
"seq"
,
-
1
);
// hasSubseq max to seq
}
TEST
(
Layer
,
SequenceLastInstanceLayer
)
{
...
...
@@ -868,6 +872,10 @@ TEST(Layer, SequenceLastInstanceLayer) {
TEST
(
Layer
,
AverageLayer
)
{
testDegradeLayer
(
false
,
"average"
,
"non-seq"
,
-
1
);
// seq average to non-seq
testDegradeLayer
(
false
,
"average"
,
"non-seq"
,
5
);
// seq average to a shorten seq, stride window = 5
testDegradeLayer
(
true
,
"average"
,
"non-seq"
,
-
1
);
// hasSubseq average to non-seq
testDegradeLayer
(
true
,
"average"
,
"seq"
,
-
1
);
// hasSubseq average to seq
...
...
@@ -1661,6 +1669,8 @@ TEST(Layer, PadLayer) {
TEST
(
Layer
,
CrossChannelNormLayer
)
{
TestConfig
config
;
config
.
paramInitialMean
=
1.
;
config
.
paramInitialStd
=
0.
;
config
.
layerConfig
.
set_type
(
"norm"
);
config
.
layerConfig
.
set_size
(
100
);
LayerInputConfig
*
input
=
config
.
layerConfig
.
add_inputs
();
...
...
@@ -1674,7 +1684,7 @@ TEST(Layer, CrossChannelNormLayer) {
config
.
inputDefs
.
push_back
({
INPUT_DATA
,
"layer_0"
,
100
,
10
});
for
(
auto
useGpu
:
{
false
,
true
})
{
testLayerGrad
(
config
,
"cross-channel-norm"
,
10
,
false
,
useGpu
,
false
,
5
);
testLayerGrad
(
config
,
"cross-channel-norm"
,
10
,
false
,
useGpu
,
false
);
}
}
...
...
paddle/parameter/Argument.cpp
浏览文件 @
ce0d6704
...
...
@@ -561,7 +561,7 @@ void Argument::degradeSequence(const Argument& input) {
void
Argument
::
poolSequenceWithStride
(
const
Argument
&
input
,
size_t
stride
,
IVectorPtr
*
stridePostions
,
I
CpuGpu
VectorPtr
*
stridePostions
,
bool
reversed
)
{
// If input.sequenceStartPositions = [0, 9, 14, 17, 30] and stride = 5,
// then sequenceStartPositions = [0, 2, 3, 4, 7].
...
...
@@ -598,8 +598,8 @@ void Argument::poolSequenceWithStride(const Argument& input,
stridePos
.
emplace_back
(
starts
[
numSequences
]);
int
size
=
stridePos
.
size
();
CHECK_EQ
(
size
-
1
,
tgtBuf
[
numSequences
]);
IVector
::
resizeOrCreate
(
*
stridePostions
,
size
,
false
);
(
*
stridePostions
)
->
copyFrom
(
stridePos
.
data
(),
size
);
I
CpuGpu
Vector
::
resizeOrCreate
(
*
stridePostions
,
size
,
false
);
(
*
stridePostions
)
->
getMutableVector
(
false
)
->
copyFrom
(
stridePos
.
data
(),
size
);
}
void
Argument
::
getValueString
(
...
...
paddle/parameter/Argument.h
浏览文件 @
ce0d6704
...
...
@@ -299,7 +299,7 @@ struct Argument {
*/
void
poolSequenceWithStride
(
const
Argument
&
input
,
size_t
stride
,
IVectorPtr
*
stridePositions
,
I
CpuGpu
VectorPtr
*
stridePositions
,
bool
reversed
=
false
);
/**
* @brief getValueString will return the argument's output in string. There
...
...
paddle/parameter/tests/test_argument.cpp
浏览文件 @
ce0d6704
...
...
@@ -31,7 +31,7 @@ TEST(Argument, poolSequenceWithStride) {
int
strideResultReversed
[]
=
{
0
,
4
,
9
,
14
,
17
,
20
,
25
,
30
};
for
(
auto
reversed
:
{
false
,
true
})
{
IVectorPtr
stridePositions
;
I
CpuGpu
VectorPtr
stridePositions
;
output
.
poolSequenceWithStride
(
input
,
5
/* stride */
,
&
stridePositions
,
reversed
);
...
...
@@ -45,7 +45,7 @@ TEST(Argument, poolSequenceWithStride) {
CHECK_EQ
(
stridePositions
->
getSize
(),
8UL
);
auto
result
=
reversed
?
strideResultReversed
:
strideResult
;
for
(
int
i
=
0
;
i
<
8
;
i
++
)
{
CHECK_EQ
(
stridePositions
->
getData
()[
i
],
result
[
i
]);
CHECK_EQ
(
stridePositions
->
getData
(
false
)[
i
],
result
[
i
]);
}
}
}
...
...
paddle/parameter/tests/test_common.cpp
浏览文件 @
ce0d6704
...
...
@@ -172,53 +172,3 @@ TEST_F(CommonTest, syncThreadPool) {
EXPECT_EQ
((
int
)
0
,
nums
[
i
]);
}
}
TEST_F
(
CommonTest
,
barrierStat
)
{
const
int
threadNum
=
10
;
SyncThreadPool
pool
(
threadNum
);
#define TEST_BARRIER_RANDOM(statName, numConnThreads, ...) \
pool.exec([&](int tid, size_t numThreads) { \
struct timeval time; \
gettimeofday(&time, nullptr); \
uint64_t usec = timeToMicroSecond(time); \
std::srand(usec); \
auto value = std::rand() % 100000; \
usleep(value); \
REGISTER_SLOW_NODES_PROBE( \
globalStat, statName, numConnThreads, tid, __VA_ARGS__); \
});
for
(
auto
i
=
0
;
i
<
10
;
i
++
)
{
TEST_BARRIER_RANDOM
(
"synThreadBarrier1"
,
threadNum
);
TEST_BARRIER_RANDOM
(
"synThreadBarrier2"
,
threadNum
);
}
globalStat
.
printAllStatus
();
globalStat
.
reset
();
for
(
auto
i
=
0
;
i
<
10
;
i
++
)
{
TEST_BARRIER_RANDOM
(
"synThreadBarrier3"
,
threadNum
,
"tag0"
);
TEST_BARRIER_RANDOM
(
"synThreadBarrier4"
,
threadNum
,
"tag1"
);
}
globalStat
.
printAllStatus
();
globalStat
.
reset
();
// use it to test accurate barrier gap
#define TEST_BARRIER(statName, numConnThreads, ...) \
pool.exec([&](int tid, size_t numThreads) { \
usleep(tid * 10000); \
REGISTER_SLOW_NODES_PROBE( \
globalStat, statName, numConnThreads, tid, __VA_ARGS__); \
});
for
(
auto
i
=
0
;
i
<
10
;
i
++
)
{
TEST_BARRIER
(
"synThreadBarrier3"
,
threadNum
,
"tag0"
);
TEST_BARRIER
(
"synThreadBarrier4"
,
threadNum
,
"tag1"
);
}
globalStat
.
printAllStatus
();
globalStat
.
reset
();
}
paddle/pserver/LightNetwork.cpp
浏览文件 @
ce0d6704
...
...
@@ -142,7 +142,7 @@ SocketServer::SocketServer(const std::string &addr, int port, int rdmaCpu)
}
/// trigger to initialize RDMA lib
P
CHECK
(
RdmaClientDaemons
::
get
())
<<
"initilizate RDMA failed
\n
"
;
CHECK
(
RdmaClientDaemons
::
get
())
<<
"initilizate RDMA failed
\n
"
;
}
SocketServer
::~
SocketServer
()
{
...
...
@@ -168,7 +168,7 @@ void SocketServer::tcpServer() {
/// First call to socket() function
socket_
=
socket
(
AF_INET
,
SOCK_STREAM
,
0
);
P
CHECK
(
socket_
>=
0
)
<<
"ERROR opening socket"
;
CHECK
(
socket_
>=
0
)
<<
"ERROR opening socket"
;
/// Initialize socket structure
bzero
((
char
*
)
&
serv_addr
,
sizeof
(
serv_addr
));
...
...
@@ -176,7 +176,7 @@ void SocketServer::tcpServer() {
serv_addr
.
sin_port
=
htons
(
port_
);
if
(
!
addr_
.
empty
())
{
server
=
gethostbyname
(
addr_
.
c_str
());
P
CHECK
(
server
)
<<
"ERROR, no such host: "
<<
addr_
;
CHECK
(
server
)
<<
"ERROR, no such host: "
<<
addr_
;
bcopy
((
char
*
)
server
->
h_addr
,
(
char
*
)
&
serv_addr
.
sin_addr
.
s_addr
,
server
->
h_length
);
...
...
@@ -187,7 +187,7 @@ void SocketServer::tcpServer() {
setOption
(
socket_
);
/// Now bind the host address using bind() call.
P
CHECK
(
bind
(
socket_
,
(
struct
sockaddr
*
)
&
serv_addr
,
sizeof
(
serv_addr
))
>=
0
)
CHECK
(
bind
(
socket_
,
(
struct
sockaddr
*
)
&
serv_addr
,
sizeof
(
serv_addr
))
>=
0
)
<<
"ERROR on binding "
<<
addr_
;
/// Now start listening for the clients, here process will
...
...
@@ -201,7 +201,7 @@ void SocketServer::tcpServer() {
if
(
stopping_
)
{
break
;
}
P
CHECK
(
newsockfd
>=
0
)
<<
"ERROR on accept"
;
CHECK
(
newsockfd
>=
0
)
<<
"ERROR on accept"
;
constexpr
int
kPeerNameLen
=
128
;
char
peerName
[
kPeerNameLen
];
CHECK
(
inet_ntop
(
AF_INET
,
&
cli_addr
.
sin_addr
,
peerName
,
kPeerNameLen
));
...
...
@@ -227,14 +227,14 @@ void SocketServer::rdmaServer() {
/// First call to socket() function
rdmaSocket_
=
rdma
::
ssocket
(
rdmaCpu_
);
P
CHECK
(
rdmaSocket_
)
<<
"ERROR opening RDMA socket"
;
CHECK
(
rdmaSocket_
)
<<
"ERROR opening RDMA socket"
;
P
CHECK
(
rdma
::
bind
(
rdmaSocket_
,
rdmaUri_
.
c_str
())
==
0
)
CHECK
(
rdma
::
bind
(
rdmaSocket_
,
rdmaUri_
.
c_str
())
==
0
)
<<
"ERROR bind RDMA socket"
;
/// Now start listening for the clients, here process will
/// go in sleep mode and will wait for the incoming connection
P
CHECK
(
rdma
::
listen
(
rdmaSocket_
)
==
0
)
<<
"ERROR listen RDMA socket"
;
CHECK
(
rdma
::
listen
(
rdmaSocket_
)
==
0
)
<<
"ERROR listen RDMA socket"
;
while
(
true
)
{
/// Accept actual connection from the client
...
...
@@ -242,7 +242,7 @@ void SocketServer::rdmaServer() {
if
(
stopping_
)
{
break
;
}
P
CHECK
(
newsock
)
<<
"ERROR on accept"
;
CHECK
(
newsock
)
<<
"ERROR on accept"
;
constexpr
int
kPeerNameLen
=
128
;
char
peerName
[
kPeerNameLen
];
...
...
@@ -290,7 +290,7 @@ RdmaClientDaemons::RdmaClientDaemons() {
onlineCpus_
=
rdma
::
numCpus
();
for
(
auto
i
=
0
;
i
<
onlineCpus_
;
i
++
)
{
socket
=
rdma
::
csocket
(
i
);
P
CHECK
(
socket
)
<<
"ERROR open client socket daemon"
;
CHECK
(
socket
)
<<
"ERROR open client socket daemon"
;
rdmaClientSocket_
.
push_back
(
socket
);
}
...
...
@@ -355,7 +355,7 @@ void SocketClient::TcpClient(const std::string &serverAddr, int serverPort) {
/// Create a socket point
int
sockfd
=
socket
(
AF_INET
,
SOCK_STREAM
,
0
);
P
CHECK
(
sockfd
>=
0
)
<<
"ERROR opening socket"
;
CHECK
(
sockfd
>=
0
)
<<
"ERROR opening socket"
;
#if defined(__OSX__) || defined(__APPLE__)
server
=
getipnodebyname
(
serverAddr
.
c_str
(),
AF_INET
,
AI_DEFAULT
,
&
errRet
);
...
...
@@ -396,8 +396,8 @@ void SocketClient::TcpClient(const std::string &serverAddr, int serverPort) {
}
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
seconds
(
1
));
}
else
{
P
CHECK
(
errno
!=
0
)
<<
"ERROR connecting to "
<<
serverAddr
<<
":"
<<
serverPort
<<
"errorno: "
<<
errno
;
CHECK
(
errno
!=
0
)
<<
"ERROR connecting to "
<<
serverAddr
<<
":"
<<
serverPort
<<
"errorno: "
<<
errno
;
}
}
while
(
errno
==
ECONNREFUSED
);
...
...
@@ -426,7 +426,7 @@ void SocketClient::RdmaClient(const std::string &serverAddr, int serverPort) {
/// connect to server with socket daemon
sock
=
rdma
::
connect
(
socketDaemon_
,
rdmaUri
.
c_str
());
P
CHECK
(
sock
)
<<
"ERROR connect to server"
<<
rdmaUri
;
CHECK
(
sock
)
<<
"ERROR connect to server"
<<
rdmaUri
;
std
::
vector
<
std
::
string
>
seg
;
str
::
split
(
rdmaUri
,
'/'
,
&
seg
);
...
...
paddle/pserver/ParameterServer2.cpp
浏览文件 @
ce0d6704
...
...
@@ -217,10 +217,6 @@ void ParameterServer2::setConfig(const SetConfigRequest& request,
SetConfigResponse
response
;
callback
(
response
);
/// always defined, barrier slowest node function need it.
statSet_
.
reset
(
new
StatSet
(
"ParameterServer"
+
str
::
to_string
(
static_cast
<
int
>
(
serverId_
))));
}
real
bufferSum
(
const
std
::
vector
<
ParameterServer2
::
Buffer
>&
buffers
)
{
...
...
@@ -369,50 +365,7 @@ void ParameterServer2::addGradient(const SendParameterRequest& request,
std
::
vector
<
Buffer
>*
outputBuffers
)
{
VLOG
(
1
)
<<
"pserver: addGradient"
;
// forwardbackward delta from all trainers
// indicate the fluctuation caused by forwardbackward.
if
(
!
numPassFinishClients_
)
{
REGISTER_BARRIER_DELTA_SERVER_SET
(
*
statSet_
,
"forwardbackwardDelta"
,
FLAGS_num_gradient_servers
,
request
.
trainer_id
(),
request
.
forwardbackward_time
(),
isSparseServer_
?
"_sparseUpdater"
:
"_denseUpdater"
);
}
{
/// approximately pure network overhead
REGISTER_TIMER_DYNAMIC_SET
(
"pushRecv"
,
timeToMicroSecond
(
*
handleRequestBegin_
),
-
1
,
*
statSet_
);
}
#ifndef PADDLE_DISABLE_TIMER
gettimeofday
(
&
(
*
addGradBegin_
),
nullptr
);
#endif
/// barrier fluctuation caused by network and previous forwardbackward
if
(
!
numPassFinishClients_
)
{
REGISTER_BARRIER_TIMER_SERVER_SET
(
*
statSet_
,
"handleReqBegin"
,
FLAGS_num_gradient_servers
,
request
.
trainer_id
(),
(
*
handleRequestBegin_
),
isSparseServer_
?
"_sparseUpdater"
:
"_denseUpdater"
);
}
if
(
!
numPassFinishClients_
)
{
REGISTER_BARRIER_TIMER_SERVER
(
*
statSet_
,
"addGradBegin"
,
FLAGS_num_gradient_servers
,
request
.
trainer_id
(),
isSparseServer_
?
"_sparseUpdater"
:
"_denseUpdater"
);
}
{
REGISTER_TIMER_DYNAMIC
(
"addGradCore"
,
-
1
,
*
statSet_
);
ReadLockGuard
guard
(
parameterMutex_
);
int
bufferIndex
=
0
;
for
(
const
auto
&
block
:
request
.
blocks
())
{
...
...
@@ -444,15 +397,6 @@ void ParameterServer2::addGradient(const SendParameterRequest& request,
std
::
lock_guard
<
std
::
mutex
>
guard
(
*
info
.
lock
);
simd
::
addTo
(
gradientSumBuffer
,
gradientBuffer
,
size
);
}
if
(
!
numPassFinishClients_
)
{
REGISTER_BARRIER_TIMER_SERVER
(
*
statSet_
,
"addGradCoreFinish"
,
FLAGS_num_gradient_servers
,
request
.
trainer_id
(),
isSparseServer_
?
"_sparseUpdater"
:
"_denseUpdater"
);
}
}
if
(
request
.
batch_status
()
==
BATCH_FINISH
||
request
.
batch_status
()
==
BATCH_START_AND_FINISH
)
{
...
...
@@ -461,47 +405,12 @@ void ParameterServer2::addGradient(const SendParameterRequest& request,
VLOG
(
1
)
<<
"num samples: "
<<
numSamplesProcessed_
<<
", new cost:"
<<
cost_
;
/// numPassFinishClients_ means some trainer has entered finishPass
if
(
!
numPassFinishClients_
)
{
REGISTER_SLOW_NODES_PROBE
(
*
statSet_
,
"SLOW_NODES"
,
FLAGS_num_gradient_servers
,
request
.
trainer_id
(),
isSparseServer_
?
"_sparseUpdater"
:
"_denseUpdater"
);
}
/// notify doOperation gradient ready
gradientReadyBarrier_
.
wait
();
/// if wait pass finish does not start, do check
if
(
!
numPassFinishClients_
)
{
CHECK_BARRIER_TIMER
(
*
statSet_
,
"SLOW_NODES"
,
FLAGS_num_gradient_servers
,
isSparseServer_
?
"_sparseUpdater"
:
"_denseUpdater"
);
}
/// barrier performance while all parameter add is finished
/// can indicate the fluctation caused by computation at pserver.
if
(
!
numPassFinishClients_
)
{
REGISTER_BARRIER_TIMER_SERVER
(
*
statSet_
,
"paraReady"
,
FLAGS_num_gradient_servers
,
request
.
trainer_id
(),
isSparseServer_
?
"_sparseUpdater"
:
"_denseUpdater"
);
}
/// wait doOperation finish
parameterReadyBarrier_
.
wait
();
VLOG
(
1
)
<<
"start send back"
;
{
/// total time except overhead of network.
REGISTER_TIMER_DYNAMIC_SET
(
"sendParaNoRecvNoSend"
,
timeToMicroSecond
(
*
addGradBegin_
),
-
1
,
*
statSet_
);
}
}
}
...
...
@@ -543,57 +452,6 @@ bool ParameterServer2::asyncGrdientCommitCheckAndStat(
return
commitGradient
;
}
void
ParameterServer2
::
printAsyncGradientCommitStatAndReset
()
{
std
::
stringstream
statFormat
;
if
(
asyncUpdateSteps_
)
{
statFormat
<<
"async discard gradients stat: "
<<
std
::
endl
;
statFormat
<<
"serverId: "
<<
serverId_
<<
" serverType: "
<<
isSparseServer_
<<
" total updates: "
<<
asyncUpdateSteps_
<<
" discard updates: "
<<
asyncLaggedGradientsNum_
<<
" discard ratio: "
<<
(
real
)
asyncLaggedGradientsNum_
/
(
real
)
asyncUpdateSteps_
;
statFormat
<<
std
::
endl
;
statFormat
<<
std
::
endl
;
statFormat
<<
"Async Gradient Update Steps distribution: "
<<
std
::
endl
<<
"Sample: 1:1912(0.00284449) means "
<<
"the updates step=1 count 1912 times "
<<
"and account for 0.284449% of total updates"
<<
std
::
endl
;
size_t
index
=
0
;
for
(
const
auto
&
stat
:
asyncUpdateStat_
)
{
statFormat
<<
index
<<
":"
<<
stat
<<
"("
<<
(
real
)
stat
/
(
real
)
asyncUpdateSteps_
<<
") "
;
index
++
;
}
statFormat
<<
std
::
endl
;
statFormat
<<
std
::
endl
;
statFormat
<<
"Async Gradient Discard based on trainer_id: "
<<
std
::
endl
<<
"Sample: 2:22(0.0016363) means "
<<
"total discarded updates from trainer_id=2 count 22 "
<<
"and account for 0.16363% of all updates from trainer_id=2"
<<
std
::
endl
;
for
(
auto
i
=
0
;
i
<
FLAGS_num_gradient_servers
;
i
++
)
{
real
ratio
=
(
real
)
asyncTrainerDiscardStat_
[
i
]
/
(
real
)(
asyncTrainerCommitStat_
[
i
]
+
asyncTrainerDiscardStat_
[
i
]);
statFormat
<<
i
<<
":"
<<
asyncTrainerDiscardStat_
[
i
]
<<
"("
<<
ratio
<<
")"
<<
" "
;
}
LOG
(
INFO
)
<<
statFormat
.
str
();
/// reset stat
asyncUpdateSteps_
=
0
;
asyncTrainerSteps_
.
assign
(
asyncTrainerSteps_
.
size
(),
0
);
asyncLaggedGradientsNum_
=
0
;
asyncUpdateStat_
.
assign
(
asyncUpdateStat_
.
size
(),
0
);
asyncTrainerDiscardStat_
.
assign
(
asyncTrainerDiscardStat_
.
size
(),
0
);
asyncTrainerCommitStat_
.
assign
(
asyncTrainerCommitStat_
.
size
(),
0
);
}
}
static
ThreadLocal
<
std
::
vector
<
bool
>>
localBlockBitset_
;
void
ParameterServer2
::
asyncSGD
(
const
SendParameterRequest
&
request
,
...
...
@@ -695,7 +553,6 @@ void ParameterServer2::asyncSGD(const SendParameterRequest& request,
if
(
request
.
trainer_id
()
==
0
)
{
/// batchId_ is approximately equal to "real batchId_"
batchId_
++
;
tuningAsyncsgdMidOutput
();
}
}
...
...
@@ -881,34 +738,6 @@ void ParameterServer2::sendParameter(const SendParameterRequest& request,
}
(
*
requestVec_
).
clear
();
(
*
callbackVec_
).
clear
();
/// barrier perfromance while all data are send finished.
/// indicates network flucatuation for big message.
if
(
!
numPassFinishClients_
)
{
REGISTER_BARRIER_TIMER_SERVER
(
*
statSet_
,
"sendParamFinish"
,
FLAGS_num_gradient_servers
,
request
.
trainer_id
(),
isSparseServer_
?
"_sparseUpdater"
:
"_denseUpdater"
);
}
/// all time exhausted in parameterServer for big message.
/// it contains network and computation at pserver.
{
/// total time including overhead of network.
REGISTER_TIMER_DYNAMIC_SET
(
"sendParaTotal"
,
timeToMicroSecond
(
*
handleRequestBegin_
),
-
1
,
*
statSet_
);
}
/// all time exhausted in pserverServer except recieve network.
{
/// total time except overhead of network receive
REGISTER_TIMER_DYNAMIC_SET
(
"sendParaNoRecv"
,
timeToMicroSecond
(
*
addGradBegin_
),
-
1
,
*
statSet_
);
}
}
break
;
case
PSERVER_UPDATE_MODE_SET_PARAM
:
...
...
@@ -1088,8 +917,6 @@ void ParameterServer2::op_SGD(const Operation& operation,
}
{
REGISTER_TIMER_DYNAMIC
(
"op_SGD"
,
-
1
,
*
statSet_
);
parallelExecForEachBlock
([
&
](
int64_t
blockId
,
const
VectorPtr
vecs
[])
{
BlockInfo
&
info
=
blockInfos_
[
blockId
];
const
ParameterConfig
&
config
=
getParameterConfig
(
blockId
);
...
...
@@ -1113,7 +940,6 @@ void ParameterServer2::op_SGD(const Operation& operation,
}
batchId_
++
;
tuningSgdMidOutput
();
}
void
ParameterServer2
::
op_start_pass
(
const
Operation
&
operation
,
...
...
@@ -1146,8 +972,6 @@ void ParameterServer2::op_finish_pass(const Operation& operation,
/// finish pass
info
.
optimizer
->
finishPass
();
});
tuningSgdFinished
();
batchId_
=
0
;
}
...
...
@@ -1515,7 +1339,6 @@ void ParameterServer2::asyncFinishPass(const SynchronizeRequest& request,
callback
(
SynchronizeResponse
());
if
(
request
.
trainer_id
()
==
0
)
{
tuningAsyncsgdFinished
();
batchId_
=
0
;
}
}
...
...
@@ -1574,42 +1397,4 @@ void ParameterServer2::releaseMatrix(const ReleaseMatrixRequest& request,
callback
(
response
);
}
void
ParameterServer2
::
tuningSgdMidOutput
()
{
if
(
batchId_
&&
batchId_
%
FLAGS_log_period_server
==
0
)
{
LOG
(
INFO
)
<<
"======== Batch="
<<
batchId_
<<
"======="
;
statSet_
->
setThreadInfo
(
true
);
statSet_
->
printAllStatus
();
/// not reset raw data for reducing the overhead of performance tuning
statSet_
->
reset
(
false
);
}
}
void
ParameterServer2
::
tuningSgdFinished
()
{
LOG
(
INFO
)
<<
"======== Batch="
<<
batchId_
<<
" pass END"
<<
"======="
;
statSet_
->
setThreadInfo
(
true
);
statSet_
->
printAllStatus
();
/**
* reset raw data at end of pass since some raw data could be not
* complete. Otherwise the raw data will pollute next pass performance
* tuning
*/
statSet_
->
reset
();
}
void
ParameterServer2
::
tuningAsyncsgdMidOutput
()
{
#ifndef PADDLE_DISABLE_TIMER
if
(
batchId_
&&
batchId_
%
FLAGS_log_period_server
==
0
)
{
LOG
(
INFO
)
<<
"======== [not accurate] Batch="
<<
batchId_
<<
"======="
;
printAsyncGradientCommitStatAndReset
();
}
#endif
}
void
ParameterServer2
::
tuningAsyncsgdFinished
()
{
LOG
(
INFO
)
<<
"======== [not accurate] Batch="
<<
batchId_
<<
" pass END"
<<
"======="
;
printAsyncGradientCommitStatAndReset
();
}
}
// namespace paddle
paddle/pserver/ParameterServer2.h
浏览文件 @
ce0d6704
...
...
@@ -298,24 +298,6 @@ protected:
/// barrier performance tuning sync-sgd required
std
::
atomic
<
int64_t
>
batchId_
;
/// the beginning of addGradient without network overhead
ThreadLocal
<
struct
timeval
>
addGradBegin_
;
/**
* tuning barrier performance
* to better control log for sparse and dense parameter,
* we use different log entities for different parameterServer
* objects.
* it will output lots of performance stats to perceive the
* overhead of network, fluctuation of computation from
* forwardbackward and network, computation from optimization
* at pserver end, barrier overhead, etc. to understand tuning
* data, focus on the synchronization between addGradient and
* doOperation which indirectly call op_SGD operation controlled
* by remote updater controller
*/
std
::
unique_ptr
<
StatSet
>
statSet_
;
public:
struct
Buffer
{
real
*
base
;
...
...
@@ -325,7 +307,6 @@ public:
protected:
/// async gradient commit control
bool
asyncGrdientCommitCheckAndStat
(
const
SendParameterRequest
&
request
);
void
printAsyncGradientCommitStatAndReset
();
public:
/// disable default parameter for overloading
...
...
@@ -710,36 +691,6 @@ public:
void
op_load
(
const
Operation
&
operation
,
OperationResult
*
result
);
void
op_save
(
const
Operation
&
operation
,
OperationResult
*
result
);
/**
* @brief output log in at the middle stage of training
*
* @note flush log histroy and state at the end for sgd
*/
void
tuningSgdMidOutput
();
/**
* @brief output log in at the end stage of training
*
* @note flush log histroy and state at the end for sgd. it will also
* flush some stateful stat for next pass.
*/
void
tuningSgdFinished
();
/**
* @brief output log in at the middle stage of training
*
* @note flush log histroy and state at the end for async-sgd.
* it will log some performance log if some lagged node are found
*/
void
tuningAsyncsgdMidOutput
();
/**
* @brief output log in at the end stage of training
*
* @note flush log histroy and state at the end for async-sgd.
*/
void
tuningAsyncsgdFinished
();
};
}
// namespace paddle
paddle/pserver/SocketChannel.cpp
浏览文件 @
ce0d6704
...
...
@@ -51,7 +51,7 @@ size_t SocketChannel::read(void* buf, size_t size) {
else
len
=
rdma
::
read
(
rdmaSocket_
,
(
char
*
)
buf
+
total
,
size
-
total
);
P
CHECK
(
len
>=
0
)
<<
" peer="
<<
peerName_
;
CHECK
(
len
>=
0
)
<<
" peer="
<<
peerName_
;
if
(
len
<=
0
)
{
return
total
;
}
...
...
@@ -69,7 +69,7 @@ size_t SocketChannel::write(const void* buf, size_t size) {
else
len
=
rdma
::
write
(
rdmaSocket_
,
(
char
*
)
buf
+
total
,
size
-
total
);
P
CHECK
(
len
>=
0
)
<<
" peer="
<<
peerName_
;
CHECK
(
len
>=
0
)
<<
" peer="
<<
peerName_
;
if
(
len
<=
0
)
{
return
total
;
}
...
...
@@ -98,10 +98,10 @@ static size_t readwritev(IOFunc iofunc,
while
(
size
<
total
)
{
ssize_t
len
=
iofunc
(
socket
,
&
iovs
[
curIov
],
std
::
min
(
iovcnt
-
curIov
,
maxiovs
));
P
CHECK
(
len
>
0
)
<<
" peer="
<<
peerName
<<
" curIov="
<<
curIov
<<
" iovCnt="
<<
iovcnt
<<
" iovs[curIov].base="
<<
iovs
[
curIov
].
iov_base
<<
" iovs[curIov].iov_len="
<<
iovs
[
curIov
].
iov_len
;
CHECK
(
len
>
0
)
<<
" peer="
<<
peerName
<<
" curIov="
<<
curIov
<<
" iovCnt="
<<
iovcnt
<<
" iovs[curIov].base="
<<
iovs
[
curIov
].
iov_base
<<
" iovs[curIov].iov_len="
<<
iovs
[
curIov
].
iov_len
;
size
+=
len
;
/// restore iovs[curIov] to the original value
...
...
@@ -183,7 +183,7 @@ void SocketChannel::writeMessage(const std::vector<struct iovec>& userIovs) {
header
.
totalLength
+=
iov
.
iov_len
;
}
P
CHECK
(
writev
(
iovs
)
==
(
size_t
)
header
.
totalLength
);
CHECK
(
writev
(
iovs
)
==
(
size_t
)
header
.
totalLength
);
}
std
::
unique_ptr
<
MsgReader
>
SocketChannel
::
readMessage
()
{
...
...
@@ -194,7 +194,7 @@ std::unique_ptr<MsgReader> SocketChannel::readMessage() {
return
nullptr
;
}
P
CHECK
(
len
==
sizeof
(
header
));
CHECK
(
len
==
sizeof
(
header
));
std
::
unique_ptr
<
MsgReader
>
msgReader
(
new
MsgReader
(
this
,
header
.
numIovs
));
...
...
@@ -209,7 +209,7 @@ std::unique_ptr<MsgReader> SocketChannel::readMessage() {
MsgReader
::
MsgReader
(
SocketChannel
*
channel
,
size_t
numBlocks
)
:
channel_
(
channel
),
blockLengths_
(
numBlocks
),
currentBlockIndex_
(
0
)
{
size_t
size
=
numBlocks
*
sizeof
(
blockLengths_
[
0
]);
P
CHECK
(
channel_
->
read
(
&
blockLengths_
[
0
],
size
)
==
size
);
CHECK
(
channel_
->
read
(
&
blockLengths_
[
0
],
size
)
==
size
);
}
void
MsgReader
::
readBlocks
(
const
std
::
vector
<
void
*>&
bufs
)
{
...
...
@@ -223,12 +223,12 @@ void MsgReader::readBlocks(const std::vector<void*>& bufs) {
++
currentBlockIndex_
;
}
P
CHECK
(
channel_
->
readv
(
&
iovs
)
==
totalLength
);
CHECK
(
channel_
->
readv
(
&
iovs
)
==
totalLength
);
}
void
MsgReader
::
readNextBlock
(
void
*
buf
)
{
CHECK_LT
(
currentBlockIndex_
,
blockLengths_
.
size
());
P
CHECK
(
channel_
->
read
(
buf
,
getNextBlockLength
())
==
getNextBlockLength
());
CHECK
(
channel_
->
read
(
buf
,
getNextBlockLength
())
==
getNextBlockLength
());
++
currentBlockIndex_
;
}
...
...
paddle/pserver/test/SocketTest.cpp
浏览文件 @
ce0d6704
...
...
@@ -113,7 +113,7 @@ void SocketServer::run() {
/* First call to socket() function */
socket_
=
socket
(
AF_INET
,
SOCK_STREAM
,
0
);
P
CHECK
(
socket_
>=
0
)
<<
"ERROR opening socket"
;
CHECK
(
socket_
>=
0
)
<<
"ERROR opening socket"
;
/* Initialize socket structure */
bzero
((
char
*
)
&
serv_addr
,
sizeof
(
serv_addr
));
...
...
@@ -122,7 +122,7 @@ void SocketServer::run() {
serv_addr
.
sin_port
=
htons
(
port_
);
/* Now bind the host address using bind() call.*/
P
CHECK
(
bind
(
socket_
,
(
struct
sockaddr
*
)
&
serv_addr
,
sizeof
(
serv_addr
))
>=
0
)
CHECK
(
bind
(
socket_
,
(
struct
sockaddr
*
)
&
serv_addr
,
sizeof
(
serv_addr
))
>=
0
)
<<
"ERROR on binding"
;
/* Now start listening for the clients, here process will
...
...
@@ -134,7 +134,7 @@ void SocketServer::run() {
while
(
true
)
{
/* Accept actual connection from the client */
newsockfd
=
accept
(
socket_
,
(
struct
sockaddr
*
)
&
cli_addr
,
&
clilen
);
P
CHECK
(
newsockfd
>=
0
)
<<
"ERROR on accept"
;
CHECK
(
newsockfd
>=
0
)
<<
"ERROR on accept"
;
SocketWorker
*
worker
=
new
SocketWorker
(
newsockfd
);
worker
->
start
();
...
...
@@ -146,17 +146,17 @@ void SocketWorker::run() {
while
(
true
)
{
int64_t
n
=
channel_
.
readAll
(
&
header
,
sizeof
(
header
));
P
CHECK
(
n
==
sizeof
(
header
))
<<
"ERROR reading from socket"
;
CHECK
(
n
==
sizeof
(
header
))
<<
"ERROR reading from socket"
;
buffer_
.
resize
(
header
.
dataLength
);
n
=
channel_
.
readAll
(
&
buffer_
[
0
],
header
.
dataLength
);
P
CHECK
(
n
==
header
.
dataLength
)
<<
"ERROR reading from socket"
;
CHECK
(
n
==
header
.
dataLength
)
<<
"ERROR reading from socket"
;
/* Write a response to the client */
n
=
channel_
.
writeAll
(
&
header
,
sizeof
(
header
));
P
CHECK
(
n
==
sizeof
(
header
))
<<
"ERROR reading from socket"
;
CHECK
(
n
==
sizeof
(
header
))
<<
"ERROR reading from socket"
;
n
=
channel_
.
writeAll
(
buffer_
.
data
(),
buffer_
.
size
());
P
CHECK
(
n
==
header
.
dataLength
)
<<
"ERROR writing to socket"
;
CHECK
(
n
==
header
.
dataLength
)
<<
"ERROR writing to socket"
;
}
}
...
...
@@ -177,9 +177,9 @@ SocketClient::SocketClient(const std::string& serverAddr, int serverPort) {
/* Create a socket point */
int
sockfd
=
socket
(
AF_INET
,
SOCK_STREAM
,
0
);
P
CHECK
(
sockfd
>=
0
)
<<
"ERROR opening socket"
;
CHECK
(
sockfd
>=
0
)
<<
"ERROR opening socket"
;
server
=
gethostbyname
(
serverAddr
.
c_str
());
P
CHECK
(
server
)
<<
"ERROR, no such host: "
<<
serverAddr
;
CHECK
(
server
)
<<
"ERROR, no such host: "
<<
serverAddr
;
bzero
((
char
*
)
&
serv_addr
,
sizeof
(
serv_addr
));
serv_addr
.
sin_family
=
AF_INET
;
...
...
@@ -189,7 +189,7 @@ SocketClient::SocketClient(const std::string& serverAddr, int serverPort) {
serv_addr
.
sin_port
=
htons
(
serverPort
);
/* Now connect to the server */
P
CHECK
(
connect
(
sockfd
,
(
sockaddr
*
)
&
serv_addr
,
sizeof
(
serv_addr
))
>=
0
)
CHECK
(
connect
(
sockfd
,
(
sockaddr
*
)
&
serv_addr
,
sizeof
(
serv_addr
))
>=
0
)
<<
"ERROR connecting"
;
channel_
.
reset
(
new
SocketChannel
(
sockfd
));
...
...
@@ -234,18 +234,18 @@ int main(int argc, char** argv) {
cpuGrad
.
copyFrom
(
gpuGrad
);
header
.
dataLength
=
dataSize
;
P
CHECK
(
channel
->
writeAll
(
&
header
,
sizeof
(
header
))
==
sizeof
(
header
))
CHECK
(
channel
->
writeAll
(
&
header
,
sizeof
(
header
))
==
sizeof
(
header
))
<<
"Client write header error"
;
P
CHECK
(
channel
->
writeAll
(
cpuGrad
.
getData
(),
dataSize
)
==
dataSize
)
CHECK
(
channel
->
writeAll
(
cpuGrad
.
getData
(),
dataSize
)
==
dataSize
)
<<
"Client write data error"
;
/* Now read server response */
P
CHECK
(
channel
->
readAll
(
&
header
,
sizeof
(
header
))
==
sizeof
(
header
))
CHECK
(
channel
->
readAll
(
&
header
,
sizeof
(
header
))
==
sizeof
(
header
))
<<
"Client read header error"
;
CHECK_EQ
((
uint64_t
)
header
.
dataLength
,
dataSize
);
P
CHECK
(
channel
->
readAll
(
cpuParam
.
getData
(),
dataSize
)
==
dataSize
)
CHECK
(
channel
->
readAll
(
cpuParam
.
getData
(),
dataSize
)
==
dataSize
)
<<
"Client read data error"
;
gpuParam
.
copyFrom
(
cpuParam
);
...
...
paddle/scripts/docker/build.sh
浏览文件 @
ce0d6704
...
...
@@ -78,7 +78,7 @@ paddle version
# PaddlePaddle. This awkwardness is due to
# https://github.com/PaddlePaddle/Paddle/issues/1854. It also
# describes a solution.
if
[
${
WITH_DOC
}
==
"ON"
]
;
then
if
[
[
${
WITH_DOC
}
==
"ON"
]
]
;
then
cat
<<
EOF
========================================
Building documentation ...
...
...
paddle/scripts/travis/build_doc.sh
浏览文件 @
ce0d6704
...
...
@@ -5,13 +5,14 @@ set -e
mkdir
-p
$TRAVIS_BUILD_DIR
/build
cd
$TRAVIS_BUILD_DIR
/build
# Compile
Documentation only.
cmake ..
-DCMAKE_BUILD_TYPE
=
Debug
-DWITH_GPU
=
OFF
-DWITH_DOC
=
OFF
-DWITH_STYLE_CHECK
=
OFF
# Compile
paddle binaries first
cmake ..
-DCMAKE_BUILD_TYPE
=
Debug
-DWITH_GPU
=
OFF
-DWITH_DOC
=
OFF
-DWITH_
GOLANG
=
ON
-DWITH_
STYLE_CHECK
=
OFF
mkdir
output
make
-j
`
nproc
`
find ..
-name
'*whl'
| xargs pip
install
# install all wheels.
rm
-rf
*
# Compile Documentation only.
cmake ..
-DCMAKE_BUILD_TYPE
=
Debug
-DWITH_GPU
=
OFF
-DWITH_DOC
=
ON
make
-j
`
nproc
`
paddle_docs paddle_docs_cn
...
...
@@ -25,7 +26,7 @@ SSH_REPO=${REPO/https:\/\/github.com\//git@github.com:}
SHA
=
`
git rev-parse
--verify
HEAD
`
# Documentation branch name
# gh-pages branch is used for PaddlePaddle.org. The English version of
# gh-pages branch is used for PaddlePaddle.org. The English version of
# documentation in `doc` directory, and the chinese version in `doc_cn`
# directory.
TARGET_BRANCH
=
"gh-pages"
...
...
@@ -51,7 +52,7 @@ function deploy_docs() {
# checkout github page branch
git checkout
$TARGET_BRANCH
||
git checkout
--orphan
$TARGET_BRANCH
mkdir
-p
${
DIR
}
# remove old docs. mv new docs.
set
+e
...
...
@@ -62,7 +63,7 @@ function deploy_docs() {
git add
.
}
deploy_docs
"master"
"."
deploy_docs
"master"
"."
deploy_docs
"develop"
"./develop/"
# Check is there anything changed.
...
...
paddle/trainer/Tester.cpp
浏览文件 @
ce0d6704
...
...
@@ -175,7 +175,7 @@ real Tester::forwardOneBatch(const DataBatch& dataBatch,
}
hl_stream_synchronize
(
HPPL_STREAM_DEFAULT
);
FILE
*
fp
=
fopen
(
featFile
.
c_str
(),
"ab+"
);
P
CHECK
(
!
ferror
(
fp
))
<<
"Fail to open "
<<
featFile
;
CHECK
(
!
ferror
(
fp
))
<<
"Fail to open "
<<
featFile
;
size_t
sampleNum
=
featMatrices
[
0
]
->
getHeight
();
for
(
size_t
i
=
0
;
i
<
sampleNum
;
++
i
)
{
...
...
paddle/utils/BarrierStat.cpp
已删除
100644 → 0
浏览文件 @
c78f41a3
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/utils/BarrierStat.h"
#include <string.h>
#include <sys/types.h>
#include <algorithm>
#include <iomanip>
#include "paddle/utils/Flags.h"
#include "paddle/utils/Stat.h"
DEFINE_bool
(
log_barrier_abstract
,
true
,
"if true, show abstract of barrier performance"
);
DEFINE_int32
(
log_barrier_lowest_nodes
,
5
,
"how many lowest node will be logged"
);
DEFINE_bool
(
log_barrier_show_log
,
false
,
// for performance tuning insight
"if true, always show barrier abstract even with little gap"
);
namespace
paddle
{
std
::
ostream
&
operator
<<
(
std
::
ostream
&
output
,
const
BarrierStatBase
&
stat
)
{
if
(
FLAGS_log_barrier_abstract
)
{
std
::
lock_guard
<
std
::
mutex
>
guard
(
stat
.
lock_
);
stat
.
showAbstract
(
output
);
}
return
output
;
}
BarrierStatBase
::
BarrierStatBase
(
uint16_t
numConnThreads
,
const
std
::
string
&
name
)
:
totSamples_
(
0
),
numConnThreads_
(
numConnThreads
),
name_
(
name
)
{
abstract_
.
resize
(
numConnThreads_
);
if
(
FLAGS_log_barrier_show_log
)
{
rateThreshold_
=
0.0
;
}
else
{
/* probablity of abnormal node
* p = 1/n + (n/8)/(n+1), n = nodes, n > 1
* if the freq of lowest trainerId larger than p,
* output FLAGS_log_barrier_lowest_nodes lastTrainerId.
* numConnThreads_ indicates nodes
*/
float
n
=
(
float
)
numConnThreads
;
rateThreshold_
=
1.0
/
n
+
(
n
/
8.0
)
/
(
n
+
1.0
);
}
}
BarrierEndStat
::
BarrierEndStat
(
uint16_t
numConnThreads
,
const
std
::
string
&
name
)
:
BarrierStatBase
(
numConnThreads
,
name
)
{
timeVector_
.
reset
(
new
TimeVectorEnd
(
numConnThreads_
));
reset
(
true
);
LOG
(
INFO
)
<<
" create barrierEndStat: "
<<
name
<<
" endBarrier warning rate: "
<<
rateThreshold_
;
}
/*
* Note:
* the design different pserver entity owns different statSet to obey
* the background that different pserver runs separately.
*/
void
BarrierEndStat
::
updateStat
(
struct
timeval
&
cur
,
int32_t
trainerId
)
{
CHECK_LT
(
trainerId
,
numConnThreads_
)
<<
"trainerId is invalid in barrier"
;
std
::
lock_guard
<
std
::
mutex
>
guard
(
lock_
);
timeVector_
->
addTimeval
(
cur
,
trainerId
);
if
(
timeVector_
->
full
())
{
std
::
lock_guard
<
std
::
mutex
>
abstractGuard
(
abstractLock_
);
auto
id
=
timeVector_
->
getLastTrainerId
();
auto
delta
=
timeToMicroSecond
(
timeVector_
->
getDelta
());
auto
secondDelta
=
timeToMicroSecond
(
timeVector_
->
get1NDelta
());
auto
lastTwoDelta
=
timeToMicroSecond
(
timeVector_
->
getMinus1NDelta
());
auto
midDelta
=
timeToMicroSecond
(
timeVector_
->
getMidNDelta
());
// discard first sample, since first sample probably is abnormal.
if
(
totSamples_
)
{
abstract_
[
id
].
freq
++
;
if
(
delta
<
abstract_
[
id
].
minDelta
)
{
abstract_
[
id
].
minDelta
=
delta
;
}
if
(
delta
>
abstract_
[
id
].
maxDelta
)
{
abstract_
[
id
].
maxDelta
=
delta
;
}
abstract_
[
id
].
totDelta
+=
delta
;
abstract_
[
id
].
totSecondDelta
+=
secondDelta
;
abstract_
[
id
].
totLastTwoDelta
+=
lastTwoDelta
;
abstract_
[
id
].
totMidDelta
+=
midDelta
;
// update totAbstract_
totAbstract_
.
freq
++
;
if
(
delta
<
totAbstract_
.
minDelta
)
{
totAbstract_
.
minDelta
=
delta
;
}
if
(
delta
>
totAbstract_
.
maxDelta
)
{
totAbstract_
.
maxDelta
=
delta
;
}
totAbstract_
.
totDelta
+=
delta
;
totAbstract_
.
totSecondDelta
+=
secondDelta
;
totAbstract_
.
totLastTwoDelta
+=
lastTwoDelta
;
totAbstract_
.
totMidDelta
+=
midDelta
;
}
totSamples_
++
;
timeVector_
->
reset
();
}
}
void
BarrierEndStat
::
reset
(
bool
clearRawData
)
{
int32_t
i
=
0
;
totSamples_
=
0
;
std
::
lock_guard
<
std
::
mutex
>
guard
(
abstractLock_
);
if
(
clearRawData
)
{
timeVector_
->
reset
();
}
for
(
auto
&
abstract
:
abstract_
)
{
memset
((
void
*
)
&
abstract
,
0
,
sizeof
(
abstract
));
abstract
.
minDelta
=
UINT64_MAX
;
abstract
.
trainerId
=
i
++
;
}
memset
((
void
*
)
&
totAbstract_
,
0
,
sizeof
(
Abstract
));
totAbstract_
.
minDelta
=
UINT64_MAX
;
}
void
BarrierEndStat
::
showAbstract
(
std
::
ostream
&
output
)
const
{
// do not support the case "<=2 pserver"
if
(
numConnThreads_
<=
2
||
!
totSamples_
)
{
return
;
}
// duplicate freq info
std
::
vector
<
struct
Abstract
>
outputAbstract
=
abstract_
;
std
::
sort
(
outputAbstract
.
begin
(),
outputAbstract
.
end
(),
[](
const
struct
Abstract
&
a
,
const
struct
Abstract
&
b
)
{
return
a
.
freq
>
b
.
freq
;
});
auto
rate
=
(
float
)
outputAbstract
[
0
].
freq
/
(
float
)
totSamples_
;
if
(
rate
<
rateThreshold_
)
{
return
;
}
output
<<
std
::
setw
(
20
)
<<
name_
<<
std
::
endl
;
/*
* Note:
* avgGap: the average delta between 1 -- n arriving trainers
* avgSecondGap: the average delta between 2 -- n arriving trainers
* avgLastTwoGap: the average delta between n-1 -- n arriving trainers
* avgMidGap: the average delta between n/2 -- n arriving trainers
* rato: samples / totSamples
*
* the stat is based on per trainer if trainer_id is set, totAbstract is
* stat based on all trainers scope.
*/
output
<<
std
::
setw
(
42
)
<<
" "
<<
std
::
setw
(
15
)
<<
"trainerId"
<<
std
::
setw
(
15
)
<<
"avgGap"
<<
std
::
setw
(
15
)
<<
"avgSecondGap"
<<
std
::
setw
(
15
)
<<
"avgLastTwoGap"
<<
std
::
setw
(
15
)
<<
"avgMidGap"
<<
std
::
setw
(
10
)
<<
"rate"
<<
std
::
setw
(
10
)
<<
"samples"
<<
std
::
setw
(
10
)
<<
"totSamples"
<<
std
::
endl
;
// show totAbstract, it's valuable when lastTrainerId is even-distributed'
if
(
!
totAbstract_
.
freq
)
return
;
output
<<
std
::
setw
(
42
)
<<
" "
<<
std
::
setw
(
15
)
<<
"totAbstract"
<<
std
::
setw
(
15
)
<<
(
totAbstract_
.
totDelta
/
totAbstract_
.
freq
)
*
0.001
<<
std
::
setw
(
15
)
<<
(
totAbstract_
.
totSecondDelta
/
totAbstract_
.
freq
)
*
0.001
<<
std
::
setw
(
15
)
<<
(
totAbstract_
.
totLastTwoDelta
/
totAbstract_
.
freq
)
*
0.001
<<
std
::
setw
(
15
)
<<
(
totAbstract_
.
totMidDelta
/
totAbstract_
.
freq
)
*
0.001
<<
std
::
setw
(
10
)
<<
(
float
)
totAbstract_
.
freq
/
(
float
)
totSamples_
<<
std
::
setw
(
10
)
<<
(
float
)
totAbstract_
.
freq
<<
std
::
setw
(
10
)
<<
(
float
)
totSamples_
<<
std
::
endl
;
// show lastTrainerId abstract
int
count
=
0
;
for
(
auto
&
abstract
:
outputAbstract
)
{
if
(
!
abstract
.
freq
||
count
++
>=
FLAGS_log_barrier_lowest_nodes
)
{
break
;
}
// output format control
output
<<
std
::
setw
(
42
)
<<
" "
<<
std
::
setw
(
15
)
<<
abstract
.
trainerId
<<
std
::
setw
(
15
)
<<
(
abstract
.
totDelta
/
abstract
.
freq
)
*
0.001
<<
std
::
setw
(
15
)
<<
(
abstract
.
totSecondDelta
/
abstract
.
freq
)
*
0.001
<<
std
::
setw
(
15
)
<<
(
abstract
.
totLastTwoDelta
/
abstract
.
freq
)
*
0.001
<<
std
::
setw
(
15
)
<<
(
abstract
.
totMidDelta
/
abstract
.
freq
)
*
0.001
<<
std
::
setw
(
10
)
<<
(
float
)
abstract
.
freq
/
(
float
)
totSamples_
<<
std
::
setw
(
10
)
<<
(
float
)
abstract
.
freq
<<
std
::
setw
(
10
)
<<
(
float
)
totSamples_
<<
std
::
endl
;
}
}
BarrierDeltaStat
::
BarrierDeltaStat
(
uint16_t
numConnThreads
,
const
std
::
string
&
name
)
:
BarrierStatBase
(
numConnThreads
,
name
)
{
timeVector_
.
reset
(
new
TimeVectorDelta
(
numConnThreads_
));
reset
(
true
);
LOG
(
INFO
)
<<
" create barrierDeltaStat: "
<<
name
<<
" barrierDelta warning rate: "
<<
rateThreshold_
;
}
void
BarrierDeltaStat
::
updateStat
(
uint64_t
delta
,
int32_t
trainerId
)
{
CHECK_LT
(
trainerId
,
numConnThreads_
)
<<
"trainerId is invalid in barrier"
;
std
::
lock_guard
<
std
::
mutex
>
guard
(
lock_
);
timeVector_
->
addTimeval
(
delta
,
trainerId
);
if
(
timeVector_
->
full
())
{
std
::
lock_guard
<
std
::
mutex
>
abstractGuard
(
abstractLock_
);
auto
id
=
timeVector_
->
getMaxTrainerId
();
auto
delta
=
timeVector_
->
getDelta
();
// discard first sample, since first sample probably is abnormal.
if
(
totSamples_
)
{
abstract_
[
id
].
freq
++
;
if
(
delta
<
abstract_
[
id
].
minDelta
)
{
abstract_
[
id
].
minDelta
=
delta
;
}
if
(
delta
>
abstract_
[
id
].
maxDelta
)
{
abstract_
[
id
].
maxDelta
=
delta
;
}
abstract_
[
id
].
totDelta
+=
delta
;
// update totAbstract_
totAbstract_
.
freq
++
;
if
(
delta
<
totAbstract_
.
minDelta
)
{
totAbstract_
.
minDelta
=
delta
;
}
if
(
delta
>
totAbstract_
.
maxDelta
)
{
totAbstract_
.
maxDelta
=
delta
;
}
totAbstract_
.
totDelta
+=
delta
;
}
totSamples_
++
;
timeVector_
->
reset
();
}
}
void
BarrierDeltaStat
::
reset
(
bool
clearRawData
)
{
int32_t
i
=
0
;
totSamples_
=
0
;
std
::
lock_guard
<
std
::
mutex
>
guard
(
abstractLock_
);
if
(
clearRawData
)
{
timeVector_
->
reset
();
}
for
(
auto
&
abstract
:
abstract_
)
{
memset
((
void
*
)
&
abstract
,
0
,
sizeof
(
abstract
));
abstract
.
minDelta
=
UINT64_MAX
;
abstract
.
trainerId
=
i
++
;
}
memset
((
void
*
)
&
totAbstract_
,
0
,
sizeof
(
Abstract
));
totAbstract_
.
minDelta
=
UINT64_MAX
;
}
void
BarrierDeltaStat
::
showAbstract
(
std
::
ostream
&
output
)
const
{
// do not support the case "<=2 pserver"
if
(
numConnThreads_
<=
2
||
!
totSamples_
)
{
return
;
}
// duplicate freq info
std
::
vector
<
struct
Abstract
>
outputAbstract
=
abstract_
;
std
::
sort
(
outputAbstract
.
begin
(),
outputAbstract
.
end
(),
[](
const
struct
Abstract
&
a
,
const
struct
Abstract
&
b
)
{
return
a
.
freq
>
b
.
freq
;
});
auto
rate
=
(
float
)
outputAbstract
[
0
].
freq
/
(
float
)
totSamples_
;
if
(
rate
<
rateThreshold_
)
{
return
;
}
output
<<
std
::
setw
(
20
)
<<
name_
<<
std
::
endl
;
/* Note:
* Gap means the delta from all trainers' forwardbackward
* avgGap: average Gap in log_period batches
* minGap: min Gap in log_period batches
* maxGap: max Gap in log_period batches
* trainerId: the slowest trainer_id
*
* the stat is based on per trainer if trainer_id is set, totAbstract is
* stat based on all trainers scope.
*/
output
<<
std
::
setw
(
42
)
<<
" "
<<
std
::
setw
(
15
)
<<
"trainerId"
<<
std
::
setw
(
15
)
<<
"avgGap"
<<
std
::
setw
(
10
)
<<
"minGap"
<<
std
::
setw
(
10
)
<<
"maxGap"
<<
std
::
setw
(
10
)
<<
"rate"
<<
std
::
setw
(
10
)
<<
"samples"
<<
std
::
setw
(
10
)
<<
"totSamples"
<<
std
::
endl
;
// show totAbstract, it's valuable when lastTrainerId is even-distributed'
if
(
!
totAbstract_
.
freq
)
return
;
output
<<
std
::
setw
(
42
)
<<
" "
<<
std
::
setw
(
15
)
<<
"totAbstract"
<<
std
::
setw
(
15
)
<<
(
totAbstract_
.
totDelta
/
totAbstract_
.
freq
)
*
0.001
<<
std
::
setw
(
10
)
<<
totAbstract_
.
minDelta
*
0.001
<<
std
::
setw
(
10
)
<<
totAbstract_
.
maxDelta
*
0.001
<<
std
::
setw
(
10
)
<<
(
float
)
totAbstract_
.
freq
/
(
float
)
totSamples_
<<
std
::
setw
(
10
)
<<
(
float
)
totAbstract_
.
freq
<<
std
::
setw
(
10
)
<<
(
float
)
totSamples_
<<
std
::
endl
;
// show lastTrainerId abstract
int
count
=
0
;
for
(
auto
&
abstract
:
outputAbstract
)
{
if
(
!
abstract
.
freq
||
count
++
>=
FLAGS_log_barrier_lowest_nodes
)
{
break
;
}
// output format control
output
<<
std
::
setw
(
42
)
<<
" "
<<
std
::
setw
(
15
)
<<
abstract
.
trainerId
<<
std
::
setw
(
15
)
<<
(
abstract
.
totDelta
/
abstract
.
freq
)
*
0.001
<<
std
::
setw
(
10
)
<<
abstract
.
minDelta
*
0.001
<<
std
::
setw
(
10
)
<<
abstract
.
maxDelta
*
0.001
<<
std
::
setw
(
10
)
<<
(
float
)
abstract
.
freq
/
(
float
)
totSamples_
<<
std
::
setw
(
10
)
<<
(
float
)
abstract
.
freq
<<
std
::
setw
(
10
)
<<
(
float
)
totSamples_
<<
std
::
endl
;
}
}
}
// namespace paddle
paddle/utils/BarrierStat.h
已删除
100644 → 0
浏览文件 @
c78f41a3
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <stdint.h>
#include <sys/time.h>
#include <iostream>
#include <list>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include "Locks.h"
#include "Logging.h"
#include "ThreadLocal.h"
namespace
paddle
{
inline
uint64_t
timeToMicroSecond
(
struct
timeval
time
)
{
return
time
.
tv_sec
*
1000000LU
+
time
.
tv_usec
;
}
class
TimeVectorEnd
{
/*
* help class for gathering all barrier performance data
* which shows time point property.
* freqently used in barrier performance tuning API, such
* as tuning which is slowest node in sync-sgd mode training.
*/
public:
explicit
TimeVectorEnd
(
uint16_t
size
)
:
size_
(
size
)
{
index_
=
0
;
timeArray_
.
resize
(
size
);
trainerIds_
.
resize
(
size
);
}
~
TimeVectorEnd
()
{}
uint16_t
size
()
{
return
size_
;
}
bool
full
()
{
return
index_
==
size_
;
}
bool
empty
()
{
return
index_
==
0
;
}
void
reset
()
{
index_
=
0
;
}
void
addTimeval
(
struct
timeval
time
,
int32_t
trainerId
)
{
timeArray_
[
index_
]
=
time
;
trainerIds_
[
index_
]
=
trainerId
;
index_
++
;
}
struct
timeval
getDelta
()
const
{
struct
timeval
delta
;
CHECK_GT
(
size_
,
1
)
<<
"not support with 1 pserver"
;
timersub
(
&
timeArray_
[
size_
-
1
],
&
timeArray_
[
0
],
&
delta
);
return
delta
;
}
/* 2, n delta */
struct
timeval
get1NDelta
()
const
{
CHECK_GT
(
size_
,
2
)
<<
"not support with less than 2 pservers"
;
struct
timeval
delta
;
timersub
(
&
timeArray_
[
size_
-
1
],
&
timeArray_
[
1
],
&
delta
);
return
delta
;
}
/* n-1, n delta */
struct
timeval
getMinus1NDelta
()
const
{
CHECK_GT
(
size_
,
2
)
<<
"not support with less than 2 pservers"
;
struct
timeval
delta
;
timersub
(
&
timeArray_
[
size_
-
1
],
&
timeArray_
[
size_
-
2
],
&
delta
);
return
delta
;
}
/* n/2, n delta */
struct
timeval
getMidNDelta
()
const
{
CHECK_GT
(
size_
,
2
)
<<
"not support with less than 2 pservers"
;
struct
timeval
delta
;
timersub
(
&
timeArray_
[
size_
-
1
],
&
timeArray_
[
size_
/
2
],
&
delta
);
return
delta
;
}
int32_t
getLastTrainerId
()
const
{
return
trainerIds_
[
index_
-
1
];
}
private:
uint16_t
size_
;
uint16_t
index_
;
std
::
vector
<
struct
timeval
>
timeArray_
;
std
::
vector
<
int32_t
>
trainerIds_
;
};
class
TimeVectorDelta
{
/*
* help class for gathering performance data which shows time
* delta property, such as tuning the time distribution of
* forwardBackward time from all cluster nodes.
*/
public:
explicit
TimeVectorDelta
(
uint16_t
size
)
:
size_
(
size
),
min_
(
UINT64_MAX
),
max_
(
0
)
{
index_
=
0
;
timeArray_
.
resize
(
size
);
}
~
TimeVectorDelta
()
{}
uint16_t
size
()
{
return
size_
;
}
bool
full
()
{
return
index_
==
size_
;
}
bool
empty
()
{
return
index_
==
0
;
}
void
reset
()
{
index_
=
0
;
min_
=
UINT64_MAX
;
max_
=
0
;
}
void
addTimeval
(
uint64_t
delta
,
int32_t
trainerId
)
{
timeArray_
[
index_
]
=
delta
;
index_
++
;
if
(
delta
<
min_
)
{
min_
=
delta
;
}
if
(
delta
>
max_
)
{
max_
=
delta
;
maxTrainerId_
=
trainerId
;
}
}
uint64_t
getDelta
()
const
{
CHECK_GT
(
size_
,
1
)
<<
"not support with 1 pserver"
;
return
max_
-
min_
;
}
/* 2, n delta */
uint64_t
get1NDelta
()
const
{
CHECK_GT
(
size_
,
2
)
<<
"not support with less than 2 pservers"
;
LOG
(
FATAL
)
<<
"Not implemented"
;
}
/* n-1, n delta */
uint64_t
getMinus1NDelta
()
const
{
CHECK_GT
(
size_
,
2
)
<<
"not support with less than 2 pservers"
;
LOG
(
FATAL
)
<<
"Not implemented"
;
}
/* n/2, n delta */
uint64_t
getMidNDelta
()
const
{
CHECK_GT
(
size_
,
2
)
<<
"not support with less than 2 pservers"
;
LOG
(
FATAL
)
<<
"Not implemented"
;
}
int32_t
getMaxTrainerId
()
const
{
return
maxTrainerId_
;
}
private:
uint16_t
size_
;
uint16_t
index_
;
std
::
vector
<
uint64_t
>
timeArray_
;
private:
uint64_t
min_
;
uint64_t
max_
;
int32_t
maxTrainerId_
;
};
// total samples stats, us
struct
Abstract
{
// last trainerId for barrier end, maxDelta trainerId for barrier delta
int32_t
trainerId
;
uint64_t
minDelta
;
uint64_t
maxDelta
;
uint64_t
totDelta
;
// first one is probably itself, so discard it.
uint64_t
totSecondDelta
;
// to confirm if last node destroy barrier performance.
uint64_t
totLastTwoDelta
;
// n/2-n delta
uint64_t
totMidDelta
;
uint64_t
freq
;
};
// barrier performance tunning stats
class
BarrierStatBase
{
public:
BarrierStatBase
(
uint16_t
numConnThreads
,
const
std
::
string
&
name
);
virtual
~
BarrierStatBase
()
{}
// if called at pserver end, then trainId means trainer's id.
// by default trainer does not use trainerId, so set it to -1
virtual
void
updateStat
(
struct
timeval
&
cur
,
int32_t
trainerId
=
-
1
)
=
0
;
virtual
void
updateStat
(
uint64_t
delta
,
int32_t
trainerId
=
-
1
)
=
0
;
const
std
::
string
&
getName
()
{
return
name_
;
}
virtual
void
reset
(
bool
clearRawData
=
true
)
{}
// since the timeVector_ is not stateful, so it's not clear whether the
// the barrier delta is correct. if one timestamp was lost, the all data
// from barrier stat becomes rubbish. -_-
virtual
bool
checkPassBarrier
()
{
LOG
(
INFO
)
<<
"bug implementation found"
;
return
false
;
}
protected:
virtual
void
showAbstract
(
std
::
ostream
&
output
)
const
{}
friend
std
::
ostream
&
operator
<<
(
std
::
ostream
&
output
,
const
BarrierStatBase
&
stat
);
protected:
mutable
std
::
mutex
lock_
;
std
::
mutex
abstractLock_
;
// see note on updaterStat
// each freqency for each barrier trainer
std
::
vector
<
struct
Abstract
>
abstract_
;
// it is valuable when do perf-tuining, if lastTrainerId acts uniform
// distribution
struct
Abstract
totAbstract_
;
uint64_t
totSamples_
;
protected:
uint16_t
numConnThreads_
;
// total updates needed
float
rateThreshold_
;
std
::
string
name_
;
};
// the end-time of arriving real/forged barrier position
class
BarrierEndStat
:
public
BarrierStatBase
{
public:
BarrierEndStat
(
uint16_t
numConnThreads
,
const
std
::
string
&
name
);
~
BarrierEndStat
()
{}
virtual
void
updateStat
(
struct
timeval
&
cur
,
int32_t
trainerId
=
-
1
);
virtual
void
updateStat
(
uint64_t
delta
,
int32_t
trainerId
=
-
1
)
{
LOG
(
INFO
)
<<
"have no delta updateStat in BarrierEndStat"
;
}
virtual
void
reset
(
bool
clearRawData
=
true
);
virtual
bool
checkPassBarrier
()
{
return
timeVector_
->
empty
();
}
protected:
/*
* LOG:
* readAllBlocks_denseUpdater
* trainerId avgGap avgSecondGap avgLastTwoGap avgMidGap rate
* 44 86.702 81.022 9.984 50.472 0.144737
* 46 87.723 82.939 8.737 50.019 0.118421
* 35 100.923 96.752 14.305 61.979
* 0.0657895
* log_barrier_abstract, log_barrier_lowest_nodes, log_barrier_threshold
* control details.
*/
virtual
void
showAbstract
(
std
::
ostream
&
output
)
const
;
private:
std
::
unique_ptr
<
TimeVectorEnd
>
timeVector_
;
};
// the delta-time from different trainers,
// eg, find the degree of imbalance of BP time at pserver end
// the entry value in timerVector_ is BP delta, do evaluation to BP delta.
class
BarrierDeltaStat
:
public
BarrierStatBase
{
public:
BarrierDeltaStat
(
uint16_t
numConnThreads
,
const
std
::
string
&
name
);
~
BarrierDeltaStat
()
{}
virtual
void
updateStat
(
uint64_t
delta
,
int32_t
trainerId
=
-
1
);
virtual
void
updateStat
(
struct
timeval
&
cur
,
int32_t
trainerId
=
-
1
)
{
LOG
(
INFO
)
<<
"have no timeval updateStat in BarrierDeltaStat"
;
}
virtual
void
reset
(
bool
clearRawData
=
true
);
virtual
bool
checkPassBarrier
()
{
return
timeVector_
->
empty
();
}
protected:
virtual
void
showAbstract
(
std
::
ostream
&
outPut
)
const
;
private:
// store delta time in uint64_t, eg BP time of all trainers
std
::
unique_ptr
<
TimeVectorDelta
>
timeVector_
;
};
// to distinguish different contexts for same parallel threads, and different
// threads with same code-sgement, just use tagName to tag the run-time
// position.
// in Sparse, sendParallel threads can not only run in the stage of push&pull
// with same thread group, but also run in the stage of pull&push with different
// thread group, tag will be used to distinguish different run-time barrier
// position.
// trainerId in REGISTER_BARRIER_TIMER_SERVER is used to retreive lowest trainer
// nodes.
// end barrier
#define __REGISTER_BARRIER_TIMER_SERVER( \
set, statName, numConnThreads, trainerId, ...) \
do { \
if (numConnThreads > 2) { \
std::string internalName = \
std::string(statName) + std::string(__VA_ARGS__); \
BarrierStatPtr __stat = \
(set).getStat(numConnThreads, internalName, BARRIER_END); \
struct timeval cur; \
gettimeofday(&cur, nullptr); \
__stat->updateStat(cur, trainerId); \
} \
} while (0);
// end barrier with user-defined timer
#define __REGISTER_BARRIER_TIMER_SERVER_SET( \
set, statName, numConnThreads, trainerId, cur, ...) \
do { \
if (numConnThreads > 2) { \
std::string internalName = \
std::string(statName) + std::string(__VA_ARGS__); \
BarrierStatPtr __stat = \
(set).getStat(numConnThreads, internalName, BARRIER_END); \
__stat->updateStat(cur, trainerId); \
} \
} while (0);
// delta barrier
#define __REGISTER_BARRIER_DELTA_SERVER_SET( \
set, statName, numConnThreads, trainerId, delta, ...) \
do { \
if (numConnThreads > 2) { \
std::string internalName = \
std::string(statName) + std::string(__VA_ARGS__); \
BarrierStatPtr __stat = \
(set).getStat(numConnThreads, internalName, BARRIER_DELTA); \
__stat->updateStat(delta, trainerId); \
} \
} while (0);
// check end barrier
#define __CHECK_BARRIER_TIMER(set, statName, numConnThreads, ...) \
do { \
std::string internalName = \
std::string(statName) + std::string(__VA_ARGS__); \
BarrierStatPtr __stat = \
(set).getStat(numConnThreads, internalName, BARRIER_END); \
PCHECK(__stat->checkPassBarrier()) << internalName \
<< ": invalid barrier data"; \
} while (0);
/*
* Note:
* with sync-sgd algriothm in cluster mode, lots of synchronize action exsit at
* pserve end. these synchronizaton actions have impact on the efficiency of
* parameter exchange. the synchronizaton(barrier) GAP is composed of lots of
* factors, such as the forwardBackward variance, network fluncation. we try
* to have a quantitative analysis on these factor, so we design lots of barrier
* time to capture these performance. these barrier also can be placed at
* implict barrier position.
*
* example:
* in sync-sgd algorithm, each parameter server waits for all gradients from
* all trainers, thus, an explict barrier point exsit before doing optimization.
* the barrier timer located before the point can sense the barrier condition.
*
*/
// try to capture which trainer is slowest node in sync-sgd at pserver.
#define REGISTER_SLOW_NODES_PROBE( \
set, statName, numConnThreads, trainerId, ...) \
__REGISTER_BARRIER_TIMER_SERVER( \
(set), statName, numConnThreads, trainerId, __VA_ARGS__)
// try to check if all threads or trainers have passed barriers for data
// accuracy.
#define CHECK_BARRIER_TIMER(set, statName, numConnThreads, ...) \
__CHECK_BARRIER_TIMER((set), statName, numConnThreads, __VA_ARGS__)
#ifdef PADDLE_DISABLE_TIMER
#define REGISTER_BARRIER_TIMER_SERVER( \
set, statName, numConnThreads, trainerId, ...)
#define REGISTER_BARRIER_TIMER_SERVER_SET( \
set, statName, numConnThreads, trainerId, cur, ...)
#define REGISTER_BARRIER_DELTA_SERVER_SET( \
set, statName, numConnThreads, trainerId, cur, ...)
#else
/*
* sensing barrier time distribution for all parallelization threads.
* it provides low API for slow node check(REGISTER_SLOW_NODES_PROBE)
*/
#define REGISTER_BARRIER_TIMER_SERVER( \
set, statName, numConnThreads, trainerId, ...) \
__REGISTER_BARRIER_TIMER_SERVER( \
(set), statName, numConnThreads, trainerId, __VA_ARGS__)
/*
* sensing barrier time distribution for all parallelization threads.
* but time point for barrier performance is set by user.
* eg, with this api, you can get implict barrier point such as the beginning
* time distribution
* for receiving data.
*/
#define REGISTER_BARRIER_TIMER_SERVER_SET( \
set, statName, numConnThreads, trainerId, cur, ...) \
__REGISTER_BARRIER_TIMER_SERVER_SET( \
(set), statName, numConnThreads, trainerId, cur, __VA_ARGS__)
// try to capture time delta from all trainers, such as forwardBackward time
// which implies
// computation fluctuation
#define REGISTER_BARRIER_DELTA_SERVER_SET( \
set, statName, numConnThreads, trainerId, delta, ...) \
__REGISTER_BARRIER_DELTA_SERVER_SET( \
(set), statName, numConnThreads, trainerId, delta, __VA_ARGS__)
#endif // DISABLE_TIMER
}
// namespace paddle
paddle/utils/Stat.cpp
浏览文件 @
ce0d6704
...
...
@@ -97,34 +97,6 @@ std::ostream& operator<<(std::ostream& outPut, const Stat& stat) {
return
outPut
;
}
BarrierStatPtr
StatSet
::
getStat
(
uint16_t
numConnThreads
,
const
std
::
string
&
name
,
BarrierStatType
bType
)
{
{
ReadLockGuard
guard
(
lock_
);
auto
it
=
barrierStatSet_
.
find
(
name
);
if
(
it
!=
barrierStatSet_
.
end
())
{
return
it
->
second
;
}
}
std
::
lock_guard
<
RWLock
>
guard
(
lock_
);
// test again with lock_guard
auto
it
=
barrierStatSet_
.
find
(
name
);
if
(
it
!=
barrierStatSet_
.
end
())
{
return
it
->
second
;
}
BarrierStatPtr
stat
;
if
(
bType
==
BARRIER_END
)
{
stat
=
std
::
make_shared
<
BarrierEndStat
>
(
numConnThreads
,
name
);
}
else
if
(
bType
==
BARRIER_DELTA
)
{
stat
=
std
::
make_shared
<
BarrierDeltaStat
>
(
numConnThreads
,
name
);
}
auto
ret
=
barrierStatSet_
.
insert
(
std
::
make_pair
(
name
,
stat
));
return
ret
.
first
->
second
;
}
void
StatSet
::
printSegTimerStatus
()
{
ReadLockGuard
guard
(
lock_
);
LOG
(
INFO
)
<<
std
::
setiosflags
(
std
::
ios
::
left
)
<<
std
::
setfill
(
' '
)
...
...
@@ -135,46 +107,20 @@ void StatSet::printSegTimerStatus() {
}
}
void
StatSet
::
printBarrierTimerStatus
()
{
ReadLockGuard
guard
(
lock_
);
if
(
barrierStatSet_
.
empty
())
{
return
;
}
// control barrierAbstact in runtime, so enable compliation
LOG
(
INFO
)
<<
std
::
setiosflags
(
std
::
ios
::
left
)
<<
std
::
setfill
(
' '
)
<<
"======= BarrierStatSet status ======"
<<
std
::
endl
;
for
(
auto
&
stat
:
barrierStatSet_
)
{
LOG
(
INFO
)
<<
std
::
setiosflags
(
std
::
ios
::
left
)
<<
std
::
setfill
(
' '
)
<<
*
(
stat
.
second
);
}
}
void
StatSet
::
printAllStatus
()
{
#ifndef PADDLE_DISABLE_TIMER
printSegTimerStatus
();
#endif
printBarrierTimerStatus
();
LOG
(
INFO
)
<<
std
::
setiosflags
(
std
::
ios
::
left
)
<<
"--------------------------------------------------"
<<
std
::
endl
;
}
void
StatSet
::
printStatus
(
const
std
::
string
&
name
)
{
ReadLockGuard
guard
(
lock_
);
auto
iter
=
statSet_
.
find
(
name
);
CHECK
(
iter
!=
statSet_
.
end
())
<<
name
<<
" is not registed in "
<<
name_
;
LOG
(
INFO
)
<<
*
(
iter
->
second
);
}
void
StatSet
::
reset
(
bool
clearRawData
)
{
ReadLockGuard
guard
(
lock_
);
for
(
auto
&
stat
:
statSet_
)
{
stat
.
second
->
reset
();
}
// reset barrierStat
for
(
auto
&
stat
:
barrierStatSet_
)
{
stat
.
second
->
reset
(
clearRawData
);
}
}
void
StatSet
::
setThreadInfo
(
const
std
::
string
&
name
,
bool
flag
)
{
...
...
@@ -184,13 +130,6 @@ void StatSet::setThreadInfo(const std::string& name, bool flag) {
iter
->
second
->
setThreadInfo
(
flag
);
}
void
StatSet
::
deleteStat
(
const
std
::
string
&
name
)
{
std
::
lock_guard
<
RWLock
>
guard
(
lock_
);
auto
iter
=
statSet_
.
find
(
name
);
CHECK
(
iter
!=
statSet_
.
end
())
<<
name
<<
" is not registed in "
<<
name_
;
statSet_
.
erase
(
iter
);
}
StatInfo
::~
StatInfo
()
{
if
(
stat_
)
{
std
::
lock_guard
<
std
::
mutex
>
guard
(
stat_
->
lock_
);
...
...
paddle/utils/Stat.h
浏览文件 @
ce0d6704
...
...
@@ -23,7 +23,6 @@ limitations under the License. */
#include <string>
#include <unordered_map>
#include "BarrierStat.h"
#include "Locks.h"
#include "Logging.h"
#include "ThreadLocal.h"
...
...
@@ -60,12 +59,6 @@ public:
class
Stat
;
typedef
std
::
shared_ptr
<
Stat
>
StatPtr
;
typedef
std
::
shared_ptr
<
BarrierStatBase
>
BarrierStatPtr
;
enum
BarrierStatType
{
BARRIER_END
=
0
,
BARRIER_DELTA
=
1
,
};
class
StatSet
{
public:
...
...
@@ -74,11 +67,8 @@ public:
// print to LOG(INFO)
void
printSegTimerStatus
();
void
printBarrierTimerStatus
();
void
printAllStatus
();
void
printStatus
(
const
std
::
string
&
name
);
StatPtr
getStat
(
const
std
::
string
&
name
)
{
{
ReadLockGuard
guard
(
lock_
);
...
...
@@ -93,12 +83,6 @@ public:
return
ret
.
first
->
second
;
}
BarrierStatPtr
getStat
(
uint16_t
numConnThreads
,
const
std
::
string
&
name
,
BarrierStatType
bType
);
void
deleteStat
(
const
std
::
string
&
name
);
// true for showing stats for each thread
// false for showing stats aggragated over threads
void
setThreadInfo
(
const
std
::
string
&
name
,
bool
flag
);
...
...
@@ -120,7 +104,6 @@ public:
private:
std
::
unordered_map
<
std
::
string
,
StatPtr
>
statSet_
;
std
::
unordered_map
<
std
::
string
,
BarrierStatPtr
>
barrierStatSet_
;
const
std
::
string
name_
;
RWLock
lock_
;
};
...
...
paddle/utils/ThreadLocal.h
浏览文件 @
ce0d6704
...
...
@@ -51,7 +51,7 @@ template <class T>
class
ThreadLocal
{
public:
ThreadLocal
()
{
P
CHECK
(
pthread_key_create
(
&
threadSpecificKey_
,
dataDestructor
)
==
0
);
CHECK
(
pthread_key_create
(
&
threadSpecificKey_
,
dataDestructor
)
==
0
);
}
~
ThreadLocal
()
{
pthread_key_delete
(
threadSpecificKey_
);
}
...
...
@@ -65,7 +65,7 @@ public:
if
(
!
p
&&
createLocal
)
{
p
=
new
T
();
int
ret
=
pthread_setspecific
(
threadSpecificKey_
,
p
);
P
CHECK
(
ret
==
0
);
CHECK
(
ret
==
0
);
}
return
p
;
}
...
...
@@ -79,7 +79,7 @@ public:
if
(
T
*
q
=
get
(
false
))
{
dataDestructor
(
q
);
}
P
CHECK
(
pthread_setspecific
(
threadSpecificKey_
,
p
)
==
0
);
CHECK
(
pthread_setspecific
(
threadSpecificKey_
,
p
)
==
0
);
}
/**
...
...
@@ -112,7 +112,7 @@ private:
template
<
class
T
>
class
ThreadLocalD
{
public:
ThreadLocalD
()
{
P
CHECK
(
pthread_key_create
(
&
threadSpecificKey_
,
NULL
)
==
0
);
}
ThreadLocalD
()
{
CHECK
(
pthread_key_create
(
&
threadSpecificKey_
,
NULL
)
==
0
);
}
~
ThreadLocalD
()
{
pthread_key_delete
(
threadSpecificKey_
);
for
(
auto
t
:
threadMap_
)
{
...
...
@@ -127,7 +127,7 @@ public:
T
*
p
=
(
T
*
)
pthread_getspecific
(
threadSpecificKey_
);
if
(
!
p
)
{
p
=
new
T
();
P
CHECK
(
pthread_setspecific
(
threadSpecificKey_
,
p
)
==
0
);
CHECK
(
pthread_setspecific
(
threadSpecificKey_
,
p
)
==
0
);
updateMap
(
p
);
}
return
p
;
...
...
@@ -141,7 +141,7 @@ public:
if
(
T
*
q
=
(
T
*
)
pthread_getspecific
(
threadSpecificKey_
))
{
dataDestructor
(
q
);
}
P
CHECK
(
pthread_setspecific
(
threadSpecificKey_
,
p
)
==
0
);
CHECK
(
pthread_setspecific
(
threadSpecificKey_
,
p
)
==
0
);
updateMap
(
p
);
}
...
...
python/CMakeLists.txt
浏览文件 @
ce0d6704
...
...
@@ -29,7 +29,7 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/setup.py.in
add_custom_command
(
OUTPUT
${
OUTPUT_DIR
}
/.timestamp
COMMAND env
${
py_env
}
${
PYTHON_EXECUTABLE
}
setup.py bdist_wheel
COMMAND
${
CMAKE_COMMAND
}
-E touch
${
OUTPUT_DIR
}
/.timestamp
DEPENDS gen_proto_py
${
PY_FILES
}
${
external_project_dependencies
}
${
COPY_PADDLE_MASTER
}
)
DEPENDS gen_proto_py
framework_py_proto
${
PY_FILES
}
${
external_project_dependencies
}
${
COPY_PADDLE_MASTER
}
)
add_custom_target
(
paddle_python ALL DEPENDS
${
OUTPUT_DIR
}
/.timestamp
)
...
...
@@ -43,6 +43,7 @@ if (WITH_TESTING)
add_subdirectory
(
paddle/v2/tests
)
add_subdirectory
(
paddle/v2/reader/tests
)
add_subdirectory
(
paddle/v2/plot/tests
)
add_subdirectory
(
paddle/v2/framework/tests
)
endif
()
endif
()
install
(
DIRECTORY
${
PADDLE_PYTHON_PACKAGE_DIR
}
...
...
python/paddle/trainer/config_parser.py
浏览文件 @
ce0d6704
...
...
@@ -1353,7 +1353,8 @@ class LayerBase(object):
device
=
None
,
active_type
=
""
,
drop_rate
=
0.
,
coeff
=
None
):
coeff
=
None
,
error_clipping_threshold
=
None
):
config_assert
(
'@'
not
in
name
,
"layer name: %s contain special character @"
%
name
)
global
g_current_submodel
...
...
@@ -1387,6 +1388,9 @@ class LayerBase(object):
elif
g_default_device
is
not
None
:
self
.
config
.
device
=
g_default_device
if
error_clipping_threshold
is
not
None
:
self
.
config
.
error_clipping_threshold
=
error_clipping_threshold
for
input_index
in
xrange
(
len
(
self
.
inputs
)):
input
=
self
.
inputs
[
input_index
]
input_config
=
None
...
...
@@ -2466,10 +2470,14 @@ class MaxLayer(LayerBase):
trans_type
=
'non-seq'
,
bias
=
False
,
output_max_index
=
None
,
stride
=-
1
,
**
xargs
):
super
(
MaxLayer
,
self
).
__init__
(
name
,
'max'
,
0
,
inputs
=
inputs
,
**
xargs
)
config_assert
(
len
(
self
.
inputs
)
==
1
,
'MaxLayer must have 1 input'
)
if
trans_type
==
'seq'
:
config_assert
(
stride
==
-
1
,
'subseq does not support stride window'
)
self
.
config
.
trans_type
=
trans_type
self
.
config
.
seq_pool_stride
=
stride
for
input_index
in
xrange
(
len
(
self
.
inputs
)):
input_layer
=
self
.
get_input_layer
(
input_index
)
self
.
set_layer_size
(
input_layer
.
size
)
...
...
@@ -2731,11 +2739,15 @@ class AverageLayer(LayerBase):
average_strategy
=
'average'
,
trans_type
=
'non-seq'
,
bias
=
False
,
stride
=-
1
,
**
xargs
):
super
(
AverageLayer
,
self
).
__init__
(
name
,
'average'
,
0
,
inputs
=
inputs
,
**
xargs
)
self
.
config
.
average_strategy
=
average_strategy
if
trans_type
==
'seq'
:
config_assert
(
stride
==
-
1
,
'subseq does not support stride window'
)
self
.
config
.
trans_type
=
trans_type
self
.
config
.
seq_pool_stride
=
stride
config_assert
(
len
(
inputs
)
==
1
,
'AverageLayer must have 1 input'
)
for
input_index
in
xrange
(
len
(
self
.
inputs
)):
input_layer
=
self
.
get_input_layer
(
input_index
)
...
...
@@ -2774,13 +2786,7 @@ class TensorLayer(LayerBase):
@
config_layer
(
'mixed'
)
class
MixedLayer
(
LayerBase
):
def
__init__
(
self
,
name
,
inputs
,
size
=
0
,
bias
=
True
,
error_clipping_threshold
=
None
,
**
xargs
):
def
__init__
(
self
,
name
,
inputs
,
size
=
0
,
bias
=
True
,
**
xargs
):
config_assert
(
inputs
,
'inputs cannot be empty'
)
super
(
MixedLayer
,
self
).
__init__
(
name
,
'mixed'
,
size
,
inputs
=
inputs
,
**
xargs
)
...
...
@@ -2862,9 +2868,6 @@ class MixedLayer(LayerBase):
self
.
config
.
bias_size
=
psize
self
.
create_bias_parameter
(
bias
,
psize
)
if
error_clipping_threshold
is
not
None
:
self
.
config
.
error_clipping_threshold
=
error_clipping_threshold
# like MixedLayer, but no bias parameter
@
config_func
...
...
python/paddle/trainer_config_helpers/layers.py
浏览文件 @
ce0d6704
...
...
@@ -1246,10 +1246,19 @@ def pooling_layer(input,
name
=
None
,
bias_attr
=
None
,
agg_level
=
AggregateLevel
.
TO_NO_SEQUENCE
,
stride
=-
1
,
layer_attr
=
None
):
"""
Pooling layer for sequence inputs, not used for Image.
If stride > 0, this layer slides a window whose size is determined by stride,
and return the pooling value of the window as the output. Thus, a long sequence
will be shorten.
The parameter stride specifies the intervals at which to apply the pooling
operation. Note that for sequence with sub-sequence, the default value
of stride is -1.
The example usage is:
.. code-block:: python
...
...
@@ -1268,6 +1277,8 @@ def pooling_layer(input,
:param pooling_type: Type of pooling, MaxPooling(default), AvgPooling,
SumPooling, SquareRootNPooling.
:type pooling_type: BasePoolingType|None
:param stride: The step size between successive pooling regions.
:type stride: Int
:param bias_attr: Bias parameter attribute. False if no bias.
:type bias_attr: ParameterAttribute|None|False
:param layer_attr: The Extra Attributes for layer, such as dropout.
...
...
@@ -1285,12 +1296,16 @@ def pooling_layer(input,
extra_dict
[
'output_max_index'
]
=
pooling_type
.
output_max_index
extra_dict
.
update
(
ExtraLayerAttribute
.
to_kwargs
(
layer_attr
))
if
agg_level
==
AggregateLevel
.
TO_SEQUENCE
:
assert
stride
==
-
1
Layer
(
name
=
name
,
type
=
pooling_type
.
name
,
inputs
=
[
Input
(
input
.
name
)],
bias
=
ParamAttr
.
to_bias
(
bias_attr
),
trans_type
=
agg_level
,
stride
=
stride
,
**
extra_dict
)
return
LayerOutput
(
...
...
@@ -1552,7 +1567,7 @@ def last_seq(input,
:type name: basestring
:param input: Input layer name.
:type input: LayerOutput
:param stride:
window size
.
:param stride:
The step size between successive pooling regions
.
:type stride: Int
:param layer_attr: extra layer attributes.
:type layer_attr: ExtraLayerAttribute.
...
...
@@ -1608,7 +1623,7 @@ def first_seq(input,
:type name: basestring
:param input: Input layer name.
:type input: LayerOutput
:param stride:
window size
.
:param stride:
The step size between successive pooling regions
.
:type stride: Int
:param layer_attr: extra layer attributes.
:type layer_attr: ExtraLayerAttribute.
...
...
python/paddle/trainer_config_helpers/tests/configs/protostr/test_sequence_pooling.protostr
浏览文件 @
ce0d6704
...
...
@@ -14,6 +14,7 @@ layers {
input_layer_name: "dat_in"
}
trans_type: "seq"
seq_pool_stride: -1
}
layers {
name: "__seq_pooling_1__"
...
...
@@ -24,6 +25,7 @@ layers {
input_layer_name: "dat_in"
}
trans_type: "non-seq"
seq_pool_stride: -1
}
layers {
name: "__seq_pooling_2__"
...
...
@@ -35,6 +37,7 @@ layers {
}
average_strategy: "average"
trans_type: "seq"
seq_pool_stride: -1
}
layers {
name: "__seq_pooling_3__"
...
...
@@ -46,6 +49,7 @@ layers {
}
average_strategy: "average"
trans_type: "non-seq"
seq_pool_stride: -1
}
layers {
name: "__seq_pooling_4__"
...
...
@@ -57,6 +61,7 @@ layers {
}
average_strategy: "sum"
trans_type: "seq"
seq_pool_stride: -1
}
layers {
name: "__seq_pooling_5__"
...
...
@@ -68,6 +73,7 @@ layers {
}
average_strategy: "sum"
trans_type: "non-seq"
seq_pool_stride: -1
}
layers {
name: "__seq_pooling_6__"
...
...
@@ -77,8 +83,44 @@ layers {
inputs {
input_layer_name: "dat_in"
}
trans_type: "non-seq"
seq_pool_stride: 5
}
layers {
name: "__seq_pooling_7__"
type: "average"
size: 100
active_type: ""
inputs {
input_layer_name: "dat_in"
}
average_strategy: "average"
trans_type: "non-seq"
seq_pool_stride: 5
}
layers {
name: "__seq_pooling_8__"
type: "average"
size: 100
active_type: ""
inputs {
input_layer_name: "dat_in"
}
average_strategy: "sum"
trans_type: "non-seq"
seq_pool_stride: 5
}
layers {
name: "__seq_pooling_9__"
type: "max"
size: 100
active_type: ""
inputs {
input_layer_name: "dat_in"
}
output_max_index: true
trans_type: "non-seq"
seq_pool_stride: -1
}
input_layer_names: "dat_in"
output_layer_names: "__seq_pooling_0__"
...
...
@@ -88,6 +130,9 @@ output_layer_names: "__seq_pooling_3__"
output_layer_names: "__seq_pooling_4__"
output_layer_names: "__seq_pooling_5__"
output_layer_names: "__seq_pooling_6__"
output_layer_names: "__seq_pooling_7__"
output_layer_names: "__seq_pooling_8__"
output_layer_names: "__seq_pooling_9__"
sub_models {
name: "root"
layer_names: "dat_in"
...
...
@@ -98,6 +143,9 @@ sub_models {
layer_names: "__seq_pooling_4__"
layer_names: "__seq_pooling_5__"
layer_names: "__seq_pooling_6__"
layer_names: "__seq_pooling_7__"
layer_names: "__seq_pooling_8__"
layer_names: "__seq_pooling_9__"
input_layer_names: "dat_in"
output_layer_names: "__seq_pooling_0__"
output_layer_names: "__seq_pooling_1__"
...
...
@@ -106,6 +154,9 @@ sub_models {
output_layer_names: "__seq_pooling_4__"
output_layer_names: "__seq_pooling_5__"
output_layer_names: "__seq_pooling_6__"
output_layer_names: "__seq_pooling_7__"
output_layer_names: "__seq_pooling_8__"
output_layer_names: "__seq_pooling_9__"
is_recurrent_layer_group: false
}
python/paddle/trainer_config_helpers/tests/configs/test_sequence_pooling.py
浏览文件 @
ce0d6704
...
...
@@ -14,6 +14,14 @@ for pt in POOL_TYPE:
for
al
in
AGG_LEVEL
:
opts
.
append
(
pooling_layer
(
input
=
din
,
agg_level
=
al
,
pooling_type
=
pt
()))
for
pt
in
POOL_TYPE
:
opts
.
append
(
pooling_layer
(
input
=
din
,
agg_level
=
AggregateLevel
.
TO_NO_SEQUENCE
,
pooling_type
=
pt
(),
stride
=
5
))
opts
.
append
(
pooling_layer
(
input
=
din
,
pooling_type
=
MaxPooling
(
output_max_index
=
True
)))
...
...
python/paddle/v2/framework/__init__.py
0 → 100644
浏览文件 @
ce0d6704
__all__
=
[
'proto'
]
python/paddle/v2/framework/tests/CMakeLists.txt
0 → 100644
浏览文件 @
ce0d6704
add_python_test
(
test_framework test_protobuf.py
)
python/paddle/v2/framework/tests/test_protobuf.py
0 → 100644
浏览文件 @
ce0d6704
import
paddle.v2.framework.proto.op_proto_pb2
import
paddle.v2.framework.proto.attr_type_pb2
import
unittest
class
TestFrameworkProto
(
unittest
.
TestCase
):
def
test_all
(
self
):
op_proto_lib
=
paddle
.
v2
.
framework
.
proto
.
op_proto_pb2
attr_type_lib
=
paddle
.
v2
.
framework
.
proto
.
attr_type_pb2
op_proto
=
op_proto_lib
.
OpProto
()
ipt0
=
op_proto
.
inputs
.
add
()
ipt0
.
name
=
"a"
ipt0
.
comment
=
"the input of cosine op"
ipt1
=
op_proto
.
inputs
.
add
()
ipt1
.
name
=
"b"
ipt1
.
comment
=
"the other input of cosine op"
opt
=
op_proto
.
outputs
.
add
()
opt
.
name
=
"output"
opt
.
comment
=
"the output of cosine op"
op_proto
.
comment
=
"cosine op, output = scale*cos(a, b)"
attr
=
op_proto
.
attrs
.
add
()
attr
.
name
=
"scale"
attr
.
comment
=
"scale of cosine op"
attr
.
type
=
attr_type_lib
.
FLOAT
op_proto
.
type
=
"cos"
self
.
assertTrue
(
op_proto
.
IsInitialized
())
python/paddle/v2/master/client.py
浏览文件 @
ce0d6704
...
...
@@ -26,14 +26,22 @@ class client(object):
holder
[
idx
]
=
c_ptr
lib
.
paddle_set_dataset
(
self
.
c
,
holder
,
len
(
paths
))
# return format: (record, errno)
# errno = 0: ok
# < 0: error
def
next_record
(
self
):
p
=
ctypes
.
c_char_p
()
ret
=
ctypes
.
pointer
(
p
)
size
=
lib
.
paddle_next_record
(
self
.
c
,
ret
)
if
size
<
0
:
# Error
return
None
,
size
if
size
==
0
:
# Empty record
return
""
return
""
,
0
record
=
ret
.
contents
.
value
[:
size
]
# Memory created from C should be freed.
lib
.
mem_free
(
ret
.
contents
)
return
record
return
record
,
0
python/paddle/v2/reader/creator.py
浏览文件 @
ce0d6704
...
...
@@ -57,17 +57,20 @@ def text_file(path):
return
reader
def
recordio
(
path
):
def
recordio
_local
(
paths
,
buf_size
=
100
):
"""
Creates a data reader that outputs record one one by one from given recordio file
:path: path of recordio file
:returns: data reader of recordio file
Creates a data reader from given RecordIO file paths separated by ",",
glob pattern is supported.
:path: path of recordio files.
:returns: data reader of recordio files.
"""
import
recordio
as
rec
import
paddle.v2.reader.decorator
as
dec
def
reader
():
f
=
rec
.
reader
(
path
)
a
=
','
.
join
(
paths
)
f
=
rec
.
reader
(
a
)
while
True
:
r
=
f
.
read
()
if
r
is
None
:
...
...
@@ -75,4 +78,38 @@ def recordio(path):
yield
r
f
.
close
()
return
dec
.
buffered
(
reader
,
buf_size
)
def
recordio
(
paths
,
buf_size
=
100
):
"""
Creates a data reader that outputs record one one by one
from given local or cloud recordio path.
:path: path of recordio files.
:returns: data reader of recordio files.
"""
import
os
import
paddle.v2.master.client
as
cloud
if
"KUBERNETES_SERVICE_HOST"
not
in
os
.
environ
.
keys
():
return
recordio_local
(
paths
)
host_name
=
"MASTER_SERVICE_HOST"
if
host_name
not
in
os
.
environ
.
keys
():
raise
Exception
(
'not find '
+
host_name
+
' in environ.'
)
addr
=
os
.
environ
(
host
)
def
reader
():
c
=
cloud
(
addr
,
buf_size
)
c
.
set_dataset
(
paths
)
while
True
:
r
,
err
=
client
.
next_record
()
if
err
<
0
:
break
yield
r
c
.
close
()
return
reader
python/paddle/v2/reader/tests/creator_test.py
浏览文件 @
ce0d6704
...
...
@@ -38,7 +38,7 @@ class TestRecordIO(unittest.TestCase):
def
test_recordio
(
self
):
path
=
os
.
path
.
join
(
os
.
path
.
dirname
(
__file__
),
"test_recordio_creator.dat"
)
reader
=
paddle
.
v2
.
reader
.
creator
.
recordio
(
path
)
reader
=
paddle
.
v2
.
reader
.
creator
.
recordio
(
[
path
]
)
for
idx
,
r
in
enumerate
(
reader
()):
self
.
assertSequenceEqual
(
r
,
str
(
idx
))
...
...
python/setup.py.in
浏览文件 @
ce0d6704
...
...
@@ -9,7 +9,9 @@ packages=['paddle',
'paddle.v2.dataset',
'paddle.v2.reader',
'paddle.v2.master',
'paddle.v2.plot']
'paddle.v2.plot',
'paddle.v2.framework',
'paddle.v2.framework.proto']
setup_requires=["requests",
"numpy",
...
...
@@ -27,8 +29,11 @@ setup(name='paddle',
description='Parallel Distributed Deep Learning',
install_requires=setup_requires,
packages=packages,
package_data={'paddle.v2.master': ['
${paddle_master_LIB_NAME}
'], },
package_data={'paddle.v2.master': ['
libpaddle_master.so
'], },
package_dir={
'': '${CMAKE_CURRENT_SOURCE_DIR}'
'': '${CMAKE_CURRENT_SOURCE_DIR}',
# The paddle.v2.framework.proto will be generated while compiling.
# So that package points to other directory.
'paddle.v2.framework.proto': '${CMAKE_BINARY_DIR}/paddle/framework'
},
)
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录