Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
PaddlePaddle
Paddle
提交
7de9420a
P
Paddle
项目概览
PaddlePaddle
/
Paddle
接近 2 年 前同步成功
通知
2323
Star
20933
Fork
5424
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1423
列表
看板
标记
里程碑
合并请求
543
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1,423
Issue
1,423
列表
看板
标记
里程碑
合并请求
543
合并请求
543
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
7de9420a
编写于
1月 16, 2023
作者:
W
wangxiaoning
提交者:
GitHub
1月 16, 2023
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[Fluid clean]clean distributed fluid API (#49795)
上级
a3f58b70
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
54 addition
and
49 deletion
+54
-49
python/paddle/distributed/fleet/metrics/metric.py
python/paddle/distributed/fleet/metrics/metric.py
+18
-18
python/paddle/distributed/fleet/runtime/parameter_server_runtime.py
...dle/distributed/fleet/runtime/parameter_server_runtime.py
+27
-20
python/paddle/distributed/passes/ps_trainer_pass.py
python/paddle/distributed/passes/ps_trainer_pass.py
+3
-3
python/paddle/distributed/ps/utils/ps_program_builder.py
python/paddle/distributed/ps/utils/ps_program_builder.py
+2
-2
python/paddle/distributed/ps/utils/public.py
python/paddle/distributed/ps/utils/public.py
+4
-6
未找到文件。
python/paddle/distributed/fleet/metrics/metric.py
浏览文件 @
7de9420a
...
...
@@ -38,11 +38,11 @@ def sum(input, scope=None, util=None):
.. code-block:: python
# in model.py
input =
fluid.layers
.cast(some_input, dtype='float32')
input =
paddle
.cast(some_input, dtype='float32')
cnt = paddle.sum(input)
global_cnt = paddle.static.create_global_var(persistable=True, dtype='float32', shape=[1], value=0)
tmp = paddle.add(cnt, global_cnt)
fluid.layers
.assign(tmp, global_cnt)
paddle
.assign(tmp, global_cnt)
# in train.py, after train or infer
res = np.array(scope.find_var(global_cnt.name).get_tensor())
...
...
@@ -78,11 +78,11 @@ def max(input, scope=None, util=None):
.. code-block:: python
# in model.py
input =
fluid.layers
.cast(some_input, dtype='float32')
input =
paddle
.cast(some_input, dtype='float32')
cnt = paddle.sum(input)
global_cnt = paddle.static.create_global_var(persistable=True, dtype='float32', shape=[1], value=0)
tmp = paddle.maximum(cnt, global_cnt)
fluid.layers
.assign(tmp, global_cnt)
paddle
.assign(tmp, global_cnt)
# in train.py, after train or infer
res = np.array(scope.find_var(global_cnt.name).get_tensor())
...
...
@@ -118,11 +118,11 @@ def min(input, scope=None, util=None):
.. code-block:: python
# in model.py
input =
fluid.layers
.cast(some_input, dtype='float32')
input =
paddle
.cast(some_input, dtype='float32')
cnt = paddle.sum(input)
global_cnt = paddle.static.create_global_var(persistable=True, dtype='float32', shape=[1], value=0)
tmp = paddle.minimum(cnt, global_cnt)
fluid.layers
.assign(tmp, global_cnt)
paddle
.assign(tmp, global_cnt)
# in train.py, after train or infer
res = np.array(scope.find_var(global_cnt.name).get_tensor())
...
...
@@ -159,9 +159,9 @@ def auc(stat_pos, stat_neg, scope=None, util=None):
.. code-block:: python
# in model.py
similarity_norm =
fluid.layers
.sigmoid(paddle.clip(output, min=-15.0, max=15.0))
binary_predict =
fluid.layers
.concat(
input=[paddle.subtract(
fluid.layers
.ceil(similarity_norm), similarity_norm), similarity_norm], axis=1)
similarity_norm =
paddle.nn.functional
.sigmoid(paddle.clip(output, min=-15.0, max=15.0))
binary_predict =
paddle
.concat(
input=[paddle.subtract(
paddle
.ceil(similarity_norm), similarity_norm), similarity_norm], axis=1)
self.auc, batch_auc, [batch_stat_pos, batch_stat_neg, stat_pos, stat_neg] =
paddle.static.auc(input=binary_predict, label=label, curve='ROC', num_thresholds=4096)
...
...
@@ -231,7 +231,7 @@ def mae(abserr, total_ins_num, scope=None, util=None):
distributed mae in fleet
Args:
abserr(numpy.array|Variable|string): abserr in output of
fluid.contrib.layers
.ctr_metric_bundle
abserr(numpy.array|Variable|string): abserr in output of
paddle.static
.ctr_metric_bundle
total_ins_num(numpy.array|Variable|string): total variable
scope(Scope): specific scope
...
...
@@ -242,7 +242,7 @@ def mae(abserr, total_ins_num, scope=None, util=None):
.. code-block:: python
# in model.py
sqrerr, abserr, prob, q, pos, total =
fluid.contrib.layers.ctr_metric_bundle(similarity_norm, fluid.layers
.cast(x=label, dtype='float32'))
sqrerr, abserr, prob, q, pos, total =
paddle.static.ctr_metric_bundle(similarity_norm, paddle
.cast(x=label, dtype='float32'))
# in train.py, after train or infer
res = np.array(scope.find_var(abserr.name).get_tensor())
...
...
@@ -281,7 +281,7 @@ def rmse(sqrerr, total_ins_num, scope=None, util=None):
distributed rmse in fleet
Args:
sqrerr(numpy.array|Variable|string): sqrerr in output of
fluid.contrib.layers
.ctr_metric_bundle
sqrerr(numpy.array|Variable|string): sqrerr in output of
paddle.static
.ctr_metric_bundle
total_ins_num(numpy.array|Variable|string): total variable
scope(Scope): specific scope
...
...
@@ -292,7 +292,7 @@ def rmse(sqrerr, total_ins_num, scope=None, util=None):
.. code-block:: python
# in model.py
sqrerr, abserr, prob, q, pos, total =
fluid.contrib.layers.ctr_metric_bundle(similarity_norm, fluid.layers
.cast(x=label, dtype='float32'))
sqrerr, abserr, prob, q, pos, total =
paddle.static.ctr_metric_bundle(similarity_norm, paddle
.cast(x=label, dtype='float32'))
# in train.py, after train or infer
res = np.array(scope.find_var(sqrerr.name).get_tensor())
...
...
@@ -331,7 +331,7 @@ def mse(sqrerr, total_ins_num, scope=None, util=None):
distributed mse in fleet
Args:
sqrerr(numpy.array|Variable|string): sqrerr in output of
fluid.contrib.layers
.ctr_metric_bundle
sqrerr(numpy.array|Variable|string): sqrerr in output of
paddle.static
.ctr_metric_bundle
total_ins_num(numpy.array|Variable|string): total variable
scope(Scope): specific scope
...
...
@@ -342,7 +342,7 @@ def mse(sqrerr, total_ins_num, scope=None, util=None):
.. code-block:: python
# in model.py
sqrerr, abserr, prob, q, pos, total =
fluid.contrib.layers.ctr_metric_bundle(similarity_norm, fluid.layers
.cast(x=label, dtype='float32'))
sqrerr, abserr, prob, q, pos, total =
paddle.static.ctr_metric_bundle(similarity_norm, paddle
.cast(x=label, dtype='float32'))
# in train.py, after train or infer
metric = np.array(scope.find_var(sqrerr.name).get_tensor())
...
...
@@ -393,15 +393,15 @@ def acc(correct, total, scope=None, util=None):
# in model.py
correct = paddle.static.create_global_var(dtype='float32', shape=[1], value=0)
total = paddle.static.create_global_var(dtype='float32', shape=[1], value=0)
acc =
fluid.layers.acc
(predict, label, k=1, correct=correct, total=total)
acc =
paddle.metric.accuracy
(predict, label, k=1, correct=correct, total=total)
global_correct = paddle.static.create_global_var(persistable=True, dtype='float32', shape=[1], value=0)
tmp1 = paddle.minimum(correct, global_correct)
fluid.layers
.assign(tmp1, global_correct)
paddle
.assign(tmp1, global_correct)
global_total = paddle.static.create_global_var(persistable=True, dtype='float32', shape=[1], value=0)
tmp2 = paddle.minimum(total, global_total)
fluid.layers
.assign(tmp2, global_total)
paddle
.assign(tmp2, global_total)
# in train.py, after train or infer
correct_num = np.array(scope.find_var(correct.name).get_tensor())
...
...
python/paddle/distributed/fleet/runtime/parameter_server_runtime.py
浏览文件 @
7de9420a
...
...
@@ -15,12 +15,19 @@
import
os
import
warnings
import
paddle
import
paddle.fluid
as
fluid
from
paddle.fluid
import
core
from
paddle.fluid.compiler
import
CompiledProgram
from
paddle.fluid.executor
import
Executor
from
paddle.fluid.framework
import
Program
,
Variable
from
paddle.fluid.parallel_executor
import
ParallelExecutor
from
paddle.static
import
(
CompiledProgram
,
Executor
,
ParallelExecutor
,
Program
,
Variable
,
default_main_program
,
default_startup_program
,
save_inference_model
,
)
from
..base.private_helper_function
import
wait_server_ready
from
.runtime_base
import
RuntimeBase
...
...
@@ -90,7 +97,7 @@ class ParameterServerRuntime(RuntimeBase):
return
var
.
name
in
varnames
load_vars
=
list
(
filter
(
_in_varnames
,
fluid
.
default_main_program
().
list_vars
())
filter
(
_in_varnames
,
default_main_program
().
list_vars
())
)
if
main_program
is
None
:
main_program
=
self
.
origin_main_program
...
...
@@ -130,7 +137,7 @@ class ParameterServerRuntime(RuntimeBase):
executor
.
run
(
load_prog
)
def
_load_distributed_params
(
self
,
dirname
,
varnames
):
from
paddle.
flui
d.communicator
import
LargeScaleKV
from
paddle.
distribute
d.communicator
import
LargeScaleKV
from
paddle.fluid.incubate.fleet.parameter_server.ir.public
import
(
_get_varname_parts
,
)
...
...
@@ -202,7 +209,7 @@ class ParameterServerRuntime(RuntimeBase):
if
len
(
dist_varnames
)
!=
0
:
raise
ValueError
(
"GeoStrategy can not support large scale embeding now, please use
fluid.layers
.embedding"
"GeoStrategy can not support large scale embeding now, please use
paddle.static.nn
.embedding"
)
init_attrs
=
[]
...
...
@@ -284,7 +291,7 @@ class ParameterServerRuntime(RuntimeBase):
recv_type
=
1
)
from
paddle.
flui
d.communicator
import
Communicator
from
paddle.
distribute
d.communicator
import
Communicator
self
.
_communicator
=
Communicator
(
trainer_config
.
mode
,
kwargs
,
trainer_config
.
get_communicator_flags
()
...
...
@@ -297,7 +304,7 @@ class ParameterServerRuntime(RuntimeBase):
warnings
.
warn
(
"communicator has been initialized, skip"
)
def
_get_executor
(
self
):
executor
=
fluid
.
Executor
(
fluid
.
CPUPlace
())
executor
=
Executor
(
paddle
.
CPUPlace
())
if
self
.
role_maker
.
_is_heter_parameter_server_mode
:
heter_worker_device_guard
=
(
self
.
context
[
"valid_strategy"
]
...
...
@@ -313,13 +320,13 @@ class ParameterServerRuntime(RuntimeBase):
if
self
.
role_maker
.
_is_heter_worker
():
if
heter_worker_device_guard
==
"GPU"
:
executor
=
Executor
(
fluid
.
CUDAPlace
(
paddle
.
CUDAPlace
(
int
(
os
.
getenv
(
"FLAGS_selected_gpus"
,
"0"
))
)
)
elif
heter_worker_device_guard
==
"XPU"
:
executor
=
Executor
(
fluid
.
XPUPlace
(
paddle
.
XPUPlace
(
int
(
os
.
getenv
(
"FLAGS_selected_xpus"
,
"0"
))
)
)
...
...
@@ -340,7 +347,7 @@ class ParameterServerRuntime(RuntimeBase):
):
# for heter trainer wait server ready
wait_server_ready
(
self
.
role_maker
.
_get_pserver_endpoints
())
executor
.
run
(
fluid
.
default_startup_program
())
executor
.
run
(
default_startup_program
())
if
self
.
role_maker
.
_is_heter_worker
():
self
.
_init_worker
()
...
...
@@ -375,7 +382,7 @@ class ParameterServerRuntime(RuntimeBase):
+
sparse_related_optimize_varnames
+
distributed_related_optimize_varnames
),
fluid
.
default_main_program
().
list_vars
(),
default_main_program
().
list_vars
(),
)
)
...
...
@@ -386,9 +393,9 @@ class ParameterServerRuntime(RuntimeBase):
raise
ValueError
(
"There is no directory named '%s'"
,
model_dirname
)
# load dense
fluid
.
io
.
load_vars
(
paddle
.
static
.
load_vars
(
executor
,
main_program
=
fluid
.
default_main_program
(),
main_program
=
default_main_program
(),
dirname
=
model_dirname
,
vars
=
remaining_vars
,
)
...
...
@@ -409,7 +416,7 @@ class ParameterServerRuntime(RuntimeBase):
def
_run_server
(
self
):
executor
=
self
.
_get_executor
()
executor
.
run
(
fluid
.
default_main_program
())
executor
.
run
(
default_main_program
())
def
_stop_worker
(
self
):
self
.
_communicator
.
stop
()
...
...
@@ -671,7 +678,7 @@ class ParameterServerRuntime(RuntimeBase):
)
)
fluid
.
io
.
save_vars
(
paddle
.
static
.
save_vars
(
executor
,
main_program
=
main_program
,
dirname
=
dirname
,
...
...
@@ -743,7 +750,7 @@ class ParameterServerRuntime(RuntimeBase):
raise
TypeError
(
"in fleet.save_inference_model() function, main_program must be as Program type, CompiledProgram is not allowed"
)
fluid
.
io
.
save_inference_model
(
save_inference_model
(
dirname
,
feeded_var_names
,
target_vars
,
...
...
@@ -754,7 +761,7 @@ class ParameterServerRuntime(RuntimeBase):
export_for_deployment
,
)
else
:
fluid
.
io
.
save_inference_model
(
save_inference_model
(
dirname
,
feeded_var_names
,
target_vars
,
...
...
@@ -773,7 +780,7 @@ class ParameterServerRuntime(RuntimeBase):
program_desc_str
=
f
.
read
()
program
=
Program
.
parse_from_string
(
program_desc_str
)
program
.
_copy_dist_param_info_from
(
fluid
.
default_main_program
())
program
.
_copy_dist_param_info_from
(
default_main_program
())
self
.
_ps_inference_save_persistables
(
executor
,
dirname
,
program
,
mode
=
0
)
...
...
python/paddle/distributed/passes/ps_trainer_pass.py
浏览文件 @
7de9420a
...
...
@@ -17,10 +17,10 @@ import os
from
_collections
import
defaultdict
import
paddle
import
paddle.fluid.framework
as
framework
from
paddle.distributed.passes.pass_base
import
PassBase
,
register_pass
from
paddle.fluid.framework
import
Parameter
from
paddle.framework
import
core
from
paddle.static
import
Program
from
paddle.static
import
P
arameter
,
P
rogram
from
..ps.utils.collective_transpiler
import
SingleProcessMultiThread
from
..ps.utils.public
import
*
# noqa: F403
...
...
@@ -757,7 +757,7 @@ class PsGpuPass(PassBase):
)
new_op_desc
.
copy_from
(
op_desc
)
new_op_desc
.
_set_attr
(
op_role_attr_name
,
backward
)
new_op
=
paddle
.
fluid
.
framework
.
Operator
(
new_op
=
paddle
.
static
.
Operator
(
program
.
global_block
(),
new_op_desc
)
program
.
global_block
().
ops
.
insert
(
insert_index
+
1
,
new_op
)
...
...
python/paddle/distributed/ps/utils/ps_program_builder.py
浏览文件 @
7de9420a
...
...
@@ -472,7 +472,7 @@ class FlPsProgramBuilder(HeterAsyncPsProgramBuilder):
if
not
self
.
is_server
:
self
.
_build_trainer_programs
()
fluid
.
framework
.
switch_startup_program
(
self
.
cloned_startup
)
fluid
.
framework
.
switch_main_program
(
self
.
cloned_main
)
paddle
.
framework
.
switch_main_program
(
self
.
cloned_main
)
print
(
"paddle.static.default_startup_program: {}"
.
format
(
paddle
.
static
.
default_startup_program
().
_heter_pipeline_opt
...
...
@@ -483,4 +483,4 @@ class FlPsProgramBuilder(HeterAsyncPsProgramBuilder):
fluid
.
framework
.
switch_startup_program
(
self
.
attrs
[
'_startup_server'
]
)
fluid
.
framework
.
switch_main_program
(
self
.
attrs
[
'_main_server'
])
paddle
.
framework
.
switch_main_program
(
self
.
attrs
[
'_main_server'
])
python/paddle/distributed/ps/utils/public.py
浏览文件 @
7de9420a
...
...
@@ -18,8 +18,8 @@ import os
import
warnings
from
functools
import
reduce
import
paddle.fluid
as
fluid
import
paddle.fluid.framework
as
framework
from
paddle.distributed.io
import
is_persistable
from
paddle.fluid.framework
import
generate_control_dev_var_name
from
paddle.framework
import
core
# logging.basicConfig(
...
...
@@ -1253,7 +1253,7 @@ def screen_persistables(program, var_list):
else
:
var
=
program
.
global_block
().
vars
[
var_name
]
if
fluid
.
io
.
is_persistable
(
var
):
if
is_persistable
(
var
):
need_remove
.
append
(
var_name
)
for
var_name
in
need_remove
:
...
...
@@ -1676,9 +1676,7 @@ def add_send_op(program, block, _vars):
table_dict
[
table_id
][
'var_list'
].
append
(
persistable_var
)
for
table_id
in
table_dict
:
dummy_output
=
block
.
create_var
(
name
=
framework
.
generate_control_dev_var_name
()
)
dummy_output
=
block
.
create_var
(
name
=
generate_control_dev_var_name
())
send_input_vars
=
[
block
.
vars
[
union_var
]
for
union_var
in
table_dict
[
table_id
][
'var_list'
]
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录