Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
机器未来
Paddle
提交
c49560da
P
Paddle
项目概览
机器未来
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
c49560da
编写于
9月 02, 2020
作者:
C
Chengmo
提交者:
GitHub
9月 02, 2020
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
supplement bug fix of parameter server (#26217) (#26909)
* fix fluid.embedding
上级
1b60f7f6
变更
14
隐藏空白更改
内联
并排
Showing
14 changed file
with
255 addition
and
340 deletion
+255
-340
paddle/fluid/operators/distributed_ops/distributed_lookup_table_op.cc
.../operators/distributed_ops/distributed_lookup_table_op.cc
+16
-9
paddle/fluid/operators/distributed_ops/distributed_lookup_table_op.h
...d/operators/distributed_ops/distributed_lookup_table_op.h
+21
-0
python/paddle/distributed/fleet/runtime/parameter_server_runtime.py
...dle/distributed/fleet/runtime/parameter_server_runtime.py
+4
-3
python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py
...paddle/fluid/incubate/fleet/parameter_server/ir/public.py
+7
-5
python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py
.../fluid/incubate/fleet/parameter_server/ir/trainer_pass.py
+7
-4
python/paddle/fluid/tests/unittests/CMakeLists.txt
python/paddle/fluid/tests/unittests/CMakeLists.txt
+0
-2
python/paddle/fluid/tests/unittests/dist_fleet_ctr.py
python/paddle/fluid/tests/unittests/dist_fleet_ctr.py
+2
-5
python/paddle/fluid/tests/unittests/dist_fleet_simnet_bow.py
python/paddle/fluid/tests/unittests/dist_fleet_simnet_bow.py
+105
-112
python/paddle/fluid/tests/unittests/simnet_dataset_reader.py
python/paddle/fluid/tests/unittests/simnet_dataset_reader.py
+33
-0
python/paddle/fluid/tests/unittests/test_dist_fleet_ctr.py
python/paddle/fluid/tests/unittests/test_dist_fleet_ctr.py
+0
-35
python/paddle/fluid/tests/unittests/test_dist_fleet_geo.py
python/paddle/fluid/tests/unittests/test_dist_fleet_geo.py
+2
-2
python/paddle/fluid/tests/unittests/test_dist_fleet_grad_clip.py
...paddle/fluid/tests/unittests/test_dist_fleet_grad_clip.py
+2
-2
python/paddle/fluid/tests/unittests/test_dist_fleet_simnet.py
...on/paddle/fluid/tests/unittests/test_dist_fleet_simnet.py
+56
-0
python/paddle/fluid/tests/unittests/test_dist_simnet_bow.py
python/paddle/fluid/tests/unittests/test_dist_simnet_bow.py
+0
-161
未找到文件。
paddle/fluid/operators/distributed_ops/distributed_lookup_table_op.cc
浏览文件 @
c49560da
...
...
@@ -25,25 +25,32 @@ class DistributedLookupTableOp : public framework::OperatorWithKernel {
using
framework
::
OperatorWithKernel
::
OperatorWithKernel
;
void
InferShape
(
framework
::
InferShapeContext
*
ctx
)
const
override
{
PADDLE_ENFORCE
(
ctx
->
HasInputs
(
"Ids"
),
"Input(Ids) of LookupTableOp should not be null."
);
PADDLE_ENFORCE
(
ctx
->
HasInput
(
"W"
),
"Input(W) of LookupTableOp should not be null."
);
PADDLE_ENFORCE
(
ctx
->
HasOutputs
(
"Outputs"
),
"Output(Outs) of LookupTableOp should not be null."
);
PADDLE_ENFORCE_EQ
(
ctx
->
HasInputs
(
"Ids"
),
true
,
platform
::
errors
::
InvalidArgument
(
"Input(Ids) of LookupTableOp should not be null."
));
PADDLE_ENFORCE_EQ
(
ctx
->
HasInput
(
"W"
),
true
,
platform
::
errors
::
InvalidArgument
(
"Input(W) of LookupTableOp should not be null."
));
PADDLE_ENFORCE_EQ
(
ctx
->
HasOutputs
(
"Outputs"
),
true
,
platform
::
errors
::
InvalidArgument
(
"Output(Outs) of LookupTableOp should not be null."
));
auto
ids_dims
=
ctx
->
GetInputsDim
(
"Ids"
);
auto
table_dims
=
ctx
->
GetInputDim
(
"W"
);
PADDLE_ENFORCE_EQ
(
table_dims
.
size
(),
2
,
"Only 2 dimensions of the 'Embedding' is supported."
);
PADDLE_ENFORCE_EQ
(
table_dims
.
size
(),
2
,
platform
::
errors
::
InvalidArgument
(
"Only 2 dimensions of the 'Embedding' is supported."
));
for
(
auto
&
ids_dim
:
ids_dims
)
{
PADDLE_ENFORCE_EQ
(
ids_dim
.
size
(),
2
,
"The dimension of the 'Ids' tensor must be 2."
);
platform
::
errors
::
InvalidArgument
(
"The dimension of the 'Ids' tensor must be 2."
));
}
auto
endpoints
=
ctx
->
Attrs
().
Get
<
std
::
vector
<
std
::
string
>>
(
"endpoints"
);
// for fluid.embedding
auto
lookup_table_version
=
ctx
->
Attrs
().
Get
<
std
::
string
>
(
"lookup_table_version"
);
...
...
paddle/fluid/operators/distributed_ops/distributed_lookup_table_op.h
浏览文件 @
c49560da
...
...
@@ -35,9 +35,30 @@ class DistributedLookupTableKernel : public framework::OpKernel<T> {
auto
endpoints
=
context
.
Attr
<
std
::
vector
<
std
::
string
>>
(
"endpoints"
);
auto
is_distributed
=
context
.
Attr
<
bool
>
(
"is_distributed"
);
auto
lookup_table_version
=
context
.
Attr
<
std
::
string
>
(
"lookup_table_version"
);
operators
::
distributed
::
prefetchs
(
id_names
,
out_names
,
embedding_name
,
is_distributed
,
lookup_tables
,
endpoints
,
context
,
context
.
scope
());
if
(
lookup_table_version
==
"lookup_table_v2"
)
{
auto
&
scope
=
context
.
scope
();
auto
emb_dim
=
scope
.
FindVar
(
embedding_name
)
->
Get
<
framework
::
LoDTensor
>
().
dims
()[
1
];
for
(
size_t
i
=
0
;
i
<
id_names
.
size
();
++
i
)
{
auto
*
id_var
=
scope
.
FindVar
(
id_names
[
i
]);
auto
*
out_var
=
scope
.
FindVar
(
out_names
[
i
]);
auto
*
id_tensor
=
id_var
->
GetMutable
<
framework
::
LoDTensor
>
();
auto
*
out_tensor
=
out_var
->
GetMutable
<
framework
::
LoDTensor
>
();
auto
id_dims
=
id_tensor
->
dims
();
out_tensor
->
Resize
(
framework
::
make_ddim
(
{
static_cast
<
int64_t
>
(
id_dims
[
0
]),
static_cast
<
int64_t
>
(
id_dims
[
1
]),
static_cast
<
int64_t
>
(
emb_dim
)}));
}
}
}
};
...
...
python/paddle/distributed/fleet/runtime/parameter_server_runtime.py
浏览文件 @
c49560da
...
...
@@ -154,15 +154,16 @@ class ParameterServerRuntime(RuntimeBase):
kwargs
[
"sparse_attrs"
]
=
get_sparse_attrs
()
return
kwargs
from
paddle.fluid.incubate.fleet.parameter_server.ir.public
import
_get_lr_ops
from
paddle.fluid.incubate.fleet.parameter_server.ir.public
import
_get_lr_ops
,
_has_global_step
from
paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy
import
\
SyncStrategy
,
GeoStrategy
trainer_config
=
self
.
async_strategy
.
get_trainer_runtime_config
()
lrs
=
_get_lr_ops
(
self
.
origin_main_program
)
if
len
(
lrs
)
>
0
:
lrs
=
_has_global_step
(
_get_lr_ops
(
self
.
origin_main_program
))
if
lrs
:
kwargs
=
{
"need_global_step"
:
"1"
}
else
:
kwargs
=
{
"need_global_step"
:
"0"
}
...
...
python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py
浏览文件 @
c49560da
...
...
@@ -42,6 +42,9 @@ op_role_attr_name = core.op_proto_and_checker_maker.kOpRoleAttrName()
LR_SCHED_OP_ROLE_ATTR_VALUE
=
core
.
op_proto_and_checker_maker
.
OpRole
.
LRSched
OPT_OP_ROLE_ATTR_VALUE
=
core
.
op_proto_and_checker_maker
.
OpRole
.
Optimize
SPARSE_OP_LIST
=
[
"lookup_table"
,
"lookup_table_v2"
]
SPARSE_OP_TYPE_DICT
=
{
"lookup_table"
:
"W"
,
"lookup_table_v2"
:
"W"
}
def
_get_lr_ops
(
program
):
lr_ops
=
[]
...
...
@@ -66,7 +69,7 @@ def _has_global_step(lr_ops):
def
is_sparse_op
(
op
):
if
op
.
type
==
"lookup_table"
and
op
.
attr
(
'is_sparse'
)
is
True
and
op
.
attr
(
if
op
.
type
in
SPARSE_OP_LIST
and
op
.
attr
(
'is_sparse'
)
is
True
and
op
.
attr
(
'is_distributed'
)
is
False
:
return
True
...
...
@@ -78,7 +81,7 @@ def is_sparse_op(op):
def
is_distributed_sparse_op
(
op
):
if
op
.
type
==
"lookup_table"
and
op
.
attr
(
'is_distributed'
)
is
True
:
if
op
.
type
in
SPARSE_OP_LIST
and
op
.
attr
(
'is_distributed'
)
is
True
:
return
True
if
op
.
type
==
"distributed_lookup_table"
and
op
.
attr
(
...
...
@@ -802,11 +805,10 @@ class CompileTimeStrategy(object):
def
_get_sparse_varnames
():
varnames
=
[]
op_types
=
{
"lookup_table"
:
"W"
}
for
op
in
origin_program
.
global_block
().
ops
:
if
op
.
type
in
op_types
.
keys
()
\
if
op
.
type
in
SPARSE_OP_TYPE_DICT
.
keys
()
\
and
op
.
attr
(
'remote_prefetch'
)
is
True
:
param_name
=
op
.
input
(
op_types
[
op
.
type
])[
0
]
param_name
=
op
.
input
(
SPARSE_OP_TYPE_DICT
[
op
.
type
])[
0
]
varnames
.
append
(
param_name
)
return
list
(
set
(
varnames
))
...
...
python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py
浏览文件 @
c49560da
...
...
@@ -40,6 +40,8 @@ LR_SCHED_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.LRSched
OPT_OP_ROLE_ATTR_VALUE
=
core
.
op_proto_and_checker_maker
.
OpRole
.
Optimize
op_role_attr_name
=
core
.
op_proto_and_checker_maker
.
kOpRoleAttrName
()
SPARSE_OP_TYPE_DICT
=
{
"lookup_table"
:
"W"
,
"lookup_table_v2"
:
"W"
}
DEVICE_LIST
=
[
"cpu"
,
"gpu"
,
"xpu"
]
COMMUNICATE_OPS_TYPE
=
[
"send"
,
"recv"
,
"fetch_barrier"
,
"send_barrier"
]
DEFAULT_DEVICE
=
'cpu'
...
...
@@ -81,11 +83,10 @@ def distributed_ops_pass(program, config):
def
_get_pull_sparse_ops
(
_program
):
pull_sparse_ops
=
{}
op_types
=
{
"lookup_table"
:
"W"
}
for
op
in
_program
.
global_block
().
ops
:
if
op
.
type
in
op_types
.
keys
()
\
if
op
.
type
in
SPARSE_OP_TYPE_DICT
.
keys
()
\
and
op
.
attr
(
'remote_prefetch'
)
is
True
:
param_name
=
op
.
input
(
op_types
[
op
.
type
])[
0
]
param_name
=
op
.
input
(
SPARSE_OP_TYPE_DICT
[
op
.
type
])[
0
]
ops
=
pull_sparse_ops
.
get
(
param_name
,
[])
ops
.
append
(
op
)
pull_sparse_ops
[
param_name
]
=
ops
...
...
@@ -101,6 +102,7 @@ def distributed_ops_pass(program, config):
w
=
program
.
global_block
().
vars
[
ops
[
0
].
input
(
"W"
)[
0
]]
padding_idx
=
ops
[
0
].
attr
(
"padding_idx"
)
is_distributed
=
ops
[
0
].
attr
(
"is_distributed"
)
op_type
=
ops
[
0
].
type
outputs
=
[
program
.
global_block
().
vars
[
op
.
output
(
"Out"
)[
0
]]
for
op
in
ops
...
...
@@ -149,7 +151,8 @@ def distributed_ops_pass(program, config):
"is_distributed"
:
is_distributed
,
"pserver_num"
:
len
(
pserver_endpoints
),
"padding_idx"
:
padding_idx
,
"trainer_id"
:
trainer_id
"trainer_id"
:
trainer_id
,
"lookup_table_version"
:
op_type
})
else
:
raise
ValueError
(
...
...
python/paddle/fluid/tests/unittests/CMakeLists.txt
浏览文件 @
c49560da
...
...
@@ -432,8 +432,6 @@ if(WITH_DISTRIBUTE)
list
(
REMOVE_ITEM DIST_TEST_OPS
"test_dist_mnist_lars"
)
list
(
REMOVE_ITEM DIST_TEST_OPS
"test_dist_mnist_train"
)
list
(
REMOVE_ITEM DIST_TEST_OPS
"test_dist_save_load"
)
list
(
REMOVE_ITEM DIST_TEST_OPS
"test_dist_simnet_bow"
)
list
(
REMOVE_ITEM DIST_TEST_OPS
"test_dist_fleet_ctr"
)
list
(
REMOVE_ITEM DIST_TEST_OPS
"test_dist_text_classification"
)
list
(
REMOVE_ITEM DIST_TEST_OPS
"test_dist_train"
)
list
(
REMOVE_ITEM DIST_TEST_OPS
"test_dist_word2vec"
)
...
...
python/paddle/fluid/tests/unittests/dist_fleet_ctr.py
浏览文件 @
c49560da
...
...
@@ -196,8 +196,7 @@ class TestDistCTR2x2(FleetDistRunnerBase):
fleet
.
stop_worker
()
def
do_dataset_training
(
self
,
fleet
):
dnn_input_dim
,
lr_input_dim
,
train_file_path
=
ctr_dataset_reader
.
prepare_data
(
)
train_file_list
=
ctr_dataset_reader
.
prepare_fake_data
()
exe
=
fluid
.
Executor
(
fluid
.
CPUPlace
())
...
...
@@ -206,9 +205,7 @@ class TestDistCTR2x2(FleetDistRunnerBase):
thread_num
=
2
batch_size
=
128
filelist
=
[]
for
_
in
range
(
thread_num
):
filelist
.
append
(
train_file_path
)
filelist
=
train_file_list
# config dataset
dataset
=
paddle
.
distributed
.
fleet
.
DatasetFactory
().
create_dataset
()
...
...
python/paddle/fluid/tests/unittests/dist_simnet_bow.py
→
python/paddle/fluid/tests/unittests/dist_
fleet_
simnet_bow.py
浏览文件 @
c49560da
...
...
@@ -19,6 +19,8 @@ import argparse
import
time
import
math
import
random
import
shutil
import
tempfile
import
paddle
import
paddle.fluid
as
fluid
...
...
@@ -29,7 +31,8 @@ from multiprocessing import Process
import
os
import
signal
from
functools
import
reduce
from
test_dist_base
import
TestDistRunnerBase
,
runtime_main
from
test_dist_fleet_base
import
runtime_main
,
FleetDistRunnerBase
from
paddle.distributed.fleet.base.util_factory
import
fleet_util
DTYPE
=
"int64"
DATA_URL
=
'http://paddle-dist-ce-data.bj.bcebos.com/simnet.train.1000'
...
...
@@ -49,6 +52,18 @@ fluid.default_startup_program().random_seed = 1
fluid
.
default_main_program
().
random_seed
=
1
def
fake_simnet_reader
():
def
reader
():
for
_
in
range
(
1000
):
q
=
np
.
random
.
random_integers
(
0
,
1500
-
1
,
size
=
1
).
tolist
()
label
=
np
.
random
.
random_integers
(
0
,
1
,
size
=
1
).
tolist
()
pt
=
np
.
random
.
random_integers
(
0
,
1500
-
1
,
size
=
1
).
tolist
()
nt
=
np
.
random
.
random_integers
(
0
,
1500
-
1
,
size
=
1
).
tolist
()
yield
[
q
,
label
,
pt
,
nt
]
return
reader
def
get_acc
(
cos_q_nt
,
cos_q_pt
,
batch_size
):
cond
=
fluid
.
layers
.
less_than
(
cos_q_nt
,
cos_q_pt
)
cond
=
fluid
.
layers
.
cast
(
cond
,
dtype
=
'float64'
)
...
...
@@ -75,34 +90,40 @@ def get_loss(cos_q_pt, cos_q_nt):
return
avg_cost
def
get_optimizer
(
op
=
"sgd"
):
if
op
.
upper
()
==
"sgd"
.
upper
():
optimizer
=
fluid
.
optimizer
.
SGD
(
learning_rate
=
base_lr
)
elif
op
.
upper
()
==
"adam"
.
upper
():
optimizer
=
fluid
.
optimizer
.
Adam
(
learning_rate
=
base_lr
)
else
:
optimizer
=
fluid
.
optimizer
.
SGD
(
learning_rate
=
base_lr
)
return
optimizer
def
train_network
(
batch_size
,
is_distributed
=
False
,
is_sparse
=
False
,
is_self_contained_lr
=
False
):
is_self_contained_lr
=
False
,
is_pyreader
=
False
):
# query
q
=
fluid
.
layers
.
data
(
name
=
"query_ids"
,
shape
=
[
1
],
dtype
=
"int64"
,
lod_level
=
1
)
# label data
label
=
fluid
.
layers
.
data
(
name
=
"label"
,
shape
=
[
1
],
dtype
=
"int64"
)
# pt
pt
=
fluid
.
layers
.
data
(
name
=
"pos_title_ids"
,
shape
=
[
1
],
dtype
=
"int64"
,
lod_level
=
1
)
# nt
nt
=
fluid
.
layers
.
data
(
name
=
"neg_title_ids"
,
shape
=
[
1
],
dtype
=
"int64"
,
lod_level
=
1
)
datas
=
[
q
,
label
,
pt
,
nt
]
reader
=
None
if
is_pyreader
:
reader
=
fluid
.
io
.
PyReader
(
feed_list
=
datas
,
capacity
=
64
,
iterable
=
False
,
use_double_buffer
=
False
)
# embedding
q_emb
=
fluid
.
embedding
(
input
=
q
,
is_distributed
=
is_distributed
,
size
=
[
dict_dim
,
emb_dim
],
param_attr
=
fluid
.
ParamAttr
(
initializer
=
fluid
.
initializer
.
Constant
(
value
=
0.01
),
name
=
"__emb__"
,
learning_rate
=
emb_lr
)
if
is_self_contained_lr
else
fluid
.
ParamAttr
(
initializer
=
fluid
.
initializer
.
Constant
(
value
=
0.01
),
name
=
"__emb__"
),
initializer
=
fluid
.
initializer
.
Constant
(
value
=
0.01
),
name
=
"__emb__"
),
is_sparse
=
is_sparse
)
q_emb
=
fluid
.
layers
.
reshape
(
q_emb
,
[
-
1
,
emb_dim
])
# vsum
...
...
@@ -115,12 +136,8 @@ def train_network(batch_size,
param_attr
=
fluid
.
ParamAttr
(
initializer
=
fluid
.
initializer
.
Constant
(
value
=
0.01
),
name
=
"__q_fc__"
,
learning_rate
=
base_lr
))
# label data
label
=
fluid
.
layers
.
data
(
name
=
"label"
,
shape
=
[
1
],
dtype
=
"int64"
)
# pt
pt
=
fluid
.
layers
.
data
(
name
=
"pos_title_ids"
,
shape
=
[
1
],
dtype
=
"int64"
,
lod_level
=
1
)
learning_rate
=
base_lr
),
)
# embedding
pt_emb
=
fluid
.
embedding
(
input
=
pt
,
...
...
@@ -129,9 +146,7 @@ def train_network(batch_size,
param_attr
=
fluid
.
ParamAttr
(
initializer
=
fluid
.
initializer
.
Constant
(
value
=
0.01
),
name
=
"__emb__"
,
learning_rate
=
emb_lr
)
if
is_self_contained_lr
else
fluid
.
ParamAttr
(
initializer
=
fluid
.
initializer
.
Constant
(
value
=
0.01
),
name
=
"__emb__"
),
learning_rate
=
emb_lr
),
is_sparse
=
is_sparse
)
pt_emb
=
fluid
.
layers
.
reshape
(
pt_emb
,
[
-
1
,
emb_dim
])
# vsum
...
...
@@ -142,24 +157,16 @@ def train_network(batch_size,
input
=
pt_ss
,
size
=
hid_dim
,
param_attr
=
fluid
.
ParamAttr
(
initializer
=
fluid
.
initializer
.
Constant
(
value
=
0.01
),
name
=
"__fc__"
,
learning_rate
=
base_lr
),
initializer
=
fluid
.
initializer
.
Constant
(
value
=
0.01
),
name
=
"__fc__"
),
bias_attr
=
fluid
.
ParamAttr
(
name
=
"__fc_b__"
))
# nt
nt
=
fluid
.
layers
.
data
(
name
=
"neg_title_ids"
,
shape
=
[
1
],
dtype
=
"int64"
,
lod_level
=
1
)
# embedding
nt_emb
=
fluid
.
embedding
(
input
=
nt
,
is_distributed
=
is_distributed
,
size
=
[
dict_dim
,
emb_dim
],
param_attr
=
fluid
.
ParamAttr
(
initializer
=
fluid
.
initializer
.
Constant
(
value
=
0.01
),
name
=
"__emb__"
,
learning_rate
=
emb_lr
)
if
is_self_contained_lr
else
fluid
.
ParamAttr
(
initializer
=
fluid
.
initializer
.
Constant
(
value
=
0.01
),
name
=
"__emb__"
),
initializer
=
fluid
.
initializer
.
Constant
(
value
=
0.01
),
name
=
"__emb__"
),
is_sparse
=
is_sparse
)
nt_emb
=
fluid
.
layers
.
reshape
(
nt_emb
,
[
-
1
,
emb_dim
])
# vsum
...
...
@@ -170,9 +177,7 @@ def train_network(batch_size,
input
=
nt_ss
,
size
=
hid_dim
,
param_attr
=
fluid
.
ParamAttr
(
initializer
=
fluid
.
initializer
.
Constant
(
value
=
0.01
),
name
=
"__fc__"
,
learning_rate
=
base_lr
),
initializer
=
fluid
.
initializer
.
Constant
(
value
=
0.01
),
name
=
"__fc__"
),
bias_attr
=
fluid
.
ParamAttr
(
name
=
"__fc_b__"
))
cos_q_pt
=
fluid
.
layers
.
cos_sim
(
q_fc
,
pt_fc
)
cos_q_nt
=
fluid
.
layers
.
cos_sim
(
q_fc
,
nt_fc
)
...
...
@@ -180,79 +185,67 @@ def train_network(batch_size,
avg_cost
=
get_loss
(
cos_q_pt
,
cos_q_nt
)
# acc
acc
=
get_acc
(
cos_q_nt
,
cos_q_pt
,
batch_size
)
return
[
avg_cost
,
acc
,
cos_q_pt
]
def
combination
(
x
,
y
):
res
=
[[[
xi
,
yi
]
for
yi
in
y
]
for
xi
in
x
]
return
res
[
0
]
def
get_one_data
(
file_list
):
for
file
in
file_list
:
contents
=
[]
with
open
(
file
,
"r"
)
as
fin
:
for
i
in
fin
:
contents
.
append
(
i
.
strip
())
for
index
,
q
in
enumerate
(
contents
):
try
:
one_data
=
[[
int
(
j
)
for
j
in
i
.
split
(
" "
)]
for
i
in
q
.
split
(
";"
)[:
-
1
]]
if
one_data
[
1
][
0
]
+
one_data
[
1
][
1
]
!=
len
(
one_data
)
-
3
:
q
=
fin
.
readline
()
continue
tmp
=
combination
(
one_data
[
3
:
3
+
one_data
[
1
][
0
]],
one_data
[
3
+
one_data
[
1
][
0
]:])
except
Exception
as
e
:
continue
for
each
in
tmp
:
yield
[
one_data
[
2
],
0
,
each
[
0
],
each
[
1
]]
def
get_batch_reader
(
file_list
,
batch_size
):
def
batch_reader
():
res
=
[]
for
i
in
get_one_data
(
file_list
):
if
random
.
random
()
<=
sample_rate
:
res
.
append
(
i
)
if
len
(
res
)
>=
batch_size
:
yield
res
res
=
[]
return
batch_reader
def
get_train_reader
(
batch_size
):
# The training data set.
train_file
=
os
.
path
.
join
(
paddle
.
dataset
.
common
.
DATA_HOME
,
"simnet"
,
"train"
)
train_reader
=
get_batch_reader
([
train_file
],
batch_size
)
train_feed
=
[
"query_ids"
,
"pos_title_ids"
,
"neg_title_ids"
,
"label"
]
return
train_reader
,
train_feed
class
TestDistSimnetBow2x2
(
TestDistRunnerBase
):
def
get_model
(
self
,
batch_size
=
2
):
# Train program
avg_cost
,
acc
,
predict
=
\
train_network
(
batch_size
,
bool
(
int
(
os
.
environ
[
"IS_DISTRIBUTED"
])),
bool
(
int
(
os
.
environ
[
"IS_SPARSE"
])),
bool
(
int
(
os
.
environ
[
"IS_SELF_CONTAINED_LR"
])))
inference_program
=
fluid
.
default_main_program
().
clone
()
# Optimization
opt
=
os
.
getenv
(
'OPTIMIZER'
,
'sgd'
)
opt
=
get_optimizer
(
opt
)
opt
.
minimize
(
avg_cost
)
# Reader
train_reader
,
_
=
get_train_reader
(
batch_size
)
return
inference_program
,
avg_cost
,
train_reader
,
train_reader
,
acc
,
predict
return
avg_cost
,
acc
,
cos_q_pt
,
reader
class
TestDistSimnetBow2x2
(
FleetDistRunnerBase
):
"""
For test SimnetBow model, use Fleet api
"""
def
net
(
self
,
args
,
batch_size
=
4
,
lr
=
0.01
):
avg_cost
,
_
,
predict
,
self
.
reader
=
\
train_network
(
batch_size
=
batch_size
,
is_distributed
=
False
,
is_sparse
=
True
,
is_self_contained_lr
=
False
,
is_pyreader
=
(
args
.
reader
==
"pyreader"
))
self
.
avg_cost
=
avg_cost
self
.
predict
=
predict
return
avg_cost
def
check_model_right
(
self
,
dirname
):
model_filename
=
os
.
path
.
join
(
dirname
,
"__model__"
)
with
open
(
model_filename
,
"rb"
)
as
f
:
program_desc_str
=
f
.
read
()
program
=
fluid
.
Program
.
parse_from_string
(
program_desc_str
)
with
open
(
os
.
path
.
join
(
dirname
,
"__model__.proto"
),
"w"
)
as
wn
:
wn
.
write
(
str
(
program
))
def
do_pyreader_training
(
self
,
fleet
):
"""
do training using dataset, using fetch handler to catch variable
Args:
fleet(Fleet api): the fleet object of Parameter Server, define distribute training role
"""
exe
=
fluid
.
Executor
(
fluid
.
CPUPlace
())
fleet
.
init_worker
()
exe
.
run
(
fluid
.
default_startup_program
())
batch_size
=
4
# reader
train_reader
=
paddle
.
batch
(
fake_simnet_reader
(),
batch_size
=
batch_size
)
self
.
reader
.
decorate_sample_list_generator
(
train_reader
)
for
epoch_id
in
range
(
1
):
self
.
reader
.
start
()
try
:
pass_start
=
time
.
time
()
while
True
:
loss_val
=
exe
.
run
(
program
=
fluid
.
default_main_program
(),
fetch_list
=
[
self
.
avg_cost
.
name
])
loss_val
=
np
.
mean
(
loss_val
)
message
=
"TRAIN ---> pass: {} loss: {}
\n
"
.
format
(
epoch_id
,
loss_val
)
fleet_util
.
print_on_rank
(
message
,
0
)
pass_time
=
time
.
time
()
-
pass_start
except
fluid
.
core
.
EOFException
:
self
.
reader
.
reset
()
fleet
.
stop_worker
()
def
do_dataset_training
(
self
,
fleet
):
pass
if
__name__
==
"__main__"
:
paddle
.
dataset
.
common
.
download
(
DATA_URL
,
'simnet'
,
DATA_MD5
,
"train"
)
runtime_main
(
TestDistSimnetBow2x2
)
python/paddle/fluid/tests/unittests/simnet_dataset_reader.py
0 → 100644
浏览文件 @
c49560da
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
__future__
import
print_function
import
os
import
logging
import
tarfile
import
random
import
paddle
import
paddle.fluid.incubate.data_generator
as
data_generator
logging
.
basicConfig
()
logger
=
logging
.
getLogger
(
"paddle"
)
logger
.
setLevel
(
logging
.
INFO
)
class
DatasetSimnetReader
(
data_generator
.
MultiSlotDataGenerator
):
def
generate_sample
(
self
,
line
):
pass
python/paddle/fluid/tests/unittests/test_dist_fleet_ctr.py
浏览文件 @
c49560da
...
...
@@ -156,40 +156,5 @@ class TestDistCtrHalfAsync2x2(TestFleetBase):
"dist_fleet_ctr.py"
,
delta
=
1e-5
,
check_error_log
=
True
)
class
TestDistCtrPsGpuPyreaderAsync2x2
(
TestFleetBase
):
def
_setup_config
(
self
):
self
.
_mode
=
"async"
self
.
_reader
=
"pyreader"
def
check_with_place
(
self
,
model_file
,
delta
=
1e-3
,
check_error_log
=
False
,
need_envs
=
{}):
required_envs
=
{
"PATH"
:
os
.
getenv
(
"PATH"
,
""
),
"PYTHONPATH"
:
os
.
getenv
(
"PYTHONPATH"
,
""
),
"LD_LIBRARY_PATH"
:
os
.
getenv
(
"LD_LIBRARY_PATH"
,
""
),
"FLAGS_rpc_deadline"
:
"30000"
,
# 5sec to fail fast
"http_proxy"
:
""
,
"FLAGS_communicator_send_queue_size"
:
"2"
,
"FLAGS_communicator_max_merge_var_num"
:
"2"
,
"CPU_NUM"
:
"2"
,
"SAVE_MODEL"
:
"1"
}
required_envs
.
update
(
need_envs
)
if
check_error_log
:
required_envs
[
"GLOG_v"
]
=
"3"
required_envs
[
"GLOG_logtostderr"
]
=
"1"
tr0_losses
,
tr1_losses
=
self
.
_run_cluster
(
model_file
,
required_envs
)
def
test_dist_train
(
self
):
self
.
check_with_place
(
"dist_fleet_ctr_ps_gpu.py"
,
delta
=
1e-5
,
check_error_log
=
True
)
if
__name__
==
"__main__"
:
unittest
.
main
()
python/paddle/fluid/tests/unittests/test_dist_fleet_geo.py
浏览文件 @
c49560da
...
...
@@ -21,7 +21,7 @@ import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from
paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler
import
fleet
from
paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy
import
StrategyFactory
from
test_dist_fleet_base
import
TestFleetBase
from
dist_simnet_bow
import
train_network
from
dist_
fleet_
simnet_bow
import
train_network
class
TestDistGeoCtr_2x2
(
TestFleetBase
):
...
...
@@ -72,7 +72,7 @@ class TestGeoSgdTranspiler(unittest.TestCase):
strategy
=
StrategyFactory
.
create_geo_strategy
(
5
)
avg_cost
,
_
,
_
=
train_network
(
batch_size
,
is_distribute
,
is_sparse
)
avg_cost
,
_
,
_
,
_
=
train_network
(
batch_size
,
is_distribute
,
is_sparse
)
optimizer
=
fluid
.
optimizer
.
SGD
(
0.1
)
optimizer
=
fleet
.
distributed_optimizer
(
optimizer
,
strategy
)
...
...
python/paddle/fluid/tests/unittests/test_dist_fleet_grad_clip.py
浏览文件 @
c49560da
...
...
@@ -21,7 +21,7 @@ import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from
paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler
import
fleet
from
paddle.fluid.transpiler.distribute_transpiler
import
DistributeTranspilerConfig
from
test_dist_fleet_base
import
TestFleetBase
from
dist_simnet_bow
import
train_network
from
dist_
fleet_
simnet_bow
import
train_network
@
unittest
.
skip
(
reason
=
"Skip unstable ut, add it after PR 22957 merged"
)
...
...
@@ -44,7 +44,7 @@ class TestDistGeoClipByGlobalNormTranspiler(unittest.TestCase):
strategy
.
geo_sgd_mode
=
True
strategy
.
geo_sgd_need_push_nums
=
5
avg_cost
,
_
,
_
=
train_network
(
batch_size
,
is_distribute
,
is_sparse
)
avg_cost
,
_
,
_
,
_
=
train_network
(
batch_size
,
is_distribute
,
is_sparse
)
fluid
.
clip
.
set_gradient_clip
(
clip
=
fluid
.
clip
.
GradientClipByGlobalNorm
(
2.0
))
...
...
python/paddle/fluid/tests/unittests/test_dist_fleet_simnet.py
0 → 100644
浏览文件 @
c49560da
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
__future__
import
print_function
import
os
import
unittest
import
tempfile
from
test_dist_fleet_base
import
TestFleetBase
class
TestDistSimnetASync2x2
(
TestFleetBase
):
def
_setup_config
(
self
):
self
.
_mode
=
"async"
self
.
_reader
=
"pyreader"
def
check_with_place
(
self
,
model_file
,
delta
=
1e-3
,
check_error_log
=
False
,
need_envs
=
{}):
required_envs
=
{
"PATH"
:
os
.
getenv
(
"PATH"
,
""
),
"PYTHONPATH"
:
os
.
getenv
(
"PYTHONPATH"
,
""
),
"LD_LIBRARY_PATH"
:
os
.
getenv
(
"LD_LIBRARY_PATH"
,
""
),
"FLAGS_rpc_deadline"
:
"5000"
,
# 5sec to fail fast
"http_proxy"
:
""
,
"CPU_NUM"
:
"2"
}
required_envs
.
update
(
need_envs
)
if
check_error_log
:
required_envs
[
"GLOG_v"
]
=
"3"
required_envs
[
"GLOG_logtostderr"
]
=
"1"
tr0_losses
,
tr1_losses
=
self
.
_run_cluster
(
model_file
,
required_envs
)
def
test_dist_train
(
self
):
self
.
check_with_place
(
"dist_fleet_simnet_bow.py"
,
delta
=
1e-5
,
check_error_log
=
True
)
if
__name__
==
"__main__"
:
unittest
.
main
()
python/paddle/fluid/tests/unittests/test_dist_simnet_bow.py
已删除
100644 → 0
浏览文件 @
1b60f7f6
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
__future__
import
print_function
import
os
import
unittest
from
test_dist_base
import
TestDistBase
import
os
flag_name
=
os
.
path
.
splitext
(
__file__
)[
0
]
class
TestDistSimnetBowDense2x2
(
TestDistBase
):
def
_setup_config
(
self
):
self
.
_sync_mode
=
True
self
.
_enforce_place
=
"CPU"
def
test_simnet_bow
(
self
):
need_envs
=
{
"IS_DISTRIBUTED"
:
'0'
,
"IS_SPARSE"
:
'0'
,
'IS_SELF_CONTAINED_LR'
:
'1'
}
self
.
check_with_place
(
"dist_simnet_bow.py"
,
delta
=
1e-5
,
check_error_log
=
True
,
need_envs
=
need_envs
,
log_name
=
flag_name
)
class
TestDistSimnetBow2x2DenseAsync
(
TestDistBase
):
def
_setup_config
(
self
):
self
.
_sync_mode
=
False
self
.
_enforce_place
=
"CPU"
# FIXME(typhoonzero): fix async tests later
def
notest_simnet_bow
(
self
):
need_envs
=
{
"IS_DISTRIBUTED"
:
'0'
,
"IS_SPARSE"
:
'0'
,
'IS_SELF_CONTAINED_LR'
:
'1'
,
}
self
.
check_with_place
(
"dist_simnet_bow.py"
,
delta
=
100
,
check_error_log
=
True
,
need_envs
=
need_envs
,
log_name
=
flag_name
)
class
TestDistSimnetBowSparse2x2
(
TestDistBase
):
def
_setup_config
(
self
):
self
.
_sync_mode
=
True
self
.
_enforce_place
=
"CPU"
def
test_simnet_bow
(
self
):
need_envs
=
{
"IS_DISTRIBUTED"
:
'0'
,
"IS_SPARSE"
:
'1'
,
'IS_SELF_CONTAINED_LR'
:
'1'
}
self
.
check_with_place
(
"dist_simnet_bow.py"
,
delta
=
1e-5
,
check_error_log
=
True
,
need_envs
=
need_envs
,
log_name
=
flag_name
)
class
TestDistSimnetBow2x2SparseAsync
(
TestDistBase
):
def
_setup_config
(
self
):
self
.
_sync_mode
=
False
self
.
_enforce_place
=
"CPU"
def
test_simnet_bow
(
self
):
need_envs
=
{
"IS_DISTRIBUTED"
:
'0'
,
"IS_SPARSE"
:
'1'
,
'IS_SELF_CONTAINED_LR'
:
'1'
}
self
.
check_with_place
(
"dist_simnet_bow.py"
,
delta
=
100
,
check_error_log
=
True
,
need_envs
=
need_envs
,
log_name
=
flag_name
)
# FIXME(tangwei): Learningrate variable is not created on pserver.
class
TestDistSimnetBow2x2LookupTableSync
(
TestDistBase
):
def
_setup_config
(
self
):
self
.
_sync_mode
=
True
self
.
_enforce_place
=
"CPU"
def
test_simnet_bow
(
self
):
need_envs
=
{
"IS_DISTRIBUTED"
:
'0'
,
"IS_SPARSE"
:
'1'
,
'IS_SELF_CONTAINED_LR'
:
'1'
}
self
.
check_with_place
(
"dist_simnet_bow.py"
,
delta
=
1e-5
,
check_error_log
=
True
,
need_envs
=
need_envs
,
log_name
=
flag_name
)
class
TestDistSimnetBow2x2LookupTableAsync
(
TestDistBase
):
def
_setup_config
(
self
):
self
.
_sync_mode
=
False
self
.
_enforce_place
=
"CPU"
def
test_simnet_bow
(
self
):
need_envs
=
{
"IS_DISTRIBUTED"
:
'0'
,
"IS_SPARSE"
:
'1'
,
'IS_SELF_CONTAINED_LR'
:
'1'
}
self
.
check_with_place
(
"dist_simnet_bow.py"
,
delta
=
100
,
check_error_log
=
True
,
need_envs
=
need_envs
,
log_name
=
flag_name
)
class
TestDistSimnetBow2x2LookupTableNotContainLRSync
(
TestDistBase
):
def
_setup_config
(
self
):
self
.
_sync_mode
=
True
self
.
_enforce_place
=
"CPU"
def
test_simnet_bow
(
self
):
need_envs
=
{
"IS_DISTRIBUTED"
:
'0'
,
"IS_SPARSE"
:
'1'
,
'IS_SELF_CONTAINED_LR'
:
'0'
}
self
.
check_with_place
(
"dist_simnet_bow.py"
,
delta
=
1e-5
,
check_error_log
=
True
,
need_envs
=
need_envs
,
log_name
=
flag_name
)
if
__name__
==
"__main__"
:
unittest
.
main
()
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录