From 890f626b24242f8c67c61721f2d1951479d9f178 Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Wed, 12 May 2021 12:24:49 +0800 Subject: [PATCH] Optimize/fleet save (#32817) * fix cpp lint * fix save/load with unexpected value * fix save and user interface --- .../distributed/table/common_sparse_table.cc | 49 ++++++++----- .../distributed/fleet/base/fleet_base.py | 49 +++++++++++++ .../distributed/fleet/runtime/the_one_ps.py | 70 +++++++++---------- .../tests/unittests/test_fleet_base_2.py | 26 +++---- 4 files changed, 130 insertions(+), 64 deletions(-) diff --git a/paddle/fluid/distributed/table/common_sparse_table.cc b/paddle/fluid/distributed/table/common_sparse_table.cc index 718fce9950..a4f672c296 100644 --- a/paddle/fluid/distributed/table/common_sparse_table.cc +++ b/paddle/fluid/distributed/table/common_sparse_table.cc @@ -13,9 +13,9 @@ // limitations under the License. #include "paddle/fluid/distributed/table/common_sparse_table.h" - #include +#include "boost/lexical_cast.hpp" #include "glog/logging.h" #include "paddle/fluid/platform/enforce.h" @@ -25,7 +25,8 @@ class ValueBlock; } // namespace distributed } // namespace paddle -#define PSERVER_SAVE_SUFFIX "_txt" +#define PSERVER_SAVE_SUFFIX ".shard" +using boost::lexical_cast; namespace paddle { namespace distributed { @@ -100,7 +101,7 @@ struct Meta { }; void ProcessALine(const std::vector& columns, const Meta& meta, - std::vector>* values) { + const int64_t id, std::vector>* values) { auto colunmn_size = columns.size(); auto load_values = paddle::string::split_string(columns[colunmn_size - 1], ","); @@ -116,8 +117,18 @@ void ProcessALine(const std::vector& columns, const Meta& meta, "The data format in txt does not meet the field " "requirements defined in meta")); - std::transform(start, end, std::back_inserter(val), - [](std::string va) { return std::stof(va); }); + std::transform(start, end, std::back_inserter(val), [id](std::string va) { + float v = 0.0; + + try { + v = lexical_cast(va); + } catch (boost::bad_lexical_cast& e) { + VLOG(0) << "id: " << id << " get unexpected value: " << va + << " and be reset to: 0.0"; + } + return v; + }); + values->push_back(val); offset += meta.dims[x]; } @@ -126,25 +137,29 @@ void ProcessALine(const std::vector& columns, const Meta& meta, int64_t SaveToText(std::ostream* os, std::shared_ptr block, const int mode) { int64_t save_num = 0; + for (auto& table : block->values_) { for (auto& value : table) { if (mode == SaveMode::delta && !value.second->need_save_) { continue; } - save_num += 1; - auto* vs = value.second->data_.data(); + ++save_num; + std::stringstream ss; + auto* vs = value.second->data_.data(); + auto id = value.first; + ss << id << "\t" << value.second->count_ << "\t" << value.second->unseen_days_ << "\t" << value.second->is_entry_ << "\t"; - for (int i = 0; i < block->value_length_; i++) { - ss << vs[i]; - ss << ","; + for (int i = 0; i < block->value_length_ - 1; i++) { + ss << std::to_string(vs[i]) << ","; } + ss << std::to_string(vs[block->value_length_ - 1]); ss << "\n"; os->write(ss.str().c_str(), sizeof(char) * ss.str().size()); @@ -170,7 +185,7 @@ int64_t LoadFromText(const std::string& valuepath, const std::string& metapath, while (std::getline(file, line)) { auto values = paddle::string::split_string(line, "\t"); - auto id = std::stoull(values[0]); + auto id = lexical_cast(values[0]); if (id % pserver_num != pserver_id) { VLOG(3) << "will not load " << values[0] << " from " << valuepath @@ -182,15 +197,17 @@ int64_t LoadFromText(const std::string& valuepath, const std::string& metapath, auto block = blocks->at(shard_id); std::vector> kvalues; - ProcessALine(values, meta, &kvalues); + ProcessALine(values, meta, id, &kvalues); block->Init(id, false); VALUE* value_instant = block->GetValue(id); + if (values.size() == 5) { - value_instant->count_ = std::stoi(values[1]); - value_instant->unseen_days_ = std::stoi(values[2]); - value_instant->is_entry_ = static_cast(std::stoi(values[3])); + value_instant->count_ = lexical_cast(values[1]); + value_instant->unseen_days_ = lexical_cast(values[2]); + value_instant->is_entry_ = + static_cast(lexical_cast(values[3])); } std::vector block_values = block->Get(id, meta.names, meta.dims); @@ -475,7 +492,7 @@ int32_t CommonSparseTable::pull_sparse_ptr(char** pull_values, auto* value = block->InitGet(id); // std::copy_n(value + param_offset_, param_dim_, // pull_values + param_dim_ * offset); - pull_values[offset] = (char*)value; + pull_values[offset] = reinterpret_cast(value); } return 0; diff --git a/python/paddle/distributed/fleet/base/fleet_base.py b/python/paddle/distributed/fleet/base/fleet_base.py index a7564a23a7..15ee047b1a 100644 --- a/python/paddle/distributed/fleet/base/fleet_base.py +++ b/python/paddle/distributed/fleet/base/fleet_base.py @@ -580,6 +580,49 @@ class Fleet(object): """ self._runtime_handle._stop_worker() + def save(self, dirname, feed=[], fetch=[], **configs): + inference = True + + if not feed and not fetch: + inference = False + + place = paddle.CPUPlace() + executor = paddle.static.Executor(place) + + if inference: + feeded_var_names = [] + fetch_var_names = [] + + for var in feed: + if isinstance(var, str): + feeded_var_names.append(var) + elif isinstance(var, paddle.static.Variable): + feeded_var_names.append(var.name) + else: + raise ValueError("feed must be [str|Variable]") + + for var in fetch: + if isinstance(var, str): + fetch_var_names.append(var) + elif isinstance(var, paddle.static.Variable): + fetch_var_names.append(var.name) + else: + raise ValueError("feed must be [str|Variable]") + + fetch_vars = [ + paddle.static.default_main_program().global_block().var(name) + for name in fetch_var_names + ] + + self._runtime_handle._save_inference_model( + executor, dirname, feeded_var_names, fetch_vars, None, True, 0) + else: + increment_mode = 0 + if "mode" in configs: + increment_mode = int(configs["mode"]) + self._runtime_handle._save_persistables( + executor, dirname, main_program=None, mode=increment_mode) + def save_inference_model(self, executor, dirname, @@ -607,6 +650,9 @@ class Fleet(object): fleet.init_server() """ + # warnings.warn( + # "'save_inference_model' is a deprecated, will be deleted after v2.2.0, Please use fleet.save instead." + # ) self._runtime_handle._save_inference_model( executor, dirname, feeded_var_names, target_vars, main_program, @@ -653,6 +699,9 @@ class Fleet(object): fleet.save_persistables(exe, "dirname", paddle.static.default_main_program()) """ + # warnings.warn( + # "'save_persistables' is a deprecated, will be deleted after v2.2.0, Please use fleet.save instead." + # ) self._runtime_handle._save_persistables(executor, dirname, main_program, mode) diff --git a/python/paddle/distributed/fleet/runtime/the_one_ps.py b/python/paddle/distributed/fleet/runtime/the_one_ps.py index ce68eb9a1f..d31fa549ad 100644 --- a/python/paddle/distributed/fleet/runtime/the_one_ps.py +++ b/python/paddle/distributed/fleet/runtime/the_one_ps.py @@ -32,7 +32,7 @@ def conv_indent(indent): return "".join([" "] * indent) -PSERVER_SAVE_SUFFIX = "_txt" +PSERVER_SAVE_SUFFIX = ".shard" class Accessor: @@ -916,7 +916,7 @@ class TheOnePSRuntime(RuntimeBase): self.compiled_strategy.origin_main_program, True) values = [] for id, names in context.items(): - if names not in distributed_varnames: + if names[0] not in distributed_varnames: # only save sparse param to local self._worker.recv_and_save_model(id, dirname) # save sparse & distributed param on server @@ -953,11 +953,11 @@ class TheOnePSRuntime(RuntimeBase): TheOnePSRuntime.__exclude_vars(saved_varnames), main_program.list_vars())) - fluid.io.save_vars( - executor, - main_program=main_program, - dirname=dirname, - vars=remaining_vars) + import paddle + for var in remaining_vars: + tensor = var.get_value() + paddle.save( + tensor, os.path.join(dirname, var.name), use_binary_format=True) def _ps_inference_save_persistables(self, executor, @@ -978,20 +978,19 @@ class TheOnePSRuntime(RuntimeBase): if isinstance(executor, ParallelExecutor): raise TypeError( - "in fleet.save_persistables() function, executor must be as Executor type, ParallelExecutor is not allowed" + "in fleet.save() function, executor must be as Executor type, ParallelExecutor is not allowed" ) if not isinstance(executor, Executor): raise TypeError( - "in fleet.save_persistables() function, executor must be as Executor type" - ) + "in fleet.save() function, executor must be as Executor type") if main_program is None: main_program = self.compiled_strategy.get_origin_ps_main_program() if isinstance(main_program, CompiledProgram): raise TypeError( - "in fleet.save_persistables() function, main_program must be as Program type, CompiledProgram is not allowed" + "in fleet.save() function, main_program must be as Program type, CompiledProgram is not allowed" ) # Todo(MrChengmo): Save optimizer status @@ -1013,37 +1012,36 @@ class TheOnePSRuntime(RuntimeBase): if isinstance(executor, ParallelExecutor): raise TypeError( - "in fleet.save_inference_model() function, executor must be as Executor type, ParallelExecutor is not allowed" + "in fleet.save() function, executor must be as Executor type, ParallelExecutor is not allowed" ) if not isinstance(executor, Executor): raise TypeError( - "in fleet.save_inference_model() function, executor must be as Executor type" + "in fleet.save() function, executor must be as Executor type") + + import paddle + program = self.origin_main_program if main_program is None else main_program + + if isinstance(program, CompiledProgram): + raise TypeError( + "in fleet.save() function, main_program must be as Program type, CompiledProgram is not allowed" ) - if main_program is not None: - if isinstance(main_program, CompiledProgram): - raise TypeError( - "in fleet.save_inference_model() function, main_program must be as Program type, CompiledProgram is not allowed" - ) - fluid.io.save_inference_model(dirname, feeded_var_names, - target_vars, executor, main_program, - None, None, export_for_deployment) - else: - fluid.io.save_inference_model(dirname, feeded_var_names, - target_vars, executor, - self.origin_main_program, None, None, - export_for_deployment, True) - model_basename = "__model__" - model_filename = os.path.join(dirname, model_basename) - - with open(model_filename, "rb") as f: - program_desc_str = f.read() - - program = Program.parse_from_string(program_desc_str) - program._copy_dist_param_info_from(fluid.default_main_program()) - self._ps_inference_save_persistables(executor, dirname, program, - mode) + feed_vars = [ + program.global_block().var(name) for name in feeded_var_names + ] + + infer_program = paddle.static.normalize_program(program, feed_vars, + target_vars) + + infer_program._copy_dist_param_info_from(program) + + model_basename = "__model__" + model_basename = os.path.join(dirname, model_basename) + paddle.save(infer_program, model_basename) + + self._ps_inference_save_persistables(executor, dirname, infer_program, + mode) def _save_inference_model(self, *args, **kwargs): self._ps_inference_save_inference_model(*args, **kwargs) diff --git a/python/paddle/fluid/tests/unittests/test_fleet_base_2.py b/python/paddle/fluid/tests/unittests/test_fleet_base_2.py index d666ea6740..7ca08bcb9d 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_base_2.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_base_2.py @@ -14,6 +14,8 @@ import unittest import paddle +paddle.enable_static() + import os import paddle.fluid as fluid @@ -21,18 +23,16 @@ import paddle.fluid as fluid class TestFleetBase(unittest.TestCase): def setUp(self): os.environ["POD_IP"] = "127.0.0.1" - os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001" os.environ["PADDLE_TRAINERS_NUM"] = "2" os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = \ - "127.0.0.1:36001,127.0.0.2:36001" + "127.0.0.1:36001,127.0.0.2:36001" def test_ps_minimize(self): import paddle import paddle.distributed.fleet as fleet - os.environ["TRAINING_ROLE"] = "PSERVER" - os.environ["POD_IP"] = "127.0.0.1" - os.environ["PADDLE_PORT"] = "36001" + os.environ["TRAINING_ROLE"] = "TRAINER" + os.environ["PADDLE_TRAINER_ID"] = "1" input_x = paddle.fluid.layers.data( name="x", shape=[32], dtype='float32') @@ -47,24 +47,26 @@ class TestFleetBase(unittest.TestCase): role = fleet.PaddleCloudRoleMaker(is_collective=False) fleet.init(role) + strategy = paddle.distributed.fleet.DistributedStrategy() strategy.a_sync = False + strategy.a_sync_configs = {"launch_barrier": False} + optimizer = paddle.optimizer.SGD(learning_rate=0.001) optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) optimizer.minimize(avg_cost) place = fluid.CPUPlace() exe = fluid.Executor(place) + exe.run(paddle.static.default_startup_program()) pe = fluid.ParallelExecutor(use_cuda=False, loss_name=avg_cost.name) compiled_prog = fluid.compiler.CompiledProgram( fluid.default_main_program()) - self.assertRaises( - Exception, - fleet.save_inference_model, - dirname='/tmp/', - feeded_var_names=['x', 'y'], - target_vars=[avg_cost], - executor=pe) + + fleet.fleet.save(dirname="/tmp", feed=['x', 'y'], fetch=[avg_cost]) + fleet.fleet.save( + dirname="/tmp", feed=[input_x, input_y], fetch=[avg_cost]) + fleet.fleet.save(dirname="/tmp") self.assertRaises( Exception, -- GitLab