未验证 提交 ae75affd 编写于 作者: 1 123malin 提交者: GitHub

【Cherry-Pick】add distributed_infer (#30300) (#30427)

* test=develop, add distributed_infer (#30300)

* test=develop, add distributed_infer

* test=develop, fix unittest cmakefile conflict

* test=develop, fix test_dist_fleet_base
上级 e0e98627
......@@ -103,6 +103,7 @@ int32_t BrpcPsService::initialize() {
_service_handler_map[PS_BARRIER] = &BrpcPsService::barrier;
_service_handler_map[PS_START_PROFILER] = &BrpcPsService::start_profiler;
_service_handler_map[PS_STOP_PROFILER] = &BrpcPsService::stop_profiler;
_service_handler_map[PS_PUSH_GLOBAL_STEP] = &BrpcPsService::push_global_step;
// shard初始化,server启动后才可从env获取到server_list的shard信息
initialize_shard_info();
......
......@@ -94,23 +94,28 @@ struct Meta {
void ProcessALine(const std::vector<std::string>& columns, const Meta& meta,
std::vector<std::vector<float>>* values) {
PADDLE_ENFORCE_EQ(columns.size(), meta.names.size() + 1,
PADDLE_ENFORCE_EQ(columns.size(), 2,
paddle::platform::errors::InvalidArgument(
"record in txt do not match meta."));
"The data format does not meet the requirements. It "
"should look like feasign_id \t params."));
values->reserve(columns.size() - 1);
for (int x = 1; x < columns.size(); ++x) {
auto& column = columns[x];
auto val_ = paddle::string::split_string<std::string>(column, ",");
auto load_values = paddle::string::split_string<std::string>(columns[1], ",");
values->reserve(meta.names.size());
int offset = 0;
for (int x = 0; x < meta.names.size(); ++x) {
std::vector<float> val;
std::transform(val_.begin(), val_.end(), std::back_inserter(val),
[](std::string va) { return std::stof(va); });
PADDLE_ENFORCE_EQ(val.size(), meta.dims[x - 1],
auto start = load_values.begin() + offset;
auto end = load_values.begin() + offset + meta.dims[x];
PADDLE_ENFORCE_LE(offset + meta.dims[x], load_values.size(),
paddle::platform::errors::InvalidArgument(
"record in txt do not match meta."));
"The data format in txt does not meet the field "
"requirements defined in meta"));
std::transform(start, end, std::back_inserter(val),
[](std::string va) { return std::stof(va); });
values->push_back(val);
offset += meta.dims[x];
}
}
......
......@@ -19,7 +19,7 @@ from .base.fleet_base import Fleet
from .base.util_factory import UtilBase
from .dataset import *
from .data_generator import MultiSlotDataGenerator, MultiSlotStringDataGenerator
#from . import metrics
from . import metrics
__all__ = [
"DistributedStrategy",
......
......@@ -13,11 +13,10 @@
# limitations under the License.
"""Fleet Metrics"""
import paddle.fluid as fluid
import math
import numpy as np
from paddle.fluid.framework import Variable
import paddle.distributed.fleet as fleet
from paddle.static import Variable
import paddle
def sum(input, scope=None, util=None):
......@@ -46,9 +45,9 @@ def sum(input, scope=None, util=None):
print("sum array: ", paddle.distributed.fleet.sum(res))
"""
if scope is None:
scope = fluid.global_scope()
scope = paddle.static.global_scope()
if util is None:
util = fleet.util
util = paddle.distributed.fleet.util
if isinstance(input, Variable):
input = np.array(scope.find_var(input.name).get_tensor())
elif isinstance(input, str):
......@@ -86,9 +85,9 @@ def max(input, scope=None, util=None):
print("max array: ", paddle.distributed.fleet.max(res))
"""
if scope is None:
scope = fluid.global_scope()
scope = paddle.static.global_scope()
if util is None:
util = fleet.util
util = paddle.distributed.fleet.util
if isinstance(input, Variable):
input = np.array(scope.find_var(input.name).get_tensor())
elif isinstance(input, str):
......@@ -126,9 +125,9 @@ def min(input, scope=None, util=None):
print("min array: ", paddle.distributed.fleet.min(res))
"""
if scope is None:
scope = fluid.global_scope()
scope = paddle.static.global_scope()
if util is None:
util = fleet.util
util = paddle.distributed.fleet.util
if isinstance(input, Variable):
input = np.array(scope.find_var(input.name).get_tensor())
elif isinstance(input, str):
......@@ -168,9 +167,9 @@ def auc(stat_pos, stat_neg, scope=None, util=None):
print("auc: ", paddle.distributed.fleet.auc(pos, neg))
"""
if scope is None:
scope = fluid.global_scope()
scope = paddle.static.global_scope()
if util is None:
util = fleet.util
util = paddle.distributed.fleet.util
if isinstance(stat_pos, Variable):
stat_pos = np.array(scope.find_var(stat_pos.name).get_tensor())
......@@ -246,9 +245,9 @@ def mae(abserr, total_ins_num, scope=None, util=None):
print("mae: ", paddle.distributed.fleet.mae(res, total_ins_num))
"""
if scope is None:
scope = fluid.global_scope()
scope = paddle.static.global_scope()
if util is None:
util = fleet.util
util = paddle.distributed.fleet.util
if isinstance(abserr, Variable):
abserr = np.array(scope.find_var(abserr.name).get_tensor())
......@@ -289,9 +288,9 @@ def rmse(sqrerr, total_ins_num, scope=None, util=None):
print("rmse: ", paddle.distributed.fleet.rmse(res, total_ins_num))
"""
if scope is None:
scope = fluid.global_scope()
scope = paddle.static.global_scope()
if util is None:
util = fleet.util
util = paddle.distributed.fleet.util
if isinstance(sqrerr, Variable):
sqrerr = np.array(scope.find_var(sqrerr.name).get_tensor())
......@@ -331,9 +330,9 @@ def mse(sqrerr, total_ins_num, scope=None, util=None):
print("mse: ", paddle.distributed.fleet.mse(metric, total_ins_num))
"""
if scope is None:
scope = fluid.global_scope()
scope = paddle.static.global_scope()
if util is None:
util = fleet.util
util = paddle.distributed.fleet.util
if isinstance(sqrerr, Variable):
sqrerr = np.array(scope.find_var(sqrerr.name).get_tensor())
......@@ -384,9 +383,9 @@ def acc(correct, total, scope=None, util=None):
print("accuracy: ", paddle.distributed.fleet.acc(correct_num, total_num))
"""
if scope is None:
scope = fluid.global_scope()
scope = paddle.static.global_scope()
if util is None:
util = fleet.util
util = paddle.distributed.fleet.util
if isinstance(correct, Variable):
correct = np.array(scope.find_var(correct.name).get_tensor())
......
......@@ -30,6 +30,9 @@ def conv_indent(indent):
return "".join([" "] * indent)
PSERVER_SAVE_SUFFIX = "_txt"
class Accessor:
def __init__(self):
self.accessor_class = ""
......@@ -789,9 +792,9 @@ class TheOnePSRuntime(RuntimeBase):
begin = time.time()
for var_name in load_varnames:
table_id = sparse_table_maps[var_name]
path = os.path.join(dirname, var_name,
path = os.path.join(dirname, var_name + PSERVER_SAVE_SUFFIX,
"{}.block{}.txt".format(var_name, pserver_id))
meta = os.path.join(dirname, var_name,
meta = os.path.join(dirname, var_name + PSERVER_SAVE_SUFFIX,
"{}.block{}.meta".format(var_name, pserver_id))
self._server.load_sparse(path, meta, table_id)
end = time.time()
......
......@@ -13,4 +13,4 @@
# limitations under the License.
from .fs import LocalFS, HDFSClient
from .ps_util import Distributed
from .ps_util import DistributedInfer
......@@ -14,11 +14,104 @@
"""Parameter Server utils"""
import numpy as np
class Distributed:
@staticmethod
def estimate(main_program, varname2tables):
import os
import paddle
class DistributedInfer:
"""
Utility class for distributed infer of PaddlePaddle.
"""
def __init__(self, main_program=None, startup_program=None):
if main_program:
self.origin_main_program = main_program.clone()
else:
self.origin_main_program = paddle.static.default_main_program(
).clone()
if startup_program:
self.origin_startup_program = startup_program
else:
self.origin_startup_program = paddle.static.default_startup_program(
)
self.sparse_table_maps = None
def init_distributed_infer_env(self,
exe,
loss,
role_maker=None,
dirname=None):
import paddle.distributed.fleet as fleet
if fleet.fleet._runtime_handle is None:
fleet.init(role_maker=role_maker)
fake_optimizer = paddle.optimizer.SGD()
strategy = fleet.DistributedStrategy()
strategy.a_sync = True
optimizer = fleet.distributed_optimizer(
fake_optimizer, strategy=strategy)
optimizer.minimize(
loss, startup_program=self.origin_startup_program)
if fleet.is_server():
fleet.init_server(dirname=dirname)
fleet.run_server()
else:
exe.run(paddle.static.default_startup_program())
fleet.init_worker()
self._init_dense_params(exe, dirname)
global_startup_program = paddle.static.default_startup_program()
global_startup_program = self.origin_startup_program
global_main_program = paddle.static.default_main_program()
global_main_program = self.origin_main_program
def _get_sparse_table_map(self):
import paddle.distributed.fleet as fleet
if self.sparse_table_maps is None:
self.sparse_table_maps = {}
send_ctx = fleet.fleet._runtime_handle._communicator.send_ctx_
for gradname, ctx in send_ctx.items():
if ctx.is_sparse:
param = gradname.strip("@GRAD")
self.sparse_table_maps[param] = ctx.table_id()
else:
continue
return self.sparse_table_maps
def _init_dense_params(self, exe=None, dirname=None):
import paddle.distributed.fleet as fleet
sparse_table_maps = self._get_sparse_table_map()
if dirname is not None and exe is not None:
all_persist_vars = [
v for v in self.origin_main_program.list_vars()
if paddle.static.io.is_persistable(v)
]
dense_persist_vars = [(v.name, v) for v in all_persist_vars
if v.name not in sparse_table_maps]
need_load_vars = [
v[1] for v in dense_persist_vars
if os.path.isfile(os.path.join(dirname, v[0]))
]
paddle.static.load_vars(
exe,
dirname,
main_program=self.origin_main_program,
vars=need_load_vars)
def get_dist_infer_program(self):
import paddle.distributed.fleet as fleet
varname2tables = self._get_sparse_table_map()
convert_program = self._convert_program(self.origin_main_program,
varname2tables)
return convert_program
def _convert_program(self, main_program, varname2tables):
def distributed_ops_pass(program):
SPARSE_OP_TYPE_DICT = {"lookup_table": "W", "lookup_table_v2": "W"}
......
......@@ -661,6 +661,7 @@ endif()
if (WITH_DISTRIBUTE)
set_tests_properties(test_communicator_half_async PROPERTIES TIMEOUT 120)
set_tests_properties(test_dist_fleet_infer PROPERTIES TIMEOUT 200)
endif()
if (WITH_DISTRIBUTE AND NOT APPLE)
......
......@@ -28,7 +28,7 @@ import numpy as np
import ctr_dataset_reader
from test_dist_fleet_base import runtime_main, FleetDistRunnerBase
from paddle.distributed.fleet.utils.ps_util import Distributed
from paddle.distributed.fleet.utils.ps_util import DistributedInfer
import paddle.distributed.fleet as fleet
paddle.enable_static()
......@@ -165,17 +165,11 @@ class TestDistCTR2x2(FleetDistRunnerBase):
with open(os.path.join(dirname, "__model__.proto"), "w") as wn:
wn.write(str(program))
def do_distributed_testing(self, args, test_main_program,
test_startup_program):
def do_distributed_testing(self, fleet):
"""
do distributed
"""
device_env = os.getenv("DEVICE", 'cpu')
if device_env == 'cpu':
device = fluid.CPUPlace()
elif device_env == 'gpu':
device = fluid.CUDAPlace(0)
exe = fluid.Executor(device)
exe = self.get_executor()
batch_size = 4
test_reader = paddle.batch(fake_ctr_reader(), batch_size=batch_size)
......@@ -188,7 +182,7 @@ class TestDistCTR2x2(FleetDistRunnerBase):
try:
while True:
batch_idx += 1
loss_val = exe.run(program=test_main_program,
loss_val = exe.run(program=paddle.static.default_main_program(),
fetch_list=[self.avg_cost.name])
loss_val = np.mean(loss_val)
message = "TEST ---> batch_idx: {} loss: {}\n".format(batch_idx,
......@@ -207,12 +201,7 @@ class TestDistCTR2x2(FleetDistRunnerBase):
Args:
fleet(Fleet api): the fleet object of Parameter Server, define distribute training role
"""
device_env = os.getenv("DEVICE", 'cpu')
if device_env == 'cpu':
device = fluid.CPUPlace()
elif device_env == 'gpu':
device = fluid.CUDAPlace(0)
exe = fluid.Executor(device)
exe = self.get_executor()
exe.run(fluid.default_startup_program())
fleet.init_worker()
......@@ -250,13 +239,7 @@ class TestDistCTR2x2(FleetDistRunnerBase):
def do_dataset_training(self, fleet):
train_file_list = ctr_dataset_reader.prepare_fake_data()
device_env = os.getenv("DEVICE", 'cpu')
if device_env == 'cpu':
device = fluid.CPUPlace()
elif device_env == 'gpu':
device = fluid.CUDAPlace(0)
exe = fluid.Executor(device)
exe = self.get_executor()
exe.run(fluid.default_startup_program())
fleet.init_worker()
......
......@@ -13,7 +13,7 @@
# limitations under the License.
from __future__ import print_function
from paddle.distributed.fleet.utils.ps_util import Distributed
from paddle.distributed.fleet.utils.ps_util import DistributedInfer
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
import paddle.distributed.fleet as fleet
import paddle.distributed.fleet.base.role_maker as role_maker
......@@ -37,11 +37,6 @@ import tempfile
import unittest
import paddle
import paddle.fluid as fluid
import paddle.distributed.fleet.base.role_maker as role_maker
import paddle.distributed.fleet as fleet
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
from paddle.distributed.fleet.utils.ps_util import Distributed
paddle.enable_static()
__all__ = ['FleetDistRunnerBase', 'TestFleetBase', 'runtime_main']
......@@ -58,6 +53,9 @@ class FleetDistRunnerBase(object):
do training : exe run program
"""
def __init__(self):
self._exe = None
def build_role(self, args):
if args.role.upper() == "PSERVER":
......@@ -159,6 +157,16 @@ class FleetDistRunnerBase(object):
raise NotImplementedError(
"get_model should be implemented by child classes.")
def get_executor(self):
if self._exe is None:
device_env = os.getenv("DEVICE", 'cpu')
if device_env == 'cpu':
device = fluid.CPUPlace()
elif device_env == 'gpu':
device = fluid.CUDAPlace(0)
self._exe = fluid.Executor(device)
return self._exe
def do_dataset_training(self, fleet):
raise NotImplementedError(
"do_dataset_training should be implemented by child classes.")
......@@ -193,6 +201,7 @@ class TestFleetBase(unittest.TestCase):
self._trainers = 2
self._pservers = 2
self._need_test = 0
self._model_dir = ""
self._port_set = set()
global DIST_UT_PORT
......@@ -290,6 +299,10 @@ class TestFleetBase(unittest.TestCase):
self._trainers, self._mode, self._geo_sgd_need_push_nums,
self._reader, gloo_path, self._need_test)
if self._model_dir:
tr_cmd += " --model_dir {}".format(self._model_dir)
ps_cmd += " --model_dir {}".format(self._model_dir)
# Run dist train to compare with local results
ps0, ps1, ps0_pipe, ps1_pipe = self._start_pserver(ps_cmd, env)
tr0, tr1, tr0_pipe, tr1_pipe = self._start_trainer(tr_cmd, env)
......@@ -381,14 +394,32 @@ def runtime_main(test_class):
'--geo_sgd_need_push_nums', type=int, required=False, default=2)
parser.add_argument('--reader', type=str, required=False, default='dataset')
parser.add_argument('--test', type=int, required=False, default=0)
parser.add_argument('--model_dir', type=str, required=False, default="")
args = parser.parse_args()
model = test_class()
role = model.build_role(args)
if args.test and args.model_dir != "":
avg_cost = model.net(args, is_train=False)
dist_infer = DistributedInfer()
dist_infer.init_distributed_infer_env(
exe=model.get_executor(),
loss=model.avg_cost,
role_maker=role,
dirname=args.model_dir)
if fleet.is_worker():
with paddle.static.program_guard(
main_program=dist_infer.get_dist_infer_program()):
model.do_distributed_testing(fleet)
fleet.stop_worker()
return
fleet.init(role)
strategy = model.build_strategy(args)
avg_cost = model.net(args)
model.build_optimizer(avg_cost, strategy)
if args.role == "pserver":
model.run_pserver(args)
else:
......@@ -398,26 +429,17 @@ def runtime_main(test_class):
model.run_pyreader_trainer(args)
if args.test:
test_origin_program = fluid.Program()
test_startup_program = fluid.Program()
with fluid.program_guard(
test_origin_program = paddle.static.Program()
test_startup_program = paddle.static.Program()
with paddle.static.program_guard(
main_program=test_origin_program,
startup_program=test_startup_program):
with fluid.unique_name.guard():
with paddle.utils.unique_name.guard():
avg_cost = model.net(args, is_train=False)
send_ctx = fleet.fleet._runtime_handle._communicator.send_ctx_
varname2tables = {}
for gradname, ctx in send_ctx.items():
if ctx.is_sparse:
param = gradname.strip("@GRAD")
varname2tables[param] = ctx.table_id()
else:
continue
ps_util = Distributed()
test_main_program = ps_util.estimate(test_origin_program,
varname2tables)
print(str(test_main_program))
print(str(test_startup_program))
model.do_distributed_testing(args, test_main_program,
test_startup_program)
dist_infer = DistributedInfer(
main_program=test_origin_program,
startup_program=test_startup_program)
with paddle.static.program_guard(
main_program=dist_infer.get_dist_infer_program()):
model.do_distributed_testing(fleet)
fleet.stop_worker()
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
import shutil
import unittest
import tempfile
import tarfile
from test_dist_fleet_base import TestFleetBase
from paddle.dataset.common import download, DATA_HOME
class TestDistCtrInfer(TestFleetBase):
def _setup_config(self):
self._mode = "async"
self._reader = "pyreader"
self._need_test = 1
data_url = "https://fleet.bj.bcebos.com/unittest/ctr_saved_params.tar.gz"
data_md5 = "aa7e8286ced566ea8a67410be7482438"
module_name = "ctr_saved_params"
path = download(data_url, module_name, data_md5)
print('ctr_params is downloaded at ' + path)
tar = tarfile.open(path)
unzip_folder = tempfile.mkdtemp()
tar.extractall(unzip_folder)
self._model_dir = unzip_folder
def check_with_place(self,
model_file,
delta=1e-3,
check_error_log=False,
need_envs={}):
required_envs = {
"PATH": os.getenv("PATH", ""),
"PYTHONPATH": os.getenv("PYTHONPATH", ""),
"LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""),
"FLAGS_rpc_deadline": "30000", # 5sec to fail fast
"http_proxy": "",
"FLAGS_communicator_send_queue_size": "2",
"FLAGS_communicator_max_merge_var_num": "2",
"CPU_NUM": "2",
}
required_envs.update(need_envs)
if check_error_log:
required_envs["GLOG_v"] = "3"
required_envs["GLOG_logtostderr"] = "1"
tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs)
def test_dist_infer(self):
self.check_with_place(
"dist_fleet_ctr.py", delta=1e-5, check_error_log=False)
shutil.rmtree(self._model_dir)
class TestDistCtrTrainInfer(TestFleetBase):
def _setup_config(self):
self._mode = "async"
self._reader = "pyreader"
self._need_test = 1
def check_with_place(self,
model_file,
delta=1e-3,
check_error_log=False,
need_envs={}):
required_envs = {
"PATH": os.getenv("PATH", ""),
"PYTHONPATH": os.getenv("PYTHONPATH", ""),
"LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""),
"FLAGS_rpc_deadline": "30000", # 5sec to fail fast
"http_proxy": "",
"FLAGS_communicator_send_queue_size": "2",
"FLAGS_communicator_max_merge_var_num": "2",
"CPU_NUM": "2",
}
required_envs.update(need_envs)
if check_error_log:
required_envs["GLOG_v"] = "3"
required_envs["GLOG_logtostderr"] = "1"
tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs)
def test_dist_train_infer(self):
self.check_with_place(
"dist_fleet_ctr.py", delta=1e-5, check_error_log=False)
if __name__ == "__main__":
unittest.main()
......@@ -73,6 +73,7 @@ class TestFleetMetric(unittest.TestCase):
pass
self.util = FakeUtil(FakeFleet())
fleet.util = self.util
def test_metric_1(self):
"""Test cases for metrics."""
......@@ -104,14 +105,14 @@ class TestFleetMetric(unittest.TestCase):
metric.rmse(t1, 3, scope, self.util)
metric.mse(t1, 3, scope, self.util)
metric.acc(t, t1, scope, self.util)
metric.sum(str(t.name), scope, self.util)
metric.max(str(t.name), scope, self.util)
metric.min(str(t.name), scope, self.util)
metric.auc(str(t1.name), str(t.name), scope, self.util)
metric.mae(str(t1.name), 3, scope, self.util)
metric.rmse(str(t1.name), 3, scope, self.util)
metric.mse(str(t1.name), 3, scope, self.util)
metric.acc(str(t.name), str(t1.name), scope, self.util)
metric.sum(str(t.name))
metric.max(str(t.name))
metric.min(str(t.name))
metric.auc(str(t1.name), str(t.name))
metric.mae(str(t1.name), 3)
metric.rmse(str(t1.name), 3)
metric.mse(str(t1.name), 3)
metric.acc(str(t.name), str(t1.name))
arr = np.array([1, 2, 3, 4])
metric.sum(arr, util=self.util)
metric.max(arr, util=self.util)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册