From ae75affd34b5308f8456bed4ecb94bba6321cb24 Mon Sep 17 00:00:00 2001 From: 123malin Date: Fri, 15 Jan 2021 11:59:09 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90Cherry-Pick=E3=80=91add=20distributed?= =?UTF-8?q?=5Finfer=20(#30300)=20(#30427)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * test=develop, add distributed_infer (#30300) * test=develop, add distributed_infer * test=develop, fix unittest cmakefile conflict * test=develop, fix test_dist_fleet_base --- .../distributed/service/brpc_ps_server.cc | 1 + .../distributed/table/common_sparse_table.cc | 27 +++-- python/paddle/distributed/fleet/__init__.py | 2 +- .../distributed/fleet/metrics/metric.py | 37 +++--- .../distributed/fleet/runtime/the_one_ps.py | 7 +- .../distributed/fleet/utils/__init__.py | 2 +- .../paddle/distributed/fleet/utils/ps_util.py | 103 ++++++++++++++++- .../fluid/tests/unittests/CMakeLists.txt | 1 + .../fluid/tests/unittests/dist_fleet_ctr.py | 29 +---- .../tests/unittests/test_dist_fleet_base.py | 72 ++++++++---- .../tests/unittests/test_dist_fleet_infer.py | 108 ++++++++++++++++++ .../tests/unittests/test_fleet_metric.py | 17 +-- 12 files changed, 311 insertions(+), 95 deletions(-) create mode 100644 python/paddle/fluid/tests/unittests/test_dist_fleet_infer.py diff --git a/paddle/fluid/distributed/service/brpc_ps_server.cc b/paddle/fluid/distributed/service/brpc_ps_server.cc index 92a317d4e48..a6837cd4525 100644 --- a/paddle/fluid/distributed/service/brpc_ps_server.cc +++ b/paddle/fluid/distributed/service/brpc_ps_server.cc @@ -103,6 +103,7 @@ int32_t BrpcPsService::initialize() { _service_handler_map[PS_BARRIER] = &BrpcPsService::barrier; _service_handler_map[PS_START_PROFILER] = &BrpcPsService::start_profiler; _service_handler_map[PS_STOP_PROFILER] = &BrpcPsService::stop_profiler; + _service_handler_map[PS_PUSH_GLOBAL_STEP] = &BrpcPsService::push_global_step; // shard初始化,server启动后才可从env获取到server_list的shard信息 initialize_shard_info(); diff --git a/paddle/fluid/distributed/table/common_sparse_table.cc b/paddle/fluid/distributed/table/common_sparse_table.cc index fffe5eac1d8..98db14e0eca 100644 --- a/paddle/fluid/distributed/table/common_sparse_table.cc +++ b/paddle/fluid/distributed/table/common_sparse_table.cc @@ -94,23 +94,28 @@ struct Meta { void ProcessALine(const std::vector& columns, const Meta& meta, std::vector>* values) { - PADDLE_ENFORCE_EQ(columns.size(), meta.names.size() + 1, + PADDLE_ENFORCE_EQ(columns.size(), 2, paddle::platform::errors::InvalidArgument( - "record in txt do not match meta.")); + "The data format does not meet the requirements. It " + "should look like feasign_id \t params.")); - values->reserve(columns.size() - 1); - - for (int x = 1; x < columns.size(); ++x) { - auto& column = columns[x]; - auto val_ = paddle::string::split_string(column, ","); + auto load_values = paddle::string::split_string(columns[1], ","); + values->reserve(meta.names.size()); + int offset = 0; + for (int x = 0; x < meta.names.size(); ++x) { std::vector val; - std::transform(val_.begin(), val_.end(), std::back_inserter(val), - [](std::string va) { return std::stof(va); }); - PADDLE_ENFORCE_EQ(val.size(), meta.dims[x - 1], + auto start = load_values.begin() + offset; + auto end = load_values.begin() + offset + meta.dims[x]; + PADDLE_ENFORCE_LE(offset + meta.dims[x], load_values.size(), paddle::platform::errors::InvalidArgument( - "record in txt do not match meta.")); + "The data format in txt does not meet the field " + "requirements defined in meta")); + + std::transform(start, end, std::back_inserter(val), + [](std::string va) { return std::stof(va); }); values->push_back(val); + offset += meta.dims[x]; } } diff --git a/python/paddle/distributed/fleet/__init__.py b/python/paddle/distributed/fleet/__init__.py index 6282b9021b4..0b7e8da101b 100644 --- a/python/paddle/distributed/fleet/__init__.py +++ b/python/paddle/distributed/fleet/__init__.py @@ -19,7 +19,7 @@ from .base.fleet_base import Fleet from .base.util_factory import UtilBase from .dataset import * from .data_generator import MultiSlotDataGenerator, MultiSlotStringDataGenerator -#from . import metrics +from . import metrics __all__ = [ "DistributedStrategy", diff --git a/python/paddle/distributed/fleet/metrics/metric.py b/python/paddle/distributed/fleet/metrics/metric.py index 00525dfcb96..d057f207314 100644 --- a/python/paddle/distributed/fleet/metrics/metric.py +++ b/python/paddle/distributed/fleet/metrics/metric.py @@ -13,11 +13,10 @@ # limitations under the License. """Fleet Metrics""" -import paddle.fluid as fluid import math import numpy as np -from paddle.fluid.framework import Variable -import paddle.distributed.fleet as fleet +from paddle.static import Variable +import paddle def sum(input, scope=None, util=None): @@ -46,9 +45,9 @@ def sum(input, scope=None, util=None): print("sum array: ", paddle.distributed.fleet.sum(res)) """ if scope is None: - scope = fluid.global_scope() + scope = paddle.static.global_scope() if util is None: - util = fleet.util + util = paddle.distributed.fleet.util if isinstance(input, Variable): input = np.array(scope.find_var(input.name).get_tensor()) elif isinstance(input, str): @@ -86,9 +85,9 @@ def max(input, scope=None, util=None): print("max array: ", paddle.distributed.fleet.max(res)) """ if scope is None: - scope = fluid.global_scope() + scope = paddle.static.global_scope() if util is None: - util = fleet.util + util = paddle.distributed.fleet.util if isinstance(input, Variable): input = np.array(scope.find_var(input.name).get_tensor()) elif isinstance(input, str): @@ -126,9 +125,9 @@ def min(input, scope=None, util=None): print("min array: ", paddle.distributed.fleet.min(res)) """ if scope is None: - scope = fluid.global_scope() + scope = paddle.static.global_scope() if util is None: - util = fleet.util + util = paddle.distributed.fleet.util if isinstance(input, Variable): input = np.array(scope.find_var(input.name).get_tensor()) elif isinstance(input, str): @@ -168,9 +167,9 @@ def auc(stat_pos, stat_neg, scope=None, util=None): print("auc: ", paddle.distributed.fleet.auc(pos, neg)) """ if scope is None: - scope = fluid.global_scope() + scope = paddle.static.global_scope() if util is None: - util = fleet.util + util = paddle.distributed.fleet.util if isinstance(stat_pos, Variable): stat_pos = np.array(scope.find_var(stat_pos.name).get_tensor()) @@ -246,9 +245,9 @@ def mae(abserr, total_ins_num, scope=None, util=None): print("mae: ", paddle.distributed.fleet.mae(res, total_ins_num)) """ if scope is None: - scope = fluid.global_scope() + scope = paddle.static.global_scope() if util is None: - util = fleet.util + util = paddle.distributed.fleet.util if isinstance(abserr, Variable): abserr = np.array(scope.find_var(abserr.name).get_tensor()) @@ -289,9 +288,9 @@ def rmse(sqrerr, total_ins_num, scope=None, util=None): print("rmse: ", paddle.distributed.fleet.rmse(res, total_ins_num)) """ if scope is None: - scope = fluid.global_scope() + scope = paddle.static.global_scope() if util is None: - util = fleet.util + util = paddle.distributed.fleet.util if isinstance(sqrerr, Variable): sqrerr = np.array(scope.find_var(sqrerr.name).get_tensor()) @@ -331,9 +330,9 @@ def mse(sqrerr, total_ins_num, scope=None, util=None): print("mse: ", paddle.distributed.fleet.mse(metric, total_ins_num)) """ if scope is None: - scope = fluid.global_scope() + scope = paddle.static.global_scope() if util is None: - util = fleet.util + util = paddle.distributed.fleet.util if isinstance(sqrerr, Variable): sqrerr = np.array(scope.find_var(sqrerr.name).get_tensor()) @@ -384,9 +383,9 @@ def acc(correct, total, scope=None, util=None): print("accuracy: ", paddle.distributed.fleet.acc(correct_num, total_num)) """ if scope is None: - scope = fluid.global_scope() + scope = paddle.static.global_scope() if util is None: - util = fleet.util + util = paddle.distributed.fleet.util if isinstance(correct, Variable): correct = np.array(scope.find_var(correct.name).get_tensor()) diff --git a/python/paddle/distributed/fleet/runtime/the_one_ps.py b/python/paddle/distributed/fleet/runtime/the_one_ps.py index e0caae9a2fd..20bf443689e 100644 --- a/python/paddle/distributed/fleet/runtime/the_one_ps.py +++ b/python/paddle/distributed/fleet/runtime/the_one_ps.py @@ -30,6 +30,9 @@ def conv_indent(indent): return "".join([" "] * indent) +PSERVER_SAVE_SUFFIX = "_txt" + + class Accessor: def __init__(self): self.accessor_class = "" @@ -789,9 +792,9 @@ class TheOnePSRuntime(RuntimeBase): begin = time.time() for var_name in load_varnames: table_id = sparse_table_maps[var_name] - path = os.path.join(dirname, var_name, + path = os.path.join(dirname, var_name + PSERVER_SAVE_SUFFIX, "{}.block{}.txt".format(var_name, pserver_id)) - meta = os.path.join(dirname, var_name, + meta = os.path.join(dirname, var_name + PSERVER_SAVE_SUFFIX, "{}.block{}.meta".format(var_name, pserver_id)) self._server.load_sparse(path, meta, table_id) end = time.time() diff --git a/python/paddle/distributed/fleet/utils/__init__.py b/python/paddle/distributed/fleet/utils/__init__.py index ce86c3945cc..774e8db0df5 100644 --- a/python/paddle/distributed/fleet/utils/__init__.py +++ b/python/paddle/distributed/fleet/utils/__init__.py @@ -13,4 +13,4 @@ # limitations under the License. from .fs import LocalFS, HDFSClient -from .ps_util import Distributed +from .ps_util import DistributedInfer diff --git a/python/paddle/distributed/fleet/utils/ps_util.py b/python/paddle/distributed/fleet/utils/ps_util.py index 0fba1c6c552..a409d02c984 100644 --- a/python/paddle/distributed/fleet/utils/ps_util.py +++ b/python/paddle/distributed/fleet/utils/ps_util.py @@ -14,11 +14,104 @@ """Parameter Server utils""" import numpy as np - - -class Distributed: - @staticmethod - def estimate(main_program, varname2tables): +import os +import paddle + + +class DistributedInfer: + """ + Utility class for distributed infer of PaddlePaddle. + """ + + def __init__(self, main_program=None, startup_program=None): + if main_program: + self.origin_main_program = main_program.clone() + else: + self.origin_main_program = paddle.static.default_main_program( + ).clone() + + if startup_program: + self.origin_startup_program = startup_program + else: + self.origin_startup_program = paddle.static.default_startup_program( + ) + self.sparse_table_maps = None + + def init_distributed_infer_env(self, + exe, + loss, + role_maker=None, + dirname=None): + import paddle.distributed.fleet as fleet + + if fleet.fleet._runtime_handle is None: + fleet.init(role_maker=role_maker) + + fake_optimizer = paddle.optimizer.SGD() + strategy = fleet.DistributedStrategy() + strategy.a_sync = True + optimizer = fleet.distributed_optimizer( + fake_optimizer, strategy=strategy) + optimizer.minimize( + loss, startup_program=self.origin_startup_program) + + if fleet.is_server(): + fleet.init_server(dirname=dirname) + fleet.run_server() + else: + exe.run(paddle.static.default_startup_program()) + fleet.init_worker() + self._init_dense_params(exe, dirname) + global_startup_program = paddle.static.default_startup_program() + global_startup_program = self.origin_startup_program + global_main_program = paddle.static.default_main_program() + global_main_program = self.origin_main_program + + def _get_sparse_table_map(self): + import paddle.distributed.fleet as fleet + + if self.sparse_table_maps is None: + self.sparse_table_maps = {} + send_ctx = fleet.fleet._runtime_handle._communicator.send_ctx_ + for gradname, ctx in send_ctx.items(): + if ctx.is_sparse: + param = gradname.strip("@GRAD") + self.sparse_table_maps[param] = ctx.table_id() + else: + continue + return self.sparse_table_maps + + def _init_dense_params(self, exe=None, dirname=None): + import paddle.distributed.fleet as fleet + + sparse_table_maps = self._get_sparse_table_map() + + if dirname is not None and exe is not None: + all_persist_vars = [ + v for v in self.origin_main_program.list_vars() + if paddle.static.io.is_persistable(v) + ] + dense_persist_vars = [(v.name, v) for v in all_persist_vars + if v.name not in sparse_table_maps] + need_load_vars = [ + v[1] for v in dense_persist_vars + if os.path.isfile(os.path.join(dirname, v[0])) + ] + paddle.static.load_vars( + exe, + dirname, + main_program=self.origin_main_program, + vars=need_load_vars) + + def get_dist_infer_program(self): + import paddle.distributed.fleet as fleet + + varname2tables = self._get_sparse_table_map() + convert_program = self._convert_program(self.origin_main_program, + varname2tables) + return convert_program + + def _convert_program(self, main_program, varname2tables): def distributed_ops_pass(program): SPARSE_OP_TYPE_DICT = {"lookup_table": "W", "lookup_table_v2": "W"} diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index c1290ae37ac..5ad0587132d 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -661,6 +661,7 @@ endif() if (WITH_DISTRIBUTE) set_tests_properties(test_communicator_half_async PROPERTIES TIMEOUT 120) + set_tests_properties(test_dist_fleet_infer PROPERTIES TIMEOUT 200) endif() if (WITH_DISTRIBUTE AND NOT APPLE) diff --git a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py index f974098bbef..3ab93b38795 100644 --- a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py +++ b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py @@ -28,7 +28,7 @@ import numpy as np import ctr_dataset_reader from test_dist_fleet_base import runtime_main, FleetDistRunnerBase -from paddle.distributed.fleet.utils.ps_util import Distributed +from paddle.distributed.fleet.utils.ps_util import DistributedInfer import paddle.distributed.fleet as fleet paddle.enable_static() @@ -165,17 +165,11 @@ class TestDistCTR2x2(FleetDistRunnerBase): with open(os.path.join(dirname, "__model__.proto"), "w") as wn: wn.write(str(program)) - def do_distributed_testing(self, args, test_main_program, - test_startup_program): + def do_distributed_testing(self, fleet): """ do distributed """ - device_env = os.getenv("DEVICE", 'cpu') - if device_env == 'cpu': - device = fluid.CPUPlace() - elif device_env == 'gpu': - device = fluid.CUDAPlace(0) - exe = fluid.Executor(device) + exe = self.get_executor() batch_size = 4 test_reader = paddle.batch(fake_ctr_reader(), batch_size=batch_size) @@ -188,7 +182,7 @@ class TestDistCTR2x2(FleetDistRunnerBase): try: while True: batch_idx += 1 - loss_val = exe.run(program=test_main_program, + loss_val = exe.run(program=paddle.static.default_main_program(), fetch_list=[self.avg_cost.name]) loss_val = np.mean(loss_val) message = "TEST ---> batch_idx: {} loss: {}\n".format(batch_idx, @@ -207,12 +201,7 @@ class TestDistCTR2x2(FleetDistRunnerBase): Args: fleet(Fleet api): the fleet object of Parameter Server, define distribute training role """ - device_env = os.getenv("DEVICE", 'cpu') - if device_env == 'cpu': - device = fluid.CPUPlace() - elif device_env == 'gpu': - device = fluid.CUDAPlace(0) - exe = fluid.Executor(device) + exe = self.get_executor() exe.run(fluid.default_startup_program()) fleet.init_worker() @@ -250,13 +239,7 @@ class TestDistCTR2x2(FleetDistRunnerBase): def do_dataset_training(self, fleet): train_file_list = ctr_dataset_reader.prepare_fake_data() - device_env = os.getenv("DEVICE", 'cpu') - if device_env == 'cpu': - device = fluid.CPUPlace() - elif device_env == 'gpu': - device = fluid.CUDAPlace(0) - exe = fluid.Executor(device) - + exe = self.get_executor() exe.run(fluid.default_startup_program()) fleet.init_worker() diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py index aefab47ee7c..03d7251f829 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py @@ -13,7 +13,7 @@ # limitations under the License. from __future__ import print_function -from paddle.distributed.fleet.utils.ps_util import Distributed +from paddle.distributed.fleet.utils.ps_util import DistributedInfer from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory import paddle.distributed.fleet as fleet import paddle.distributed.fleet.base.role_maker as role_maker @@ -37,11 +37,6 @@ import tempfile import unittest import paddle -import paddle.fluid as fluid -import paddle.distributed.fleet.base.role_maker as role_maker -import paddle.distributed.fleet as fleet -from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory -from paddle.distributed.fleet.utils.ps_util import Distributed paddle.enable_static() __all__ = ['FleetDistRunnerBase', 'TestFleetBase', 'runtime_main'] @@ -58,6 +53,9 @@ class FleetDistRunnerBase(object): do training : exe run program """ + def __init__(self): + self._exe = None + def build_role(self, args): if args.role.upper() == "PSERVER": @@ -159,6 +157,16 @@ class FleetDistRunnerBase(object): raise NotImplementedError( "get_model should be implemented by child classes.") + def get_executor(self): + if self._exe is None: + device_env = os.getenv("DEVICE", 'cpu') + if device_env == 'cpu': + device = fluid.CPUPlace() + elif device_env == 'gpu': + device = fluid.CUDAPlace(0) + self._exe = fluid.Executor(device) + return self._exe + def do_dataset_training(self, fleet): raise NotImplementedError( "do_dataset_training should be implemented by child classes.") @@ -193,6 +201,7 @@ class TestFleetBase(unittest.TestCase): self._trainers = 2 self._pservers = 2 self._need_test = 0 + self._model_dir = "" self._port_set = set() global DIST_UT_PORT @@ -290,6 +299,10 @@ class TestFleetBase(unittest.TestCase): self._trainers, self._mode, self._geo_sgd_need_push_nums, self._reader, gloo_path, self._need_test) + if self._model_dir: + tr_cmd += " --model_dir {}".format(self._model_dir) + ps_cmd += " --model_dir {}".format(self._model_dir) + # Run dist train to compare with local results ps0, ps1, ps0_pipe, ps1_pipe = self._start_pserver(ps_cmd, env) tr0, tr1, tr0_pipe, tr1_pipe = self._start_trainer(tr_cmd, env) @@ -381,14 +394,32 @@ def runtime_main(test_class): '--geo_sgd_need_push_nums', type=int, required=False, default=2) parser.add_argument('--reader', type=str, required=False, default='dataset') parser.add_argument('--test', type=int, required=False, default=0) + parser.add_argument('--model_dir', type=str, required=False, default="") args = parser.parse_args() model = test_class() role = model.build_role(args) + + if args.test and args.model_dir != "": + avg_cost = model.net(args, is_train=False) + dist_infer = DistributedInfer() + dist_infer.init_distributed_infer_env( + exe=model.get_executor(), + loss=model.avg_cost, + role_maker=role, + dirname=args.model_dir) + if fleet.is_worker(): + with paddle.static.program_guard( + main_program=dist_infer.get_dist_infer_program()): + model.do_distributed_testing(fleet) + fleet.stop_worker() + return + fleet.init(role) strategy = model.build_strategy(args) avg_cost = model.net(args) model.build_optimizer(avg_cost, strategy) + if args.role == "pserver": model.run_pserver(args) else: @@ -398,26 +429,17 @@ def runtime_main(test_class): model.run_pyreader_trainer(args) if args.test: - test_origin_program = fluid.Program() - test_startup_program = fluid.Program() - with fluid.program_guard( + test_origin_program = paddle.static.Program() + test_startup_program = paddle.static.Program() + with paddle.static.program_guard( main_program=test_origin_program, startup_program=test_startup_program): - with fluid.unique_name.guard(): + with paddle.utils.unique_name.guard(): avg_cost = model.net(args, is_train=False) - send_ctx = fleet.fleet._runtime_handle._communicator.send_ctx_ - varname2tables = {} - for gradname, ctx in send_ctx.items(): - if ctx.is_sparse: - param = gradname.strip("@GRAD") - varname2tables[param] = ctx.table_id() - else: - continue - ps_util = Distributed() - test_main_program = ps_util.estimate(test_origin_program, - varname2tables) - print(str(test_main_program)) - print(str(test_startup_program)) - model.do_distributed_testing(args, test_main_program, - test_startup_program) + dist_infer = DistributedInfer( + main_program=test_origin_program, + startup_program=test_startup_program) + with paddle.static.program_guard( + main_program=dist_infer.get_dist_infer_program()): + model.do_distributed_testing(fleet) fleet.stop_worker() diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_infer.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_infer.py new file mode 100644 index 00000000000..3d24328c9d0 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_infer.py @@ -0,0 +1,108 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import os +import shutil +import unittest +import tempfile +import tarfile +from test_dist_fleet_base import TestFleetBase +from paddle.dataset.common import download, DATA_HOME + + +class TestDistCtrInfer(TestFleetBase): + def _setup_config(self): + self._mode = "async" + self._reader = "pyreader" + self._need_test = 1 + + data_url = "https://fleet.bj.bcebos.com/unittest/ctr_saved_params.tar.gz" + data_md5 = "aa7e8286ced566ea8a67410be7482438" + module_name = "ctr_saved_params" + path = download(data_url, module_name, data_md5) + print('ctr_params is downloaded at ' + path) + tar = tarfile.open(path) + unzip_folder = tempfile.mkdtemp() + tar.extractall(unzip_folder) + self._model_dir = unzip_folder + + def check_with_place(self, + model_file, + delta=1e-3, + check_error_log=False, + need_envs={}): + required_envs = { + "PATH": os.getenv("PATH", ""), + "PYTHONPATH": os.getenv("PYTHONPATH", ""), + "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""), + "FLAGS_rpc_deadline": "30000", # 5sec to fail fast + "http_proxy": "", + "FLAGS_communicator_send_queue_size": "2", + "FLAGS_communicator_max_merge_var_num": "2", + "CPU_NUM": "2", + } + + required_envs.update(need_envs) + + if check_error_log: + required_envs["GLOG_v"] = "3" + required_envs["GLOG_logtostderr"] = "1" + + tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs) + + def test_dist_infer(self): + self.check_with_place( + "dist_fleet_ctr.py", delta=1e-5, check_error_log=False) + shutil.rmtree(self._model_dir) + + +class TestDistCtrTrainInfer(TestFleetBase): + def _setup_config(self): + self._mode = "async" + self._reader = "pyreader" + self._need_test = 1 + + def check_with_place(self, + model_file, + delta=1e-3, + check_error_log=False, + need_envs={}): + required_envs = { + "PATH": os.getenv("PATH", ""), + "PYTHONPATH": os.getenv("PYTHONPATH", ""), + "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""), + "FLAGS_rpc_deadline": "30000", # 5sec to fail fast + "http_proxy": "", + "FLAGS_communicator_send_queue_size": "2", + "FLAGS_communicator_max_merge_var_num": "2", + "CPU_NUM": "2", + } + + required_envs.update(need_envs) + + if check_error_log: + required_envs["GLOG_v"] = "3" + required_envs["GLOG_logtostderr"] = "1" + + tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs) + + def test_dist_train_infer(self): + self.check_with_place( + "dist_fleet_ctr.py", delta=1e-5, check_error_log=False) + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_fleet_metric.py b/python/paddle/fluid/tests/unittests/test_fleet_metric.py index 511b29780cb..aae2d7f3aa5 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_metric.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_metric.py @@ -73,6 +73,7 @@ class TestFleetMetric(unittest.TestCase): pass self.util = FakeUtil(FakeFleet()) + fleet.util = self.util def test_metric_1(self): """Test cases for metrics.""" @@ -104,14 +105,14 @@ class TestFleetMetric(unittest.TestCase): metric.rmse(t1, 3, scope, self.util) metric.mse(t1, 3, scope, self.util) metric.acc(t, t1, scope, self.util) - metric.sum(str(t.name), scope, self.util) - metric.max(str(t.name), scope, self.util) - metric.min(str(t.name), scope, self.util) - metric.auc(str(t1.name), str(t.name), scope, self.util) - metric.mae(str(t1.name), 3, scope, self.util) - metric.rmse(str(t1.name), 3, scope, self.util) - metric.mse(str(t1.name), 3, scope, self.util) - metric.acc(str(t.name), str(t1.name), scope, self.util) + metric.sum(str(t.name)) + metric.max(str(t.name)) + metric.min(str(t.name)) + metric.auc(str(t1.name), str(t.name)) + metric.mae(str(t1.name), 3) + metric.rmse(str(t1.name), 3) + metric.mse(str(t1.name), 3) + metric.acc(str(t.name), str(t1.name)) arr = np.array([1, 2, 3, 4]) metric.sum(arr, util=self.util) metric.max(arr, util=self.util) -- GitLab