Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
机器未来
Paddle
提交
0c321fe3
P
Paddle
项目概览
机器未来
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
0c321fe3
编写于
4月 12, 2018
作者:
_青葱
浏览文件
操作
浏览文件
下载
差异文件
Merge branch develop
上级
ac78cc04
4c55a602
变更
23
隐藏空白更改
内联
并排
Showing
23 changed file
with
504 addition
and
150 deletion
+504
-150
paddle/fluid/framework/block_desc.h
paddle/fluid/framework/block_desc.h
+1
-1
paddle/fluid/framework/details/computation_op_handle.cc
paddle/fluid/framework/details/computation_op_handle.cc
+3
-1
paddle/fluid/framework/details/fetch_op_handle.cc
paddle/fluid/framework/details/fetch_op_handle.cc
+7
-1
paddle/fluid/framework/details/op_handle_base.h
paddle/fluid/framework/details/op_handle_base.h
+2
-0
paddle/fluid/framework/details/ssa_graph_executor.h
paddle/fluid/framework/details/ssa_graph_executor.h
+3
-1
paddle/fluid/framework/details/threaded_ssa_graph_executor.cc
...le/fluid/framework/details/threaded_ssa_graph_executor.cc
+0
-30
paddle/fluid/framework/details/threaded_ssa_graph_executor.h
paddle/fluid/framework/details/threaded_ssa_graph_executor.h
+0
-3
paddle/fluid/framework/operator.cc
paddle/fluid/framework/operator.cc
+9
-4
paddle/fluid/framework/parallel_executor.cc
paddle/fluid/framework/parallel_executor.cc
+39
-8
paddle/fluid/operators/concat_op.cc
paddle/fluid/operators/concat_op.cc
+5
-1
paddle/fluid/operators/detail/grpc_server.cc
paddle/fluid/operators/detail/grpc_server.cc
+1
-0
paddle/fluid/operators/listen_and_serv_op.cc
paddle/fluid/operators/listen_and_serv_op.cc
+29
-16
paddle/fluid/operators/listen_and_serv_op.h
paddle/fluid/operators/listen_and_serv_op.h
+2
-0
paddle/fluid/operators/lookup_table_op.cc
paddle/fluid/operators/lookup_table_op.cc
+3
-0
paddle/fluid/operators/prefetch_op.cc
paddle/fluid/operators/prefetch_op.cc
+4
-4
paddle/fluid/operators/send_recv_op_test.cc
paddle/fluid/operators/send_recv_op_test.cc
+14
-12
paddle/fluid/operators/send_vars_op.cc
paddle/fluid/operators/send_vars_op.cc
+2
-2
paddle/fluid/operators/sgd_op.cc
paddle/fluid/operators/sgd_op.cc
+2
-2
paddle/fluid/operators/split_ids_op.cc
paddle/fluid/operators/split_ids_op.cc
+8
-6
paddle/fluid/operators/split_ids_op.h
paddle/fluid/operators/split_ids_op.h
+49
-21
paddle/fluid/operators/sum_op.cc
paddle/fluid/operators/sum_op.cc
+6
-1
python/paddle/fluid/distribute_transpiler.py
python/paddle/fluid/distribute_transpiler.py
+309
-34
python/paddle/fluid/layers/nn.py
python/paddle/fluid/layers/nn.py
+6
-2
未找到文件。
paddle/fluid/framework/block_desc.h
浏览文件 @
0c321fe3
...
...
@@ -92,7 +92,7 @@ class BlockDesc {
/*
* Remove Op and its input/output variables.
* Note that for either input or ouput variable, if it is also an input or
* Note that for either input or ou
t
put variable, if it is also an input or
* output variable of other ops, we should remain it.
*/
void
RemoveOp
(
size_t
s
,
size_t
e
);
...
...
paddle/fluid/framework/details/computation_op_handle.cc
浏览文件 @
0c321fe3
...
...
@@ -14,6 +14,8 @@
#include "paddle/fluid/framework/details/computation_op_handle.h"
#include <string>
namespace
paddle
{
namespace
framework
{
namespace
details
{
...
...
@@ -33,7 +35,7 @@ void ComputationOpHandle::RunImpl() {
}
}
op_
->
Run
(
*
scope_
->
FindVar
(
"@TMP_SCOPE@"
)
->
Get
<
Scope
*>
(),
place_
);
op_
->
Run
(
*
scope_
->
FindVar
(
kLocalExecScopeName
)
->
Get
<
Scope
*>
(),
place_
);
}
std
::
string
ComputationOpHandle
::
Name
()
const
{
return
op_
->
Type
();
}
...
...
paddle/fluid/framework/details/fetch_op_handle.cc
浏览文件 @
0c321fe3
...
...
@@ -14,6 +14,9 @@
#include "paddle/fluid/framework/details/fetch_op_handle.h"
#include <string>
#include <vector>
namespace
paddle
{
namespace
framework
{
namespace
details
{
...
...
@@ -57,7 +60,10 @@ void FetchOpHandle::RunImpl() {
for
(
size_t
i
=
0
;
i
<
scopes
.
size
();
++
i
)
{
auto
&
scope
=
scopes
[
i
];
auto
&
t
=
scope
->
FindVar
(
var_name
)
->
Get
<
framework
::
LoDTensor
>
();
auto
&
t
=
scope
->
FindVar
(
kLocalExecScopeName
)
->
Get
<
Scope
*>
()
->
FindVar
(
var_name
)
->
Get
<
framework
::
LoDTensor
>
();
if
(
platform
::
is_gpu_place
(
var
->
place_
))
{
#ifdef PADDLE_WITH_CUDA
TensorCopy
(
t
,
cpu
,
*
dev_ctxes_
[
t
.
place
()],
&
tensors_
[
i
]);
...
...
paddle/fluid/framework/details/op_handle_base.h
浏览文件 @
0c321fe3
...
...
@@ -24,6 +24,8 @@ namespace paddle {
namespace
framework
{
namespace
details
{
constexpr
char
kLocalExecScopeName
[]
=
"@LCOAL_SCOPE@"
;
class
OpHandleBase
{
private:
DISABLE_COPY_AND_ASSIGN
(
OpHandleBase
);
...
...
paddle/fluid/framework/details/ssa_graph_executor.h
浏览文件 @
0c321fe3
...
...
@@ -15,13 +15,15 @@
#pragma once
#include <memory>
#include <string>
#include <vector>
#include "paddle/fluid/framework/details/ssa_graph.h"
#include "paddle/fluid/framework/feed_fetch_type.h"
namespace
paddle
{
namespace
framework
{
namespace
details
{
class
SSAGraphExecutor
{
DISABLE_COPY_AND_ASSIGN
(
SSAGraphExecutor
);
...
...
paddle/fluid/framework/details/threaded_ssa_graph_executor.cc
浏览文件 @
0c321fe3
...
...
@@ -136,12 +136,6 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
ready_ops
.
clear
();
};
// Create local scopes.
for
(
auto
&
scope
:
local_scopes_
)
{
auto
&
local_scope
=
scope
->
NewScope
();
*
scope
->
Var
(
"@TMP_SCOPE@"
)
->
GetMutable
<
Scope
*>
()
=
&
local_scope
;
}
// Step 3. Execution
while
(
!
pending_vars
.
empty
()
||
!
ready_ops
.
empty
()
||
!
delayed_ops
.
empty
())
{
// 1. Run All Ready ops
...
...
@@ -189,34 +183,10 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
PADDLE_ENFORCE
(
ready_ops
.
empty
());
PADDLE_ENFORCE
(
delayed_ops
.
empty
());
PADDLE_ENFORCE
(
blocked_by_delayed_ops
.
empty
());
++
computation_count_
;
auto
sync_computation
=
[
&
]
{
computation_count_
=
0
;
// Wait All computational streams
for
(
auto
p
:
this
->
places_
)
{
platform
::
DeviceContextPool
::
Instance
().
Get
(
p
)
->
Wait
();
}
for
(
auto
&
scope
:
local_scopes_
)
{
scope
->
DropKids
();
}
};
// Wait FetchOps.
if
(
!
fetch_ops
.
empty
())
{
fetch_ops
.
clear
();
sync_computation
();
}
if
(
computation_count_
==
max_async_computation
)
{
sync_computation
();
}
// NOTE: the temp scope can be dropped lazily if needed.
// Drop tmp scopes;
for
(
auto
&
scope
:
local_scopes_
)
{
auto
&
kid
=
*
scope
->
Var
(
"@TMP_SCOPE@"
)
->
GetMutable
<
Scope
*>
();
kid
=
nullptr
;
}
return
fetch_data
;
...
...
paddle/fluid/framework/details/threaded_ssa_graph_executor.h
浏览文件 @
0c321fe3
...
...
@@ -99,9 +99,6 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor {
std
::
unique_ptr
<
platform
::
EnforceNotMet
>
exception_
;
std
::
atomic
<
int
>
running_ops_
;
bool
allow_op_delay_
;
size_t
computation_count_
{
0
};
size_t
max_async_computation
{
100
};
};
}
// namespace details
...
...
paddle/fluid/framework/operator.cc
浏览文件 @
0c321fe3
...
...
@@ -46,7 +46,8 @@ proto::VarType::Type GetDataTypeOfVar(const Variable* var) {
}
}
static
DDim
GetDims
(
const
Scope
&
scope
,
const
std
::
string
&
name
)
{
static
DDim
GetDims
(
const
Scope
&
scope
,
const
std
::
string
&
name
,
bool
get_actual_dim
=
false
)
{
Variable
*
var
=
scope
.
FindVar
(
name
);
if
(
var
==
nullptr
)
{
return
DDim
({
-
1
});
...
...
@@ -55,7 +56,11 @@ static DDim GetDims(const Scope& scope, const std::string& name) {
if
(
var
->
IsType
<
LoDTensor
>
())
{
return
var
->
Get
<
LoDTensor
>
().
dims
();
}
else
if
(
var
->
IsType
<
SelectedRows
>
())
{
return
var
->
Get
<
SelectedRows
>
().
GetCompleteDims
();
if
(
get_actual_dim
)
{
return
var
->
Get
<
SelectedRows
>
().
value
().
dims
();
}
else
{
return
var
->
Get
<
SelectedRows
>
().
GetCompleteDims
();
}
}
else
{
return
DDim
({
-
1
});
}
...
...
@@ -129,7 +134,7 @@ std::string OperatorBase::DebugStringEx(const Scope* scope) const {
for
(
size_t
i
=
0
;
i
<
input
.
second
.
size
();
++
i
)
{
ss
<<
input
.
second
[
i
];
if
(
scope
)
{
ss
<<
"["
<<
GetDims
(
*
scope
,
input
.
second
[
i
])
<<
"]"
;
ss
<<
"["
<<
GetDims
(
*
scope
,
input
.
second
[
i
]
,
true
)
<<
"]"
;
ss
<<
"("
<<
GetLoD
(
*
scope
,
input
.
second
[
i
])
<<
")"
;
}
if
(
i
!=
input
.
second
.
size
()
-
1
)
{
...
...
@@ -149,7 +154,7 @@ std::string OperatorBase::DebugStringEx(const Scope* scope) const {
for
(
size_t
i
=
0
;
i
<
output
.
second
.
size
();
++
i
)
{
ss
<<
output
.
second
[
i
];
if
(
scope
)
{
ss
<<
"["
<<
GetDims
(
*
scope
,
output
.
second
[
i
])
<<
"]"
;
ss
<<
"["
<<
GetDims
(
*
scope
,
output
.
second
[
i
]
,
true
)
<<
"]"
;
ss
<<
"("
<<
GetLoD
(
*
scope
,
output
.
second
[
i
])
<<
")"
;
}
if
(
i
!=
output
.
second
.
size
()
-
1
)
{
...
...
paddle/fluid/framework/parallel_executor.cc
浏览文件 @
0c321fe3
...
...
@@ -15,6 +15,7 @@ limitations under the License. */
#include "paddle/fluid/framework/parallel_executor.h"
#include <string>
#include <tuple>
#include <vector>
#ifdef PADDLE_WITH_CUDA
...
...
@@ -41,6 +42,8 @@ class ParallelExecutorPrivate {
#ifdef PADDLE_WITH_CUDA
std
::
unique_ptr
<
platform
::
NCCLContextMap
>
nccl_ctxs_
;
#endif
std
::
vector
<
std
::
tuple
<
std
::
string
,
proto
::
VarType
::
Type
,
bool
>>
var_types_
;
};
std
::
vector
<
Scope
*>
&
ParallelExecutor
::
GetLocalScopes
()
{
...
...
@@ -97,14 +100,9 @@ ParallelExecutor::ParallelExecutor(
allow_op_delay
));
// Step 3. Create vars in each scope;
for
(
auto
*
scope
:
member_
->
local_scopes_
)
{
for
(
auto
*
var
:
main_program
.
Block
(
0
).
AllVars
())
{
if
(
scope
->
FindVar
(
var
->
Name
())
!=
nullptr
)
{
continue
;
}
InitializeVariable
(
scope
->
Var
(
var
->
Name
()),
var
->
GetType
());
}
for
(
auto
*
var
:
main_program
.
Block
(
0
).
AllVars
())
{
member_
->
var_types_
.
emplace_back
(
var
->
Name
(),
var
->
GetType
(),
var
->
Persistable
());
}
}
...
...
@@ -163,9 +161,42 @@ void ParallelExecutor::Run(
const
std
::
unordered_map
<
std
::
string
,
LoDTensor
>
&
feed_tensors
)
{
platform
::
RecordBlock
b
(
0
);
SplitTensorToPlaces
(
feed_tensors
);
// Create local scopes.
for
(
auto
&
scope
:
member_
->
local_scopes_
)
{
Scope
&
local_scope
=
scope
->
NewScope
();
*
scope
->
Var
(
details
::
kLocalExecScopeName
)
->
GetMutable
<
Scope
*>
()
=
&
local_scope
;
for
(
auto
&
name_type_pair
:
member_
->
var_types_
)
{
if
(
scope
->
FindVar
(
std
::
get
<
0
>
(
name_type_pair
))
!=
nullptr
)
{
continue
;
}
if
(
std
::
get
<
2
>
(
name_type_pair
))
{
// Persistable
InitializeVariable
(
scope
->
Var
(
std
::
get
<
0
>
(
name_type_pair
)),
std
::
get
<
1
>
(
name_type_pair
));
}
else
{
InitializeVariable
(
scope
->
Var
(
std
::
get
<
0
>
(
name_type_pair
)),
std
::
get
<
1
>
(
name_type_pair
));
}
}
}
auto
fetch_data
=
member_
->
executor_
->
Run
(
fetch_tensors
);
*
member_
->
global_scope_
->
Var
(
fetched_var_name
)
->
GetMutable
<
FeedFetchList
>
()
=
fetch_data
;
// Wait All computational streams
for
(
auto
p
:
member_
->
places_
)
{
platform
::
DeviceContextPool
::
Instance
().
Get
(
p
)
->
Wait
();
}
for
(
auto
&
scope
:
member_
->
local_scopes_
)
{
auto
&
local_scope
=
*
scope
->
Var
(
details
::
kLocalExecScopeName
)
->
GetMutable
<
Scope
*>
();
scope
->
DeleteScope
(
local_scope
);
local_scope
=
nullptr
;
}
}
void
ParallelExecutor
::
SplitTensorToPlaces
(
...
...
paddle/fluid/operators/concat_op.cc
浏览文件 @
0c321fe3
...
...
@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/concat_op.h"
#include <string>
#include <vector>
...
...
@@ -34,7 +35,10 @@ class ConcatOp : public framework::OperatorWithKernel {
size_t
axis
=
static_cast
<
size_t
>
(
ctx
->
Attrs
().
Get
<
int
>
(
"axis"
));
const
size_t
n
=
ins
.
size
();
PADDLE_ENFORCE_GT
(
n
,
1
,
"Input tensors count should > 1."
);
PADDLE_ENFORCE_GT
(
n
,
0
,
"Input tensors count should > 0."
);
if
(
n
==
1
)
{
VLOG
(
3
)
<<
"Warning: concat op have only one input, may waste memory"
;
}
auto
out_dims
=
ins
[
0
];
size_t
in_zero_dims_size
=
out_dims
.
size
();
...
...
paddle/fluid/operators/detail/grpc_server.cc
浏览文件 @
0c321fe3
...
...
@@ -161,6 +161,7 @@ class RequestPrefetch final : public RequestBase {
::
grpc
::
ByteBuffer
reply
;
std
::
string
var_name
=
request_
->
OutVarname
();
VLOG
(
3
)
<<
"prefetch var "
<<
var_name
;
auto
var_desc
=
program_
->
Block
(
0
).
FindVar
(
var_name
);
framework
::
Scope
*
local_scope
=
&
scope_
->
NewScope
();
auto
*
var
=
local_scope
->
FindVar
(
var_name
);
...
...
paddle/fluid/operators/listen_and_serv_op.cc
浏览文件 @
0c321fe3
...
...
@@ -13,7 +13,8 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include <ostream>
#include <thread>
#include <thread> // NOLINT
#include <vector>
#include "paddle/fluid/operators/listen_and_serv_op.h"
...
...
@@ -88,8 +89,9 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope,
auto
ins
=
Inputs
(
"X"
);
auto
fan_in
=
Attr
<
int
>
(
"Fanin"
);
auto
*
block
=
Attr
<
framework
::
BlockDesc
*>
(
kOptimizeBlock
);
auto
*
program
=
block
->
Program
();
auto
*
optimize_block
=
Attr
<
framework
::
BlockDesc
*>
(
kOptimizeBlock
);
auto
*
prefetch_block
=
Attr
<
framework
::
BlockDesc
*>
(
kPrefetchBlock
);
auto
*
program
=
optimize_block
->
Program
();
size_t
num_blocks
=
program
->
Size
();
PADDLE_ENFORCE_GE
(
num_blocks
,
2
,
"server program should have at least 2 blocks"
);
...
...
@@ -97,18 +99,25 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope,
framework
::
Executor
executor
(
dev_place
);
std
::
vector
<
int
>
block_list
;
for
(
size_t
blkid
=
1
;
blkid
<
num_blocks
;
++
blkid
)
{
block_list
.
push_back
(
blkid
);
if
(
blkid
!=
prefetch_block
->
ID
())
{
block_list
.
push_back
(
blkid
);
}
}
auto
prepared
=
executor
.
Prepare
(
*
program
,
block_list
);
auto
optimize_
prepared
=
executor
.
Prepare
(
*
program
,
block_list
);
// Insert placeholder for block0 which holds current op itself.
prepared
.
insert
(
prepared
.
begin
(),
std
::
shared_ptr
<
framework
::
ExecutorPrepareContext
>
(
nullptr
));
optimize_prepared
.
insert
(
optimize_prepared
.
begin
(),
std
::
shared_ptr
<
framework
::
ExecutorPrepareContext
>
(
nullptr
));
rpc_service_
->
SetScope
(
&
recv_scope
);
rpc_service_
->
SetDevCtx
(
&
dev_ctx
);
// TODO(qiao) set proper fields for table lookup and update
rpc_service_
->
SetExecutor
(
&
executor
);
rpc_service_
->
SetPrefetchBlkdId
(
0
);
VLOG
(
3
)
<<
"prefetch block id is "
<<
prefetch_block
->
ID
();
auto
prefetch_prepared
=
executor
.
Prepare
(
*
program
,
prefetch_block
->
ID
());
rpc_service_
->
SetPrefetchBlkdId
(
prefetch_block
->
ID
());
rpc_service_
->
SetPrefetchPreparedCtx
(
prefetch_prepared
.
get
());
prefetch_prepared
.
release
();
rpc_service_
->
SetProgram
(
program
);
// start the server listening after all member initialized.
server_thread_
.
reset
(
new
std
::
thread
(
RunServer
,
rpc_service_
));
...
...
@@ -166,16 +175,18 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope,
parallel_blkids
.
push_back
(
1
);
double
ts
=
detail
::
GetTimestamp
();
for
(
size_t
blkid
=
2
;
blkid
<
num_blocks
;
++
blkid
)
{
if
(
program
->
Block
(
blkid
).
Parent
()
!=
last_parent_blkid
)
{
ParallelExecuteBlocks
(
parallel_blkids
,
&
executor
,
prepared
,
program
,
&
recv_scope
);
parallel_blkids
.
clear
();
last_parent_blkid
=
program
->
Block
(
blkid
).
Parent
();
if
(
blkid
!=
prefetch_block
->
ID
())
{
if
(
program
->
Block
(
blkid
).
Parent
()
!=
last_parent_blkid
)
{
ParallelExecuteBlocks
(
parallel_blkids
,
&
executor
,
optimize_prepared
,
program
,
&
recv_scope
);
parallel_blkids
.
clear
();
last_parent_blkid
=
program
->
Block
(
blkid
).
Parent
();
}
parallel_blkids
.
push_back
(
blkid
);
}
parallel_blkids
.
push_back
(
blkid
);
}
ParallelExecuteBlocks
(
parallel_blkids
,
&
executor
,
prepared
,
program
,
&
recv_scope
);
ParallelExecuteBlocks
(
parallel_blkids
,
&
executor
,
optimize_prepared
,
program
,
&
recv_scope
);
VLOG
(
2
)
<<
"run all blocks spent "
<<
detail
::
GetTimestamp
()
-
ts
<<
"(ms)"
;
// Reset the received sparse variables, the sum operator would not
...
...
@@ -211,6 +222,8 @@ from send_op and send back variables to recv_op.
.
AddCustomChecker
([](
const
std
::
string
&
ip
)
{
return
!
ip
.
empty
();
});
AddAttr
<
framework
::
BlockDesc
*>
(
kOptimizeBlock
,
"BlockID to run on server side."
);
AddAttr
<
framework
::
BlockDesc
*>
(
kPrefetchBlock
,
"prefetch block to run on server side."
);
AddAttr
<
int
>
(
"Fanin"
,
"How many clients send to this server."
)
.
SetDefault
(
1
);
}
...
...
paddle/fluid/operators/listen_and_serv_op.h
浏览文件 @
0c321fe3
...
...
@@ -16,6 +16,7 @@ limitations under the License. */
#include <stdint.h>
#include <ostream>
#include <string>
#include "paddle/fluid/framework/executor.h"
#include "paddle/fluid/framework/lod_tensor.h"
...
...
@@ -27,6 +28,7 @@ namespace paddle {
namespace
operators
{
constexpr
char
kOptimizeBlock
[]
=
"OptimizeBlock"
;
constexpr
char
kPrefetchBlock
[]
=
"PrefetchBlock"
;
void
RunServer
(
std
::
shared_ptr
<
detail
::
AsyncGRPCServer
>
service
);
...
...
paddle/fluid/operators/lookup_table_op.cc
浏览文件 @
0c321fe3
...
...
@@ -78,6 +78,9 @@ class LookupTableOpMaker : public framework::OpProtoAndCheckerMaker {
"(boolean, default false) "
"Sparse update."
)
.
SetDefault
(
false
);
AddAttr
<
bool
>
(
"is_distributed"
,
"(boolean, default false) distributed lookup table."
)
.
SetDefault
(
false
);
AddAttr
<
int64_t
>
(
"padding_idx"
,
"(int64, default -1) "
"If the value is -1, it makes no effect to lookup. "
...
...
paddle/fluid/operators/prefetch_op.cc
浏览文件 @
0c321fe3
...
...
@@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <future>
#include <future>
// NOLINT
#include <ostream>
#include "paddle/fluid/framework/data_type.h"
...
...
@@ -50,8 +50,8 @@ class PrefetchOp : public framework::OperatorBase {
for
(
size_t
i
=
0
;
i
<
ins
.
size
();
i
++
)
{
if
(
NeedSend
(
scope
,
ins
[
i
]))
{
VLOG
(
3
)
<<
"sending "
<<
ins
[
i
]
<<
" to "
<<
epmap
[
i
]
<<
"to get "
<<
outs
[
i
]
<<
"back"
;
VLOG
(
3
)
<<
"sending "
<<
ins
[
i
]
<<
" to "
<<
epmap
[
i
]
<<
"
to get "
<<
outs
[
i
]
<<
"
back"
;
rpc_client
->
AsyncPrefetchVariable
(
epmap
[
i
],
ctx
,
scope
,
ins
[
i
],
outs
[
i
]);
}
else
{
...
...
@@ -71,7 +71,7 @@ class PrefetchOpMaker : public framework::OpProtoAndCheckerMaker {
"(RPCClient) The RPC client object which will be"
"initialized at most once."
);
AddOutput
(
"Out"
,
"(
SelectedRows
) result "
"(
LoDTensor
) result "
"to be fetched from parameter server"
)
.
AsDuplicable
();
AddAttr
<
std
::
vector
<
std
::
string
>>
(
...
...
paddle/fluid/operators/send_recv_op_test.cc
浏览文件 @
0c321fe3
...
...
@@ -14,7 +14,7 @@ limitations under the License. */
#include <unistd.h>
#include <string>
#include <thread>
#include <thread>
// NOLINT
#include "gtest/gtest.h"
#include "paddle/fluid/framework/op_registry.h"
...
...
@@ -37,11 +37,11 @@ namespace m = paddle::operators::math;
std
::
unique_ptr
<
f
::
OperatorBase
>
listen_and_serv_op
;
int
selected_port
;
void
InitTensorsInScope
(
f
::
Scope
&
scope
,
p
::
CPUPlace
&
plac
e
)
{
void
InitTensorsInScope
(
const
p
::
CPUPlace
&
place
,
f
::
Scope
*
scop
e
)
{
p
::
CPUDeviceContext
ctx
(
place
);
for
(
int
i
=
0
;
i
<
2
;
++
i
)
{
auto
var_name
=
paddle
::
string
::
Sprintf
(
"x%d"
,
i
);
auto
var
=
scope
.
Var
(
var_name
);
auto
var
=
scope
->
Var
(
var_name
);
auto
tensor
=
var
->
GetMutable
<
f
::
LoDTensor
>
();
tensor
->
Resize
({
10
,
10
});
float
*
expect
=
tensor
->
mutable_data
<
float
>
(
place
);
...
...
@@ -50,20 +50,20 @@ void InitTensorsInScope(f::Scope &scope, p::CPUPlace &place) {
}
}
auto
out_var
=
scope
.
Var
(
"Out"
);
auto
out_var
=
scope
->
Var
(
"Out"
);
auto
out_tensor
=
out_var
->
GetMutable
<
f
::
LoDTensor
>
();
out_tensor
->
Resize
({
10
,
10
});
out_tensor
->
mutable_data
<
float
>
(
place
);
// allocate
}
void
InitSelectedRowsInScope
(
f
::
Scope
&
scope
,
p
::
CPUPlace
&
plac
e
)
{
void
InitSelectedRowsInScope
(
const
p
::
CPUPlace
&
place
,
f
::
Scope
*
scop
e
)
{
p
::
CPUDeviceContext
ctx
(
place
);
int64_t
height
=
10
;
int64_t
row_numel
=
10
;
m
::
SetConstant
<
p
::
CPUDeviceContext
,
float
>
set_one
;
// init x0
std
::
vector
<
int64_t
>
rows0
{
0
,
4
,
7
};
auto
x0_var
=
scope
.
Var
(
"x0"
);
auto
x0_var
=
scope
->
Var
(
"x0"
);
auto
x0
=
x0_var
->
GetMutable
<
f
::
SelectedRows
>
();
x0
->
set_rows
(
rows0
);
x0
->
set_height
(
height
);
...
...
@@ -74,7 +74,7 @@ void InitSelectedRowsInScope(f::Scope &scope, p::CPUPlace &place) {
// init x1
std
::
vector
<
int64_t
>
rows1
{
2
,
9
};
auto
x1_var
=
scope
.
Var
(
"x1"
);
auto
x1_var
=
scope
->
Var
(
"x1"
);
auto
x1
=
x1_var
->
GetMutable
<
f
::
SelectedRows
>
();
x1
->
set_rows
(
rows1
);
x1
->
set_height
(
height
);
...
...
@@ -83,7 +83,7 @@ void InitSelectedRowsInScope(f::Scope &scope, p::CPUPlace &place) {
f
::
make_ddim
({
static_cast
<
int64_t
>
(
rows1
.
size
()),
row_numel
}),
place
);
set_one
(
ctx
,
x1_value
,
1.0
);
auto
out_var
=
scope
.
Var
(
"Out"
);
auto
out_var
=
scope
->
Var
(
"Out"
);
auto
out
=
out_var
->
GetMutable
<
f
::
SelectedRows
>
();
auto
out_value
=
out
->
mutable_value
();
out
->
set_height
(
height
);
...
...
@@ -117,15 +117,16 @@ void StartServerNet(bool is_sparse) {
f
::
Scope
scope
;
p
::
CPUPlace
place
;
if
(
is_sparse
)
{
InitSelectedRowsInScope
(
scope
,
plac
e
);
InitSelectedRowsInScope
(
place
,
&
scop
e
);
}
else
{
InitTensorsInScope
(
scope
,
plac
e
);
InitTensorsInScope
(
place
,
&
scop
e
);
}
// sub program run in listen_and_serv_op, for simple test we use sum
f
::
ProgramDesc
program
;
const
auto
&
root_block
=
program
.
Block
(
0
);
auto
*
optimize_block
=
program
.
AppendBlock
(
root_block
);
auto
*
prefetch_block
=
program
.
AppendBlock
(
root_block
);
// X for server side tensors, RX for received tensers, must be of same shape.
AddOp
(
"sum"
,
{{
"X"
,
{
"x0"
,
"x1"
}}},
{{
"Out"
,
{
"Out"
}}},
{},
optimize_block
);
...
...
@@ -135,6 +136,7 @@ void StartServerNet(bool is_sparse) {
attrs
.
insert
({
"ParamList"
,
std
::
vector
<
std
::
string
>
({
"Out"
})});
attrs
.
insert
({
"GradList"
,
std
::
vector
<
std
::
string
>
({
"x1"
})});
attrs
.
insert
({
"OptimizeBlock"
,
optimize_block
});
attrs
.
insert
({
"PrefetchBlock"
,
prefetch_block
});
listen_and_serv_op
=
f
::
OpRegistry
::
CreateOp
(
"listen_and_serv"
,
{{
"X"
,
{
"x1"
}}},
{},
attrs
);
LOG
(
INFO
)
<<
"selected port before run "
<<
selected_port
;
...
...
@@ -148,7 +150,7 @@ TEST(SendRecvOp, CPUDense) {
// local net
f
::
Scope
scope
;
p
::
CPUPlace
place
;
InitTensorsInScope
(
scope
,
plac
e
);
InitTensorsInScope
(
place
,
&
scop
e
);
// create rpc client var
scope
.
Var
(
"RPC_CLIENT_VAR"
);
...
...
@@ -191,7 +193,7 @@ TEST(SendRecvOp, CPUSparse) {
f
::
Scope
scope
;
p
::
CPUPlace
place
;
p
::
CPUDeviceContext
ctx
(
place
);
InitSelectedRowsInScope
(
scope
,
plac
e
);
InitSelectedRowsInScope
(
place
,
&
scop
e
);
scope
.
Var
(
"RPC_CLIENT_VAR"
);
f
::
AttributeMap
attrs
;
selected_port
=
static_cast
<
paddle
::
operators
::
ListenAndServOp
*>
(
...
...
paddle/fluid/operators/send_vars_op.cc
浏览文件 @
0c321fe3
...
...
@@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <future>
#include <future>
// NOLINT
#include <ostream>
#include "paddle/fluid/framework/data_type.h"
...
...
@@ -36,7 +36,7 @@ class SendVarsOp : public framework::OperatorBase {
auto
ins
=
Inputs
(
"X"
);
std
::
vector
<
std
::
string
>
epmap
=
Attr
<
std
::
vector
<
std
::
string
>>
(
"epmap"
);
int
sync_send
=
Attr
<
int
>
(
"sync_sen
t
"
);
int
sync_send
=
Attr
<
int
>
(
"sync_sen
d
"
);
platform
::
DeviceContextPool
&
pool
=
platform
::
DeviceContextPool
::
Instance
();
auto
&
ctx
=
*
pool
.
Get
(
place
);
...
...
paddle/fluid/operators/sgd_op.cc
浏览文件 @
0c321fe3
...
...
@@ -35,8 +35,8 @@ class SGDOp : public framework::OperatorWithKernel {
PADDLE_ENFORCE_EQ
(
framework
::
product
(
lr_dims
),
1
,
"Learning rate should have 1 element"
);
auto
param_dim
=
ctx
->
GetInputDim
(
"Param"
);
// TODO(qijun): check dimensions of Param and Grad at comp
li
e
// and run
time.
// TODO(qijun): check dimensions of Param and Grad at comp
il
e
// and runtime.
ctx
->
SetOutputDim
(
"ParamOut"
,
param_dim
);
}
...
...
paddle/fluid/operators/split_ids_op.cc
浏览文件 @
0c321fe3
...
...
@@ -48,11 +48,11 @@ class SplitIdsOp : public framework::OperatorWithKernel {
PADDLE_ENFORCE
(
ctx
->
HasOutputs
(
"Out"
),
"SplitIdsOp must has output Out."
);
auto
ids_var_type
=
ctx
->
GetInputsVarType
(
"Ids"
).
front
();
PADDLE_ENFORCE_EQ
(
ids_var_type
,
framework
::
proto
::
VarType
::
LOD_TENSOR
);
auto
ids_dims
=
ctx
->
GetInputDim
(
"Ids"
);
PADDLE_ENFORCE_EQ
(
ids_dims
.
size
(),
2
);
PADDLE_ENFORCE_EQ
(
ids_dims
[
1
],
1
);
if
(
ids_var_type
==
framework
::
proto
::
VarType
::
LOD_TENSOR
)
{
PADDLE_ENFORCE_EQ
(
ids_dims
.
size
(),
2
);
PADDLE_ENFORCE_EQ
(
ids_dims
[
1
],
1
);
}
}
};
...
...
@@ -60,8 +60,9 @@ class SplitIdsOpInferVarType : public framework::VarTypeInference {
public:
void
operator
()(
const
framework
::
OpDesc
&
op_desc
,
framework
::
BlockDesc
*
block
)
const
override
{
auto
*
input_var
=
block
->
Var
(
op_desc
.
Input
(
"Ids"
)[
0
]);
for
(
auto
&
out_var
:
op_desc
.
Output
(
"Out"
))
{
block
->
Var
(
out_var
)
->
SetType
(
framework
::
proto
::
VarType
::
LOD_TENSOR
);
block
->
Var
(
out_var
)
->
SetType
(
input_var
->
GetType
()
);
}
}
};
...
...
@@ -73,4 +74,5 @@ namespace ops = paddle::operators;
REGISTER_OPERATOR
(
split_ids
,
ops
::
SplitIdsOp
,
ops
::
SplitIdsOpMaker
,
ops
::
SplitIdsOpInferVarType
);
REGISTER_OP_CPU_KERNEL
(
split_ids
,
ops
::
SplitIdsOpKernel
<
paddle
::
platform
::
CPUPlace
,
int64_t
>
);
split_ids
,
ops
::
SplitIdsOpKernel
<
paddle
::
platform
::
CPUPlace
,
int64_t
>
,
ops
::
SplitIdsOpKernel
<
paddle
::
platform
::
CPUPlace
,
float
>
);
paddle/fluid/operators/split_ids_op.h
浏览文件 @
0c321fe3
...
...
@@ -24,35 +24,63 @@ namespace operators {
template
<
typename
DeviceContext
,
typename
T
>
class
SplitIdsOpKernel
:
public
framework
::
OpKernel
<
T
>
{
public:
void
Compute
(
const
framework
::
ExecutionContext
&
ctx
)
const
override
{
void
Compute
(
const
framework
::
ExecutionContext
&
ctx
)
const
override
{
auto
place
=
ctx
.
GetPlace
();
if
(
!
platform
::
is_cpu_place
(
place
))
{
PADDLE_THROW
(
"SplitIds do not support GPU kernel"
);
}
auto
&
ids_dims
=
ctx
.
Input
<
framework
::
LoDTensor
>
(
"Ids"
)
->
dims
();
const
T
*
ids
=
ctx
.
Input
<
framework
::
LoDTensor
>
(
"Ids"
)
->
data
<
T
>
();
auto
outs
=
ctx
.
MultiOutput
<
framework
::
LoDTensor
>
(
"Out"
);
const
size_t
shard_num
=
outs
.
size
();
const
auto
*
ids_var
=
ctx
.
InputVar
(
"Ids"
);
if
(
ids_var
->
IsType
<
framework
::
LoDTensor
>
())
{
const
auto
&
ids_dims
=
ctx
.
Input
<
framework
::
LoDTensor
>
(
"Ids"
)
->
dims
();
const
T
*
ids
=
ctx
.
Input
<
framework
::
LoDTensor
>
(
"Ids"
)
->
data
<
T
>
();
auto
outs
=
ctx
.
MultiOutput
<
framework
::
LoDTensor
>
(
"Out"
);
const
size_t
shard_num
=
outs
.
size
();
std
::
vector
<
std
::
vector
<
T
>>
out_ids
;
out_ids
.
resize
(
outs
.
size
());
std
::
vector
<
std
::
vector
<
T
>>
out_ids
;
out_ids
.
resize
(
outs
.
size
());
// split id by their shard_num.
for
(
int
i
=
0
;
i
<
ids_dims
[
0
];
++
i
)
{
T
id
=
ids
[
i
];
size_t
shard_id
=
static_cast
<
size_t
>
(
id
)
%
shard_num
;
out_ids
[
shard_id
].
push_back
(
id
);
}
// split id by their shard_num.
for
(
int
i
=
0
;
i
<
ids_dims
[
0
];
++
i
)
{
T
id
=
ids
[
i
];
size_t
shard_id
=
static_cast
<
size_t
>
(
id
)
%
shard_num
;
out_ids
[
shard_id
].
push_back
(
id
);
}
// create tensor for each shard and send to parameter server
for
(
size_t
i
=
0
;
i
<
out_ids
.
size
();
++
i
)
{
auto
*
shard_t
=
outs
[
i
];
std
::
vector
<
T
>
ids
=
out_ids
[
i
];
auto
*
shard_data
=
shard_t
->
mutable_data
<
T
>
(
framework
::
make_ddim
({
static_cast
<
int64_t
>
(
ids
.
size
()),
1
}),
place
);
for
(
size_t
i
=
0
;
i
<
ids
.
size
();
++
i
)
{
shard_data
[
i
]
=
ids
[
i
];
}
}
}
else
if
(
ids_var
->
IsType
<
framework
::
SelectedRows
>
())
{
const
auto
*
ids_selected_rows
=
ctx
.
Input
<
framework
::
SelectedRows
>
(
"Ids"
);
auto
&
ids_dims
=
ids_selected_rows
->
value
().
dims
();
PADDLE_ENFORCE_EQ
(
ids_dims
[
0
],
ids_selected_rows
->
rows
().
size
(),
""
);
const
T
*
ids
=
ids_selected_rows
->
value
().
data
<
T
>
();
const
auto
&
ids_rows
=
ids_selected_rows
->
rows
();
auto
outs
=
ctx
.
MultiOutput
<
framework
::
SelectedRows
>
(
"Out"
);
const
size_t
shard_num
=
outs
.
size
();
// get rows for outputs
for
(
auto
&
id
:
ids_rows
)
{
size_t
shard_id
=
static_cast
<
size_t
>
(
id
)
%
shard_num
;
outs
[
shard_id
]
->
mutable_rows
()
->
push_back
(
id
);
}
// create tensor for each shard and send to parameter server
for
(
size_t
i
=
0
;
i
<
out_ids
.
size
();
++
i
)
{
auto
*
shard_t
=
outs
[
i
];
std
::
vector
<
T
>
ids
=
out_ids
[
i
];
auto
*
shard_data
=
shard_t
->
mutable_data
<
T
>
(
framework
::
make_ddim
({
static_cast
<
int64_t
>
(
ids
.
size
()),
1
}),
place
);
for
(
size_t
i
=
0
;
i
<
ids
.
size
();
++
i
)
{
shard_data
[
i
]
=
ids
[
i
];
int64_t
row_width
=
ids_dims
[
1
];
for
(
auto
&
out
:
outs
)
{
out
->
set_height
(
ids_selected_rows
->
height
());
framework
::
DDim
ddim
=
framework
::
make_ddim
(
{
static_cast
<
int64_t
>
(
out
->
rows
().
size
()),
row_width
});
T
*
output
=
out
->
mutable_value
()
->
mutable_data
<
T
>
(
ddim
,
place
);
for
(
size_t
i
=
0
;
i
<
ddim
[
0
];
++
i
)
{
memcpy
(
output
+
i
*
row_width
,
ids
+
out
->
rows
()[
i
]
*
row_width
,
row_width
*
sizeof
(
T
));
}
}
}
}
...
...
paddle/fluid/operators/sum_op.cc
浏览文件 @
0c321fe3
...
...
@@ -10,9 +10,11 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/sum_op.h"
#include <algorithm>
#include <string>
#include <vector>
#include "paddle/fluid/framework/var_type_inference.h"
#include "paddle/fluid/operators/detail/safe_ref.h"
...
...
@@ -37,7 +39,10 @@ class SumOp : public framework::OperatorWithKernel {
auto
x_dims
=
ctx
->
GetInputsDim
(
"X"
);
size_t
N
=
x_dims
.
size
();
PADDLE_ENFORCE_GT
(
N
,
1
,
"Input tensors count should > 1."
);
PADDLE_ENFORCE_GT
(
N
,
0
,
"Input tensors count should > 0."
);
if
(
N
==
1
)
{
VLOG
(
3
)
<<
"Warning: sum have only one input, may waste memory"
;
}
framework
::
DDim
in_dim
({
0
});
for
(
auto
&
x_dim
:
x_dims
)
{
...
...
python/paddle/fluid/distribute_transpiler.py
浏览文件 @
0c321fe3
...
...
@@ -13,14 +13,17 @@
# limitations under the License.
from
__future__
import
print_function
import
framework
from
framework
import
Program
,
default_main_program
,
default_startup_program
,
Parameter
,
Variable
import
optimizer
from
layer_helper
import
LayerHelper
import
distributed_splitter
as
splitter
import
math
import
distributed_splitter
as
splitter
import
framework
from
framework
import
Program
,
default_main_program
,
Variable
from
.
import
core
import
debuger
LOOKUP_TABLE_TYPE
=
"lookup_table"
LOOKUP_TABLE_GRAD_TYPE
=
"lookup_table_grad"
RPC_CLIENT_VAR_NAME
=
"RPC_CLIENT_VAR"
class
VarBlock
:
...
...
@@ -35,9 +38,9 @@ class VarBlock:
class
UnionFind
(
object
):
""" Union-find data struct.
""" Union-find data struct
ure
.
Union-find is a data struct that keeps track of a set of elements partitioned
Union-find is a data struct
ure
that keeps track of a set of elements partitioned
into a number of disjoint (non-overlapping) subsets.
Reference:
...
...
@@ -185,19 +188,66 @@ class DistributeTranspiler:
assert
(
callable
(
split_method
))
if
program
is
None
:
program
=
default_main_program
()
self
.
program
=
program
self
.
trainer
s
=
trainers
self
.
origin_
program
=
program
self
.
trainer
_num
=
trainers
self
.
optimize_ops
=
optimize_ops
# TODO(typhoonzero): currently trainer_id is fetched from cluster system
# like Kubernetes, we should port this to use etcd later when developing
# fluid distributed training with fault-tolerance.
self
.
trainer_id
=
trainer_id
pserver_endpoints
=
pservers
.
split
(
","
)
self
.
pserver_endpoints
=
pserver_endpoints
# process lookup_table_op
# 1. check all lookup_table_op is distributed
# 2. check all lookup_table_op share the same table.
distributed_lookup_table_ops
=
[]
# support only one distributed_lookup_table now
self
.
table_name
=
None
for
op
in
program
.
global_block
().
ops
:
if
op
.
type
==
LOOKUP_TABLE_TYPE
:
if
op
.
attrs
[
'is_distributed'
]
is
True
:
if
self
.
table_name
is
None
:
self
.
table_name
=
op
.
input
(
"W"
)[
0
]
if
self
.
table_name
!=
op
.
input
(
"W"
)[
0
]:
raise
RuntimeError
(
"all distributed lookup_table_ops"
" should have only one table"
)
distributed_lookup_table_ops
.
append
(
op
)
else
:
if
self
.
table_name
is
not
None
:
assert
op
.
input
(
"W"
)[
0
]
!=
self
.
table_name
self
.
has_distributed_lookup_table
=
len
(
distributed_lookup_table_ops
)
>
0
# step1: For large parameters and gradients, split them into smaller
# blocks.
param_list
=
[
pg
[
0
]
for
pg
in
params_grads
]
grad_list
=
[
pg
[
1
]
for
pg
in
params_grads
]
if
self
.
has_distributed_lookup_table
:
param_list
=
[
param
for
param
in
param_list
if
param
.
name
!=
self
.
table_name
]
grad_list
=
[
grad
for
grad
in
grad_list
if
grad
.
name
!=
framework
.
grad_var_name
(
self
.
table_name
)
]
self
.
table_param_grad
=
[
param_grad
for
param_grad
in
params_grads
if
param_grad
[
0
].
name
==
self
.
table_name
][
0
]
table_grad_var
=
self
.
table_param_grad
[
1
]
self
.
table_grad_list
=
[
program
.
global_block
().
create_var
(
name
=
"%s.trainer_%d.pserver_%d"
%
(
table_grad_var
.
name
,
trainer_id
,
index
),
type
=
table_grad_var
.
type
,
shape
=
table_grad_var
.
shape
,
dtype
=
table_grad_var
.
dtype
)
for
index
in
range
(
len
(
self
.
pserver_endpoints
))
]
grad_blocks
=
split_dense_variable
(
grad_list
,
len
(
pserver_endpoints
))
param_blocks
=
split_dense_variable
(
param_list
,
len
(
pserver_endpoints
))
# step2: Create new vars for the parameters and gradients blocks and
...
...
@@ -229,7 +279,7 @@ class DistributeTranspiler:
self
.
param_grad_ep_mapping
[
ep
][
"grads"
].
append
(
grad
)
rpc_client_var
=
program
.
global_block
().
create_var
(
name
=
"RPC_CLIENT_VAR"
,
name
=
RPC_CLIENT_VAR_NAME
,
persistable
=
True
,
type
=
core
.
VarDesc
.
VarType
.
RAW
)
...
...
@@ -252,13 +302,19 @@ class DistributeTranspiler:
outputs
=
{
"Out"
:
[
orig_param
]},
attrs
=
{
"axis"
:
0
})
if
self
.
has_distributed_lookup_table
:
self
.
_replace_lookup_table_op_with_prefetch
(
program
,
rpc_client_var
,
eplist
)
self
.
_split_table_grad_and_add_send_vars
(
program
,
rpc_client_var
,
pserver_endpoints
)
def
get_trainer_program
(
self
):
# remove optimize ops and add a send op to main_program
self
.
program
.
global_block
().
delete_ops
(
self
.
optimize_ops
)
self
.
program
.
sync_with_cpp
()
self
.
origin_
program
.
global_block
().
delete_ops
(
self
.
optimize_ops
)
self
.
origin_
program
.
sync_with_cpp
()
# FIXME(typhoonzero): serialize once will fix error occurs when clone.
self
.
program
.
__str__
()
return
self
.
program
self
.
origin_
program
.
__str__
()
return
self
.
origin_
program
def
get_pserver_program
(
self
,
endpoint
):
"""
...
...
@@ -294,8 +350,8 @@ class DistributeTranspiler:
type
=
v
.
type
,
dtype
=
v
.
dtype
,
shape
=
v
.
shape
)
if
self
.
trainer
s
>
1
:
for
trainer_id
in
xrange
(
self
.
trainer
s
):
if
self
.
trainer
_num
>
1
:
for
trainer_id
in
xrange
(
self
.
trainer
_num
):
var
=
pserver_program
.
global_block
().
create_var
(
name
=
"%s.trainer_%d"
%
(
orig_var_name
,
trainer_id
),
persistable
=
False
,
...
...
@@ -309,7 +365,7 @@ class DistributeTranspiler:
# step3
optimize_block
=
pserver_program
.
create_block
(
0
)
# step 4
# Create a union-find data struct from optimize ops,
# Create a union-find data struct
ure
from optimize ops,
# If two ops are connected, we could add these two ops
# into one set.
ufind
=
self
.
_create_ufind
(
self
.
optimize_ops
)
...
...
@@ -384,6 +440,23 @@ class DistributeTranspiler:
# __append_optimize_op__(glb_op, optimize_block)
# break
# process distributed lookup_table
prefetch_block
=
None
if
self
.
has_distributed_lookup_table
:
pserver_index
=
self
.
pserver_endpoints
.
index
(
endpoint
)
self
.
_create_table_optimize_block
(
pserver_index
,
pserver_program
,
append_block
)
prefetch_block
=
self
.
_create_prefetch_block
(
pserver_index
,
pserver_program
,
optimize_block
)
# NOTE: if has_distributed_lookup_table is False, then prefetch_block will
# not be executed, so it's safe to use optimize_block to hold the place
if
self
.
has_distributed_lookup_table
:
assert
prefetch_block
is
not
None
else
:
assert
prefetch_block
is
None
prefetch_block
=
pserver_program
.
global_block
()
# step5 append the listen_and_serv op
pserver_program
.
global_block
().
append_op
(
type
=
"listen_and_serv"
,
...
...
@@ -392,8 +465,10 @@ class DistributeTranspiler:
attrs
=
{
"OptimizeBlock"
:
optimize_block
,
"endpoint"
:
endpoint
,
"Fanin"
:
self
.
trainers
"Fanin"
:
self
.
trainer_num
,
"PrefetchBlock"
:
prefetch_block
})
pserver_program
.
sync_with_cpp
()
return
pserver_program
...
...
@@ -451,6 +526,197 @@ class DistributeTranspiler:
attrs
=
op
.
attrs
)
return
s_prog
# transpiler function for dis lookup_table
def
_replace_lookup_table_op_with_prefetch
(
self
,
program
,
rpc_client_var
,
eplist
):
# 1. replace lookup_table_op with split_ids_op -> prefetch_op -> sum_op
self
.
prefetch_input_vars
=
None
self
.
prefetch_output_vars
=
None
continue_search_lookup_table_op
=
True
while
continue_search_lookup_table_op
:
continue_search_lookup_table_op
=
False
all_ops
=
program
.
global_block
().
ops
for
op
in
all_ops
:
if
op
.
type
==
LOOKUP_TABLE_TYPE
:
continue_search_lookup_table_op
=
True
op_index
=
list
(
all_ops
).
index
(
op
)
ids_name
=
op
.
input
(
"Ids"
)
out_name
=
op
.
output
(
"Out"
)
if
self
.
prefetch_input_vars
is
None
:
ids_var
=
program
.
global_block
().
vars
[
ids_name
[
0
]]
self
.
prefetch_input_vars
=
self
.
create_splited_vars
(
source_var
=
ids_var
,
block
=
program
.
global_block
(),
tag
=
"_prefetch_in_"
)
if
self
.
prefetch_output_vars
is
None
:
out_var
=
program
.
global_block
().
vars
[
out_name
[
0
]]
self
.
prefetch_output_vars
=
self
.
create_splited_vars
(
source_var
=
out_var
,
block
=
program
.
global_block
(),
tag
=
"_prefetch_out_"
)
# insert split_ids_op
program
.
global_block
().
insert_op
(
index
=
op_index
,
type
=
"split_ids"
,
inputs
=
{
'Ids'
:
[
program
.
global_block
().
vars
[
varname
]
for
varname
in
ids_name
]
},
outputs
=
{
"Out"
:
self
.
prefetch_input_vars
})
# insert prefetch_op
program
.
global_block
().
insert_op
(
index
=
op_index
+
1
,
type
=
"prefetch"
,
inputs
=
{
'X'
:
self
.
prefetch_input_vars
},
outputs
=
{
"Out"
:
self
.
prefetch_output_vars
,
"RPCClient"
:
rpc_client_var
},
attrs
=
{
"epmap"
:
eplist
})
# insert concat_op
program
.
global_block
().
insert_op
(
index
=
op_index
+
2
,
type
=
"concat"
,
inputs
=
{
'X'
:
self
.
prefetch_output_vars
},
outputs
=
{
"Out"
:
[
program
.
global_block
().
vars
[
varname
]
for
varname
in
out_name
]
},
attrs
=
{
"axis"
:
0
})
# delete lookup_table_op
program
.
global_block
().
delete_ops
([
op
])
program
.
sync_with_cpp
()
# break for loop
break
def
_split_table_grad_and_add_send_vars
(
self
,
program
,
rpc_client_var
,
pserver_endpoints
):
# 2. add split_ids_op and send_vars_op to send gradient to pservers
# there should only be one table_name
all_ops
=
program
.
global_block
().
ops
table_grad_name
=
framework
.
grad_var_name
(
self
.
table_name
)
for
op
in
all_ops
:
if
table_grad_name
in
op
.
output_arg_names
:
op_index
=
list
(
all_ops
).
index
(
op
)
# insert split_ids_op
program
.
global_block
().
insert_op
(
index
=
op_index
+
1
,
type
=
"split_ids"
,
inputs
=
{
'Ids'
:
[
program
.
global_block
().
vars
[
table_grad_name
]]
},
outputs
=
{
"Out"
:
self
.
table_grad_list
})
program
.
global_block
().
insert_op
(
index
=
op_index
+
2
,
type
=
"send_vars"
,
inputs
=
{
'X'
:
self
.
table_grad_list
},
outputs
=
{
"RPCClient"
:
rpc_client_var
},
attrs
=
{
"sync_send"
:
True
,
"epmap"
:
pserver_endpoints
})
break
def
_create_prefetch_block
(
self
,
pserver_index
,
pserver_program
,
optimize_block
):
# STEP: create prefetch block
table_var
=
pserver_program
.
global_block
().
vars
[
self
.
table_name
]
prefetch_block
=
pserver_program
.
create_block
(
optimize_block
.
idx
)
trainer_ids
=
self
.
prefetch_input_vars
[
pserver_index
]
pserver_ids
=
pserver_program
.
global_block
().
create_var
(
name
=
trainer_ids
.
name
,
type
=
trainer_ids
.
type
,
shape
=
trainer_ids
.
shape
,
dtype
=
trainer_ids
.
dtype
)
trainer_out
=
self
.
prefetch_output_vars
[
pserver_index
]
pserver_out
=
pserver_program
.
global_block
().
create_var
(
name
=
trainer_out
.
name
,
type
=
trainer_out
.
type
,
shape
=
trainer_out
.
shape
,
dtype
=
trainer_out
.
dtype
)
prefetch_block
.
append_op
(
type
=
LOOKUP_TABLE_TYPE
,
inputs
=
{
'Ids'
:
pserver_ids
,
"W"
:
table_var
},
outputs
=
{
"Out"
:
pserver_out
},
attrs
=
{
"is_sparse"
:
True
,
# has no effect on lookup_table op
"is_distributed"
:
True
,
"padding_idx"
:
-
1
})
return
prefetch_block
def
_create_table_optimize_block
(
self
,
pserver_index
,
pserver_program
,
append_block
):
def
_clone_var
(
block
,
var
,
persistable
=
True
):
assert
isinstance
(
var
,
Variable
)
return
block
.
create_var
(
name
=
var
.
name
,
shape
=
var
.
shape
,
dtype
=
var
.
dtype
,
type
=
var
.
type
,
persistable
=
persistable
)
# STEP: create table optimize block
# create table param and grad var in pserver program
param_var
=
_clone_var
(
pserver_program
.
global_block
(),
self
.
origin_program
.
global_block
().
vars
[
self
.
table_name
])
grad_var
=
_clone_var
(
pserver_program
.
global_block
(),
self
.
origin_program
.
global_block
().
vars
[
framework
.
grad_var_name
(
self
.
table_name
)],
persistable
=
False
)
# create grad vars in pserver program
table_grad_var
=
self
.
table_param_grad
[
1
]
table_grad_list
=
[
pserver_program
.
global_block
().
create_var
(
name
=
"%s.trainer_%d.pserver_%d"
%
(
table_grad_var
.
name
,
index
,
pserver_index
),
type
=
table_grad_var
.
type
,
shape
=
table_grad_var
.
shape
,
dtype
=
table_grad_var
.
dtype
)
for
index
in
range
(
self
.
trainer_num
)
]
# create table optimize block in pserver program
table_opt_op
=
[
op
for
op
in
self
.
optimize_ops
if
op
.
input
(
"Param"
)[
0
]
==
self
.
table_name
][
0
]
table_opt_block
=
pserver_program
.
create_block
(
append_block
.
idx
)
# only support sgd now
assert
table_opt_op
.
type
==
"sgd"
# append sum op for table_grad_list
table_opt_block
.
append_op
(
type
=
"sum"
,
inputs
=
{
"X"
:
table_grad_list
},
outputs
=
{
"Out"
:
[
grad_var
]})
lr_var
=
pserver_program
.
global_block
().
vars
[
table_opt_op
.
input
(
"LearningRate"
)[
0
]]
inputs
=
{
"Param"
:
[
param_var
],
"Grad"
:
[
grad_var
],
"LearningRate"
:
[
lr_var
]
}
outputs
=
{
"ParamOut"
:
[
param_var
]}
table_opt_block
.
append_op
(
type
=
table_opt_op
.
type
,
inputs
=
inputs
,
outputs
=
outputs
,
attrs
=
table_opt_op
.
attrs
)
# ====================== private transpiler functions =====================
def
_create_vars_from_blocklist
(
self
,
program
,
...
...
@@ -512,7 +778,17 @@ class DistributeTranspiler:
program
.
global_block
().
sync_with_cpp
()
return
var_mapping
def
_clone_var
(
self
,
block
,
var
):
def
create_splited_vars
(
self
,
source_var
,
block
,
tag
):
return
[
block
.
create_var
(
name
=
str
(
source_var
.
name
+
tag
+
str
(
index
)),
type
=
source_var
.
type
,
shape
=
source_var
.
shape
,
dtype
=
source_var
.
dtype
)
for
index
in
range
(
len
(
self
.
pserver_endpoints
))
]
def
_clone_var
(
self
,
block
,
var
,
persistable
=
True
):
assert
isinstance
(
var
,
Variable
)
return
block
.
create_var
(
name
=
var
.
name
,
...
...
@@ -520,12 +796,12 @@ class DistributeTranspiler:
dtype
=
var
.
dtype
,
type
=
var
.
type
,
lod_level
=
var
.
lod_level
,
persistable
=
Tru
e
)
persistable
=
persistabl
e
)
def
_append_split_op
(
self
,
program
,
gradblocks
):
# Split variables that need to be split and append respective ops
add_suffix
=
False
if
self
.
trainer
s
>
1
:
if
self
.
trainer
_num
>
1
:
add_suffix
=
True
var_mapping
=
self
.
_create_vars_from_blocklist
(
program
,
gradblocks
,
add_trainer_suffix
=
add_suffix
)
...
...
@@ -616,9 +892,9 @@ class DistributeTranspiler:
return
merged_var
=
\
pserver_block
.
vars
[
self
.
_orig_varname
(
grad_block
.
name
)]
if
self
.
trainer
s
>
1
:
if
self
.
trainer
_num
>
1
:
vars2merge
=
[]
for
i
in
xrange
(
self
.
trainer
s
):
for
i
in
xrange
(
self
.
trainer
_num
):
per_trainer_name
=
"%s.trainer_%d"
%
\
(
self
.
_orig_varname
(
grad_block
.
name
),
i
)
vars2merge
.
append
(
pserver_block
.
vars
[
per_trainer_name
])
...
...
@@ -633,7 +909,7 @@ class DistributeTranspiler:
type
=
"scale"
,
inputs
=
{
"X"
:
merged_var
},
outputs
=
{
"Out"
:
merged_var
},
attrs
=
{
"scale"
:
1.0
/
float
(
self
.
trainer
s
)})
attrs
=
{
"scale"
:
1.0
/
float
(
self
.
trainer
_num
)})
new_inputs
[
key
]
=
merged_var
elif
key
==
"Param"
:
# param is already created on global program
...
...
@@ -669,7 +945,7 @@ class DistributeTranspiler:
new_shape
=
None
if
key
in
[
"Param"
,
"Grad"
,
"LearningRate"
]:
continue
var
=
self
.
program
.
global_block
().
vars
[
opt_op
.
input
(
key
)[
0
]]
var
=
self
.
origin_
program
.
global_block
().
vars
[
opt_op
.
input
(
key
)[
0
]]
# update accumulator variable shape
param_shape
=
new_inputs
[
"Param"
].
shape
new_shape
=
self
.
_get_optimizer_input_shape
(
opt_op
.
type
,
key
,
...
...
@@ -682,8 +958,8 @@ class DistributeTranspiler:
new_inputs
[
key
]
=
tmpvar
# change output's ParamOut variable
outputs
=
self
.
_get_output_map_from_op
(
self
.
program
.
global_block
().
vars
,
opt_op
)
outputs
=
self
.
_get_output_map_from_op
(
self
.
origin_program
.
global_block
().
vars
,
opt_op
)
outputs
[
"ParamOut"
]
=
new_inputs
[
"Param"
]
optimize_block
.
append_op
(
...
...
@@ -695,8 +971,8 @@ class DistributeTranspiler:
def
_append_pserver_non_opt_ops
(
self
,
optimize_block
,
opt_op
):
program
=
optimize_block
.
program
# Append the ops for parameters that do not need to be optimized/updated
inputs
=
self
.
_get_input_map_from_op
(
self
.
program
.
global_block
().
vars
,
opt_op
)
inputs
=
self
.
_get_input_map_from_op
(
self
.
origin_program
.
global_block
().
vars
,
opt_op
)
for
varlist
in
inputs
.
itervalues
():
if
not
isinstance
(
varlist
,
list
):
varlist
=
[
varlist
]
...
...
@@ -709,8 +985,8 @@ class DistributeTranspiler:
dtype
=
var
.
dtype
,
shape
=
var
.
shape
)
outputs
=
self
.
_get_output_map_from_op
(
self
.
program
.
global_block
().
vars
,
opt_op
)
outputs
=
self
.
_get_output_map_from_op
(
self
.
origin_program
.
global_block
().
vars
,
opt_op
)
for
varlist
in
outputs
.
itervalues
():
if
not
isinstance
(
varlist
,
list
):
...
...
@@ -783,7 +1059,6 @@ class DistributeTranspiler:
if
same_or_split_var
(
n
,
param
)
and
n
!=
param
:
return
True
return
False
return
False
def
_get_input_map_from_op
(
self
,
varmap
,
op
):
"""Returns a dict from op input name to the vars in varmap."""
...
...
@@ -821,7 +1096,7 @@ class DistributeTranspiler:
find_ops
=
[]
# find ops which output is lr var
block
=
self
.
program
.
global_block
()
block
=
self
.
origin_
program
.
global_block
()
for
op
in
block
.
ops
:
if
set
(
op
.
output_arg_names
)
&
lr_vars
:
find_ops
.
append
(
op
)
...
...
python/paddle/fluid/layers/nn.py
浏览文件 @
0c321fe3
...
...
@@ -218,6 +218,7 @@ def fc(input,
def
embedding
(
input
,
size
,
is_sparse
=
False
,
is_distributed
=
False
,
padding_idx
=
None
,
param_attr
=
None
,
dtype
=
'float32'
):
...
...
@@ -268,8 +269,11 @@ def embedding(input,
inputs
=
{
'Ids'
:
input
,
'W'
:
w
},
outputs
=
{
'Out'
:
tmp
},
attrs
=
{
'is_sparse'
:
is_sparse
,
'padding_idx'
:
padding_idx
})
attrs
=
{
'is_sparse'
:
is_sparse
,
'is_distributed'
:
is_distributed
,
'padding_idx'
:
padding_idx
})
return
tmp
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录