diff --git a/python/paddle/distributed/fleet/metrics/metric.py b/python/paddle/distributed/fleet/metrics/metric.py index 39284fa9f5a3f151747547b42409385d470571cd..999ab6f0af126f489a57aba806ec00763d7e062a 100644 --- a/python/paddle/distributed/fleet/metrics/metric.py +++ b/python/paddle/distributed/fleet/metrics/metric.py @@ -38,11 +38,11 @@ def sum(input, scope=None, util=None): .. code-block:: python # in model.py - input = fluid.layers.cast(some_input, dtype='float32') + input = paddle.cast(some_input, dtype='float32') cnt = paddle.sum(input) global_cnt = paddle.static.create_global_var(persistable=True, dtype='float32', shape=[1], value=0) tmp = paddle.add(cnt, global_cnt) - fluid.layers.assign(tmp, global_cnt) + paddle.assign(tmp, global_cnt) # in train.py, after train or infer res = np.array(scope.find_var(global_cnt.name).get_tensor()) @@ -78,11 +78,11 @@ def max(input, scope=None, util=None): .. code-block:: python # in model.py - input = fluid.layers.cast(some_input, dtype='float32') + input = paddle.cast(some_input, dtype='float32') cnt = paddle.sum(input) global_cnt = paddle.static.create_global_var(persistable=True, dtype='float32', shape=[1], value=0) tmp = paddle.maximum(cnt, global_cnt) - fluid.layers.assign(tmp, global_cnt) + paddle.assign(tmp, global_cnt) # in train.py, after train or infer res = np.array(scope.find_var(global_cnt.name).get_tensor()) @@ -118,11 +118,11 @@ def min(input, scope=None, util=None): .. code-block:: python # in model.py - input = fluid.layers.cast(some_input, dtype='float32') + input = paddle.cast(some_input, dtype='float32') cnt = paddle.sum(input) global_cnt = paddle.static.create_global_var(persistable=True, dtype='float32', shape=[1], value=0) tmp = paddle.minimum(cnt, global_cnt) - fluid.layers.assign(tmp, global_cnt) + paddle.assign(tmp, global_cnt) # in train.py, after train or infer res = np.array(scope.find_var(global_cnt.name).get_tensor()) @@ -159,9 +159,9 @@ def auc(stat_pos, stat_neg, scope=None, util=None): .. code-block:: python # in model.py - similarity_norm = fluid.layers.sigmoid(paddle.clip(output, min=-15.0, max=15.0)) - binary_predict = fluid.layers.concat( - input=[paddle.subtract(fluid.layers.ceil(similarity_norm), similarity_norm), similarity_norm], axis=1) + similarity_norm = paddle.nn.functional.sigmoid(paddle.clip(output, min=-15.0, max=15.0)) + binary_predict = paddle.concat( + input=[paddle.subtract(paddle.ceil(similarity_norm), similarity_norm), similarity_norm], axis=1) self.auc, batch_auc, [batch_stat_pos, batch_stat_neg, stat_pos, stat_neg] = paddle.static.auc(input=binary_predict, label=label, curve='ROC', num_thresholds=4096) @@ -231,7 +231,7 @@ def mae(abserr, total_ins_num, scope=None, util=None): distributed mae in fleet Args: - abserr(numpy.array|Variable|string): abserr in output of fluid.contrib.layers.ctr_metric_bundle + abserr(numpy.array|Variable|string): abserr in output of paddle.static.ctr_metric_bundle total_ins_num(numpy.array|Variable|string): total variable scope(Scope): specific scope @@ -242,7 +242,7 @@ def mae(abserr, total_ins_num, scope=None, util=None): .. code-block:: python # in model.py - sqrerr, abserr, prob, q, pos, total = fluid.contrib.layers.ctr_metric_bundle(similarity_norm, fluid.layers.cast(x=label, dtype='float32')) + sqrerr, abserr, prob, q, pos, total = paddle.static.ctr_metric_bundle(similarity_norm, paddle.cast(x=label, dtype='float32')) # in train.py, after train or infer res = np.array(scope.find_var(abserr.name).get_tensor()) @@ -281,7 +281,7 @@ def rmse(sqrerr, total_ins_num, scope=None, util=None): distributed rmse in fleet Args: - sqrerr(numpy.array|Variable|string): sqrerr in output of fluid.contrib.layers.ctr_metric_bundle + sqrerr(numpy.array|Variable|string): sqrerr in output of paddle.static.ctr_metric_bundle total_ins_num(numpy.array|Variable|string): total variable scope(Scope): specific scope @@ -292,7 +292,7 @@ def rmse(sqrerr, total_ins_num, scope=None, util=None): .. code-block:: python # in model.py - sqrerr, abserr, prob, q, pos, total = fluid.contrib.layers.ctr_metric_bundle(similarity_norm, fluid.layers.cast(x=label, dtype='float32')) + sqrerr, abserr, prob, q, pos, total = paddle.static.ctr_metric_bundle(similarity_norm, paddle.cast(x=label, dtype='float32')) # in train.py, after train or infer res = np.array(scope.find_var(sqrerr.name).get_tensor()) @@ -331,7 +331,7 @@ def mse(sqrerr, total_ins_num, scope=None, util=None): distributed mse in fleet Args: - sqrerr(numpy.array|Variable|string): sqrerr in output of fluid.contrib.layers.ctr_metric_bundle + sqrerr(numpy.array|Variable|string): sqrerr in output of paddle.static.ctr_metric_bundle total_ins_num(numpy.array|Variable|string): total variable scope(Scope): specific scope @@ -342,7 +342,7 @@ def mse(sqrerr, total_ins_num, scope=None, util=None): .. code-block:: python # in model.py - sqrerr, abserr, prob, q, pos, total = fluid.contrib.layers.ctr_metric_bundle(similarity_norm, fluid.layers.cast(x=label, dtype='float32')) + sqrerr, abserr, prob, q, pos, total = paddle.static.ctr_metric_bundle(similarity_norm, paddle.cast(x=label, dtype='float32')) # in train.py, after train or infer metric = np.array(scope.find_var(sqrerr.name).get_tensor()) @@ -393,15 +393,15 @@ def acc(correct, total, scope=None, util=None): # in model.py correct = paddle.static.create_global_var(dtype='float32', shape=[1], value=0) total = paddle.static.create_global_var(dtype='float32', shape=[1], value=0) - acc = fluid.layers.acc(predict, label, k=1, correct=correct, total=total) + acc = paddle.metric.accuracy(predict, label, k=1, correct=correct, total=total) global_correct = paddle.static.create_global_var(persistable=True, dtype='float32', shape=[1], value=0) tmp1 = paddle.minimum(correct, global_correct) - fluid.layers.assign(tmp1, global_correct) + paddle.assign(tmp1, global_correct) global_total = paddle.static.create_global_var(persistable=True, dtype='float32', shape=[1], value=0) tmp2 = paddle.minimum(total, global_total) - fluid.layers.assign(tmp2, global_total) + paddle.assign(tmp2, global_total) # in train.py, after train or infer correct_num = np.array(scope.find_var(correct.name).get_tensor()) diff --git a/python/paddle/distributed/fleet/runtime/parameter_server_runtime.py b/python/paddle/distributed/fleet/runtime/parameter_server_runtime.py index 21086b1802516d17c43b7d491166818e4b301f79..61e853a4435a6556e879e0b6281f058814f34ac8 100644 --- a/python/paddle/distributed/fleet/runtime/parameter_server_runtime.py +++ b/python/paddle/distributed/fleet/runtime/parameter_server_runtime.py @@ -15,12 +15,19 @@ import os import warnings +import paddle import paddle.fluid as fluid from paddle.fluid import core -from paddle.fluid.compiler import CompiledProgram -from paddle.fluid.executor import Executor -from paddle.fluid.framework import Program, Variable -from paddle.fluid.parallel_executor import ParallelExecutor +from paddle.static import ( + CompiledProgram, + Executor, + ParallelExecutor, + Program, + Variable, + default_main_program, + default_startup_program, + save_inference_model, +) from ..base.private_helper_function import wait_server_ready from .runtime_base import RuntimeBase @@ -90,7 +97,7 @@ class ParameterServerRuntime(RuntimeBase): return var.name in varnames load_vars = list( - filter(_in_varnames, fluid.default_main_program().list_vars()) + filter(_in_varnames, default_main_program().list_vars()) ) if main_program is None: main_program = self.origin_main_program @@ -130,7 +137,7 @@ class ParameterServerRuntime(RuntimeBase): executor.run(load_prog) def _load_distributed_params(self, dirname, varnames): - from paddle.fluid.communicator import LargeScaleKV + from paddle.distributed.communicator import LargeScaleKV from paddle.fluid.incubate.fleet.parameter_server.ir.public import ( _get_varname_parts, ) @@ -202,7 +209,7 @@ class ParameterServerRuntime(RuntimeBase): if len(dist_varnames) != 0: raise ValueError( - "GeoStrategy can not support large scale embeding now, please use fluid.layers.embedding" + "GeoStrategy can not support large scale embeding now, please use paddle.static.nn.embedding" ) init_attrs = [] @@ -284,7 +291,7 @@ class ParameterServerRuntime(RuntimeBase): recv_type=1 ) - from paddle.fluid.communicator import Communicator + from paddle.distributed.communicator import Communicator self._communicator = Communicator( trainer_config.mode, kwargs, trainer_config.get_communicator_flags() @@ -297,7 +304,7 @@ class ParameterServerRuntime(RuntimeBase): warnings.warn("communicator has been initialized, skip") def _get_executor(self): - executor = fluid.Executor(fluid.CPUPlace()) + executor = Executor(paddle.CPUPlace()) if self.role_maker._is_heter_parameter_server_mode: heter_worker_device_guard = ( self.context["valid_strategy"] @@ -313,13 +320,13 @@ class ParameterServerRuntime(RuntimeBase): if self.role_maker._is_heter_worker(): if heter_worker_device_guard == "GPU": executor = Executor( - fluid.CUDAPlace( + paddle.CUDAPlace( int(os.getenv("FLAGS_selected_gpus", "0")) ) ) elif heter_worker_device_guard == "XPU": executor = Executor( - fluid.XPUPlace( + paddle.XPUPlace( int(os.getenv("FLAGS_selected_xpus", "0")) ) ) @@ -340,7 +347,7 @@ class ParameterServerRuntime(RuntimeBase): ): # for heter trainer wait server ready wait_server_ready(self.role_maker._get_pserver_endpoints()) - executor.run(fluid.default_startup_program()) + executor.run(default_startup_program()) if self.role_maker._is_heter_worker(): self._init_worker() @@ -375,7 +382,7 @@ class ParameterServerRuntime(RuntimeBase): + sparse_related_optimize_varnames + distributed_related_optimize_varnames ), - fluid.default_main_program().list_vars(), + default_main_program().list_vars(), ) ) @@ -386,9 +393,9 @@ class ParameterServerRuntime(RuntimeBase): raise ValueError("There is no directory named '%s'", model_dirname) # load dense - fluid.io.load_vars( + paddle.static.load_vars( executor, - main_program=fluid.default_main_program(), + main_program=default_main_program(), dirname=model_dirname, vars=remaining_vars, ) @@ -409,7 +416,7 @@ class ParameterServerRuntime(RuntimeBase): def _run_server(self): executor = self._get_executor() - executor.run(fluid.default_main_program()) + executor.run(default_main_program()) def _stop_worker(self): self._communicator.stop() @@ -671,7 +678,7 @@ class ParameterServerRuntime(RuntimeBase): ) ) - fluid.io.save_vars( + paddle.static.save_vars( executor, main_program=main_program, dirname=dirname, @@ -743,7 +750,7 @@ class ParameterServerRuntime(RuntimeBase): raise TypeError( "in fleet.save_inference_model() function, main_program must be as Program type, CompiledProgram is not allowed" ) - fluid.io.save_inference_model( + save_inference_model( dirname, feeded_var_names, target_vars, @@ -754,7 +761,7 @@ class ParameterServerRuntime(RuntimeBase): export_for_deployment, ) else: - fluid.io.save_inference_model( + save_inference_model( dirname, feeded_var_names, target_vars, @@ -773,7 +780,7 @@ class ParameterServerRuntime(RuntimeBase): program_desc_str = f.read() program = Program.parse_from_string(program_desc_str) - program._copy_dist_param_info_from(fluid.default_main_program()) + program._copy_dist_param_info_from(default_main_program()) self._ps_inference_save_persistables( executor, dirname, program, mode=0 ) diff --git a/python/paddle/distributed/passes/ps_trainer_pass.py b/python/paddle/distributed/passes/ps_trainer_pass.py index 588578514393cec64304b8f6e7b98f7e2f528e77..5bb15e40854a40ab58cf28865e26ca6fe108b578 100755 --- a/python/paddle/distributed/passes/ps_trainer_pass.py +++ b/python/paddle/distributed/passes/ps_trainer_pass.py @@ -17,10 +17,10 @@ import os from _collections import defaultdict import paddle +import paddle.fluid.framework as framework from paddle.distributed.passes.pass_base import PassBase, register_pass -from paddle.fluid.framework import Parameter from paddle.framework import core -from paddle.static import Program +from paddle.static import Parameter, Program from ..ps.utils.collective_transpiler import SingleProcessMultiThread from ..ps.utils.public import * # noqa: F403 @@ -757,7 +757,7 @@ class PsGpuPass(PassBase): ) new_op_desc.copy_from(op_desc) new_op_desc._set_attr(op_role_attr_name, backward) - new_op = paddle.fluid.framework.Operator( + new_op = paddle.static.Operator( program.global_block(), new_op_desc ) program.global_block().ops.insert(insert_index + 1, new_op) diff --git a/python/paddle/distributed/ps/utils/ps_program_builder.py b/python/paddle/distributed/ps/utils/ps_program_builder.py index 02712e31d949da7590a489bcf2fb43b8a458a9e9..74bb68b46af0cdc4c2707e0fd404ec5e42fde3bf 100755 --- a/python/paddle/distributed/ps/utils/ps_program_builder.py +++ b/python/paddle/distributed/ps/utils/ps_program_builder.py @@ -472,7 +472,7 @@ class FlPsProgramBuilder(HeterAsyncPsProgramBuilder): if not self.is_server: self._build_trainer_programs() fluid.framework.switch_startup_program(self.cloned_startup) - fluid.framework.switch_main_program(self.cloned_main) + paddle.framework.switch_main_program(self.cloned_main) print( "paddle.static.default_startup_program: {}".format( paddle.static.default_startup_program()._heter_pipeline_opt @@ -483,4 +483,4 @@ class FlPsProgramBuilder(HeterAsyncPsProgramBuilder): fluid.framework.switch_startup_program( self.attrs['_startup_server'] ) - fluid.framework.switch_main_program(self.attrs['_main_server']) + paddle.framework.switch_main_program(self.attrs['_main_server']) diff --git a/python/paddle/distributed/ps/utils/public.py b/python/paddle/distributed/ps/utils/public.py index d797591f96921dc66132bfab7dc0cbc35c385aa0..89551714b390d5d975af74c053c43c439b722573 100755 --- a/python/paddle/distributed/ps/utils/public.py +++ b/python/paddle/distributed/ps/utils/public.py @@ -18,8 +18,8 @@ import os import warnings from functools import reduce -import paddle.fluid as fluid -import paddle.fluid.framework as framework +from paddle.distributed.io import is_persistable +from paddle.fluid.framework import generate_control_dev_var_name from paddle.framework import core # logging.basicConfig( @@ -1253,7 +1253,7 @@ def screen_persistables(program, var_list): else: var = program.global_block().vars[var_name] - if fluid.io.is_persistable(var): + if is_persistable(var): need_remove.append(var_name) for var_name in need_remove: @@ -1676,9 +1676,7 @@ def add_send_op(program, block, _vars): table_dict[table_id]['var_list'].append(persistable_var) for table_id in table_dict: - dummy_output = block.create_var( - name=framework.generate_control_dev_var_name() - ) + dummy_output = block.create_var(name=generate_control_dev_var_name()) send_input_vars = [ block.vars[union_var] for union_var in table_dict[table_id]['var_list']