Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
PaddlePaddle
Paddle
提交
330aea6e
P
Paddle
项目概览
PaddlePaddle
/
Paddle
大约 1 年 前同步成功
通知
2299
Star
20931
Fork
5422
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1423
列表
看板
标记
里程碑
合并请求
543
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1,423
Issue
1,423
列表
看板
标记
里程碑
合并请求
543
合并请求
543
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
330aea6e
编写于
1月 12, 2021
作者:
C
Chengmo
提交者:
GitHub
1月 12, 2021
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
cherry pick tensor table (#30221)
上级
b207b8a7
变更
43
隐藏空白更改
内联
并排
Showing
43 changed file
with
1158 addition
and
228 deletion
+1158
-228
paddle/fluid/distributed/communicator_common.h
paddle/fluid/distributed/communicator_common.h
+7
-2
paddle/fluid/distributed/fleet.cc
paddle/fluid/distributed/fleet.cc
+5
-4
paddle/fluid/distributed/fleet.h
paddle/fluid/distributed/fleet.h
+4
-2
paddle/fluid/distributed/ps.proto
paddle/fluid/distributed/ps.proto
+5
-6
paddle/fluid/distributed/service/brpc_ps_client.cc
paddle/fluid/distributed/service/brpc_ps_client.cc
+28
-0
paddle/fluid/distributed/service/brpc_ps_client.h
paddle/fluid/distributed/service/brpc_ps_client.h
+3
-1
paddle/fluid/distributed/service/brpc_ps_server.cc
paddle/fluid/distributed/service/brpc_ps_server.cc
+22
-0
paddle/fluid/distributed/service/brpc_ps_server.h
paddle/fluid/distributed/service/brpc_ps_server.h
+3
-0
paddle/fluid/distributed/service/communicator.cc
paddle/fluid/distributed/service/communicator.cc
+53
-3
paddle/fluid/distributed/service/communicator.h
paddle/fluid/distributed/service/communicator.h
+3
-4
paddle/fluid/distributed/service/ps_client.h
paddle/fluid/distributed/service/ps_client.h
+3
-0
paddle/fluid/distributed/service/sendrecv.proto
paddle/fluid/distributed/service/sendrecv.proto
+1
-0
paddle/fluid/distributed/service/server.cc
paddle/fluid/distributed/service/server.cc
+14
-2
paddle/fluid/distributed/service/server.h
paddle/fluid/distributed/service/server.h
+21
-2
paddle/fluid/distributed/service/service.cc
paddle/fluid/distributed/service/service.cc
+5
-4
paddle/fluid/distributed/service/service.h
paddle/fluid/distributed/service/service.h
+4
-3
paddle/fluid/distributed/table/CMakeLists.txt
paddle/fluid/distributed/table/CMakeLists.txt
+3
-2
paddle/fluid/distributed/table/common_dense_table.cc
paddle/fluid/distributed/table/common_dense_table.cc
+9
-0
paddle/fluid/distributed/table/common_dense_table.h
paddle/fluid/distributed/table/common_dense_table.h
+1
-0
paddle/fluid/distributed/table/common_sparse_table.cc
paddle/fluid/distributed/table/common_sparse_table.cc
+10
-0
paddle/fluid/distributed/table/common_sparse_table.h
paddle/fluid/distributed/table/common_sparse_table.h
+2
-0
paddle/fluid/distributed/table/depends/dense.h
paddle/fluid/distributed/table/depends/dense.h
+9
-2
paddle/fluid/distributed/table/depends/sparse.h
paddle/fluid/distributed/table/depends/sparse.h
+10
-4
paddle/fluid/distributed/table/table.cc
paddle/fluid/distributed/table/table.cc
+4
-1
paddle/fluid/distributed/table/table.h
paddle/fluid/distributed/table/table.h
+21
-1
paddle/fluid/distributed/table/tensor_table.cc
paddle/fluid/distributed/table/tensor_table.cc
+92
-53
paddle/fluid/distributed/table/tensor_table.h
paddle/fluid/distributed/table/tensor_table.h
+119
-98
paddle/fluid/distributed/test/brpc_service_dense_sgd_test.cc
paddle/fluid/distributed/test/brpc_service_dense_sgd_test.cc
+5
-2
paddle/fluid/distributed/test/brpc_service_sparse_sgd_test.cc
...le/fluid/distributed/test/brpc_service_sparse_sgd_test.cc
+8
-5
paddle/fluid/framework/distributed_strategy.proto
paddle/fluid/framework/distributed_strategy.proto
+1
-0
paddle/fluid/operators/pscore/send_op.cc
paddle/fluid/operators/pscore/send_op.cc
+3
-2
paddle/fluid/pybind/fleet_py.cc
paddle/fluid/pybind/fleet_py.cc
+5
-3
python/paddle/distributed/fleet/meta_optimizers/parameter_server_optimizer.py
...buted/fleet/meta_optimizers/parameter_server_optimizer.py
+11
-0
python/paddle/distributed/fleet/runtime/the_one_ps.py
python/paddle/distributed/fleet/runtime/the_one_ps.py
+98
-6
python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py
...paddle/fluid/incubate/fleet/parameter_server/ir/public.py
+127
-11
python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py
.../fluid/incubate/fleet/parameter_server/ir/trainer_pass.py
+15
-2
python/paddle/fluid/tests/unittests/ctr_dataset_reader.py
python/paddle/fluid/tests/unittests/ctr_dataset_reader.py
+1
-1
python/paddle/fluid/tests/unittests/test_dist_fleet_base.py
python/paddle/fluid/tests/unittests/test_dist_fleet_base.py
+14
-2
python/paddle/fluid/tests/unittests/test_dist_fleet_decay.py
python/paddle/fluid/tests/unittests/test_dist_fleet_decay.py
+80
-0
python/paddle/fluid/tests/unittests/test_dist_fleet_ps10.py
python/paddle/fluid/tests/unittests/test_dist_fleet_ps10.py
+83
-0
python/paddle/fluid/tests/unittests/test_dist_fleet_ps7.py
python/paddle/fluid/tests/unittests/test_dist_fleet_ps7.py
+82
-0
python/paddle/fluid/tests/unittests/test_dist_fleet_ps8.py
python/paddle/fluid/tests/unittests/test_dist_fleet_ps8.py
+82
-0
python/paddle/fluid/tests/unittests/test_dist_fleet_ps9.py
python/paddle/fluid/tests/unittests/test_dist_fleet_ps9.py
+82
-0
未找到文件。
paddle/fluid/distributed/communicator_common.h
浏览文件 @
330aea6e
...
...
@@ -30,7 +30,8 @@ struct CommContext {
const
std
::
vector
<
int64_t
>
&
sections
,
const
std
::
vector
<
std
::
string
>
&
origin_names
,
int
id
,
bool
merge_add_
=
true
,
bool
is_sparse_
=
true
,
bool
is_distributed_
=
false
,
int
table_id_
=
-
1
)
bool
is_distributed_
=
false
,
int
table_id_
=
-
1
,
bool
is_tensor_table_
=
false
)
:
var_name
(
name
),
splited_varnames
(
names
),
epmap
(
emap
),
...
...
@@ -40,7 +41,8 @@ struct CommContext {
merge_add
(
merge_add_
),
is_sparse
(
is_sparse_
),
is_distributed
(
is_distributed_
),
table_id
(
table_id_
)
{}
table_id
(
table_id_
),
is_tensor_table
(
is_tensor_table_
)
{}
CommContext
(
const
CommContext
&
ctx
)
{
var_name
=
ctx
.
var_name
;
...
...
@@ -53,6 +55,7 @@ struct CommContext {
origin_varnames
=
ctx
.
origin_varnames
;
is_distributed
=
ctx
.
is_distributed
;
table_id
=
ctx
.
table_id
;
is_tensor_table
=
ctx
.
is_tensor_table
;
}
std
::
string
print
()
const
{
...
...
@@ -75,6 +78,7 @@ struct CommContext {
ss
<<
" is_sparse: "
<<
is_sparse
;
ss
<<
" is_distributed: "
<<
is_distributed
<<
"
\n
"
;
ss
<<
" table_id: "
<<
table_id
<<
"
\n
"
;
ss
<<
" is_tensor_table: "
<<
is_tensor_table
<<
"
\n
"
;
return
ss
.
str
();
}
...
...
@@ -89,6 +93,7 @@ struct CommContext {
bool
is_sparse
;
bool
is_distributed
;
int
table_id
;
bool
is_tensor_table
;
};
}
// namespace distributed
...
...
paddle/fluid/distributed/fleet.cc
浏览文件 @
330aea6e
...
...
@@ -53,15 +53,16 @@ void FleetWrapper::LoadSparseOnServer(const std::string& path,
pserver_ptr_
->
_server_ptr
->
table
(
table_id
)
->
load
(
path
,
meta
);
}
void
FleetWrapper
::
InitServer
(
const
std
::
string
&
dist_desc
,
const
std
::
vector
<
std
::
string
>&
host_sign_list
,
int
index
)
{
void
FleetWrapper
::
InitServer
(
const
std
::
string
&
dist_desc
,
const
std
::
vector
<
std
::
string
>&
host_sign_list
,
int
index
,
const
std
::
vector
<
framework
::
ProgramDesc
>&
server_sub_program
)
{
if
(
!
is_initialized_
)
{
VLOG
(
3
)
<<
"Going to init server"
;
pserver_ptr_
=
std
::
shared_ptr
<
paddle
::
distributed
::
PSCore
>
(
new
paddle
::
distributed
::
PSCore
());
pserver_ptr_
->
init_server
(
dist_desc
,
&
host_sign_list
,
host_sign_list
.
size
(),
index
);
index
,
server_sub_program
);
is_initialized_
=
true
;
}
else
{
VLOG
(
3
)
<<
"Server can be initialized only once"
;
...
...
paddle/fluid/distributed/fleet.h
浏览文件 @
330aea6e
...
...
@@ -154,8 +154,10 @@ class FleetWrapper {
// init server
// void InitServer(const std::string& dist_desc,
// const std::vector<uint64_t>& host_sign_list, int index);
void
InitServer
(
const
std
::
string
&
dist_desc
,
const
std
::
vector
<
std
::
string
>&
host_sign_list
,
int
index
);
void
InitServer
(
const
std
::
string
&
dist_desc
,
const
std
::
vector
<
std
::
string
>&
host_sign_list
,
int
index
,
const
std
::
vector
<
framework
::
ProgramDesc
>&
server_sub_program
=
{});
// init trainer
void
InitWorker
(
const
std
::
string
&
dist_desc
,
const
std
::
vector
<
std
::
string
>&
host_sign_list
,
Scope
*
scope
,
...
...
paddle/fluid/distributed/ps.proto
浏览文件 @
330aea6e
...
...
@@ -126,12 +126,11 @@ message TableAccessorParameter {
}
message
TensorAccessorParameter
{
optional
string
tensor_class
=
1
;
optional
uint32
fea_dim
=
2
;
optional
uint32
emb_dim
=
3
;
optional
string
param
=
4
;
optional
string
grad
=
5
;
optional
string
common_block_map
=
6
;
optional
string
feed_var_name
=
1
;
optional
string
fetch_var_name
=
2
;
optional
int64
startup_program_id
=
3
;
optional
int64
main_program_id
=
4
;
optional
string
tensor_table_class
=
6
;
}
message
CommonAccessorParameter
{
...
...
paddle/fluid/distributed/service/brpc_ps_client.cc
浏览文件 @
330aea6e
...
...
@@ -719,6 +719,34 @@ std::future<int32_t> BrpcPsClient::push_dense_raw_gradient(
return
fut
;
}
std
::
future
<
int32_t
>
BrpcPsClient
::
push_global_step
(
int
table_id
,
int64_t
*
total_send_data
,
void
*
done
)
{
size_t
request_call_num
=
_server_channels
.
size
();
DownpourBrpcClosure
*
closure
=
reinterpret_cast
<
DownpourBrpcClosure
*>
(
done
);
auto
promise
=
std
::
make_shared
<
std
::
promise
<
int32_t
>>
();
closure
->
add_promise
(
promise
);
std
::
future
<
int
>
fut
=
promise
->
get_future
();
for
(
size_t
i
=
0
;
i
<
request_call_num
;
++
i
)
{
closure
->
request
(
i
)
->
set_cmd_id
(
PS_PUSH_GLOBAL_STEP
);
closure
->
request
(
i
)
->
set_table_id
(
table_id
);
closure
->
request
(
i
)
->
set_client_id
(
_client_id
);
auto
*
push_data
=
closure
->
request
(
i
)
->
mutable_data
();
push_data
->
clear
();
int32_t
num_per_shard
=
1
;
push_data
->
resize
(
sizeof
(
uint32_t
)
+
num_per_shard
*
sizeof
(
int64_t
));
char
*
push_data_ptr
=
const_cast
<
char
*>
(
push_data
->
data
());
memcpy
(
push_data_ptr
,
&
num_per_shard
,
sizeof
(
uint32_t
));
memcpy
(
push_data_ptr
+
sizeof
(
uint32_t
),
total_send_data
,
num_per_shard
*
sizeof
(
int64_t
));
PsService_Stub
rpc_stub
(
get_dense_channel
(
i
));
rpc_stub
.
service
(
closure
->
cntl
(
i
),
closure
->
request
(
i
),
closure
->
response
(
i
),
closure
);
}
return
fut
;
}
std
::
future
<
int32_t
>
BrpcPsClient
::
pull_sparse
(
float
**
select_values
,
size_t
table_id
,
const
uint64_t
*
keys
,
...
...
paddle/fluid/distributed/service/brpc_ps_client.h
浏览文件 @
330aea6e
...
...
@@ -140,7 +140,9 @@ class BrpcPsClient : public PSClient {
std
::
vector
<
float
>
*
values
,
std
::
vector
<
uint64_t
>
*
keys
,
int
pserver_idx
);
virtual
std
::
future
<
int32_t
>
push_global_step
(
int
table_id
,
int64_t
*
total_send_data
,
void
*
done
);
virtual
std
::
future
<
int32_t
>
flush
();
virtual
std
::
future
<
int32_t
>
send_client2client_msg
(
...
...
paddle/fluid/distributed/service/brpc_ps_server.cc
浏览文件 @
330aea6e
...
...
@@ -100,6 +100,7 @@ int32_t PsService::initialize() {
_service_handler_map
[
PS_BARRIER
]
=
&
PsService
::
barrier
;
_service_handler_map
[
PS_START_PROFILER
]
=
&
PsService
::
start_profiler
;
_service_handler_map
[
PS_STOP_PROFILER
]
=
&
PsService
::
stop_profiler
;
_service_handler_map
[
PS_PUSH_GLOBAL_STEP
]
=
&
PsService
::
push_global_step
;
// shard初始化,server启动后才可从env获取到server_list的shard信息
initialize_shard_info
();
...
...
@@ -526,5 +527,26 @@ int32_t PsService::start_profiler(Table *table, const PsRequestMessage &request,
return
0
;
}
int32_t
PsService
::
push_global_step
(
Table
*
table
,
const
PsRequestMessage
&
request
,
PsResponseMessage
&
response
,
brpc
::
Controller
*
cntl
)
{
CHECK_TABLE_EXIST
(
table
,
request
,
response
);
auto
req_buffer_size
=
request
.
data
().
size
();
if
(
req_buffer_size
<
1
)
{
set_response_code
(
response
,
0
,
"run_program data is empty"
);
return
0
;
}
uint32_t
num
=
*
(
const
uint32_t
*
)(
request
.
data
().
data
());
const
int64_t
*
values
=
(
const
int64_t
*
)(
request
.
data
().
data
()
+
sizeof
(
uint32_t
));
auto
trainer_id
=
request
.
client_id
();
if
(
table
->
push_dense
(
values
,
trainer_id
)
!=
0
)
{
set_response_code
(
response
,
-
1
,
"run_program failed"
);
}
return
0
;
}
}
// namespace distributed
}
// namespace paddle
paddle/fluid/distributed/service/brpc_ps_server.h
浏览文件 @
330aea6e
...
...
@@ -110,6 +110,9 @@ class PsService : public PsBaseService {
int32_t
print_table_stat
(
Table
*
table
,
const
PsRequestMessage
&
request
,
PsResponseMessage
&
response
,
brpc
::
Controller
*
cntl
);
int32_t
push_global_step
(
Table
*
table
,
const
PsRequestMessage
&
request
,
PsResponseMessage
&
response
,
brpc
::
Controller
*
cntl
);
bool
_is_initialize_shard_info
;
std
::
mutex
_initialize_shard_mutex
;
std
::
unordered_map
<
int32_t
,
serviceHandlerFunc
>
_service_handler_map
;
...
...
paddle/fluid/distributed/service/communicator.cc
浏览文件 @
330aea6e
...
...
@@ -34,6 +34,9 @@ limitations under the License. */
#include "paddle/fluid/string/printf.h"
#include "paddle/fluid/string/split.h"
#define LEARNING_RATE_DECAY_COUNTER "@LR_DECAY_COUNTER@"
#define STEP_COUNTER "@PS_STEP_COUNTER@"
namespace
paddle
{
namespace
distributed
{
...
...
@@ -377,6 +380,37 @@ void Communicator::RpcProfilerControl() {
}
}
void
Communicator
::
SendGlobalStep
(
const
CommContext
&
ctx
,
int
batches
,
Scope
*
send_scope
)
{
if
(
batches
==
0
)
{
return
;
}
auto
&
table_id
=
ctx
.
table_id
;
size_t
request_call_num
=
_worker_ptr
->
get_server_nums
();
auto
&
var_name
=
STEP_COUNTER
;
auto
*
out_var
=
send_scope
->
Var
(
var_name
);
auto
*
out_t
=
out_var
->
GetMutable
<
framework
::
LoDTensor
>
();
auto
*
data
=
out_t
->
mutable_data
<
int64_t
>
({
1
},
platform
::
CPUPlace
());
data
[
0
]
=
static_cast
<
int64_t
>
(
batches
);
VLOG
(
3
)
<<
"Communicator::SendGlobalStep send: "
<<
batches
;
DownpourBrpcClosure
*
closure
=
new
DownpourBrpcClosure
(
request_call_num
,
[
this
,
request_call_num
](
void
*
done
)
{
int
ret
=
0
;
auto
*
closure
=
(
DownpourBrpcClosure
*
)
done
;
for
(
size_t
i
=
0
;
i
<
request_call_num
;
++
i
)
{
if
(
closure
->
check_response
(
i
,
PS_PUSH_GLOBAL_STEP
)
!=
0
)
{
ret
=
-
1
;
break
;
}
}
closure
->
set_promise_value
(
ret
);
});
auto
status
=
_worker_ptr
->
push_global_step
(
table_id
,
data
,
closure
);
status
.
wait
();
return
;
}
void
AsyncCommunicator
::
RecvThread
()
{
if
(
!
independent_recv_
)
return
;
VLOG
(
3
)
<<
"Independent RecvThread Start and Wait"
;
...
...
@@ -465,10 +499,16 @@ void AsyncCommunicator::SendByCommunicator() {
for
(
size_t
i
=
0
;
i
<
var_nums
;
i
++
)
{
auto
&
var_name
=
varnames
[
i
];
MergeVars
<
float
>
(
var_name
,
vars
[
i
],
send_scope_
.
get
(),
1
);
if
(
var_name
==
STEP_COUNTER
)
{
MergeVars
<
int64_t
>
(
var_name
,
vars
[
i
],
send_scope_
.
get
(),
1
);
}
else
{
MergeVars
<
float
>
(
var_name
,
vars
[
i
],
send_scope_
.
get
(),
1
);
}
}
if
(
ctx
.
is_sparse
)
{
if
(
ctx
.
is_tensor_table
)
{
SendGlobalStep
(
ctx
,
merged_var_num
,
send_scope_
.
get
());
}
else
if
(
ctx
.
is_sparse
)
{
PADDLE_ENFORCE_EQ
(
varnames
.
size
(),
1
,
platform
::
errors
::
InvalidArgument
(
...
...
@@ -599,8 +639,18 @@ bool AsyncCommunicator::Check(const std::vector<std::string> &var_tables) {
platform
::
errors
::
InvalidArgument
(
"var_tables.size() == 1 is permitted"
));
auto
table_name
=
var_tables
[
0
];
if
(
send_varname_to_ctx_
.
find
(
table_name
)
==
send_varname_to_ctx_
.
end
())
if
(
send_varname_to_ctx_
.
find
(
table_name
)
==
send_varname_to_ctx_
.
end
())
{
return
false
;
}
if
(
table_name
==
STEP_COUNTER
)
{
VLOG
(
3
)
<<
"send step_counter into queue"
;
auto
tmp_var
=
std
::
make_shared
<
Variable
>
();
auto
*
tensor
=
tmp_var
->
GetMutable
<
framework
::
LoDTensor
>
();
tensor
->
Resize
(
framework
::
make_ddim
({
1
}));
auto
*
out_d
=
tensor
->
mutable_data
<
int64_t
>
(
platform
::
CPUPlace
());
out_d
[
0
]
=
1
;
send_varname_to_queue_
[
table_name
]
->
Push
(
tmp_var
);
}
return
true
;
}
...
...
paddle/fluid/distributed/service/communicator.h
浏览文件 @
330aea6e
...
...
@@ -223,6 +223,9 @@ class Communicator {
// 6. recv sparse param
virtual
void
RpcRecvSparse
(
const
std
::
string
&
varname
,
int
table_id
,
Scope
*
scope
);
// 7. send gloabl step
virtual
void
SendGlobalStep
(
const
CommContext
&
ctx
,
int
batches
,
Scope
*
send_scope
);
virtual
~
Communicator
()
{}
virtual
void
RpcProfilerControl
();
...
...
@@ -376,8 +379,6 @@ class AsyncCommunicator : public Communicator {
virtual
void
SendByCommunicator
();
virtual
void
SendGlobalStep
(
int
batches
)
{}
virtual
void
RecvByCommunicator
();
virtual
void
RecvNoBarrier
();
...
...
@@ -527,8 +528,6 @@ class GeoCommunicator : public AsyncCommunicator {
void
SendByCommunicator
()
{
return
;
}
void
SendGlobalStep
(
int
batches
)
override
{
return
;
}
void
RecvByCommunicator
()
override
{
return
;
}
inline
std
::
string
GradToParam
(
const
std
::
string
var_name
)
{
...
...
paddle/fluid/distributed/service/ps_client.h
浏览文件 @
330aea6e
...
...
@@ -131,6 +131,9 @@ class PSClient {
std
::
vector
<
uint64_t
>
*
keys
,
int
pserver_idx
)
=
0
;
virtual
std
::
future
<
int32_t
>
push_global_step
(
int
table_id
,
int64_t
*
total_send_data
,
void
*
done
)
=
0
;
virtual
void
finalize_worker
()
=
0
;
// client to client, 消息发送
virtual
std
::
future
<
int32_t
>
send_client2client_msg
(
int
msg_type
,
...
...
paddle/fluid/distributed/service/sendrecv.proto
浏览文件 @
330aea6e
...
...
@@ -47,6 +47,7 @@ enum PsCmdID {
PS_PUSH_SPARSE_PARAM
=
26
;
PS_START_PROFILER
=
27
;
PS_STOP_PROFILER
=
28
;
PS_PUSH_GLOBAL_STEP
=
29
;
}
message
PsRequestMessage
{
...
...
paddle/fluid/distributed/service/server.cc
浏览文件 @
330aea6e
...
...
@@ -53,8 +53,10 @@ PSServer *PSServerFactory::create(const PSParameter &ps_config) {
return
server
;
}
int32_t
PSServer
::
configure
(
const
PSParameter
&
config
,
PSEnvironment
&
env
,
size_t
server_rank
)
{
int32_t
PSServer
::
configure
(
const
PSParameter
&
config
,
PSEnvironment
&
env
,
size_t
server_rank
,
const
std
::
vector
<
framework
::
ProgramDesc
>
&
server_sub_program
)
{
scope_
.
reset
(
new
framework
::
Scope
());
_config
=
config
.
server_param
();
_rank
=
server_rank
;
_environment
=
&
env
;
...
...
@@ -65,6 +67,7 @@ int32_t PSServer::configure(const PSParameter &config, PSEnvironment &env,
const
auto
&
downpour_param
=
_config
.
downpour_server_param
();
uint32_t
barrier_table
=
UINT32_MAX
;
uint32_t
global_step_table
=
UINT32_MAX
;
for
(
size_t
i
=
0
;
i
<
downpour_param
.
downpour_table_param_size
();
++
i
)
{
auto
*
table
=
CREATE_CLASS
(
...
...
@@ -74,6 +77,12 @@ int32_t PSServer::configure(const PSParameter &config, PSEnvironment &env,
"BarrierTable"
)
{
barrier_table
=
downpour_param
.
downpour_table_param
(
i
).
table_id
();
}
if
(
downpour_param
.
downpour_table_param
(
i
).
table_class
()
==
"GlobalStepTable"
)
{
global_step_table
=
downpour_param
.
downpour_table_param
(
i
).
table_id
();
}
table
->
set_program_env
(
scope_
.
get
(),
place_
,
&
server_sub_program
);
table
->
set_shard
(
_rank
,
shard_num
);
table
->
initialize
(
downpour_param
.
downpour_table_param
(
i
),
config
.
fs_client_param
());
...
...
@@ -83,6 +92,9 @@ int32_t PSServer::configure(const PSParameter &config, PSEnvironment &env,
if
(
barrier_table
!=
UINT32_MAX
)
{
_table_map
[
barrier_table
]
->
set_table_map
(
&
_table_map
);
}
if
(
global_step_table
!=
UINT32_MAX
)
{
_table_map
[
global_step_table
]
->
set_table_map
(
&
_table_map
);
}
return
initialize
();
}
...
...
paddle/fluid/distributed/service/server.h
浏览文件 @
330aea6e
...
...
@@ -27,6 +27,20 @@
#include "paddle/fluid/distributed/service/env.h"
#include "paddle/fluid/distributed/service/sendrecv.pb.h"
#include "paddle/fluid/framework/channel.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/place.h"
namespace
paddle
{
namespace
framework
{
class
Executor
;
class
ProgramDesc
;
class
Scope
;
}
// namespace framework
namespace
platform
{
class
DeviceContext
;
}
// namespace platform
}
// namespace paddle
namespace
paddle
{
namespace
distributed
{
...
...
@@ -40,8 +54,9 @@ class PSServer {
PSServer
(
PSServer
&&
)
=
delete
;
PSServer
(
const
PSServer
&
)
=
delete
;
virtual
int32_t
configure
(
const
PSParameter
&
config
,
PSEnvironment
&
env
,
size_t
server_rank
)
final
;
virtual
int32_t
configure
(
const
PSParameter
&
config
,
PSEnvironment
&
env
,
size_t
server_rank
,
const
std
::
vector
<
framework
::
ProgramDesc
>
&
server_sub_program
=
{})
final
;
// return server_ip
virtual
std
::
string
ip
()
{
return
butil
::
my_ip_cstr
();
}
...
...
@@ -86,6 +101,10 @@ class PSServer {
PSEnvironment
*
_environment
;
std
::
unordered_map
<
uint32_t
,
std
::
shared_ptr
<
Table
>>
_table_map
;
std
::
unordered_map
<
int32_t
,
MsgHandlerFunc
>
_msg_handler_map
;
protected:
std
::
shared_ptr
<
framework
::
Scope
>
scope_
;
platform
::
Place
place_
=
platform
::
CPUPlace
();
};
REGISTER_REGISTERER
(
PSServer
);
...
...
paddle/fluid/distributed/service/service.cc
浏览文件 @
330aea6e
...
...
@@ -66,9 +66,10 @@ void PSCore::init_gflag(const std::string& gflags) {
::
google
::
ParseCommandLineFlags
(
&
params_cnt
,
&
params_ptr
,
true
);
}
int
PSCore
::
init_server
(
const
std
::
string
&
dist_desc
,
const
std
::
vector
<
std
::
string
>*
host_sign_list
,
int
node_num
,
int
index
)
{
int
PSCore
::
init_server
(
const
std
::
string
&
dist_desc
,
const
std
::
vector
<
std
::
string
>*
host_sign_list
,
int
node_num
,
int
index
,
const
std
::
vector
<
framework
::
ProgramDesc
>&
server_sub_program
)
{
google
::
protobuf
::
TextFormat
::
ParseFromString
(
dist_desc
,
&
_ps_param
);
init_gflag
(
_ps_param
.
init_gflags
());
_ps_env
=
paddle
::
distributed
::
PaddlePSEnvironment
();
...
...
@@ -76,7 +77,7 @@ int PSCore::init_server(const std::string& dist_desc,
int
ret
=
0
;
_server_ptr
=
std
::
shared_ptr
<
paddle
::
distributed
::
PSServer
>
(
paddle
::
distributed
::
PSServerFactory
::
create
(
_ps_param
));
ret
=
_server_ptr
->
configure
(
_ps_param
,
_ps_env
,
index
);
ret
=
_server_ptr
->
configure
(
_ps_param
,
_ps_env
,
index
,
server_sub_program
);
CHECK
(
ret
==
0
)
<<
"failed to configure server"
;
return
ret
;
}
...
...
paddle/fluid/distributed/service/service.h
浏览文件 @
330aea6e
...
...
@@ -33,9 +33,10 @@ class PSCore {
explicit
PSCore
()
{}
virtual
~
PSCore
()
{}
virtual
int
init_server
(
const
std
::
string
&
dist_desc
,
const
std
::
vector
<
std
::
string
>*
host_sign_list
,
int
node_num
,
int
index
);
virtual
int
init_server
(
const
std
::
string
&
dist_desc
,
const
std
::
vector
<
std
::
string
>*
host_sign_list
,
int
node_num
,
int
index
,
const
std
::
vector
<
framework
::
ProgramDesc
>&
server_sub_program
=
{});
virtual
int
init_worker
(
const
std
::
string
&
dist_desc
,
const
std
::
map
<
uint64_t
,
std
::
vector
<
paddle
::
distributed
::
Region
>>&
...
...
paddle/fluid/distributed/table/CMakeLists.txt
浏览文件 @
330aea6e
...
...
@@ -11,8 +11,9 @@ cc_library(common_table SRCS common_sparse_table.cc common_dense_table.cc sparse
set_source_files_properties
(
tensor_accessor.cc PROPERTIES COMPILE_FLAGS
${
DISTRIBUTE_COMPILE_FLAGS
}
)
set_source_files_properties
(
tensor_table.cc PROPERTIES COMPILE_FLAGS
${
DISTRIBUTE_COMPILE_FLAGS
}
)
cc_library
(
tensor_accessor SRCS tensor_accessor.cc DEPS
${
TABLE_DEPS
}
eigen3 ps_framework_proto device_context
)
cc_library
(
tensor_accessor SRCS tensor_accessor.cc DEPS
${
TABLE_DEPS
}
eigen3 ps_framework_proto device_context
)
cc_library
(
tensor_table SRCS tensor_table.cc DEPS eigen3 ps_framework_proto executor scope device_context tensor
${
TABLE_DEPS
}
)
set_source_files_properties
(
table.cc PROPERTIES COMPILE_FLAGS
${
DISTRIBUTE_COMPILE_FLAGS
}
)
cc_library
(
table SRCS table.cc DEPS common_table tensor_accessor ps_framework_proto string_helper device_context gflags glog boost
)
cc_library
(
table SRCS table.cc DEPS common_table tensor_accessor
tensor_table
ps_framework_proto string_helper device_context gflags glog boost
)
paddle/fluid/distributed/table/common_dense_table.cc
浏览文件 @
330aea6e
...
...
@@ -42,6 +42,7 @@ int32_t CommonDenseTable::initialize() {
sync
=
_config
.
common
().
sync
();
VLOG
(
1
)
<<
"table "
<<
_config
.
common
().
table_name
()
<<
" is sync: "
<<
sync
;
_global_lr
=
new
float
(
1.0
);
initialize_value
();
initialize_optimizer
();
...
...
@@ -81,8 +82,10 @@ int32_t CommonDenseTable::initialize_optimizer() {
if
(
name
==
"sgd"
)
{
optimizer_
=
std
::
make_shared
<
DSGD
>
(
common
,
&
values_
);
optimizer_
->
set_global_lr
(
_global_lr
);
}
else
if
(
name
==
"adam"
)
{
optimizer_
=
std
::
make_shared
<
DAdam
>
(
common
,
&
values_
);
optimizer_
->
set_global_lr
(
_global_lr
);
}
else
if
(
name
==
"sum"
)
{
optimizer_
=
std
::
make_shared
<
DSUM
>
(
common
,
&
values_
);
}
else
{
...
...
@@ -92,6 +95,12 @@ int32_t CommonDenseTable::initialize_optimizer() {
return
0
;
}
int32_t
CommonDenseTable
::
set_global_lr
(
float
*
lr
)
{
_global_lr
=
lr
;
optimizer_
->
set_global_lr
(
_global_lr
);
return
0
;
}
int32_t
CommonDenseTable
::
pull_dense
(
float
*
pull_values
,
size_t
num
)
{
std
::
copy
(
values_
[
param_idx_
].
begin
(),
values_
[
param_idx_
].
end
(),
pull_values
);
...
...
paddle/fluid/distributed/table/common_dense_table.h
浏览文件 @
330aea6e
...
...
@@ -42,6 +42,7 @@ class CommonDenseTable : public DenseTable {
virtual
int32_t
push_dense_param
(
const
float
*
values
,
size_t
num
)
override
;
virtual
int32_t
push_dense
(
const
float
*
values
,
size_t
num
)
override
;
virtual
int32_t
pour
()
override
;
virtual
int32_t
set_global_lr
(
float
*
lr
)
override
;
int32_t
load
(
const
std
::
string
&
path
,
const
std
::
string
&
param
)
override
{
VLOG
(
0
)
<<
"Dense table may load by "
...
...
paddle/fluid/distributed/table/common_sparse_table.cc
浏览文件 @
330aea6e
...
...
@@ -175,6 +175,8 @@ int32_t CommonSparseTable::initialize() {
sync
=
_config
.
common
().
sync
();
VLOG
(
1
)
<<
"table "
<<
_config
.
common
().
table_name
()
<<
" is sync: "
<<
sync
;
_global_lr
=
new
float
(
1.0
);
auto
common
=
_config
.
common
();
int
size
=
static_cast
<
int
>
(
common
.
params
().
size
());
...
...
@@ -249,9 +251,11 @@ int32_t CommonSparseTable::initialize_optimizer() {
if
(
name
==
"sgd"
)
{
optimizer_
=
std
::
make_shared
<
SSGD
>
(
value_names_
,
value_dims_
,
value_offsets_
,
value_idx_
);
optimizer_
->
set_global_lr
(
_global_lr
);
}
else
if
(
name
==
"adam"
)
{
optimizer_
=
std
::
make_shared
<
SAdam
>
(
value_names_
,
value_dims_
,
value_offsets_
,
value_idx_
);
optimizer_
->
set_global_lr
(
_global_lr
);
}
else
if
(
name
==
"sum"
)
{
optimizer_
=
std
::
make_shared
<
SSUM
>
(
value_names_
,
value_dims_
,
value_offsets_
,
value_idx_
);
...
...
@@ -263,6 +267,12 @@ int32_t CommonSparseTable::initialize_optimizer() {
return
0
;
}
int32_t
CommonSparseTable
::
set_global_lr
(
float
*
lr
)
{
_global_lr
=
lr
;
optimizer_
->
set_global_lr
(
_global_lr
);
return
0
;
}
int32_t
CommonSparseTable
::
load
(
const
std
::
string
&
path
,
const
std
::
string
&
param
)
{
rwlock_
->
WRLock
();
...
...
paddle/fluid/distributed/table/common_sparse_table.h
浏览文件 @
330aea6e
...
...
@@ -69,6 +69,8 @@ class CommonSparseTable : public SparseTable {
virtual
int32_t
push_sparse_param
(
const
uint64_t
*
keys
,
const
float
*
values
,
size_t
num
);
virtual
int32_t
set_global_lr
(
float
*
lr
)
override
;
virtual
int32_t
pour
();
virtual
int32_t
flush
();
virtual
int32_t
shrink
();
...
...
paddle/fluid/distributed/table/depends/dense.h
浏览文件 @
330aea6e
...
...
@@ -36,6 +36,10 @@ class DenseOptimizer {
std
::
vector
<
std
::
vector
<
float
>>*
values
)
{}
virtual
void
update
(
const
float
*
update_values
,
size_t
num
,
int
begin
,
int
end
)
=
0
;
virtual
void
set_global_lr
(
float
*
lr
)
{
global_learning_rate_
=
lr
;
}
protected:
float
*
global_learning_rate_
;
};
// sum calc for dense tensor
...
...
@@ -84,8 +88,10 @@ class DSGD : public DenseOptimizer {
grads
.
resize
(
update_numel
);
auto
blas
=
GetBlas
<
float
>
();
float
lr
=
*
(
global_learning_rate_
)
*
(
*
learning_rate
);
VLOG
(
4
)
<<
"DSGD LearningRate: "
<<
lr
;
blas
.
VCOPY
(
update_numel
,
update_values
+
begin
,
grads
.
data
());
blas
.
SCAL
(
update_numel
,
*
learning_rate
,
grads
.
data
());
blas
.
SCAL
(
update_numel
,
lr
,
grads
.
data
());
blas
.
VSUB
(
update_numel
,
param
+
begin
,
grads
.
data
(),
param
+
begin
);
}
...
...
@@ -150,7 +156,8 @@ class DAdam : public DenseOptimizer {
beta1_pow
[
0
]
=
beta1_pow
[
0
]
*
beta1
;
beta2_pow
[
0
]
=
beta2_pow
[
0
]
*
beta2
;
float
lr_
=
learning_rate
[
0
];
float
lr_
=
*
(
global_learning_rate_
)
*
learning_rate
[
0
];
VLOG
(
4
)
<<
"DAdam LearningRate: "
<<
lr_
;
lr_
*=
sqrt
(
1
-
beta2_pow
[
0
])
/
(
1
-
beta1_pow
[
0
]);
float
*
tmp_
=
tmp
.
data
();
...
...
paddle/fluid/distributed/table/depends/sparse.h
浏览文件 @
330aea6e
...
...
@@ -44,12 +44,17 @@ class SparseOptimizer {
size_t
num
,
const
std
::
vector
<
uint64_t
>&
offsets
,
ValueBlock
*
block
)
=
0
;
virtual
void
set_global_lr
(
float
*
lr
)
{
global_learning_rate_
=
lr
;
}
const
std
::
vector
<
std
::
string
>&
value_names_
;
const
std
::
vector
<
int
>&
value_dims_
;
const
std
::
vector
<
int
>&
value_offsets_
;
const
std
::
unordered_map
<
std
::
string
,
int
>&
value_idx_
;
int
param_offset
=
0
;
int
update_numel
=
0
;
protected:
float
*
global_learning_rate_
;
};
// sum calc for sparse tensor
...
...
@@ -102,13 +107,14 @@ class SSGD : public SparseOptimizer {
auto
id
=
keys
[
x
];
auto
*
value
=
block
->
Get
(
id
);
float
*
learning_rate
=
value
+
lr_offset
;
float
learning_rate
=
*
(
global_learning_rate_
)
*
(
value
+
lr_offset
)[
0
];
VLOG
(
4
)
<<
"SSGD LearningRate: "
<<
learning_rate
;
float
*
param
=
value
+
param_offset
;
std
::
vector
<
float
>
grads
;
grads
.
resize
(
update_numel
);
blas
.
VCOPY
(
update_numel
,
update_values
+
x
*
update_numel
,
grads
.
data
());
blas
.
SCAL
(
update_numel
,
learning_rate
[
0
]
,
grads
.
data
());
blas
.
SCAL
(
update_numel
,
learning_rate
,
grads
.
data
());
blas
.
VSUB
(
update_numel
,
param
,
grads
.
data
(),
param
);
}
}
...
...
@@ -156,7 +162,8 @@ class SAdam : public SparseOptimizer {
for
(
auto
x
:
offsets
)
{
auto
id
=
keys
[
x
];
auto
*
values
=
block
->
Get
(
id
);
float
*
learning_rate
=
values
+
lr_offset
;
float
lr_
=
*
(
global_learning_rate_
)
*
(
values
+
lr_offset
)[
0
];
VLOG
(
4
)
<<
"SAdam LearningRate: "
<<
lr_
;
float
*
param
=
values
+
param_offset
;
float
*
moment1
=
values
+
m1_offset
;
float
*
moment2
=
values
+
m2_offset
;
...
...
@@ -166,7 +173,6 @@ class SAdam : public SparseOptimizer {
beta1_pow
[
0
]
=
beta1_pow
[
0
]
*
beta1
;
beta2_pow
[
0
]
=
beta2_pow
[
0
]
*
beta2
;
float
lr_
=
learning_rate
[
0
];
lr_
*=
sqrt
(
1
-
beta2_pow
[
0
])
/
(
1
-
beta1_pow
[
0
]);
std
::
vector
<
float
>
grad
,
grad2
,
tmp
;
...
...
paddle/fluid/distributed/table/table.cc
浏览文件 @
330aea6e
...
...
@@ -22,6 +22,7 @@
#include "paddle/fluid/distributed/table/common_sparse_table.h"
#include "paddle/fluid/distributed/table/sparse_geo_table.h"
#include "paddle/fluid/distributed/table/tensor_accessor.h"
#include "paddle/fluid/distributed/table/tensor_table.h"
namespace
paddle
{
namespace
distributed
{
...
...
@@ -30,7 +31,9 @@ REGISTER_CLASS(Table, CommonDenseTable);
REGISTER_CLASS
(
Table
,
CommonSparseTable
);
REGISTER_CLASS
(
Table
,
SparseGeoTable
);
REGISTER_CLASS
(
Table
,
BarrierTable
);
REGISTER_CLASS
(
Table
,
TensorTable
);
REGISTER_CLASS
(
Table
,
DenseTensorTable
);
REGISTER_CLASS
(
Table
,
GlobalStepTable
);
REGISTER_CLASS
(
ValueAccessor
,
CommMergeAccessor
);
int32_t
TableManager
::
initialize
()
{
...
...
paddle/fluid/distributed/table/table.h
浏览文件 @
330aea6e
...
...
@@ -20,8 +20,11 @@
#include <memory>
#include <string>
#include <utility>
#include "paddle/fluid/distributed/table/accessor.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/string/string_helper.h"
namespace
paddle
{
...
...
@@ -35,6 +38,10 @@ class Table {
virtual
int32_t
pull_dense
(
float
*
values
,
size_t
num
)
=
0
;
virtual
int32_t
push_dense
(
const
float
*
values
,
size_t
num
)
=
0
;
// for push global_step
virtual
int32_t
push_dense
(
const
int64_t
*
values
,
const
int32_t
trainer_id
)
{
return
0
;
}
virtual
int32_t
push_dense_param
(
const
float
*
values
,
size_t
num
)
{
return
0
;
}
...
...
@@ -67,6 +74,18 @@ class Table {
return
0
;
}
// only for tensor table
virtual
int32_t
set_program_env
(
framework
::
Scope
*
scope
,
platform
::
Place
place
,
const
std
::
vector
<
framework
::
ProgramDesc
>
*
sub_program
)
{
return
0
;
}
virtual
int32_t
set_global_lr
(
float
*
lr
)
{
_global_lr
=
lr
;
return
0
;
}
virtual
int32_t
pour
()
{
return
0
;
}
virtual
void
clear
()
=
0
;
...
...
@@ -105,6 +124,7 @@ class Table {
size_t
_shard_idx
;
// table 分片编号
size_t
_shard_num
;
// table 分片总数
TableParameter
_config
;
float
*
_global_lr
=
nullptr
;
std
::
shared_ptr
<
ValueAccessor
>
_value_accesor
;
};
REGISTER_REGISTERER
(
Table
);
...
...
paddle/fluid/distributed/table/tensor_table.cc
浏览文件 @
330aea6e
...
...
@@ -13,81 +13,120 @@
// limitations under the License.
#include "paddle/fluid/distributed/table/tensor_table.h"
#include <chrono> // NOLINT
#include <map>
#include <memory>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include "paddle/fluid/distributed/common/utils.h"
DECLARE_double
(
eager_delete_tensor_gb
);
namespace
paddle
{
namespace
distributed
{
int32_t
DenseTensorTable
::
initialize
()
{
_shards_task_pool
.
resize
(
10
);
for
(
int
i
=
0
;
i
<
_shards_task_pool
.
size
();
++
i
)
{
_shards_task_pool
[
i
].
reset
(
new
::
ThreadPool
(
1
));
}
int32_t
TensorTable
::
set_program_env
(
framework
::
Scope
*
scope
,
platform
::
Place
place
,
const
std
::
vector
<
framework
::
ProgramDesc
>
*
sub_program
)
{
scope_
=
scope
;
place_
=
place
;
executor_
=
new
framework
::
Executor
(
place_
);
sub_program_
=
sub_program
;
return
0
;
}
int32_t
DenseTensorTable
::
initialize_tensor
(
framework
::
Scope
*
scope
,
framework
::
ProgramDesc
*
program
,
framework
::
Executor
*
executor
)
{
scope_
=
scope
;
program_
=
program
;
executor_
=
executor
;
int32_t
GlobalStepTable
::
initialize
()
{
auto
_program_config
=
_config
.
tensor
();
auto
trainers_
=
_config
.
common
().
trainer_num
();
FLAGS_eager_delete_tensor_gb
=
-
1
;
// Get Config
if
(
_program_config
.
has_startup_program_id
())
{
startup_program_id_
=
_program_config
.
startup_program_id
();
}
if
(
_program_config
.
has_main_program_id
())
{
main_program_id_
=
_program_config
.
main_program_id
();
}
if
(
_program_config
.
has_feed_var_name
())
{
feed_var_name_
=
_program_config
.
feed_var_name
();
}
if
(
_program_config
.
has_fetch_var_name
())
{
fetch_var_name_
=
_program_config
.
fetch_var_name
();
}
// Run startup program
if
(
startup_program_id_
!=
-
1
)
{
std
::
map
<
std
::
string
,
const
framework
::
LoDTensor
*>
fake_feed
;
std
::
map
<
std
::
string
,
framework
::
FetchType
*>
fake_fetch
;
auto
startup_program_desc
=
sub_program_
->
at
(
startup_program_id_
);
auto
ctx
=
executor_
->
Prepare
(
startup_program_desc
,
0
);
executor_
->
RunPreparedContext
(
ctx
.
get
(),
scope_
,
false
);
}
auto
tensor_config
=
_config
.
tensor
();
if
(
tensor_config
.
has_common_block_map
())
{
auto
block_maps
=
paddle
::
string
::
split_string
(
tensor_config
.
common_block_map
(),
"#"
);
for
(
auto
&
block_map
:
block_maps
)
{
auto
block
=
paddle
::
string
::
split_string
(
block_map
,
":"
);
auto
block_id
=
std
::
stoi
(
block
[
0
]);
std
::
vector
<
int
>
block_ids
{
block_id
};
auto
block_cmd
=
block
[
1
];
auto
prepared
=
executor_
->
Prepare
(
*
program_
,
block_ids
);
(
*
prepared_ctx_
)[
block_cmd
]
=
prepared
[
0
];
if
(
main_program_id_
!=
-
1
)
{
// Run main porgram, if program is used for learning decay
auto
main_program_desc
=
sub_program_
->
at
(
main_program_id_
);
auto
main_ctx
=
executor_
->
Prepare
(
main_program_desc
,
0
);
exec_context_
=
std
::
move
(
main_ctx
);
executor_
->
RunPreparedContext
(
exec_context_
.
get
(),
scope_
,
false
);
// init decay_counters
decay_counters_
.
reserve
(
trainers_
);
for
(
int32_t
i
=
0
;
i
<
trainers_
;
++
i
)
{
decay_counters_
[
i
]
=
0
;
}
}
return
0
;
}
int32_t
DenseTensorTable
::
pull_dense
(
float
*
values
,
size_t
numel
)
{
PADDLE_ENFORCE_EQ
(
numel
,
_data
.
numel
(),
paddle
::
platform
::
errors
::
PreconditionNotMet
(
"pull dense error, excepted numel %d, but actually %d."
,
_data
.
numel
(),
numel
));
int32_t
GlobalStepTable
::
set_table_map
(
std
::
unordered_map
<
uint32_t
,
std
::
shared_ptr
<
Table
>>
*
table_map
)
{
auto
*
lr_var
=
scope_
->
FindVar
(
fetch_var_name_
);
auto
*
lr_tensor
=
lr_var
->
GetMutable
<
framework
::
LoDTensor
>
();
auto
*
lr_value
=
lr_tensor
->
mutable_data
<
float
>
(
platform
::
CPUPlace
());
VLOG
(
3
)
<<
"GlobalStepTable::set_table_map set global lr: "
<<
*
lr_value
;
GetBlas
<
float
>
().
VCOPY
(
numel
,
_data
.
data
<
float
>
(),
values
);
for
(
auto
iter
=
table_map
->
begin
();
iter
!=
table_map
->
end
();
iter
++
)
{
auto
table_id
=
iter
->
first
;
if
(
table_id
==
_config
.
table_id
())
{
continue
;
}
iter
->
second
->
set_global_lr
(
lr_value
);
}
return
0
;
}
int32_t
DenseTensorTable
::
push_dense
(
const
float
*
values
,
size_t
numel
)
{
auto
varname
=
_config
.
tensor
().
grad
();
auto
local_scope
=
scope_
->
NewTmpScope
();
auto
*
var
=
local_scope
->
Var
(
varname
);
auto
*
t
=
var
->
GetMutable
<
framework
::
LoDTensor
>
();
auto
dims
=
paddle
::
framework
::
make_ddim
({});
int32_t
GlobalStepTable
::
push_dense
(
const
int64_t
*
values
,
const
int32_t
trainer_id
)
{
return
_run_program
(
values
,
trainer_id
);
}
auto
ctx
=
paddle
::
platform
::
CPUDeviceContext
();
t
->
mutable_data
<
float
>
(
_data
.
dims
(),
ctx
.
GetPlace
());
int32_t
GlobalStepTable
::
_run_program
(
const
int64_t
*
values
,
const
uint32_t
trainer_id
)
{
FLAGS_eager_delete_tensor_gb
=
-
1
;
auto
counter
=
decay_counters_
.
at
(
trainer_id
);
counter
+=
int
(
values
[
0
]);
decay_counters_
.
at
(
trainer_id
)
=
counter
;
GetBlas
<
float
>
().
VCOPY
(
numel
,
values
,
t
->
data
<
float
>
());
executor_
->
RunPreparedContext
((
*
prepared_ctx_
)[
"push"
].
get
(),
local_scope
.
get
());
}
auto
*
global_step_var
=
scope_
->
FindVar
(
feed_var_name_
);
auto
*
tensor
=
global_step_var
->
GetMutable
<
framework
::
LoDTensor
>
();
auto
*
value
=
tensor
->
mutable_data
<
int64_t
>
(
platform
::
CPUPlace
());
int32_t
DenseTensorTable
::
push_dense_param
(
const
float
*
values
,
size_t
numel
)
{
auto
ctx
=
paddle
::
platform
::
CPUDeviceContext
();
if
(
_data
.
IsInitialized
())
{
PADDLE_ENFORCE_EQ
(
numel
,
_data
.
numel
(),
paddle
::
platform
::
errors
::
PreconditionNotMet
(
"pull dense error, excepted numel %d, but actually %d."
,
_data
.
numel
(),
numel
));
}
else
{
_data
.
mutable_data
<
float
>
(
framework
::
make_ddim
({
static_cast
<
int64_t
>
(
numel
),
1
}),
ctx
.
GetPlace
());
auto
global_counter
=
0
;
for
(
auto
&
trainer_counter
:
decay_counters_
)
{
global_counter
+=
trainer_counter
.
second
;
}
GetBlas
<
float
>
().
VCOPY
(
numel
,
values
,
_data
.
data
<
float
>
());
// Todo: hard code for increment op
value
[
0
]
=
global_counter
-
1
;
VLOG
(
3
)
<<
"GlobalStepTable::_run_program global_counter "
<<
value
[
0
];
executor_
->
RunPreparedContext
(
exec_context_
.
get
(),
scope_
,
false
,
false
);
auto
*
lr_var
=
scope_
->
FindVar
(
fetch_var_name_
);
auto
*
lr_tensor
=
lr_var
->
GetMutable
<
framework
::
LoDTensor
>
();
auto
*
lr_value
=
lr_tensor
->
mutable_data
<
float
>
(
platform
::
CPUPlace
());
VLOG
(
3
)
<<
"GlobalStepTable::LR value: "
<<
lr_value
[
0
];
return
0
;
}
}
// namespace distributed
}
// namespace paddle
paddle/fluid/distributed/table/tensor_table.h
浏览文件 @
330aea6e
...
...
@@ -14,166 +14,187 @@
#pragma once
#include <algorithm>
#include <condition_variable> // NOLINT
#include <memory>
#include <mutex> // NOLINT
#include <set>
#include <string>
#include <unordered_map>
#include <vector>
#include <ThreadPool.h>
#include "paddle/fluid/distributed/common/utils.h"
#include "paddle/fluid/distributed/table/table.h"
#include "paddle/fluid/framework/executor.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/operators/math/blas.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/enforce.h"
namespace
paddle
{
namespace
distributed
{
#define LEARNING_RATE_DECAY_COUNTER "@LR_DECAY_COUNTER@"
#define STEP_COUNTER "@PS_STEP_COUNTER@"
class
TensorTable
:
public
Table
{
public:
TensorTable
()
:
Table
()
{}
TensorTable
()
{}
virtual
~
TensorTable
()
{}
virtual
int32_t
initialize
()
{
return
0
;
}
int32_t
pull_dense
(
float
*
values
,
size_t
num
)
override
{
return
0
;
}
virtual
int32_t
pull_dense
(
float
*
values
,
size_t
num
)
override
{
return
0
;
};
int32_t
push_dense
(
const
float
*
values
,
size_t
num
)
override
{
return
0
;
}
virtual
int32_t
push_dense
(
const
float
*
values
,
size_t
num
)
override
{
int32_t
pull_sparse
(
float
*
values
,
const
uint64_t
*
keys
,
size_t
num
)
override
{
return
0
;
};
}
int32_t
push_sparse
(
const
uint64_t
*
keys
,
const
float
*
values
,
size_t
num
)
override
{
return
0
;
}
int32_t
shrink
()
override
{
return
0
;
}
virtual
void
*
get_shard
(
size_t
shard_idx
)
{
return
0
;
}
virtual
void
*
get_shard
(
size_t
shard_idx
)
override
{
return
0
;
}
virtual
int32_t
initialize_shard
()
{
return
0
;
};
virtual
int32_t
pull_sparse
(
float
*
values
,
const
uint64_t
*
keys
,
size_t
num
)
override
{
virtual
int32_t
flush
()
{
return
0
;
};
virtual
int32_t
load
(
const
std
::
string
&
path
,
const
std
::
string
&
param
)
{
return
0
;
};
}
virtual
int32_t
save
(
const
std
::
string
&
path
,
const
std
::
string
&
param
)
{
return
0
;
}
virtual
void
clear
(){};
virtual
int32_t
push_sparse
(
const
uint64_t
*
keys
,
const
float
*
values
,
size_t
num
)
override
{
virtual
int32_t
initialize
()
override
{
return
0
;
};
virtual
int32_t
push_dense
(
const
int64_t
*
values
,
const
int32_t
trainer_id
)
override
{
return
0
;
};
virtual
int32_t
push_dense_param
(
const
float
*
values
,
size_t
num
)
{
virtual
int32_t
set_program_env
(
framework
::
Scope
*
scope
,
platform
::
Place
place
,
const
std
::
vector
<
framework
::
ProgramDesc
>
*
sub_program
)
override
;
protected:
framework
::
Executor
*
executor_
;
framework
::
Scope
*
scope_
;
platform
::
Place
place_
=
platform
::
CPUPlace
();
const
std
::
vector
<
framework
::
ProgramDesc
>
*
sub_program_
;
paddle
::
distributed
::
TensorAccessorParameter
program_config_
;
std
::
shared_ptr
<
framework
::
ExecutorPrepareContext
>
exec_context_
=
nullptr
;
};
class
DenseTensorTable
:
public
TensorTable
{
public:
DenseTensorTable
()
{}
virtual
~
DenseTensorTable
()
{}
int32_t
pull_sparse
(
float
*
values
,
const
uint64_t
*
keys
,
size_t
num
)
override
{
return
0
;
}
int32_t
push_sparse
(
const
uint64_t
*
keys
,
const
float
*
values
,
size_t
num
)
override
{
return
0
;
}
int32_t
shrink
()
override
{
return
0
;
}
virtual
int32_t
shrink
(
)
{
return
0
;
}
virtual
void
*
get_shard
(
size_t
shard_idx
)
{
return
0
;
}
virtual
void
clear
()
{
}
virtual
int32_t
initialize_shard
()
{
return
0
;
}
virtual
int32_t
flush
()
{
return
0
;
}
//指定加载路径
virtual
int32_t
load
(
const
std
::
string
&
path
,
const
std
::
string
&
converter
)
{
virtual
void
clear
()
{}
// Todo: Support program Load & Save
virtual
int32_t
load
(
const
std
::
string
&
path
,
const
std
::
string
&
param
)
{
return
0
;
}
//指定保存路径
virtual
int32_t
save
(
const
std
::
string
&
path
,
const
std
::
string
&
converter
)
{
virtual
int32_t
save
(
const
std
::
string
&
path
,
const
std
::
string
&
param
)
{
return
0
;
}
protected:
virtual
int32_t
initialize_shard
()
{
return
0
;
}
// Todo: Support pull dense
int32_t
pull_dense
(
float
*
values
,
size_t
num
)
override
{
return
0
;
}
/*----------------------------------------------------------------------*/
virtual
int32_t
initialize
()
override
{
return
0
;
}
int32_t
push_dense
(
const
float
*
values
,
size_t
num
)
override
{
return
0
;
}
virtual
int32_t
initialize_tensor
(
paddle
::
framework
::
Scope
*
scope
,
paddle
::
framework
::
ProgramDesc
*
program
,
paddle
::
framework
::
Executor
*
executor
)
{
int32_t
push_dense
(
const
int64_t
*
values
,
const
int32_t
trainer_id
)
{
return
0
;
}
std
::
vector
<
std
::
shared_ptr
<::
ThreadPool
>>
_shards_task_pool
;
protected:
virtual
int32_t
_run_program
(
const
float
*
values
,
size_t
num
,
const
uint32_t
trainer_id
)
{
return
0
;
}
framework
::
Executor
*
executor_
;
framework
::
Scope
*
scope_
;
framework
::
ProgramDesc
*
program_
;
std
::
unordered_map
<
std
::
string
,
std
::
shared_ptr
<
framework
::
ExecutorPrepareContext
>>
*
prepared_ctx_
;
int
startup_program_id_
=
-
1
;
int
main_program_id_
=
-
1
;
std
::
string
feed_var_name_
=
""
;
std
::
string
fetch_var_name_
=
""
;
};
class
DenseTensorTable
:
public
TensorTable
{
class
GlobalStepTable
:
public
Dense
TensorTable
{
public:
DenseTensorTable
()
:
TensorTable
()
{}
~
DenseTensorTable
()
{}
virtual
int32_t
initialize
();
GlobalStepTable
()
{}
virtual
~
GlobalStepTable
()
{}
void
*
get_shard
(
size_t
shard_idx
)
{
return
0
;
}
int32_t
pull_sparse
(
float
*
values
,
const
uint64_t
*
keys
,
size_t
num
)
{
int32_t
pull_sparse
(
float
*
values
,
const
uint64_t
*
keys
,
size_t
num
)
override
{
return
0
;
}
int32_t
push_sparse
(
const
uint64_t
*
keys
,
const
float
*
values
,
size_t
num
)
{
int32_t
push_sparse
(
const
uint64_t
*
keys
,
const
float
*
values
,
size_t
num
)
override
{
return
0
;
}
int32_t
shrink
()
{
return
0
;
}
int32_t
shrink
()
override
{
return
0
;
}
int32_t
pull_dense
(
float
*
values
,
size_t
num
)
override
;
int32_t
push_dense_param
(
const
float
*
values
,
size_t
num
)
override
;
int32_t
push_dense
(
const
float
*
values
,
size_t
num
)
override
;
virtual
void
*
get_shard
(
size_t
shard_idx
)
{
return
0
;
}
virtual
int32_t
initialize_shard
()
{
return
0
;
}
virtual
void
clear
()
{}
virtual
int32_t
flush
()
{
return
0
;
}
//指定加载路径
virtual
int32_t
load
(
const
std
::
string
&
path
,
const
std
::
string
&
converter
)
{
virtual
void
clear
()
{}
virtual
int32_t
load
(
const
std
::
string
&
path
,
const
std
::
string
&
param
)
{
return
0
;
}
//指定保存路径
virtual
int32_t
save
(
const
std
::
string
&
path
,
const
std
::
string
&
converter
)
{
virtual
int32_t
save
(
const
std
::
string
&
path
,
const
std
::
string
&
param
)
{
return
0
;
}
protected:
virtual
int32_t
initialize_shard
()
{
return
0
;
}
int32_t
pull_dense
(
float
*
values
,
size_t
num
)
override
{
return
0
;
}
virtual
int32_t
initialize_tensor
(
paddle
::
framework
::
Scope
*
scope
,
paddle
::
framework
::
ProgramDesc
*
program
,
paddle
::
framework
::
Executor
*
executor
);
/*----------------------------------------------------------------------*/
protected:
framework
::
Tensor
_data
;
int32_t
initialize
()
override
;
int32_t
push_dense
(
const
float
*
values
,
size_t
num
)
override
{
return
0
;
}
int32_t
push_dense
(
const
int64_t
*
values
,
const
int32_t
trainer_id
);
int32_t
set_table_map
(
std
::
unordered_map
<
uint32_t
,
std
::
shared_ptr
<
Table
>>
*
table_map
)
override
;
private:
virtual
int32_t
_run_program
(
const
int64_t
*
values
,
const
uint32_t
trainer_id
);
private:
std
::
unordered_map
<
int
,
int64_t
>
decay_counters_
;
int32_t
trainers_
;
};
//
//// common sparse table [0, N) with out large scale
// class SparseTensorTable : public TensorTable {
// void *get_shard(size_t shard_idx) { return 0; }
//
// int32_t pull_sparse(float *values, const uint64_t *keys, size_t num)
// override;
// int32_t push_sparse(const uint64_t *keys, const float *values, size_t num)
// override ;
// int32_t shrink() { return 0; }
// void *get_shard(size_t shard_idx) { return 0; };
//
// int32_t pull_dense(float *values, size_t num) { return 0; };
// int32_t push_dense_param(const float *values, size_t num) { return 0; };
// int32_t push_dense(const float *values, size_t num) { return 0; };
//
// protected:
// framework::Tensor _data;
//};
//// for Large scale kv tensor [0, int64] do not use specific optimizer
// class KvTensorTable : public TensorTable {
// int32_t pull_dense(float *values, size_t num) { return 0; };
// int32_t push_dense_param(const float *values, size_t num) { return 0; };
// int32_t push_dense(const float *values, size_t num) { return 0; };
//
// void *get_shard(size_t shard_idx) override;
// int32_t pull_sparse(float *values, const uint64_t *keys, size_t num)
// override;
// int32_t push_sparse(const uint64_t *keys, const float *values,
// size_t num) override;
// int32_t shrink() override;
// void *get_shard(size_t shard_idx) override;
//};
//
//// for Geo sparse handle
// class GeoSparseTensorTable : public TensorTable {};
}
// namespace distributed
}
// namespace paddle
paddle/fluid/distributed/test/brpc_service_dense_sgd_test.cc
浏览文件 @
330aea6e
...
...
@@ -20,10 +20,10 @@ limitations under the License. */
#include "google/protobuf/text_format.h"
#include "gtest/gtest.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/tensor_util.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/operators/math/math_function.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/string/printf.h"
...
...
@@ -157,7 +157,10 @@ void RunServer() {
pserver_ptr_
=
std
::
shared_ptr
<
paddle
::
distributed
::
PSServer
>
(
paddle
::
distributed
::
PSServerFactory
::
create
(
server_proto
));
LOG
(
INFO
)
<<
"RUN configure"
;
pserver_ptr_
->
configure
(
server_proto
,
_ps_env
,
0
);
std
::
vector
<
framework
::
ProgramDesc
>
empty_vec
;
framework
::
ProgramDesc
empty_prog
;
empty_vec
.
push_back
(
empty_prog
);
pserver_ptr_
->
configure
(
server_proto
,
_ps_env
,
0
,
empty_vec
);
LOG
(
INFO
)
<<
"RUN start"
;
pserver_ptr_
->
start
(
ip_
,
port_
);
LOG
(
INFO
)
<<
"End start"
;
...
...
paddle/fluid/distributed/test/brpc_service_sparse_sgd_test.cc
浏览文件 @
330aea6e
...
...
@@ -24,10 +24,6 @@ limitations under the License. */
#include "paddle/fluid/framework/tensor_util.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/operators/math/math_function.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/string/printf.h"
#include "paddle/fluid/distributed/ps.pb.h"
#include "paddle/fluid/distributed/service/brpc_ps_client.h"
#include "paddle/fluid/distributed/service/brpc_ps_server.h"
...
...
@@ -35,6 +31,10 @@ limitations under the License. */
#include "paddle/fluid/distributed/service/ps_client.h"
#include "paddle/fluid/distributed/service/sendrecv.pb.h"
#include "paddle/fluid/distributed/service/service.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/operators/math/math_function.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/string/printf.h"
namespace
framework
=
paddle
::
framework
;
namespace
platform
=
paddle
::
platform
;
...
...
@@ -155,7 +155,10 @@ void RunServer() {
_ps_env
.
set_ps_servers
(
&
host_sign_list_
,
1
);
pserver_ptr_
=
std
::
shared_ptr
<
paddle
::
distributed
::
PSServer
>
(
paddle
::
distributed
::
PSServerFactory
::
create
(
server_proto
));
pserver_ptr_
->
configure
(
server_proto
,
_ps_env
,
0
);
std
::
vector
<
framework
::
ProgramDesc
>
empty_vec
;
framework
::
ProgramDesc
empty_prog
;
empty_vec
.
push_back
(
empty_prog
);
pserver_ptr_
->
configure
(
server_proto
,
_ps_env
,
0
,
empty_vec
);
pserver_ptr_
->
start
(
ip_
,
port_
);
}
...
...
paddle/fluid/framework/distributed_strategy.proto
浏览文件 @
330aea6e
...
...
@@ -108,6 +108,7 @@ message AsyncConfig {
optional
bool
runtime_split_send_recv
=
8
[
default
=
false
];
optional
bool
launch_barrier
=
9
[
default
=
true
];
optional
string
heter_worker_device_guard
=
10
[
default
=
'cpu'
];
optional
int32
lr_decay_steps
=
11
[
default
=
10
];
}
message
PipelineConfig
{
optional
int32
micro_batch
=
1
[
default
=
1
];
}
...
...
paddle/fluid/operators/pscore/send_op.cc
浏览文件 @
330aea6e
...
...
@@ -52,8 +52,9 @@ class SendOp : public framework::OperatorBase {
auto
send_varnames
=
Attr
<
std
::
vector
<
std
::
string
>>
(
"send_varnames"
);
auto
*
communicator
=
paddle
::
distributed
::
Communicator
::
GetInstance
();
communicator
->
Check
(
send_varnames
);
communicator
->
Send
(
ins
,
scope
);
if
(
communicator
->
Check
(
send_varnames
))
{
communicator
->
Send
(
ins
,
scope
);
}
// auto fleet = paddle::distributed::FleetWrapper::GetInstance();
// if (is_sparse == 0) {
...
...
paddle/fluid/pybind/fleet_py.cc
浏览文件 @
330aea6e
...
...
@@ -62,7 +62,7 @@ void BindDistFleetWrapper(py::module* m) {
.
def
(
"stop_server"
,
&
FleetWrapper
::
StopServer
)
.
def
(
"stop_worker"
,
&
FleetWrapper
::
FinalizeWorker
)
.
def
(
"barrier"
,
&
FleetWrapper
::
BarrierWithTable
);
}
// end BindDistFleetWrapper
}
void
BindPSHost
(
py
::
module
*
m
)
{
py
::
class_
<
distributed
::
PSHost
>
(
*
m
,
"PSHost"
)
...
...
@@ -79,8 +79,8 @@ void BindCommunicatorContext(py::module* m) {
.
def
(
py
::
init
<
const
std
::
string
&
,
const
std
::
vector
<
std
::
string
>&
,
const
std
::
vector
<
std
::
string
>&
,
const
std
::
vector
<
int64_t
>&
,
const
std
::
vector
<
std
::
string
>&
,
int
,
bool
,
bool
,
bool
,
int
>
())
const
std
::
vector
<
std
::
string
>&
,
int
,
bool
,
bool
,
bool
,
int
,
bool
>
())
.
def
(
"var_name"
,
[](
const
CommContext
&
self
)
{
return
self
.
var_name
;
})
.
def
(
"trainer_id"
,
[](
const
CommContext
&
self
)
{
return
self
.
trainer_id
;
})
...
...
@@ -97,6 +97,8 @@ void BindCommunicatorContext(py::module* m) {
[](
const
CommContext
&
self
)
{
return
self
.
is_distributed
;
})
.
def
(
"origin_varnames"
,
[](
const
CommContext
&
self
)
{
return
self
.
origin_varnames
;
})
.
def
(
"is_tensor_table"
,
[](
const
CommContext
&
self
)
{
return
self
.
is_tensor_table
;
})
.
def
(
"__str__"
,
[](
const
CommContext
&
self
)
{
return
self
.
print
();
});
}
...
...
python/paddle/distributed/fleet/meta_optimizers/parameter_server_optimizer.py
浏览文件 @
330aea6e
...
...
@@ -64,6 +64,11 @@ class ParameterServerOptimizer(MetaOptimizerBase):
_main
=
compiled_config
.
origin_main_program
.
clone
()
_startup
=
compiled_config
.
origin_startup_program
.
clone
()
from
paddle.fluid.incubate.fleet.parameter_server.ir.public
import
_add_lr_decay_table_pass
_add_lr_decay_table_pass
(
_main
,
compiled_config
,
self
.
user_defined_strategy
.
a_sync_configs
[
"lr_decay_steps"
])
if
not
compiled_config
.
is_geo_mode
():
# for main program
_main
=
worker
.
delete_optimizer_pass
(
_main
,
compiled_config
)
...
...
@@ -128,6 +133,12 @@ class ParameterServerOptimizer(MetaOptimizerBase):
if
len
(
ops
)
==
0
:
return
_main
,
_startup
from
paddle.fluid.incubate.fleet.parameter_server.ir.public
import
_add_lr_decay_table_pass
lr_decay_steps
=
self
.
user_defined_strategy
.
a_sync_configs
[
"lr_decay_steps"
]
_add_lr_decay_table_pass
(
main_program
,
compiled_config
,
lr_decay_steps
)
for
op
in
ops
:
if
op
.
type
in
[
"sgd"
,
"adam"
]:
is_sgd_adam
=
True
...
...
python/paddle/distributed/fleet/runtime/the_one_ps.py
浏览文件 @
330aea6e
...
...
@@ -206,6 +206,28 @@ class CommonAccessor:
conv_indent
(
indent
),
attrs
,
conv_indent
(
indent
))
class
Tensor
:
def
__init__
(
self
):
self
.
main_program_id
=
None
self
.
startup_program_id
=
None
self
.
feed_var_name
=
None
self
.
fetch_var_name
=
None
self
.
tensor_table_class
=
False
def
to_string
(
self
,
indent
):
program_str
=
"{}tensor {{{}
\n
{}}}"
attrs
=
""
attrs
+=
"feed_var_name:
\"
{}
\"
"
.
format
(
str
(
self
.
feed_var_name
))
attrs
+=
"fetch_var_name:
\"
{}
\"
"
.
format
(
str
(
self
.
fetch_var_name
))
attrs
+=
"startup_program_id: {} "
.
format
(
str
(
self
.
startup_program_id
))
attrs
+=
"main_program_id: {} "
.
format
(
str
(
self
.
main_program_id
))
attrs
+=
"tensor_table_class:
\"
{}
\"
"
.
format
(
str
(
self
.
tensor_table_class
))
attrs
+=
"
\n
"
return
program_str
.
format
(
conv_indent
(
indent
),
attrs
,
conv_indent
(
indent
))
class
Table
:
def
__init__
(
self
):
self
.
id
=
-
1
...
...
@@ -214,6 +236,7 @@ class Table:
self
.
type
=
None
self
.
accessor
=
None
self
.
common
=
None
self
.
tensor
=
None
def
to_string
(
self
,
indent
):
table_str
=
"{}downpour_table_param {{{}
\n
{}}}"
...
...
@@ -230,6 +253,10 @@ class Table:
attrs
+=
self
.
accessor
.
to_string
(
indent
)
attrs
+=
"
\n
"
if
self
.
tensor
is
not
None
:
attrs
+=
self
.
tensor
.
to_string
(
indent
)
attrs
+=
"
\n
"
if
self
.
common
is
not
None
:
attrs
+=
self
.
common
.
to_string
(
indent
)
attrs
+=
"
\n
"
...
...
@@ -355,6 +382,7 @@ class TheOnePSRuntime(RuntimeBase):
self
.
_communicator
=
None
self
.
_server
=
None
self
.
_worker
=
fluid
.
core
.
DistFleetWrapper
()
self
.
_server_sub_program
=
[]
self
.
_heter_client
=
None
def
_set_basic_info
(
self
,
context
):
...
...
@@ -569,17 +597,73 @@ class TheOnePSRuntime(RuntimeBase):
table
.
common
=
common
return
table
def
_build_tensor_table
(
idx
,
tensor_dict
):
table
=
Table
()
table
.
id
=
idx
table
.
type
=
"PS_OTHER_TABLE"
table
.
table_class
=
tensor_dict
[
"tensor_table_class"
]
table
.
shard_num
=
256
accessor
=
Accessor
()
accessor
.
accessor_class
=
"CommMergeAccessor"
accessor
.
optimizer
=
None
accessor
.
feature_dim
=
0
accessor
.
embedding_dim
=
0
table
.
accessor
=
accessor
common
=
CommonAccessor
()
common
.
table_name
=
tensor_dict
[
"feed_var_name"
]
common
.
trainer_num
=
self
.
compiled_strategy
.
get_trainers
()
common
.
attrs
=
""
common
.
dims
=
[]
common
.
params
=
[]
table
.
common
=
common
tensor
=
Tensor
()
tensor
.
main_program_id
=
tensor_dict
[
"main_program_id"
]
tensor
.
startup_program_id
=
tensor_dict
[
"startup_program_id"
]
tensor
.
feed_var_name
=
tensor_dict
[
"feed_var_name"
]
tensor
.
fetch_var_name
=
tensor_dict
[
"fetch_var_name"
]
tensor
.
tensor_table_class
=
tensor_dict
[
"tensor_table_class"
]
table
.
tensor
=
tensor
return
table
def
_add_tensor_table
(
tables
):
tensor_table_dict
=
self
.
compiled_strategy
.
get_tensor_table_dict
()
program_idx
=
0
for
table_name
in
tensor_table_dict
:
if
tensor_table_dict
[
table_name
][
"startup_program"
]
!=
None
:
tensor_table_dict
[
table_name
][
"startup_program_id"
]
=
program_idx
self
.
_server_sub_program
.
append
(
tensor_table_dict
[
table_name
][
"startup_program"
].
desc
)
program_idx
+=
1
if
tensor_table_dict
[
table_name
][
"main_program"
]
!=
None
:
tensor_table_dict
[
table_name
][
"main_program_id"
]
=
program_idx
self
.
_server_sub_program
.
append
(
tensor_table_dict
[
table_name
][
"main_program"
].
desc
)
program_idx
+=
1
# Todo: Hard code for lr_decay table apply table id
new_table
=
_build_tensor_table
(
len
(
tables
),
tensor_table_dict
[
table_name
])
tables
.
append
(
new_table
)
return
tables
def
_get_tables
():
send_ctx
=
self
.
compiled_strategy
.
get_the_one_send_context
(
use_origin_program
=
True
,
split_dense_table
=
self
.
role_maker
.
_is_heter_parameter_server_mode
)
tables
=
[
i
for
i
in
range
(
len
(
send_ctx
)
+
1
)]
tables
=
[]
for
idx
,
(
name
,
ctx
)
in
enumerate
(
send_ctx
.
items
()):
table
=
Table
()
table
.
id
=
ctx
.
table_id
()
if
ctx
.
is_tensor_table
():
continue
if
ctx
.
is_sparse
():
if
len
(
ctx
.
origin_varnames
())
<
1
:
continue
...
...
@@ -619,10 +703,17 @@ class TheOnePSRuntime(RuntimeBase):
accessor
=
_build_merge_accessor
(
ctx
)
table
.
accessor
=
accessor
tables
[
table
.
id
]
=
table
tables
.
append
(
table
)
tensor_table_dict
=
self
.
compiled_strategy
.
get_tensor_table_dict
()
if
len
(
tensor_table_dict
)
>
0
:
tables
=
_add_tensor_table
(
tables
)
else
:
empty_porgram
=
Program
()
self
.
_server_sub_program
.
append
(
empty_porgram
.
desc
)
barrier_table
=
_build_barrier_table
(
len
(
send_ctx
))
tables
[
-
1
]
=
barrier_table
barrier_table
=
_build_barrier_table
(
len
(
tables
))
tables
.
append
(
barrier_table
)
return
tables
if
is_server
:
...
...
@@ -667,7 +758,8 @@ class TheOnePSRuntime(RuntimeBase):
string_hosts
.
append
(
pshost
.
serialize_to_string
())
self
.
_server
=
fluid
.
core
.
DistFleetWrapper
()
self
.
_server
.
init_server
(
proto_txt
,
string_hosts
,
role_id
)
self
.
_server
.
init_server
(
proto_txt
,
string_hosts
,
role_id
,
self
.
_server_sub_program
)
from
paddle.fluid.incubate.fleet.parameter_server.ir.public
import
get_sparse_tablenames
...
...
python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py
浏览文件 @
330aea6e
...
...
@@ -19,7 +19,7 @@ import collections
import
math
import
os
import
warnings
import
logging
import
six
import
paddle.fluid
as
fluid
from
paddle.fluid
import
core
...
...
@@ -162,6 +162,8 @@ class CompileTimeStrategy(object):
self
.
_build_var_distributed
()
self
.
tensor_table_dict
=
{}
# for heter-ps save variables
self
.
origin_merged_variables_pairs
=
list
(
self
.
merged_variables_pairs
)
self
.
origin_merged_dense_pairs
=
list
(
self
.
merged_dense_pairs
)
...
...
@@ -240,6 +242,24 @@ class CompileTimeStrategy(object):
def
get_origin_ps_startup_program
(
self
):
return
self
.
origin_ps_startup_program
def
add_tensor_table
(
self
,
feed_var_name
,
fetch_var_name
=
""
,
startup_program
=
None
,
main_program
=
None
,
tensor_table_class
=
""
):
self
.
tensor_table_dict
[
feed_var_name
]
=
{}
self
.
tensor_table_dict
[
feed_var_name
][
"feed_var_name"
]
=
feed_var_name
self
.
tensor_table_dict
[
feed_var_name
][
"fetch_var_name"
]
=
fetch_var_name
self
.
tensor_table_dict
[
feed_var_name
][
"startup_program"
]
=
startup_program
self
.
tensor_table_dict
[
feed_var_name
][
"main_program"
]
=
main_program
self
.
tensor_table_dict
[
feed_var_name
][
"tensor_table_class"
]
=
tensor_table_class
def
get_tensor_table_dict
(
self
):
return
self
.
tensor_table_dict
def
get_sparse_varname_on_ps
(
self
,
is_distributed
,
endpoint
=
None
):
if
not
endpoint
:
endpoint
=
self
.
get_ps_endpoint
()
...
...
@@ -523,9 +543,10 @@ class CompileTimeStrategy(object):
grad
.
merged_var
.
name
]
var_numel
=
reduce
(
lambda
x
,
y
:
x
*
y
,
var
.
shape
[
1
:])
sparse_ctx
=
CommContext
(
grad_name
,
[
grad_name
],
[
"127.0.0.1:6071"
],
[
var_numel
],
[
grad_name
],
trainer_id
,
True
,
True
,
is_distributed
,
idx
)
sparse_ctx
=
CommContext
(
grad_name
,
[
grad_name
],
[
"127.0.0.1:6071"
],
[
var_numel
],
[
grad_name
],
trainer_id
,
True
,
True
,
is_distributed
,
idx
,
False
)
idx
+=
1
send_ctx
[
sparse_ctx
.
var_name
()]
=
sparse_ctx
...
...
@@ -533,6 +554,10 @@ class CompileTimeStrategy(object):
raise
ValueError
(
"GeoSGD require sparse parameters in your net."
)
if
len
(
self
.
tensor_table_dict
)
>
0
and
self
.
role_maker
.
_is_worker
():
name
,
ctx
=
self
.
_step_ctx
(
idx
)
send_ctx
[
name
]
=
ctx
return
send_ctx
else
:
return
self
.
get_the_one_send_context
(
split_dense_table
)
...
...
@@ -559,7 +584,7 @@ class CompileTimeStrategy(object):
aggregate
=
True
dense_ctx
=
CommContext
(
grad_name
,
[
grad_name
],
[
"127.0.0.1:6071"
],
[
var_numel
],
origin_varnames
,
trainer_id
,
aggregate
,
False
,
False
,
idx
)
aggregate
,
False
,
False
,
idx
,
False
)
send_ctx
[
grad_name
]
=
dense_ctx
idx
+=
1
else
:
...
...
@@ -571,9 +596,10 @@ class CompileTimeStrategy(object):
var_numel
=
reduce
(
lambda
x
,
y
:
x
*
y
,
var
.
shape
)
grad_name
=
origin_varname
aggregate
=
True
dense_ctx
=
CommContext
(
grad_name
,
[
grad_name
],
[
"127.0.0.1:6071"
],
[
var_numel
],
[
origin_varname
],
trainer_id
,
aggregate
,
False
,
False
,
idx
)
dense_ctx
=
CommContext
(
grad_name
,
[
grad_name
],
[
"127.0.0.1:6071"
],
[
var_numel
],
[
origin_varname
],
trainer_id
,
aggregate
,
False
,
False
,
idx
,
False
)
send_ctx
[
grad_name
]
=
dense_ctx
idx
+=
1
return
idx
...
...
@@ -615,10 +641,15 @@ class CompileTimeStrategy(object):
sparse_ctx
=
CommContext
(
grad_name
,
splited_varname
,
ep_list
,
shape
,
[
grad_name
],
trainer_id
,
True
,
True
,
is_distributed
,
idx
)
is_distributed
,
idx
,
False
)
idx
+=
1
send_ctx
[
sparse_ctx
.
var_name
()]
=
sparse_ctx
if
len
(
self
.
tensor_table_dict
)
>
0
and
self
.
role_maker
.
_is_worker
():
name
,
ctx
=
self
.
_step_ctx
(
idx
)
send_ctx
[
name
]
=
ctx
return
send_ctx
def
get_the_one_recv_context
(
self
,
...
...
@@ -633,6 +664,8 @@ class CompileTimeStrategy(object):
for
idx
,
(
name
,
ctx
)
in
enumerate
(
send_ctx
.
items
()):
if
ctx
.
is_sparse
():
continue
if
ctx
.
is_tensor_table
():
continue
origin_grad_varnames
=
ctx
.
origin_varnames
()
...
...
@@ -679,14 +712,14 @@ class CompileTimeStrategy(object):
var_distributed
.
append
((
g
.
name
,
ep
,
g
.
shape
[
0
]))
return
var_distributed
def
_step_ctx
(
self
):
def
_step_ctx
(
self
,
idx
):
name
=
STEP_COUNTER
trainer_id
=
self
.
get_role_id
()
endpoints
=
self
.
get_ps_endpoints
()
sections
=
[
1
]
*
len
(
endpoints
)
names
=
[
name
]
*
len
(
endpoints
)
ctx
=
CommContext
(
name
,
names
,
endpoints
,
sections
,
[
name
],
trainer_id
,
True
,
False
,
False
)
True
,
False
,
False
,
idx
,
True
)
return
name
,
ctx
def
_create_vars_from_blocklist
(
self
,
block_list
):
...
...
@@ -1118,6 +1151,89 @@ def _get_optimize_ops(_program):
return
opt_ops
def
_add_lr_decay_table_pass
(
main_program
,
compiled_config
,
lr_decay_steps
):
if
hasattr
(
compiled_config
.
origin_main_program
,
'lr_sheduler'
):
from
paddle.optimizer.lr
import
LRScheduler
assert
isinstance
(
compiled_config
.
origin_main_program
.
lr_sheduler
,
LRScheduler
),
"must be LRScheduler"
ops
=
_get_optimize_ops
(
compiled_config
.
origin_main_program
)
lr_param_dict
=
_get_lr_param_dict
(
ops
)
lr_decay_main_program
,
lr_decay_startup_program
,
lr_name
=
_get_lr_sheduler_program
(
compiled_config
.
origin_main_program
.
lr_sheduler
,
lr_param_dict
,
lr_decay_steps
)
compiled_config
.
add_tensor_table
(
"@LR_DECAY_COUNTER@"
,
lr_name
,
lr_decay_startup_program
,
lr_decay_main_program
,
"GlobalStepTable"
)
def
_get_lr_param_dict
(
opt_ops
):
lr_param_dict
=
{}
for
op
in
opt_ops
:
lr_name
=
op
.
input
(
"LearningRate"
)[
0
]
param_name
=
op
.
input
(
"Param"
)[
0
]
if
lr_name
not
in
lr_param_dict
:
lr_param_dict
[
lr_name
]
=
[]
lr_param_dict
[
lr_name
].
append
(
param_name
)
return
lr_param_dict
def
_get_lr_sheduler_program
(
lr_sheduler
,
lr_param_dict
,
lr_decay_steps
):
schedler_decay
=
[
'NoamDecay'
,
'NaturalExpDecay'
,
'InverseTimeDecay'
,
'ExponentialDecay'
]
from
paddle.optimizer.lr
import
ExponentialDecay
,
NoamDecay
,
PiecewiseDecay
,
NaturalExpDecay
,
InverseTimeDecay
from
paddle.fluid.layers.learning_rate_scheduler
import
exponential_decay
,
noam_decay
,
piecewise_decay
,
natural_exp_decay
,
inverse_time_decay
decay_main_program
=
fluid
.
framework
.
Program
()
decay_startup_program
=
fluid
.
framework
.
Program
()
lr_name
=
""
if
isinstance
(
lr_sheduler
,
ExponentialDecay
):
with
fluid
.
program_guard
(
decay_main_program
,
decay_startup_program
):
lr
=
exponential_decay
(
1.0
,
lr_decay_steps
,
lr_sheduler
.
gamma
,
True
)
lr_name
=
lr
.
name
logging
.
warn
(
"ExponentialDecay is set, staircase = True, global learning rate decay step is [ %d ], Change decay steps as follow:
\n
"
"
\t
strategy = paddle.distributed.fleet.DistributedStrategy()
\n
"
"
\t
strategy.a_sync = True
\n
"
"
\t
strategy.a_sync_configs= { 'lr_decay_steps' : YOUR_DECAY_STEP }
\n
"
%
lr_decay_steps
)
elif
isinstance
(
lr_sheduler
,
NoamDecay
):
with
fluid
.
program_guard
(
decay_main_program
,
decay_startup_program
):
lr
=
noam_decay
(
lr_sheduler
.
d_model
,
lr_sheduler
.
warmup_steps
,
1.0
)
lr_name
=
lr
.
name
logging
.
warn
(
"NoamDecay is set, warmup steps is [ %d ]"
%
lr_sheduler
.
warmup_steps
)
elif
isinstance
(
lr_sheduler
,
NaturalExpDecay
):
with
fluid
.
program_guard
(
decay_main_program
,
decay_startup_program
):
lr
=
natural_exp_decay
(
1.0
,
lr_decay_steps
,
lr_sheduler
.
gamma
,
True
)
lr_name
=
lr
.
name
logging
.
warn
(
"NaturalExpDecay is set, staircase = True, global learning rate decay step is [ %d ], Change decay steps as follow:
\n
"
"
\t
strategy = paddle.distributed.fleet.DistributedStrategy()
\n
"
"
\t
strategy.a_sync = True
\n
"
"
\t
strategy.a_sync_configs= { 'lr_decay_steps' : YOUR_DECAY_STEP }
\n
"
%
lr_decay_steps
)
elif
isinstance
(
lr_sheduler
,
InverseTimeDecay
):
with
fluid
.
program_guard
(
decay_main_program
,
decay_startup_program
):
lr
=
inverse_time_decay
(
1.0
,
lr_decay_steps
,
lr_sheduler
.
gamma
,
True
)
lr_name
=
lr
.
name
logging
.
warn
(
"InverseTimeDecay is set, staircase = True, global learning rate decay step is [ %d ], Change decay steps as follow:
\n
"
"
\t
strategy = paddle.distributed.fleet.DistributedStrategy()
\n
"
"
\t
strategy.a_sync = True
\n
"
"
\t
strategy.a_sync_configs= { 'lr_decay_steps' : YOUR_DECAY_STEP }
\n
"
%
lr_decay_steps
)
else
:
raise
ValueError
(
"Not supported current LearningRate strategy, please use follow decay strategy: {}"
.
format
(
schedler_decay
))
return
decay_main_program
,
decay_startup_program
,
lr_name
def
_get_varname_parts
(
varname
):
# returns origin, blockid, trainerid
orig_var_name
=
""
...
...
python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py
浏览文件 @
330aea6e
...
...
@@ -34,7 +34,6 @@ from paddle.fluid.incubate.fleet.parameter_server.mode import DistributedMode
OP_NAME_SCOPE
=
"op_namescope"
CLIP_OP_NAME_SCOPE
=
"@CLIP"
STEP_COUNTER
=
"@PS_STEP_COUNTER@"
OP_ROLE_VAR_ATTR_NAME
=
core
.
op_proto_and_checker_maker
.
kOpRoleVarAttrName
()
RPC_OP_ROLE_ATTR_NAME
=
core
.
op_proto_and_checker_maker
.
kOpRoleAttrName
()
RPC_OP_ROLE_ATTR_VALUE
=
core
.
op_proto_and_checker_maker
.
OpRole
.
RPC
...
...
@@ -43,7 +42,6 @@ OPT_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.Optimize
op_role_attr_name
=
core
.
op_proto_and_checker_maker
.
kOpRoleAttrName
()
SPARSE_OP_TYPE_DICT
=
{
"lookup_table"
:
"W"
,
"lookup_table_v2"
:
"W"
}
DEVICE_LIST
=
[
"cpu"
,
"gpu"
,
"xpu"
]
COMMUNICATE_OPS_TYPE
=
[
"send"
,
"recv"
,
"fetch_barrier"
,
"send_barrier"
]
DEFAULT_DEVICE
=
'cpu'
...
...
@@ -72,11 +70,26 @@ def delete_optimizer_pass(program, config):
if
_program
.
global_block
().
has_var
(
var
):
_program
.
global_block
().
_remove_var
(
var
)
def
_add_lr_var
(
main_program
,
compiled_config
):
# Todo: hard code for pe
lr_var
=
compiled_config
.
origin_main_program
.
global_block
().
vars
[
"learning_rate_0"
]
main_program
.
global_block
().
create_var
(
name
=
lr_var
.
name
,
shape
=
lr_var
.
shape
,
dtype
=
lr_var
.
dtype
,
type
=
lr_var
.
type
,
lod_level
=
lr_var
.
lod_level
,
persistable
=
True
)
optimizer_ops
=
_get_optimize_ops
(
program
)
lr_ops
=
_get_lr_ops
(
program
)
optimizer_ops
.
extend
(
lr_ops
)
_delete_optimizer_op_and_vars
(
program
,
optimizer_ops
)
if
hasattr
(
config
.
origin_main_program
,
'lr_sheduler'
):
_add_lr_var
(
program
,
config
)
return
program
...
...
python/paddle/fluid/tests/unittests/ctr_dataset_reader.py
浏览文件 @
330aea6e
...
...
@@ -179,7 +179,7 @@ def gen_zero_line(dnn_data_num=7, lr_data_num=5):
return
line
def
prepare_fake_data
(
file_nums
=
6
,
file_lines
=
10
00
):
def
prepare_fake_data
(
file_nums
=
4
,
file_lines
=
5
00
):
"""
Create fake data with same type as avazu_ctr_data
"""
...
...
python/paddle/fluid/tests/unittests/test_dist_fleet_base.py
浏览文件 @
330aea6e
...
...
@@ -13,6 +13,11 @@
# limitations under the License.
from
__future__
import
print_function
from
paddle.distributed.fleet.utils.ps_util
import
Distributed
from
paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy
import
StrategyFactory
import
paddle.distributed.fleet
as
fleet
import
paddle.distributed.fleet.base.role_maker
as
role_maker
import
paddle.fluid
as
fluid
"""
high level unit test for distribute fleet.
"""
...
...
@@ -37,6 +42,7 @@ import paddle.distributed.fleet.base.role_maker as role_maker
import
paddle.distributed.fleet
as
fleet
from
paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy
import
StrategyFactory
from
paddle.distributed.fleet.utils.ps_util
import
Distributed
paddle
.
enable_static
()
__all__
=
[
'FleetDistRunnerBase'
,
'TestFleetBase'
,
'runtime_main'
]
...
...
@@ -120,14 +126,20 @@ class FleetDistRunnerBase(object):
fluid
.
clip
.
set_gradient_clip
(
clip
=
fluid
.
clip
.
GradientClipByGlobalNorm
(
2.0
))
use_decay
=
int
(
os
.
getenv
(
"DECAY"
,
"0"
))
use_decay
=
int
(
os
.
getenv
(
"
USE_
DECAY"
,
"0"
))
if
use_decay
:
scheduler
=
paddle
.
optimizer
.
lr
.
ExponentialDecay
(
learning_rate
=
LEARNING_RATE
,
gamma
=
0.999
,
verbose
=
True
)
optimizer
=
fluid
.
optimizer
.
SGD
(
scheduler
)
"""
# learning rate decay method before 2.0
optimizer = fluid.optimizer.SGD(
learning_rate=fluid.layers.exponential_decay(
learning_rate=LEARNING_RATE,
decay_steps=500,
decay_rate=0.969,
staircase
=
True
))
staircase=True))
"""
else
:
optimizer
=
fluid
.
optimizer
.
SGD
(
LEARNING_RATE
)
optimizer
=
fleet
.
distributed_optimizer
(
optimizer
,
strategy
=
strategy
)
...
...
python/paddle/fluid/tests/unittests/test_dist_fleet_decay.py
0 → 100644
浏览文件 @
330aea6e
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
__future__
import
print_function
import
paddle.distributed.fleet
as
fleet
import
paddle.distributed.fleet.base.role_maker
as
role_maker
import
paddle.fluid
as
fluid
import
os
import
unittest
import
paddle
paddle
.
enable_static
()
# For Net
base_lr
=
0.2
emb_lr
=
base_lr
*
3
dict_dim
=
1500
emb_dim
=
128
hid_dim
=
128
margin
=
0.1
sample_rate
=
1
batch_size
=
4
class
TestNoamDecay
(
unittest
.
TestCase
):
def
net
(
self
):
input_data
=
paddle
.
static
.
data
(
name
=
"sparse_input"
,
shape
=
[
None
,
1
],
dtype
=
"int64"
)
input_label
=
paddle
.
static
.
data
(
name
=
"label"
,
shape
=
[
None
,
1
],
dtype
=
"int64"
)
label
=
paddle
.
cast
(
input_label
,
dtype
=
"float32"
)
embedding
=
paddle
.
static
.
nn
.
embedding
(
input_data
,
is_sparse
=
True
,
size
=
[
1000
,
128
])
fc1
=
paddle
.
static
.
nn
.
fc
(
embedding
,
size
=
1024
,
activation
=
"relu"
)
fc2
=
paddle
.
static
.
nn
.
fc
(
fc1
,
size
=
512
,
activation
=
"relu"
)
fc3
=
paddle
.
static
.
nn
.
fc
(
fc2
,
size
=
256
,
activation
=
"relu"
)
predict
=
paddle
.
static
.
nn
.
fc
(
fc3
,
size
=
2
,
activation
=
"softmax"
)
label
=
paddle
.
cast
(
label
,
dtype
=
"int64"
)
cost
=
paddle
.
nn
.
functional
.
cross_entropy
(
input
=
predict
,
label
=
label
)
paddle
.
static
.
Print
(
cost
,
message
=
"heter_cost"
)
return
cost
def
test
(
self
):
endpoints
=
[
"127.0.0.1:36004"
,
"127.0.0.1:36005"
,
"127.0.0.1:36006"
,
"127.0.0.1:36007"
]
role
=
role_maker
.
UserDefinedRoleMaker
(
current_id
=
0
,
role
=
role_maker
.
Role
.
WORKER
,
worker_num
=
2
,
server_endpoints
=
endpoints
)
fleet
.
init
(
role
)
loss
=
self
.
net
()
scheduler
=
paddle
.
optimizer
.
lr
.
NoamDecay
(
d_model
=
0.01
,
warmup_steps
=
100
,
verbose
=
True
)
optimizer
=
fluid
.
optimizer
.
Adam
(
scheduler
)
strategy
=
paddle
.
distributed
.
fleet
.
DistributedStrategy
()
strategy
.
a_sync
=
True
strategy
.
a_sync_configs
=
{
"launch_barrier"
:
False
}
optimizer
=
fleet
.
distributed_optimizer
(
optimizer
,
strategy
)
optimizer
.
minimize
(
loss
)
if
__name__
==
'__main__'
:
unittest
.
main
()
python/paddle/fluid/tests/unittests/test_dist_fleet_ps10.py
0 → 100644
浏览文件 @
330aea6e
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
__future__
import
print_function
import
paddle.fluid
as
fluid
import
paddle.distributed.fleet.base.role_maker
as
role_maker
import
paddle.distributed.fleet
as
fleet
import
unittest
import
paddle
import
os
paddle
.
enable_static
()
# For Net
base_lr
=
0.2
emb_lr
=
base_lr
*
3
dict_dim
=
1500
emb_dim
=
128
hid_dim
=
128
margin
=
0.1
sample_rate
=
1
batch_size
=
4
class
TestExponentialDecay
(
unittest
.
TestCase
):
def
net
(
self
):
input_data
=
paddle
.
static
.
data
(
name
=
"sparse_input"
,
shape
=
[
None
,
1
],
dtype
=
"int64"
)
input_label
=
paddle
.
static
.
data
(
name
=
"label"
,
shape
=
[
None
,
1
],
dtype
=
"int64"
)
label
=
paddle
.
cast
(
input_label
,
dtype
=
"float32"
)
embedding
=
paddle
.
static
.
nn
.
embedding
(
input_data
,
is_sparse
=
True
,
size
=
[
1000
,
128
])
fc1
=
paddle
.
static
.
nn
.
fc
(
embedding
,
size
=
1024
,
activation
=
"relu"
)
fc2
=
paddle
.
static
.
nn
.
fc
(
fc1
,
size
=
512
,
activation
=
"relu"
)
fc3
=
paddle
.
static
.
nn
.
fc
(
fc2
,
size
=
256
,
activation
=
"relu"
)
predict
=
paddle
.
static
.
nn
.
fc
(
fc3
,
size
=
2
,
activation
=
"softmax"
)
label
=
paddle
.
cast
(
label
,
dtype
=
"int64"
)
cost
=
paddle
.
nn
.
functional
.
cross_entropy
(
input
=
predict
,
label
=
label
)
return
cost
def
test
(
self
):
endpoints
=
[
"127.0.0.1:36004"
,
"127.0.0.1:36005"
,
"127.0.0.1:36006"
,
"127.0.0.1:36007"
]
role
=
role_maker
.
UserDefinedRoleMaker
(
current_id
=
0
,
role
=
role_maker
.
Role
.
SERVER
,
worker_num
=
2
,
server_endpoints
=
endpoints
)
fleet
.
init
(
role
)
loss
=
self
.
net
()
scheduler
=
paddle
.
optimizer
.
lr
.
InverseTimeDecay
(
learning_rate
=
base_lr
,
gamma
=
0.999
,
verbose
=
True
)
optimizer
=
fluid
.
optimizer
.
Adam
(
scheduler
)
strategy
=
paddle
.
distributed
.
fleet
.
DistributedStrategy
()
strategy
.
a_sync
=
True
optimizer
=
fleet
.
distributed_optimizer
(
optimizer
,
strategy
)
optimizer
.
minimize
(
loss
)
fleet
.
init_server
()
if
__name__
==
'__main__'
:
os
.
environ
[
"GLOG_v"
]
=
"4"
os
.
environ
[
"GLOG_logtostderr"
]
=
"1"
unittest
.
main
()
python/paddle/fluid/tests/unittests/test_dist_fleet_ps7.py
0 → 100644
浏览文件 @
330aea6e
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
__future__
import
print_function
import
paddle.distributed.fleet
as
fleet
import
paddle.distributed.fleet.base.role_maker
as
role_maker
import
paddle.fluid
as
fluid
import
os
import
unittest
import
paddle
paddle
.
enable_static
()
# For Net
base_lr
=
0.2
emb_lr
=
base_lr
*
3
dict_dim
=
1500
emb_dim
=
128
hid_dim
=
128
margin
=
0.1
sample_rate
=
1
batch_size
=
4
class
TestNaturalExpDecay
(
unittest
.
TestCase
):
def
net
(
self
):
input_data
=
paddle
.
static
.
data
(
name
=
"sparse_input"
,
shape
=
[
None
,
1
],
dtype
=
"int64"
)
input_label
=
paddle
.
static
.
data
(
name
=
"label"
,
shape
=
[
None
,
1
],
dtype
=
"int64"
)
label
=
paddle
.
cast
(
input_label
,
dtype
=
"float32"
)
embedding
=
paddle
.
static
.
nn
.
embedding
(
input_data
,
is_sparse
=
True
,
size
=
[
1000
,
128
])
fc1
=
paddle
.
static
.
nn
.
fc
(
embedding
,
size
=
1024
,
activation
=
"relu"
)
fc2
=
paddle
.
static
.
nn
.
fc
(
fc1
,
size
=
512
,
activation
=
"relu"
)
fc3
=
paddle
.
static
.
nn
.
fc
(
fc2
,
size
=
256
,
activation
=
"relu"
)
predict
=
paddle
.
static
.
nn
.
fc
(
fc3
,
size
=
2
,
activation
=
"softmax"
)
label
=
paddle
.
cast
(
label
,
dtype
=
"int64"
)
cost
=
paddle
.
nn
.
functional
.
cross_entropy
(
input
=
predict
,
label
=
label
)
paddle
.
static
.
Print
(
cost
,
message
=
"heter_cost"
)
return
cost
def
test
(
self
):
endpoints
=
[
"127.0.0.1:36004"
,
"127.0.0.1:36005"
,
"127.0.0.1:36006"
,
"127.0.0.1:36007"
]
role
=
role_maker
.
UserDefinedRoleMaker
(
current_id
=
0
,
role
=
role_maker
.
Role
.
SERVER
,
worker_num
=
2
,
server_endpoints
=
endpoints
)
fleet
.
init
(
role
)
loss
=
self
.
net
()
scheduler
=
paddle
.
optimizer
.
lr
.
NaturalExpDecay
(
learning_rate
=
base_lr
,
gamma
=
0.999
,
verbose
=
True
)
optimizer
=
fluid
.
optimizer
.
Adam
(
scheduler
)
strategy
=
paddle
.
distributed
.
fleet
.
DistributedStrategy
()
strategy
.
a_sync
=
True
optimizer
=
fleet
.
distributed_optimizer
(
optimizer
,
strategy
)
optimizer
.
minimize
(
loss
)
fleet
.
init_server
()
if
__name__
==
'__main__'
:
os
.
environ
[
"GLOG_v"
]
=
"4"
os
.
environ
[
"GLOG_logtostderr"
]
=
"1"
unittest
.
main
()
python/paddle/fluid/tests/unittests/test_dist_fleet_ps8.py
0 → 100644
浏览文件 @
330aea6e
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
__future__
import
print_function
import
paddle.distributed.fleet
as
fleet
import
paddle.distributed.fleet.base.role_maker
as
role_maker
import
paddle.fluid
as
fluid
import
os
import
unittest
import
paddle
paddle
.
enable_static
()
# For Net
base_lr
=
0.2
emb_lr
=
base_lr
*
3
dict_dim
=
1500
emb_dim
=
128
hid_dim
=
128
margin
=
0.1
sample_rate
=
1
batch_size
=
4
class
TestNoamDecay
(
unittest
.
TestCase
):
def
net
(
self
):
input_data
=
paddle
.
static
.
data
(
name
=
"sparse_input"
,
shape
=
[
None
,
1
],
dtype
=
"int64"
)
input_label
=
paddle
.
static
.
data
(
name
=
"label"
,
shape
=
[
None
,
1
],
dtype
=
"int64"
)
label
=
paddle
.
cast
(
input_label
,
dtype
=
"float32"
)
embedding
=
paddle
.
static
.
nn
.
embedding
(
input_data
,
is_sparse
=
True
,
size
=
[
1000
,
128
])
fc1
=
paddle
.
static
.
nn
.
fc
(
embedding
,
size
=
1024
,
activation
=
"relu"
)
fc2
=
paddle
.
static
.
nn
.
fc
(
fc1
,
size
=
512
,
activation
=
"relu"
)
fc3
=
paddle
.
static
.
nn
.
fc
(
fc2
,
size
=
256
,
activation
=
"relu"
)
predict
=
paddle
.
static
.
nn
.
fc
(
fc3
,
size
=
2
,
activation
=
"softmax"
)
label
=
paddle
.
cast
(
label
,
dtype
=
"int64"
)
cost
=
paddle
.
nn
.
functional
.
cross_entropy
(
input
=
predict
,
label
=
label
)
paddle
.
static
.
Print
(
cost
,
message
=
"heter_cost"
)
return
cost
def
test
(
self
):
endpoints
=
[
"127.0.0.1:36004"
,
"127.0.0.1:36005"
,
"127.0.0.1:36006"
,
"127.0.0.1:36007"
]
role
=
role_maker
.
UserDefinedRoleMaker
(
current_id
=
0
,
role
=
role_maker
.
Role
.
SERVER
,
worker_num
=
2
,
server_endpoints
=
endpoints
)
fleet
.
init
(
role
)
loss
=
self
.
net
()
scheduler
=
paddle
.
optimizer
.
lr
.
NoamDecay
(
d_model
=
0.01
,
warmup_steps
=
100
,
verbose
=
True
)
optimizer
=
fluid
.
optimizer
.
Adam
(
scheduler
)
strategy
=
paddle
.
distributed
.
fleet
.
DistributedStrategy
()
strategy
.
a_sync
=
True
optimizer
=
fleet
.
distributed_optimizer
(
optimizer
,
strategy
)
optimizer
.
minimize
(
loss
)
fleet
.
init_server
()
if
__name__
==
'__main__'
:
os
.
environ
[
"GLOG_v"
]
=
"4"
os
.
environ
[
"GLOG_logtostderr"
]
=
"1"
unittest
.
main
()
python/paddle/fluid/tests/unittests/test_dist_fleet_ps9.py
0 → 100644
浏览文件 @
330aea6e
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
__future__
import
print_function
import
paddle.distributed.fleet
as
fleet
import
paddle.distributed.fleet.base.role_maker
as
role_maker
import
paddle.fluid
as
fluid
import
os
import
unittest
import
paddle
paddle
.
enable_static
()
# For Net
base_lr
=
0.2
emb_lr
=
base_lr
*
3
dict_dim
=
1500
emb_dim
=
128
hid_dim
=
128
margin
=
0.1
sample_rate
=
1
batch_size
=
4
class
TestExponentialDecay
(
unittest
.
TestCase
):
def
net
(
self
):
input_data
=
paddle
.
static
.
data
(
name
=
"sparse_input"
,
shape
=
[
None
,
1
],
dtype
=
"int64"
)
input_label
=
paddle
.
static
.
data
(
name
=
"label"
,
shape
=
[
None
,
1
],
dtype
=
"int64"
)
label
=
paddle
.
cast
(
input_label
,
dtype
=
"float32"
)
embedding
=
paddle
.
static
.
nn
.
embedding
(
input_data
,
is_sparse
=
True
,
size
=
[
1000
,
128
])
fc1
=
paddle
.
static
.
nn
.
fc
(
embedding
,
size
=
1024
,
activation
=
"relu"
)
fc2
=
paddle
.
static
.
nn
.
fc
(
fc1
,
size
=
512
,
activation
=
"relu"
)
fc3
=
paddle
.
static
.
nn
.
fc
(
fc2
,
size
=
256
,
activation
=
"relu"
)
predict
=
paddle
.
static
.
nn
.
fc
(
fc3
,
size
=
2
,
activation
=
"softmax"
)
label
=
paddle
.
cast
(
label
,
dtype
=
"int64"
)
cost
=
paddle
.
nn
.
functional
.
cross_entropy
(
input
=
predict
,
label
=
label
)
paddle
.
static
.
Print
(
cost
,
message
=
"heter_cost"
)
return
cost
def
test
(
self
):
endpoints
=
[
"127.0.0.1:36004"
,
"127.0.0.1:36005"
,
"127.0.0.1:36006"
,
"127.0.0.1:36007"
]
role
=
role_maker
.
UserDefinedRoleMaker
(
current_id
=
0
,
role
=
role_maker
.
Role
.
SERVER
,
worker_num
=
2
,
server_endpoints
=
endpoints
)
fleet
.
init
(
role
)
loss
=
self
.
net
()
scheduler
=
paddle
.
optimizer
.
lr
.
ExponentialDecay
(
learning_rate
=
base_lr
,
gamma
=
0.999
,
verbose
=
True
)
optimizer
=
fluid
.
optimizer
.
Adam
(
scheduler
)
strategy
=
paddle
.
distributed
.
fleet
.
DistributedStrategy
()
strategy
.
a_sync
=
True
optimizer
=
fleet
.
distributed_optimizer
(
optimizer
,
strategy
)
optimizer
.
minimize
(
loss
)
fleet
.
init_server
()
if
__name__
==
'__main__'
:
os
.
environ
[
"GLOG_v"
]
=
"4"
os
.
environ
[
"GLOG_logtostderr"
]
=
"1"
unittest
.
main
()
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录