Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
Paddle
提交
0fcdae84
P
Paddle
项目概览
BaiXuePrincess
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
0fcdae84
编写于
3月 12, 2019
作者:
Q
Qiao Longfei
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add communicator_test
上级
9b74707c
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
172 addition
and
62 deletion
+172
-62
paddle/fluid/operators/distributed/CMakeLists.txt
paddle/fluid/operators/distributed/CMakeLists.txt
+1
-0
paddle/fluid/operators/distributed/communicator.cc
paddle/fluid/operators/distributed/communicator.cc
+0
-62
paddle/fluid/operators/distributed/communicator.h
paddle/fluid/operators/distributed/communicator.h
+61
-0
paddle/fluid/operators/distributed/communicator_test.cc
paddle/fluid/operators/distributed/communicator_test.cc
+110
-0
未找到文件。
paddle/fluid/operators/distributed/CMakeLists.txt
浏览文件 @
0fcdae84
...
...
@@ -55,6 +55,7 @@ cc_library(parameter_prefetch SRCS parameter_prefetch.cc DEPS sendrecvop_rpc mem
cc_library
(
parameter_send SRCS parameter_send.cc DEPS sendrecvop_rpc memory
)
cc_library
(
parameter_recv SRCS parameter_recv.cc DEPS sendrecvop_rpc memory
)
cc_library
(
communicator SRCS communicator.cc DEPS scope selected_rows tensor variable_helper selected_rows_functor simple_threadpool parameter_send parameter_recv
)
cc_test
(
communicator_test SRCS communicator_test.cc DEPS communicator
)
if
(
WITH_GPU
)
cc_test
(
collective_server_test SRCS collective_server_test.cc
DEPS sendrecvop_rpc executor
${
RPC_DEPS
}
...
...
paddle/fluid/operators/distributed/communicator.cc
浏览文件 @
0fcdae84
...
...
@@ -24,9 +24,6 @@ limitations under the License. */
#include "paddle/fluid/framework/variable_helper.h"
#include "paddle/fluid/operators/distributed/parameter_recv.h"
#include "paddle/fluid/operators/distributed/parameter_send.h"
#include "paddle/fluid/operators/math/math_function.h"
#include "paddle/fluid/operators/math/selected_rows_functor.h"
#include "paddle/fluid/platform/device_context.h"
DEFINE_bool
(
communicator_independent_recv_thread
,
true
,
"use an independent to recv vars from parameter server"
);
...
...
@@ -43,71 +40,12 @@ namespace paddle {
namespace
operators
{
namespace
distributed
{
template
<
typename
T
,
int
MajorType
=
Eigen
::
RowMajor
,
typename
IndexType
=
Eigen
::
DenseIndex
>
using
EigenVector
=
framework
::
EigenVector
<
T
,
MajorType
,
IndexType
>
;
inline
double
GetCurrentUS
()
{
struct
timeval
time
;
gettimeofday
(
&
time
,
NULL
);
return
1e+6
*
time
.
tv_sec
+
time
.
tv_usec
;
}
static
inline
void
MergeVars
(
const
std
::
string
&
var_name
,
const
std
::
vector
<
std
::
shared_ptr
<
Variable
>>
&
vars
,
Scope
*
scope
)
{
PADDLE_ENFORCE
(
!
vars
.
empty
(),
"should have value to merge!"
);
auto
cpu_place
=
platform
::
CPUPlace
();
auto
&
var0
=
vars
[
0
];
auto
*
out_var
=
scope
->
Var
(
var_name
);
if
(
var0
->
IsType
<
framework
::
LoDTensor
>
())
{
VLOG
(
3
)
<<
"merge "
<<
var_name
<<
" LoDTensor"
<<
var0
->
Get
<
framework
::
LoDTensor
>
().
dims
();
// init output tensor
auto
*
out_t
=
out_var
->
GetMutable
<
framework
::
LoDTensor
>
();
auto
numel
=
out_t
->
numel
();
// check the input dims
for
(
auto
&
var
:
vars
)
{
auto
&
var_t
=
var
->
Get
<
framework
::
LoDTensor
>
();
PADDLE_ENFORCE_EQ
(
var_t
.
numel
(),
numel
,
"should have the same dims"
);
}
// set output tensor to 0.
auto
cpu_ctx
=
paddle
::
platform
::
CPUDeviceContext
();
math
::
SetConstant
<
paddle
::
platform
::
CPUDeviceContext
,
float
>
constant_functor
;
constant_functor
(
cpu_ctx
,
out_t
,
static_cast
<
float
>
(
0
));
// sum all vars to out
auto
result
=
EigenVector
<
float
>::
Flatten
(
*
out_t
);
for
(
auto
&
var
:
vars
)
{
auto
&
in_t
=
var
->
Get
<
framework
::
LoDTensor
>
();
auto
in
=
EigenVector
<
float
>::
Flatten
(
in_t
);
result
.
device
(
*
cpu_ctx
.
eigen_device
())
=
result
+
in
;
}
}
else
if
(
var0
->
IsType
<
framework
::
SelectedRows
>
())
{
auto
&
slr0
=
var0
->
Get
<
framework
::
SelectedRows
>
();
auto
*
out_slr
=
out_var
->
GetMutable
<
framework
::
SelectedRows
>
();
out_slr
->
mutable_rows
()
->
clear
();
out_slr
->
mutable_value
()
->
mutable_data
<
float
>
({{}},
cpu_place
);
std
::
vector
<
const
paddle
::
framework
::
SelectedRows
*>
inputs
;
inputs
.
reserve
(
vars
.
size
());
for
(
auto
&
var
:
vars
)
{
inputs
.
push_back
(
&
var
->
Get
<
framework
::
SelectedRows
>
());
}
math
::
scatter
::
MergeAdd
<
paddle
::
platform
::
CPUDeviceContext
,
float
>
merge_add
;
auto
dev_ctx
=
paddle
::
platform
::
CPUDeviceContext
();
merge_add
(
dev_ctx
,
inputs
,
out_slr
,
false
);
VLOG
(
3
)
<<
"merge "
<<
var_name
<<
" SelectedRows height: "
<<
slr0
.
height
()
<<
" dims: "
<<
slr0
.
value
().
dims
();
}
else
{
PADDLE_THROW
(
"unsupported var type!"
);
}
}
std
::
unique_ptr
<
Communicator
>
Communicator
::
communicator_
(
nullptr
);
std
::
once_flag
Communicator
::
init_flag_
;
...
...
paddle/fluid/operators/distributed/communicator.h
浏览文件 @
0fcdae84
...
...
@@ -24,6 +24,8 @@ limitations under the License. */
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/operators/distributed/rpc_common.h"
#include "paddle/fluid/operators/math/math_function.h"
#include "paddle/fluid/operators/math/selected_rows_functor.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/platform/place.h"
...
...
@@ -91,6 +93,65 @@ class BlockingQueue {
std
::
condition_variable
cv_
;
};
template
<
typename
T
,
int
MajorType
=
Eigen
::
RowMajor
,
typename
IndexType
=
Eigen
::
DenseIndex
>
using
EigenVector
=
framework
::
EigenVector
<
T
,
MajorType
,
IndexType
>
;
inline
void
MergeVars
(
const
std
::
string
&
var_name
,
const
std
::
vector
<
std
::
shared_ptr
<
Variable
>>&
vars
,
Scope
*
scope
)
{
PADDLE_ENFORCE
(
!
vars
.
empty
(),
"should have value to merge!"
);
auto
cpu_place
=
platform
::
CPUPlace
();
auto
&
var0
=
vars
[
0
];
auto
*
out_var
=
scope
->
Var
(
var_name
);
if
(
var0
->
IsType
<
framework
::
LoDTensor
>
())
{
auto
dims
=
var0
->
Get
<
framework
::
LoDTensor
>
().
dims
();
VLOG
(
3
)
<<
"merge "
<<
var_name
<<
" LoDTensor "
<<
dims
;
// init output tensor
auto
*
out_t
=
out_var
->
GetMutable
<
framework
::
LoDTensor
>
();
out_t
->
mutable_data
<
float
>
(
dims
,
cpu_place
);
// check the input dims
for
(
auto
&
var
:
vars
)
{
auto
&
var_t
=
var
->
Get
<
framework
::
LoDTensor
>
();
PADDLE_ENFORCE_EQ
(
var_t
.
dims
(),
dims
,
"should have the same dims"
);
}
// set output tensor to 0.
auto
cpu_ctx
=
paddle
::
platform
::
CPUDeviceContext
();
math
::
SetConstant
<
paddle
::
platform
::
CPUDeviceContext
,
float
>
constant_functor
;
constant_functor
(
cpu_ctx
,
out_t
,
static_cast
<
float
>
(
0
));
// sum all vars to out
auto
result
=
EigenVector
<
float
>::
Flatten
(
*
out_t
);
for
(
auto
&
var
:
vars
)
{
auto
&
in_t
=
var
->
Get
<
framework
::
LoDTensor
>
();
auto
in
=
EigenVector
<
float
>::
Flatten
(
in_t
);
result
.
device
(
*
cpu_ctx
.
eigen_device
())
=
result
+
in
;
}
}
else
if
(
var0
->
IsType
<
framework
::
SelectedRows
>
())
{
auto
&
slr0
=
var0
->
Get
<
framework
::
SelectedRows
>
();
auto
*
out_slr
=
out_var
->
GetMutable
<
framework
::
SelectedRows
>
();
out_slr
->
mutable_rows
()
->
clear
();
out_slr
->
mutable_value
()
->
mutable_data
<
float
>
({{}},
cpu_place
);
std
::
vector
<
const
paddle
::
framework
::
SelectedRows
*>
inputs
;
inputs
.
reserve
(
vars
.
size
());
for
(
auto
&
var
:
vars
)
{
inputs
.
push_back
(
&
var
->
Get
<
framework
::
SelectedRows
>
());
}
math
::
scatter
::
MergeAdd
<
paddle
::
platform
::
CPUDeviceContext
,
float
>
merge_add
;
auto
dev_ctx
=
paddle
::
platform
::
CPUDeviceContext
();
merge_add
(
dev_ctx
,
inputs
,
out_slr
,
false
);
VLOG
(
3
)
<<
"merge "
<<
var_name
<<
" SelectedRows height: "
<<
slr0
.
height
()
<<
" dims: "
<<
slr0
.
value
().
dims
();
}
else
{
PADDLE_THROW
(
"unsupported var type!"
);
}
}
using
RpcCtxMap
=
std
::
unordered_map
<
std
::
string
,
RpcContext
>
;
class
Communicator
{
...
...
paddle/fluid/operators/distributed/communicator_test.cc
0 → 100644
浏览文件 @
0fcdae84
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <glog/logging.h>
#include <gtest/gtest.h>
#include <algorithm>
#include <memory>
#include <vector>
#include "paddle/fluid/operators/distributed/communicator.h"
namespace
paddle
{
namespace
operators
{
namespace
distributed
{
using
LoDTensor
=
framework
::
LoDTensor
;
using
SelectedRows
=
framework
::
SelectedRows
;
TEST
(
communicator
,
merge_lod_tensors
)
{
auto
cpu_place
=
platform
::
CPUPlace
();
auto
dims
=
framework
::
make_ddim
({
2
,
3
});
std
::
vector
<
std
::
shared_ptr
<
framework
::
Variable
>>
in_vars
;
float
out_value
=
0
;
for
(
auto
i
=
0
;
i
<
10
;
++
i
)
{
auto
var
=
std
::
make_shared
<
Variable
>
();
in_vars
.
emplace_back
(
var
);
auto
*
tensor
=
var
->
GetMutable
<
LoDTensor
>
();
auto
*
data
=
tensor
->
mutable_data
<
float
>
(
dims
,
cpu_place
);
for
(
auto
j
=
0
;
j
<
tensor
->
numel
();
++
j
)
{
data
[
j
]
=
static_cast
<
float
>
(
i
);
}
out_value
+=
static_cast
<
float
>
(
i
);
}
const
std
::
string
out_name
=
"Out"
;
std
::
unique_ptr
<
framework
::
Scope
>
scope
;
scope
.
reset
(
new
framework
::
Scope
());
scope
->
Var
(
out_name
);
for
(
auto
i
=
0
;
i
<
10
;
++
i
)
{
MergeVars
(
out_name
,
in_vars
,
scope
.
get
());
}
auto
&
out_tensor
=
scope
->
FindVar
(
out_name
)
->
Get
<
LoDTensor
>
();
auto
*
out_data
=
out_tensor
.
data
<
float
>
();
ASSERT_EQ
(
out_tensor
.
dims
(),
dims
);
for
(
auto
i
=
0
;
i
<
out_tensor
.
numel
();
++
i
)
{
ASSERT_EQ
(
out_data
[
i
],
out_value
);
}
}
TEST
(
communicator
,
merge_selected_rows
)
{
auto
cpu_place
=
platform
::
CPUPlace
();
int64_t
width
=
10
;
std
::
vector
<
std
::
shared_ptr
<
framework
::
Variable
>>
in_vars
;
const
int64_t
height
=
100
;
for
(
auto
i
=
0
;
i
<
10
;
++
i
)
{
std
::
vector
<
int64_t
>
rows
;
for
(
auto
k
=
0
;
k
<=
i
;
++
k
)
{
rows
.
push_back
(
k
);
}
auto
var
=
std
::
make_shared
<
Variable
>
();
in_vars
.
emplace_back
(
var
);
auto
*
slr
=
var
->
GetMutable
<
SelectedRows
>
();
slr
->
set_height
(
height
);
slr
->
set_rows
(
rows
);
auto
dims
=
framework
::
make_ddim
({
static_cast
<
int64_t
>
(
rows
.
size
()),
width
});
auto
*
data
=
slr
->
mutable_value
()
->
mutable_data
<
float
>
(
dims
,
cpu_place
);
for
(
auto
i
=
0
;
i
<
rows
.
size
();
++
i
)
{
for
(
auto
j
=
0
;
j
<
width
;
++
j
)
{
data
[
i
*
width
+
j
]
=
static_cast
<
float
>
(
rows
[
i
]);
}
}
}
const
std
::
string
out_name
=
"Out"
;
std
::
unique_ptr
<
framework
::
Scope
>
scope
;
scope
.
reset
(
new
framework
::
Scope
());
scope
->
Var
(
out_name
);
for
(
auto
i
=
0
;
i
<
10
;
++
i
)
{
MergeVars
(
out_name
,
in_vars
,
scope
.
get
());
}
auto
&
out_slr
=
scope
->
FindVar
(
out_name
)
->
Get
<
SelectedRows
>
();
auto
&
out_t
=
out_slr
.
value
();
auto
*
out_data
=
out_t
.
data
<
float
>
();
ASSERT_EQ
(
out_t
.
dims
(),
framework
::
make_ddim
({
10
,
width
}));
std
::
vector
<
float
>
out_values
;
out_values
.
reserve
(
10
);
for
(
auto
i
=
0
;
i
<
10
;
++
i
)
{
out_values
.
push_back
(
static_cast
<
float
>
(
i
*
(
10
-
i
)));
}
for
(
auto
i
=
0
;
i
<
out_slr
.
rows
().
size
();
++
i
)
{
ASSERT_EQ
(
out_slr
.
rows
()[
i
],
i
);
for
(
auto
j
=
0
;
j
<
width
;
++
j
)
{
ASSERT_EQ
(
out_data
[
i
*
width
+
j
],
out_values
[
i
]);
}
}
}
}
// namespace distributed
}
// namespace operators
}
// namespace paddle
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录