未验证 提交 0e101c4f 编写于 作者: C Chengmo 提交者: GitHub

Fix test dist fleet heter ctr (#27513)

* fix test_dist_fleet_heter_ctr & peformance update
上级 42065ba3
...@@ -97,6 +97,7 @@ message AsyncConfig { ...@@ -97,6 +97,7 @@ message AsyncConfig {
optional int32 thread_pool_size = 6 [ default = 1 ]; optional int32 thread_pool_size = 6 [ default = 1 ];
optional int32 send_wait_times = 7 [ default = 1 ]; optional int32 send_wait_times = 7 [ default = 1 ];
optional bool runtime_split_send_recv = 8 [ default = false ]; optional bool runtime_split_send_recv = 8 [ default = false ];
optional bool launch_barrier = 9 [ default = true ];
} }
message PipelineConfig { optional int32 micro_batch = 1 [ default = 1 ]; } message PipelineConfig { optional int32 micro_batch = 1 [ default = 1 ]; }
......
...@@ -175,10 +175,6 @@ void RecvGeoSparseRecords(const CommContext &rpc_ctx, ...@@ -175,10 +175,6 @@ void RecvGeoSparseRecords(const CommContext &rpc_ctx,
template <typename T> template <typename T>
void RecvLodTensor(const CommContext &rpc_ctx, const framework::Scope &scope) { void RecvLodTensor(const CommContext &rpc_ctx, const framework::Scope &scope) {
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
auto cpu_place = platform::CPUPlace();
auto &cpu_ctx = *pool.Get(cpu_place);
distributed::RPCClient *rpc_client = distributed::RPCClient *rpc_client =
distributed::RPCClient::GetInstance<RPCCLIENT_T>(rpc_ctx.trainer_id); distributed::RPCClient::GetInstance<RPCCLIENT_T>(rpc_ctx.trainer_id);
...@@ -188,8 +184,13 @@ void RecvLodTensor(const CommContext &rpc_ctx, const framework::Scope &scope) { ...@@ -188,8 +184,13 @@ void RecvLodTensor(const CommContext &rpc_ctx, const framework::Scope &scope) {
if (rpc_ctx.origin_varnames.size() == 1 && if (rpc_ctx.origin_varnames.size() == 1 &&
rpc_ctx.splited_varnames.size() == 1) { rpc_ctx.splited_varnames.size() == 1) {
auto varname = rpc_ctx.origin_varnames[0]; auto varname = rpc_ctx.origin_varnames[0];
VLOG(4) << "recv " << varname << " from " << rpc_ctx.epmap[0]; const auto place =
rets.push_back(rpc_client->AsyncGetVarNoBarrier(rpc_ctx.epmap[0], cpu_ctx, scope.FindVar(varname)->Get<framework::LoDTensor>().place();
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
auto &ctx = *pool.Get(place);
VLOG(4) << "recv " << varname << " from " << rpc_ctx.epmap[0] << " in gpu? "
<< platform::is_gpu_place(place);
rets.push_back(rpc_client->AsyncGetVarNoBarrier(rpc_ctx.epmap[0], ctx,
scope, varname, varname)); scope, varname, varname));
for (size_t i = 0; i < rets.size(); i++) { for (size_t i = 0; i < rets.size(); i++) {
......
...@@ -495,7 +495,7 @@ class RoleMakerBase(object): ...@@ -495,7 +495,7 @@ class RoleMakerBase(object):
Returns: Returns:
string: all heter_trainers'endpoints string: all heter_trainers'endpoints
""" """
assert self._heter_trainer_endpoints != [] assert self._heter_trainer_endpoints != [], "Heter Worker Endpoints Not initialized"
return self._heter_trainer_endpoints return self._heter_trainer_endpoints
def _get_heter_worker_endpoint(self): def _get_heter_worker_endpoint(self):
...@@ -505,10 +505,10 @@ class RoleMakerBase(object): ...@@ -505,10 +505,10 @@ class RoleMakerBase(object):
e.g: if we have 4 cpu-trainer(default), 2 gpu-trainer(heter) e.g: if we have 4 cpu-trainer(default), 2 gpu-trainer(heter)
then No.0 and No.2 cpu-trainer will work with No.0 gpu-trainer then No.0 and No.2 cpu-trainer will work with No.0 gpu-trainer
and No.1 and No.3 cpu-trainer will work with No.1 gpu-trainerr and No.1 and No.3 cpu-trainer will work with No.1 gpu-trainer
""" """
assert self._heter_trainer_endpoints != [] assert self._heter_trainer_endpoints != [], "Heter Worker Endpoints Not initialized"
return self._heter_trainer_endpoints[(self._current_id + 1) % return self._heter_trainer_endpoints[(self._current_id) %
self._heter_worker_num()] self._heter_worker_num()]
def _get_heter_worker_device(self): def _get_heter_worker_device(self):
......
...@@ -23,6 +23,7 @@ from paddle.fluid.executor import Executor ...@@ -23,6 +23,7 @@ from paddle.fluid.executor import Executor
from paddle.fluid.parallel_executor import ParallelExecutor from paddle.fluid.parallel_executor import ParallelExecutor
from .runtime_base import RuntimeBase from .runtime_base import RuntimeBase
from ..base.private_helper_function import wait_server_ready
class ParameterServerRuntime(RuntimeBase): class ParameterServerRuntime(RuntimeBase):
...@@ -94,8 +95,8 @@ class ParameterServerRuntime(RuntimeBase): ...@@ -94,8 +95,8 @@ class ParameterServerRuntime(RuntimeBase):
return False return False
if var.desc.type() == core.VarDesc.VarType.FEED_MINIBATCH or \ if var.desc.type() == core.VarDesc.VarType.FEED_MINIBATCH or \
var.desc.type() == core.VarDesc.VarType.FETCH_LIST or \ var.desc.type() == core.VarDesc.VarType.FETCH_LIST or \
var.desc.type() == core.VarDesc.VarType.READER: var.desc.type() == core.VarDesc.VarType.READER:
return False return False
return var.persistable return var.persistable
...@@ -161,6 +162,17 @@ class ParameterServerRuntime(RuntimeBase): ...@@ -161,6 +162,17 @@ class ParameterServerRuntime(RuntimeBase):
trainer_config = self.async_strategy.get_trainer_runtime_config() trainer_config = self.async_strategy.get_trainer_runtime_config()
dist_strategy = self.context["valid_strategy"]
launch_barrier = dist_strategy.a_sync_configs["launch_barrier"]
if launch_barrier:
# for trainer wait server ready
wait_server_ready(self.role_maker._get_pserver_endpoints())
# for ps-heter mode, wait heter worker ready
if self.role_maker._is_heter_parameter_server_mode and self.role_maker._is_worker(
):
wait_server_ready(self.role_maker._get_heter_worker_endpoints())
lrs = _has_global_step(_get_lr_ops(self.origin_main_program)) lrs = _has_global_step(_get_lr_ops(self.origin_main_program))
if lrs: if lrs:
...@@ -312,7 +324,7 @@ class ParameterServerRuntime(RuntimeBase): ...@@ -312,7 +324,7 @@ class ParameterServerRuntime(RuntimeBase):
opts = _get_optimize_ops(self.origin_main_program) opts = _get_optimize_ops(self.origin_main_program)
for op in opts: for op in opts:
if "Param" in op.input_names and \ if "Param" in op.input_names and \
"LearningRate" in op.input_names and op.input("Param")[0] == param_name: "LearningRate" in op.input_names and op.input("Param")[0] == param_name:
return op return op
def _save_dense_params(self, executor, dirname, context, main_program): def _save_dense_params(self, executor, dirname, context, main_program):
......
...@@ -153,7 +153,7 @@ def gen_fake_line(dnn_data_num=7, ...@@ -153,7 +153,7 @@ def gen_fake_line(dnn_data_num=7,
return line return line
def prepare_fake_data(file_nums=9, file_lines=1000): def prepare_fake_data(file_nums=6, file_lines=1000):
""" """
Create fake data with same type as avazu_ctr_data Create fake data with same type as avazu_ctr_data
""" """
......
...@@ -206,13 +206,6 @@ class TestHeterPsCTR2x2(FleetDistHeterRunnerBase): ...@@ -206,13 +206,6 @@ class TestHeterPsCTR2x2(FleetDistHeterRunnerBase):
debug=int(os.getenv("Debug", "0"))) debug=int(os.getenv("Debug", "0")))
pass_time = time.time() - pass_start pass_time = time.time() - pass_start
print("do_dataset_training done. using time {}".format(pass_time)) print("do_dataset_training done. using time {}".format(pass_time))
if os.getenv("SAVE_MODEL") == "1":
model_dir = tempfile.mkdtemp()
fleet.save_inference_model(exe, model_dir,
[feed.name for feed in self.feeds],
self.avg_cost)
self.check_model_right(model_dir)
shutil.rmtree(model_dir)
fleet.stop_worker() fleet.stop_worker()
print("do_dataset_training stop worker.") print("do_dataset_training stop worker.")
......
...@@ -113,6 +113,7 @@ class TestCommunicatorGeoEnd2End(unittest.TestCase): ...@@ -113,6 +113,7 @@ class TestCommunicatorGeoEnd2End(unittest.TestCase):
strategy = paddle.distributed.fleet.DistributedStrategy() strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.a_sync = True strategy.a_sync = True
strategy.a_sync_configs = {"k_steps": 100} strategy.a_sync_configs = {"k_steps": 100}
strategy.a_sync_configs = {"launch_barrier": False}
if training_role == "TRAINER": if training_role == "TRAINER":
self.run_trainer(role, strategy) self.run_trainer(role, strategy)
......
...@@ -51,6 +51,7 @@ class TestCommunicator(unittest.TestCase): ...@@ -51,6 +51,7 @@ class TestCommunicator(unittest.TestCase):
strategy = paddle.distributed.fleet.DistributedStrategy() strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.a_sync = False strategy.a_sync = False
strategy.a_sync_configs = {"launch_barrier": False}
optimizer = fleet.distributed_optimizer(optimizer, strategy) optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(avg_cost) optimizer.minimize(avg_cost)
......
...@@ -52,6 +52,7 @@ class TestFleetGradientMergeMetaOptimizer(unittest.TestCase): ...@@ -52,6 +52,7 @@ class TestFleetGradientMergeMetaOptimizer(unittest.TestCase):
strategy = paddle.distributed.fleet.DistributedStrategy() strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.a_sync = True strategy.a_sync = True
strategy.a_sync_configs = {"launch_barrier": False}
optimizer = paddle.fluid.optimizer.SGD(learning_rate=0.01) optimizer = paddle.fluid.optimizer.SGD(learning_rate=0.01)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
optimizer.minimize(avg_cost) optimizer.minimize(avg_cost)
...@@ -92,6 +93,7 @@ class TestFleetGradientMergeMetaOptimizer(unittest.TestCase): ...@@ -92,6 +93,7 @@ class TestFleetGradientMergeMetaOptimizer(unittest.TestCase):
strategy = paddle.distributed.fleet.DistributedStrategy() strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.a_sync = True strategy.a_sync = True
strategy.a_sync_configs = {"launch_barrier": False}
optimizer = paddle.fluid.optimizer.SGD(learning_rate=0.01) optimizer = paddle.fluid.optimizer.SGD(learning_rate=0.01)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
optimizer.minimize(avg_cost) optimizer.minimize(avg_cost)
......
...@@ -44,6 +44,7 @@ class TestFleetGradientMergeMetaOptimizer(unittest.TestCase): ...@@ -44,6 +44,7 @@ class TestFleetGradientMergeMetaOptimizer(unittest.TestCase):
strategy = paddle.distributed.fleet.DistributedStrategy() strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.a_sync = False strategy.a_sync = False
strategy.a_sync_configs = {"launch_barrier": False}
optimizer = paddle.fluid.optimizer.SGD(learning_rate=0.01) optimizer = paddle.fluid.optimizer.SGD(learning_rate=0.01)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
optimizer.minimize(avg_cost) optimizer.minimize(avg_cost)
......
...@@ -312,9 +312,6 @@ class TestFleetBase(unittest.TestCase): ...@@ -312,9 +312,6 @@ class TestFleetBase(unittest.TestCase):
"========================Error tr1_err end===========================" "========================Error tr1_err end==========================="
) )
self.assertEqual(tr0_ret, 0, "something wrong in tr0, please check")
self.assertEqual(tr1_ret, 0, "something wrong in tr1, please check")
# close trainer file # close trainer file
tr0_pipe.close() tr0_pipe.close()
tr1_pipe.close() tr1_pipe.close()
...@@ -325,6 +322,8 @@ class TestFleetBase(unittest.TestCase): ...@@ -325,6 +322,8 @@ class TestFleetBase(unittest.TestCase):
ps1.terminate() ps1.terminate()
shutil.rmtree(gloo_path) shutil.rmtree(gloo_path)
self.assertEqual(tr0_ret, 0, "something wrong in tr0, please check")
self.assertEqual(tr1_ret, 0, "something wrong in tr1, please check")
return 0, 0 return 0, 0
def check_with_place(self, def check_with_place(self,
......
...@@ -81,7 +81,7 @@ class FleetDistHeterRunnerBase(object): ...@@ -81,7 +81,7 @@ class FleetDistHeterRunnerBase(object):
def build_strategy(self, args): def build_strategy(self, args):
self.strategy = paddle.distributed.fleet.DistributedStrategy() self.strategy = paddle.distributed.fleet.DistributedStrategy()
self.strategy.a_sync = True self.strategy.a_sync = True
self.strategy.a_sync_configs = {"launch_barrier": True}
return self.strategy return self.strategy
def build_optimizer(self, avg_cost, strategy): def build_optimizer(self, avg_cost, strategy):
...@@ -237,7 +237,10 @@ class TestFleetHeterBase(unittest.TestCase): ...@@ -237,7 +237,10 @@ class TestFleetHeterBase(unittest.TestCase):
return heter0_proc, heter1_proc, heter0_pipe, heter1_pipe return heter0_proc, heter1_proc, heter0_pipe, heter1_pipe
def _run_cluster(self, model, envs): def _run_cluster(self, model, envs):
env = {'GRAD_CLIP': str(self._grad_clip_mode)} env = {
'GRAD_CLIP': str(self._grad_clip_mode),
'FLAGS_eager_delete_tensor_gb': str(-1)
}
python_path = self._python_interp python_path = self._python_interp
gloo_path = tempfile.mkdtemp() gloo_path = tempfile.mkdtemp()
...@@ -286,27 +289,6 @@ class TestFleetHeterBase(unittest.TestCase): ...@@ -286,27 +289,6 @@ class TestFleetHeterBase(unittest.TestCase):
tr0_ret = tr0.returncode tr0_ret = tr0.returncode
tr1_ret = tr0.returncode tr1_ret = tr0.returncode
print("tr get returncode: {}".format(tr0_ret))
if tr0_ret != 0:
print(
"========================Error tr0_err begin==========================="
)
os.system("cat {}".format(tempfile.gettempdir() + "/tr0_err.log"))
print(
"========================Error tr0_err end==========================="
)
if tr1_ret != 0:
print(
"========================Error tr1_err begin==========================="
)
os.system("cat {}".format(tempfile.gettempdir() + "/tr1_err.log"))
print(
"========================Error tr1_err end==========================="
)
self.assertEqual(tr0_ret, 0, "something wrong in tr0, please check")
self.assertEqual(tr1_ret, 0, "something wrong in tr1, please check")
# close trainer file # close trainer file
tr0_pipe.close() tr0_pipe.close()
...@@ -320,7 +302,8 @@ class TestFleetHeterBase(unittest.TestCase): ...@@ -320,7 +302,8 @@ class TestFleetHeterBase(unittest.TestCase):
ps1.terminate() ps1.terminate()
heter0.terminate() heter0.terminate()
heter1.terminate() heter1.terminate()
self.assertEqual(tr0_ret, 0, "something wrong in tr0, please check")
self.assertEqual(tr1_ret, 0, "something wrong in tr1, please check")
shutil.rmtree(gloo_path) shutil.rmtree(gloo_path)
return 0, 0 return 0, 0
......
...@@ -23,38 +23,6 @@ import paddle ...@@ -23,38 +23,6 @@ import paddle
paddle.enable_static() paddle.enable_static()
class TestDistHeterDatasetAsync2x2(TestFleetHeterBase):
def _setup_config(self):
self._mode = "async"
self._reader = "dataset"
def check_with_place(self,
model_file,
delta=1e-3,
check_error_log=False,
need_envs={}):
required_envs = {
"PATH": os.getenv("PATH", ""),
"PYTHONPATH": os.getenv("PYTHONPATH", ""),
"LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""),
"FLAGS_rpc_deadline": "5000", # 5sec to fail fast
"http_proxy": "",
"CPU_NUM": "3"
}
required_envs.update(need_envs)
if check_error_log:
required_envs["GLOG_v"] = "3"
required_envs["GLOG_logtostderr"] = "1"
tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs)
def test_dist_train(self):
self.check_with_place(
"dist_fleet_heter_ctr.py", delta=1e-5, check_error_log=True)
class TestDistHeterPyreaderAsync2x2(TestFleetHeterBase): class TestDistHeterPyreaderAsync2x2(TestFleetHeterBase):
def _setup_config(self): def _setup_config(self):
self._mode = "async" self._mode = "async"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册