未验证 提交 890f626b 编写于 作者: T tangwei12 提交者: GitHub

Optimize/fleet save (#32817)

* fix cpp lint
* fix save/load with unexpected value
* fix save and user interface
上级 e1a4c83c
......@@ -13,9 +13,9 @@
// limitations under the License.
#include "paddle/fluid/distributed/table/common_sparse_table.h"
#include <sstream>
#include "boost/lexical_cast.hpp"
#include "glog/logging.h"
#include "paddle/fluid/platform/enforce.h"
......@@ -25,7 +25,8 @@ class ValueBlock;
} // namespace distributed
} // namespace paddle
#define PSERVER_SAVE_SUFFIX "_txt"
#define PSERVER_SAVE_SUFFIX ".shard"
using boost::lexical_cast;
namespace paddle {
namespace distributed {
......@@ -100,7 +101,7 @@ struct Meta {
};
void ProcessALine(const std::vector<std::string>& columns, const Meta& meta,
std::vector<std::vector<float>>* values) {
const int64_t id, std::vector<std::vector<float>>* values) {
auto colunmn_size = columns.size();
auto load_values =
paddle::string::split_string<std::string>(columns[colunmn_size - 1], ",");
......@@ -116,8 +117,18 @@ void ProcessALine(const std::vector<std::string>& columns, const Meta& meta,
"The data format in txt does not meet the field "
"requirements defined in meta"));
std::transform(start, end, std::back_inserter(val),
[](std::string va) { return std::stof(va); });
std::transform(start, end, std::back_inserter(val), [id](std::string va) {
float v = 0.0;
try {
v = lexical_cast<float>(va);
} catch (boost::bad_lexical_cast& e) {
VLOG(0) << "id: " << id << " get unexpected value: " << va
<< " and be reset to: 0.0";
}
return v;
});
values->push_back(val);
offset += meta.dims[x];
}
......@@ -126,25 +137,29 @@ void ProcessALine(const std::vector<std::string>& columns, const Meta& meta,
int64_t SaveToText(std::ostream* os, std::shared_ptr<ValueBlock> block,
const int mode) {
int64_t save_num = 0;
for (auto& table : block->values_) {
for (auto& value : table) {
if (mode == SaveMode::delta && !value.second->need_save_) {
continue;
}
save_num += 1;
auto* vs = value.second->data_.data();
++save_num;
std::stringstream ss;
auto* vs = value.second->data_.data();
auto id = value.first;
ss << id << "\t" << value.second->count_ << "\t"
<< value.second->unseen_days_ << "\t" << value.second->is_entry_
<< "\t";
for (int i = 0; i < block->value_length_; i++) {
ss << vs[i];
ss << ",";
for (int i = 0; i < block->value_length_ - 1; i++) {
ss << std::to_string(vs[i]) << ",";
}
ss << std::to_string(vs[block->value_length_ - 1]);
ss << "\n";
os->write(ss.str().c_str(), sizeof(char) * ss.str().size());
......@@ -170,7 +185,7 @@ int64_t LoadFromText(const std::string& valuepath, const std::string& metapath,
while (std::getline(file, line)) {
auto values = paddle::string::split_string<std::string>(line, "\t");
auto id = std::stoull(values[0]);
auto id = lexical_cast<int64_t>(values[0]);
if (id % pserver_num != pserver_id) {
VLOG(3) << "will not load " << values[0] << " from " << valuepath
......@@ -182,15 +197,17 @@ int64_t LoadFromText(const std::string& valuepath, const std::string& metapath,
auto block = blocks->at(shard_id);
std::vector<std::vector<float>> kvalues;
ProcessALine(values, meta, &kvalues);
ProcessALine(values, meta, id, &kvalues);
block->Init(id, false);
VALUE* value_instant = block->GetValue(id);
if (values.size() == 5) {
value_instant->count_ = std::stoi(values[1]);
value_instant->unseen_days_ = std::stoi(values[2]);
value_instant->is_entry_ = static_cast<bool>(std::stoi(values[3]));
value_instant->count_ = lexical_cast<int>(values[1]);
value_instant->unseen_days_ = lexical_cast<int>(values[2]);
value_instant->is_entry_ =
static_cast<bool>(lexical_cast<int>(values[3]));
}
std::vector<float*> block_values = block->Get(id, meta.names, meta.dims);
......@@ -475,7 +492,7 @@ int32_t CommonSparseTable::pull_sparse_ptr(char** pull_values,
auto* value = block->InitGet(id);
// std::copy_n(value + param_offset_, param_dim_,
// pull_values + param_dim_ * offset);
pull_values[offset] = (char*)value;
pull_values[offset] = reinterpret_cast<char*>(value);
}
return 0;
......
......@@ -580,6 +580,49 @@ class Fleet(object):
"""
self._runtime_handle._stop_worker()
def save(self, dirname, feed=[], fetch=[], **configs):
inference = True
if not feed and not fetch:
inference = False
place = paddle.CPUPlace()
executor = paddle.static.Executor(place)
if inference:
feeded_var_names = []
fetch_var_names = []
for var in feed:
if isinstance(var, str):
feeded_var_names.append(var)
elif isinstance(var, paddle.static.Variable):
feeded_var_names.append(var.name)
else:
raise ValueError("feed must be [str|Variable]")
for var in fetch:
if isinstance(var, str):
fetch_var_names.append(var)
elif isinstance(var, paddle.static.Variable):
fetch_var_names.append(var.name)
else:
raise ValueError("feed must be [str|Variable]")
fetch_vars = [
paddle.static.default_main_program().global_block().var(name)
for name in fetch_var_names
]
self._runtime_handle._save_inference_model(
executor, dirname, feeded_var_names, fetch_vars, None, True, 0)
else:
increment_mode = 0
if "mode" in configs:
increment_mode = int(configs["mode"])
self._runtime_handle._save_persistables(
executor, dirname, main_program=None, mode=increment_mode)
def save_inference_model(self,
executor,
dirname,
......@@ -607,6 +650,9 @@ class Fleet(object):
fleet.init_server()
"""
# warnings.warn(
# "'save_inference_model' is a deprecated, will be deleted after v2.2.0, Please use fleet.save instead."
# )
self._runtime_handle._save_inference_model(
executor, dirname, feeded_var_names, target_vars, main_program,
......@@ -653,6 +699,9 @@ class Fleet(object):
fleet.save_persistables(exe, "dirname", paddle.static.default_main_program())
"""
# warnings.warn(
# "'save_persistables' is a deprecated, will be deleted after v2.2.0, Please use fleet.save instead."
# )
self._runtime_handle._save_persistables(executor, dirname, main_program,
mode)
......
......@@ -32,7 +32,7 @@ def conv_indent(indent):
return "".join([" "] * indent)
PSERVER_SAVE_SUFFIX = "_txt"
PSERVER_SAVE_SUFFIX = ".shard"
class Accessor:
......@@ -916,7 +916,7 @@ class TheOnePSRuntime(RuntimeBase):
self.compiled_strategy.origin_main_program, True)
values = []
for id, names in context.items():
if names not in distributed_varnames:
if names[0] not in distributed_varnames:
# only save sparse param to local
self._worker.recv_and_save_model(id, dirname)
# save sparse & distributed param on server
......@@ -953,11 +953,11 @@ class TheOnePSRuntime(RuntimeBase):
TheOnePSRuntime.__exclude_vars(saved_varnames),
main_program.list_vars()))
fluid.io.save_vars(
executor,
main_program=main_program,
dirname=dirname,
vars=remaining_vars)
import paddle
for var in remaining_vars:
tensor = var.get_value()
paddle.save(
tensor, os.path.join(dirname, var.name), use_binary_format=True)
def _ps_inference_save_persistables(self,
executor,
......@@ -978,20 +978,19 @@ class TheOnePSRuntime(RuntimeBase):
if isinstance(executor, ParallelExecutor):
raise TypeError(
"in fleet.save_persistables() function, executor must be as Executor type, ParallelExecutor is not allowed"
"in fleet.save() function, executor must be as Executor type, ParallelExecutor is not allowed"
)
if not isinstance(executor, Executor):
raise TypeError(
"in fleet.save_persistables() function, executor must be as Executor type"
)
"in fleet.save() function, executor must be as Executor type")
if main_program is None:
main_program = self.compiled_strategy.get_origin_ps_main_program()
if isinstance(main_program, CompiledProgram):
raise TypeError(
"in fleet.save_persistables() function, main_program must be as Program type, CompiledProgram is not allowed"
"in fleet.save() function, main_program must be as Program type, CompiledProgram is not allowed"
)
# Todo(MrChengmo): Save optimizer status
......@@ -1013,36 +1012,35 @@ class TheOnePSRuntime(RuntimeBase):
if isinstance(executor, ParallelExecutor):
raise TypeError(
"in fleet.save_inference_model() function, executor must be as Executor type, ParallelExecutor is not allowed"
"in fleet.save() function, executor must be as Executor type, ParallelExecutor is not allowed"
)
if not isinstance(executor, Executor):
raise TypeError(
"in fleet.save_inference_model() function, executor must be as Executor type"
)
"in fleet.save() function, executor must be as Executor type")
if main_program is not None:
if isinstance(main_program, CompiledProgram):
import paddle
program = self.origin_main_program if main_program is None else main_program
if isinstance(program, CompiledProgram):
raise TypeError(
"in fleet.save_inference_model() function, main_program must be as Program type, CompiledProgram is not allowed"
"in fleet.save() function, main_program must be as Program type, CompiledProgram is not allowed"
)
fluid.io.save_inference_model(dirname, feeded_var_names,
target_vars, executor, main_program,
None, None, export_for_deployment)
else:
fluid.io.save_inference_model(dirname, feeded_var_names,
target_vars, executor,
self.origin_main_program, None, None,
export_for_deployment, True)
model_basename = "__model__"
model_filename = os.path.join(dirname, model_basename)
with open(model_filename, "rb") as f:
program_desc_str = f.read()
feed_vars = [
program.global_block().var(name) for name in feeded_var_names
]
infer_program = paddle.static.normalize_program(program, feed_vars,
target_vars)
infer_program._copy_dist_param_info_from(program)
model_basename = "__model__"
model_basename = os.path.join(dirname, model_basename)
paddle.save(infer_program, model_basename)
program = Program.parse_from_string(program_desc_str)
program._copy_dist_param_info_from(fluid.default_main_program())
self._ps_inference_save_persistables(executor, dirname, program,
self._ps_inference_save_persistables(executor, dirname, infer_program,
mode)
def _save_inference_model(self, *args, **kwargs):
......
......@@ -14,6 +14,8 @@
import unittest
import paddle
paddle.enable_static()
import os
import paddle.fluid as fluid
......@@ -21,7 +23,6 @@ import paddle.fluid as fluid
class TestFleetBase(unittest.TestCase):
def setUp(self):
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001"
os.environ["PADDLE_TRAINERS_NUM"] = "2"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = \
"127.0.0.1:36001,127.0.0.2:36001"
......@@ -30,9 +31,8 @@ class TestFleetBase(unittest.TestCase):
import paddle
import paddle.distributed.fleet as fleet
os.environ["TRAINING_ROLE"] = "PSERVER"
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "36001"
os.environ["TRAINING_ROLE"] = "TRAINER"
os.environ["PADDLE_TRAINER_ID"] = "1"
input_x = paddle.fluid.layers.data(
name="x", shape=[32], dtype='float32')
......@@ -47,24 +47,26 @@ class TestFleetBase(unittest.TestCase):
role = fleet.PaddleCloudRoleMaker(is_collective=False)
fleet.init(role)
strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.a_sync = False
strategy.a_sync_configs = {"launch_barrier": False}
optimizer = paddle.optimizer.SGD(learning_rate=0.001)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
optimizer.minimize(avg_cost)
place = fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(paddle.static.default_startup_program())
pe = fluid.ParallelExecutor(use_cuda=False, loss_name=avg_cost.name)
compiled_prog = fluid.compiler.CompiledProgram(
fluid.default_main_program())
self.assertRaises(
Exception,
fleet.save_inference_model,
dirname='/tmp/',
feeded_var_names=['x', 'y'],
target_vars=[avg_cost],
executor=pe)
fleet.fleet.save(dirname="/tmp", feed=['x', 'y'], fetch=[avg_cost])
fleet.fleet.save(
dirname="/tmp", feed=[input_x, input_y], fetch=[avg_cost])
fleet.fleet.save(dirname="/tmp")
self.assertRaises(
Exception,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册