Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
机器未来
Paddle
提交
90323594
P
Paddle
项目概览
机器未来
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
90323594
编写于
2月 26, 2021
作者:
T
tangwei12
提交者:
GitHub
2月 26, 2021
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
loglevel adjustment for distributed training (#31205)
Change-Id: I6210ce9c60bed48f3323c47b16500302b66cedf2
上级
c0bda910
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
26 addition
and
32 deletion
+26
-32
paddle/fluid/distributed/fleet.cc
paddle/fluid/distributed/fleet.cc
+1
-1
paddle/fluid/distributed/service/brpc_ps_server.cc
paddle/fluid/distributed/service/brpc_ps_server.cc
+2
-5
paddle/fluid/distributed/service/communicator.cc
paddle/fluid/distributed/service/communicator.cc
+1
-1
paddle/fluid/distributed/service/communicator.h
paddle/fluid/distributed/service/communicator.h
+5
-5
paddle/fluid/distributed/service/heter_client.cc
paddle/fluid/distributed/service/heter_client.cc
+4
-4
paddle/fluid/distributed/service/heter_server.cc
paddle/fluid/distributed/service/heter_server.cc
+1
-1
paddle/fluid/distributed/service/heter_server.h
paddle/fluid/distributed/service/heter_server.h
+1
-1
paddle/fluid/distributed/table/common_dense_table.cc
paddle/fluid/distributed/table/common_dense_table.cc
+1
-1
paddle/fluid/distributed/table/common_dense_table.h
paddle/fluid/distributed/table/common_dense_table.h
+2
-5
paddle/fluid/distributed/table/common_sparse_table.cc
paddle/fluid/distributed/table/common_sparse_table.cc
+8
-8
未找到文件。
paddle/fluid/distributed/fleet.cc
浏览文件 @
90323594
...
...
@@ -501,7 +501,7 @@ void FleetWrapper::ShrinkDenseTable(int table_id, Scope* scope,
if
(
name
.
find
(
"batch_sum"
)
!=
std
::
string
::
npos
)
{
Variable
*
var
=
scope
->
FindVar
(
name
);
CHECK
(
var
!=
nullptr
)
<<
"var["
<<
name
<<
"] not found"
;
VLOG
(
0
)
<<
"prepare shrink dense batch_sum"
;
VLOG
(
3
)
<<
"prepare shrink dense batch_sum"
;
LoDTensor
*
tensor
=
var
->
GetMutable
<
LoDTensor
>
();
float
*
g
=
tensor
->
data
<
float
>
();
...
...
paddle/fluid/distributed/service/brpc_ps_server.cc
浏览文件 @
90323594
...
...
@@ -79,16 +79,13 @@ uint64_t BrpcPsServer::start(const std::string &ip, uint32_t port) {
}
}
VLOG
(
0
)
<<
"BrpcPsServer::start registe_ps_server"
;
_environment
->
registe_ps_server
(
ip
,
port
,
_rank
);
VLOG
(
0
)
<<
"BrpcPsServer::start wait"
;
cv_
.
wait
(
lock
,
[
&
]
{
return
stoped_
;
});
PSHost
host
;
host
.
ip
=
ip
;
host
.
port
=
port
;
host
.
rank
=
_rank
;
VLOG
(
0
)
<<
"BrpcPsServer::start return host.rank"
;
return
host
.
rank
;
}
...
...
@@ -464,7 +461,7 @@ int32_t BrpcPsService::save_one_table(Table *table,
int32_t
feasign_size
=
0
;
VLOG
(
0
)
<<
"save on
e table "
<<
request
.
params
(
0
)
<<
" "
<<
request
.
params
(
1
);
VLOG
(
3
)
<<
"sav
e table "
<<
request
.
params
(
0
)
<<
" "
<<
request
.
params
(
1
);
feasign_size
=
table
->
save
(
request
.
params
(
0
),
request
.
params
(
1
));
if
(
feasign_size
<
0
)
{
set_response_code
(
response
,
-
1
,
"table save failed"
);
...
...
@@ -507,7 +504,7 @@ int32_t BrpcPsService::shrink_table(Table *table,
set_response_code
(
response
,
-
1
,
"table shrink failed"
);
return
-
1
;
}
VLOG
(
0
)
<<
"Pserver Shrink Finished"
;
VLOG
(
3
)
<<
"Pserver Shrink Finished"
;
return
0
;
}
...
...
paddle/fluid/distributed/service/communicator.cc
浏览文件 @
90323594
...
...
@@ -39,7 +39,7 @@ inline double GetCurrentUS() {
Communicator
::
Communicator
()
{}
void
Communicator
::
init_gflag
(
const
std
::
string
&
gflags
)
{
VLOG
(
0
)
<<
"Init With Gflags:"
<<
gflags
;
VLOG
(
3
)
<<
"Init With Gflags:"
<<
gflags
;
std
::
vector
<
std
::
string
>
flags
=
paddle
::
string
::
split_string
(
gflags
);
if
(
flags
.
size
()
<
1
)
{
flags
.
push_back
(
"-max_body_size=314217728"
);
...
...
paddle/fluid/distributed/service/communicator.h
浏览文件 @
90323594
...
...
@@ -199,10 +199,10 @@ class Communicator {
Communicator
();
explicit
Communicator
(
const
std
::
map
<
std
::
string
,
std
::
string
>
&
envs_
)
{
VLOG
(
0
)
<<
"Communicator Init Envs"
;
VLOG
(
3
)
<<
"Communicator Init Envs"
;
for
(
auto
&
iter
:
envs_
)
{
envs
[
iter
.
first
]
=
iter
.
second
;
VLOG
(
0
)
<<
iter
.
first
<<
": "
<<
iter
.
second
;
VLOG
(
3
)
<<
iter
.
first
<<
": "
<<
iter
.
second
;
}
barrier_table_id_
=
std
::
stoi
(
envs
.
at
(
"barrier_table_id"
));
trainer_id_
=
std
::
stoi
(
envs
.
at
(
"trainer_id"
));
...
...
@@ -436,7 +436,7 @@ class HalfAsyncCommunicator : public AsyncCommunicator {
need_global_step_
=
static_cast
<
bool
>
(
std
::
stoi
(
envs
.
at
(
"need_global_step"
)));
VLOG
(
0
)
<<
"HalfAsyncCommunicator Initialized"
;
VLOG
(
1
)
<<
"HalfAsyncCommunicator Initialized"
;
}
void
MainThread
()
override
;
...
...
@@ -481,7 +481,7 @@ class SyncCommunicator : public HalfAsyncCommunicator {
need_global_step_
=
static_cast
<
bool
>
(
std
::
stoi
(
envs
.
at
(
"need_global_step"
)));
VLOG
(
0
)
<<
"SyncCommunicator Initialized"
;
VLOG
(
1
)
<<
"SyncCommunicator Initialized"
;
}
void
BarrierSend
();
...
...
@@ -525,7 +525,7 @@ class GeoCommunicator : public AsyncCommunicator {
// id_queue's size
max_merge_var_num_
=
std
::
stoi
(
envs
.
at
(
"communicator_max_merge_var_num"
));
send_queue_size_
=
max_merge_var_num_
;
VLOG
(
0
)
<<
"GeoCommunicator Initialized"
;
VLOG
(
1
)
<<
"GeoCommunicator Initialized"
;
}
void
Send
(
const
std
::
vector
<
std
::
string
>
&
var_names
,
...
...
paddle/fluid/distributed/service/heter_client.cc
浏览文件 @
90323594
...
...
@@ -34,7 +34,7 @@ void HeterClient::MainThread() {
void
HeterClient
::
Stop
()
{
running_
=
false
;
if
(
!
is_initialized_
)
{
VLOG
(
0
)
<<
"HeterClient is not inited, do nothing"
;
VLOG
(
3
)
<<
"HeterClient is not inited, do nothing"
;
}
else
{
if
(
main_thread_
)
{
auto
status
=
StopHeterWorker
();
...
...
@@ -42,20 +42,20 @@ void HeterClient::Stop() {
main_thread_
->
join
();
main_thread_
.
reset
(
nullptr
);
}
VLOG
(
1
)
<<
"HeterClient Stop Done"
;
VLOG
(
3
)
<<
"HeterClient Stop Done"
;
}
}
void
HeterClient
::
FinalizeWorker
()
{
running_
=
false
;
if
(
!
is_initialized_
)
{
VLOG
(
0
)
<<
"HeterClient is not inited, do nothing"
;
VLOG
(
3
)
<<
"HeterClient is not inited, do nothing"
;
}
else
{
if
(
main_thread_
)
{
main_thread_
->
join
();
main_thread_
.
reset
(
nullptr
);
}
VLOG
(
1
)
<<
"HeterClient Stop Done"
;
VLOG
(
3
)
<<
"HeterClient Stop Done"
;
}
}
...
...
paddle/fluid/distributed/service/heter_server.cc
浏览文件 @
90323594
...
...
@@ -89,7 +89,7 @@ int32_t HeterService::stop_heter_worker(const PsRequestMessage& request,
stop_cpu_worker_set_
.
insert
(
client_id
);
if
(
stop_cpu_worker_set_
.
size
()
==
fan_in_
)
{
is_exit_
=
true
;
VLOG
(
0
)
<<
"Stop heter Service done."
;
VLOG
(
3
)
<<
"Stop heter Service done."
;
}
return
0
;
}
...
...
paddle/fluid/distributed/service/heter_server.h
浏览文件 @
90323594
...
...
@@ -153,7 +153,7 @@ class HeterServer {
virtual
~
HeterServer
()
{}
void
Stop
()
{
VLOG
(
0
)
<<
"HeterServer Stop()"
;
VLOG
(
3
)
<<
"HeterServer Stop()"
;
std
::
unique_lock
<
std
::
mutex
>
lock
(
mutex_
);
stoped_
=
true
;
cv_
.
notify_all
();
...
...
paddle/fluid/distributed/table/common_dense_table.cc
浏览文件 @
90323594
...
...
@@ -94,7 +94,7 @@ int32_t CommonDenseTable::initialize_optimizer() {
}
else
{
VLOG
(
0
)
<<
"init optimizer failed"
;
}
VLOG
(
0
)
<<
"init optimizer "
<<
name
<<
" done"
;
VLOG
(
3
)
<<
"init optimizer "
<<
name
<<
" done"
;
return
0
;
}
...
...
paddle/fluid/distributed/table/common_dense_table.h
浏览文件 @
90323594
...
...
@@ -47,15 +47,12 @@ class CommonDenseTable : public DenseTable {
virtual
int32_t
set_global_lr
(
float
*
lr
)
override
;
int32_t
load
(
const
std
::
string
&
path
,
const
std
::
string
&
param
)
override
{
VLOG
(
0
)
<<
"Dense table may load by "
"paddle.distributed.fleet.init_server"
;
VLOG
(
0
)
<<
"WARNING: dense variables will load on No.0 trainer"
;
return
0
;
}
int32_t
save
(
const
std
::
string
&
path
,
const
std
::
string
&
param
)
override
{
VLOG
(
0
)
<<
"Dense table may be saved by "
"paddle.distributed.fleet.save_persistables/save_inference_model"
;
VLOG
(
0
)
<<
"WARNING: dense variables will save on No.0 trainer"
;
return
0
;
}
...
...
paddle/fluid/distributed/table/common_sparse_table.cc
浏览文件 @
90323594
...
...
@@ -170,7 +170,7 @@ int64_t LoadFromText(const std::string& valuepath, const std::string& metapath,
auto
id
=
std
::
stoull
(
values
[
0
]);
if
(
id
%
pserver_num
!=
pserver_id
)
{
VLOG
(
0
)
<<
"will not load "
<<
values
[
0
]
<<
" from "
<<
valuepath
VLOG
(
3
)
<<
"will not load "
<<
values
[
0
]
<<
" from "
<<
valuepath
<<
", please check id distribution"
;
continue
;
}
...
...
@@ -263,7 +263,7 @@ int32_t CommonSparseTable::initialize_value() {
}
}
VLOG
(
0
)
<<
"has "
<<
feasigns
.
size
()
<<
" ids need to be pre inited"
;
VLOG
(
3
)
<<
"has "
<<
feasigns
.
size
()
<<
" ids need to be pre inited"
;
auto
buckets
=
bucket
(
feasigns
.
size
(),
10
);
for
(
int
x
=
0
;
x
<
10
;
++
x
)
{
...
...
@@ -295,10 +295,10 @@ int32_t CommonSparseTable::initialize_optimizer() {
optimizer_
=
std
::
make_shared
<
SSUM
>
(
value_names_
,
value_dims_
,
value_offsets_
,
value_idx_
);
}
else
{
VLOG
(
0
)
<<
"init optimizer failed"
;
VLOG
(
3
)
<<
"init optimizer failed"
;
}
VLOG
(
0
)
<<
"init optimizer "
<<
name
<<
" done"
;
VLOG
(
3
)
<<
"init optimizer "
<<
name
<<
" done"
;
return
0
;
}
...
...
@@ -311,7 +311,7 @@ int32_t CommonSparseTable::set_global_lr(float* lr) {
int32_t
CommonSparseTable
::
load
(
const
std
::
string
&
path
,
const
std
::
string
&
param
)
{
rwlock_
->
WRLock
();
VLOG
(
0
)
<<
"sparse table load with "
<<
path
<<
" with meta "
<<
param
;
VLOG
(
3
)
<<
"sparse table load with "
<<
path
<<
" with meta "
<<
param
;
LoadFromText
(
path
,
param
,
_shard_idx
,
_shard_num
,
task_pool_size_
,
&
shard_values_
);
rwlock_
->
UNLock
();
...
...
@@ -322,7 +322,7 @@ int32_t CommonSparseTable::save(const std::string& dirname,
const
std
::
string
&
param
)
{
rwlock_
->
WRLock
();
int
mode
=
std
::
stoi
(
param
);
VLOG
(
0
)
<<
"sparse table save: "
<<
dirname
<<
" mode: "
<<
mode
;
VLOG
(
3
)
<<
"sparse table save: "
<<
dirname
<<
" mode: "
<<
mode
;
auto
varname
=
_config
.
common
().
table_name
();
std
::
string
var_store
=
...
...
@@ -538,11 +538,11 @@ int32_t CommonSparseTable::flush() { return 0; }
int32_t
CommonSparseTable
::
shrink
(
const
std
::
string
&
param
)
{
rwlock_
->
WRLock
();
int
threshold
=
std
::
stoi
(
param
);
VLOG
(
0
)
<<
"sparse table shrink: "
<<
threshold
;
VLOG
(
3
)
<<
"sparse table shrink: "
<<
threshold
;
for
(
int
shard_id
=
0
;
shard_id
<
task_pool_size_
;
++
shard_id
)
{
// shrink
VLOG
(
0
)
<<
shard_id
<<
" "
<<
task_pool_size_
<<
" begin shrink"
;
VLOG
(
4
)
<<
shard_id
<<
" "
<<
task_pool_size_
<<
" begin shrink"
;
shard_values_
[
shard_id
]
->
Shrink
(
threshold
);
}
rwlock_
->
UNLock
();
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录