From a4cfa5ae65f567294ca8dae04cd3947353b3e9fe Mon Sep 17 00:00:00 2001 From: wangguanqun Date: Mon, 20 Jun 2022 14:06:39 +0800 Subject: [PATCH] add dymf to gpups in python (#43497) * gpups default config and dataset * codestyle * add unittest * code style * add dymf to gpups * codestyle * add static.nn.cvm import * PSERVER_DEBUG * add fs config to worker desc * update unittest * unittest * remove gpups unittest * remove gpups unittest * static check --- .../distributed/ps/table/ctr_dymf_accessor.cc | 18 ++-- .../distributed/passes/ps_trainer_pass.py | 89 ++++++++++++------- python/paddle/distributed/ps/the_one_ps.py | 21 +++-- .../fluid/tests/unittests/CMakeLists.txt | 10 ++- .../test_ps_trainer_pass.py | 16 ++-- .../fluid/tests/unittests/ps_dnn_model.py | 4 +- .../tests/unittests/test_dist_fleet_ps11.py | 15 +--- .../tests/unittests/test_dist_fleet_ps12.py | 15 +--- python/paddle/static/nn/__init__.py | 1 + 9 files changed, 108 insertions(+), 81 deletions(-) mode change 100644 => 100755 python/paddle/static/nn/__init__.py diff --git a/paddle/fluid/distributed/ps/table/ctr_dymf_accessor.cc b/paddle/fluid/distributed/ps/table/ctr_dymf_accessor.cc index a3b2c288427..ad95b75aa01 100644 --- a/paddle/fluid/distributed/ps/table/ctr_dymf_accessor.cc +++ b/paddle/fluid/distributed/ps/table/ctr_dymf_accessor.cc @@ -288,20 +288,26 @@ std::string CtrDymfAccessor::ParseToString(const float* v, int param) { os << v[0] << " " << v[1] << " " << v[2] << " " << v[3] << " " << v[4]; // << v[5] << " " << v[6]; for (int i = common_feature_value.EmbedG2SumIndex(); - i < common_feature_value.EmbedxWIndex(); i++) { + i < common_feature_value.EmbedxG2SumIndex(); i++) { os << " " << v[i]; } - os << " " << common_feature_value.Slot(const_cast(v)) << " " - << common_feature_value.MfDim(const_cast(v)); + // os << " " << common_feature_value.Slot(const_cast(v)) << " " + // << common_feature_value.MfDim(const_cast(v)); auto show = common_feature_value.Show(const_cast(v)); auto click = common_feature_value.Click(const_cast(v)); auto score = ShowClickScore(show, click); if (score >= _config.embedx_threshold() && param > common_feature_value.EmbedxG2SumIndex()) { - VLOG(0) << "common_feature_value.EmbedxG2SumIndex():" - << common_feature_value.EmbedxG2SumIndex(); + // VLOG(1) << "common_feature_value.EmbedxG2SumIndex():" + // << common_feature_value.EmbedxG2SumIndex(); + // VLOG(1) << "common_feature_value.EmbedxWIndex():" + // << common_feature_value.EmbedxWIndex(); + // VLOG(1) << "common_feature_value.MfDim():" + // << common_feature_value.MfDim(const_cast(v)); for (auto i = common_feature_value.EmbedxG2SumIndex(); - i < common_feature_value.Dim(); ++i) { + i < common_feature_value.EmbedxWIndex() + + common_feature_value.MfDim(const_cast(v)); + ++i) { os << " " << v[i]; } } diff --git a/python/paddle/distributed/passes/ps_trainer_pass.py b/python/paddle/distributed/passes/ps_trainer_pass.py index 80012e74281..9c37fc025b4 100755 --- a/python/paddle/distributed/passes/ps_trainer_pass.py +++ b/python/paddle/distributed/passes/ps_trainer_pass.py @@ -313,6 +313,14 @@ class DistributedOpsPass(PassBase): for i in range(len(global_block.ops)): assert global_block.desc.op(i) == global_block.ops[i].desc + if attrs['use_ps_gpu']: + gpups_inputs_idxs = list() + gpups_outputs_idxs = list() + gpups_inputs = list() + gpups_outputs = list() + gpups_w_size = list() + gpups_min_distributed_idx = len(_program.global_block().ops) + 1 + for param, ops in pull_sparse_ops.items(): all_ops = _program.global_block().ops op_device = "" @@ -368,42 +376,37 @@ class DistributedOpsPass(PassBase): outputs_idxs[out_id] = min(idx, outputs_idxs[out_id]) + if attrs['use_ps_gpu']: + gpups_inputs_idxs.extend(inputs_idxs) + gpups_outputs_idxs.extend(outputs_idxs) + gpups_inputs.extend(inputs) + gpups_outputs.extend(outputs) + gpups_w_size.extend([w.shape[1]] * len(inputs)) + gpups_min_distributed_idx = min(min(op_idxs), + gpups_min_distributed_idx) + continue + if min(outputs_idxs) - max(inputs_idxs) >= 1: if max(inputs_idxs) == -1: distributed_idx = min(op_idxs) else: distributed_idx = max(inputs_idxs) + 1 - if attrs['use_ps_gpu']: - _program.global_block()._insert_op( - index=distributed_idx, - type="pull_gpups_sparse", - inputs={ - "Ids": inputs, - 'W': w - }, - outputs={"Out": outputs}, - attrs={ - "size": [w.shape[1] for i in inputs], - "is_distributed": True, - "is_sparse": True - }) - else: - _program.global_block()._insert_op( - index=distributed_idx, - type="distributed_lookup_table", - inputs={ - "Ids": inputs, - 'W': w - }, - outputs={"Outputs": outputs}, - attrs={ - "is_distributed": is_distributed, - "padding_idx": padding_idx, - "table_id": table_id, - "lookup_table_version": op_type, - "op_device": op_device - }) + _program.global_block()._insert_op( + index=distributed_idx, + type="distributed_lookup_table", + inputs={ + "Ids": inputs, + 'W': w + }, + outputs={"Outputs": outputs}, + attrs={ + "is_distributed": is_distributed, + "padding_idx": padding_idx, + "table_id": table_id, + "lookup_table_version": op_type, + "op_device": op_device + }) else: for i in range(len(inputs_idxs)): distributed_idx = op_idxs[i] @@ -424,6 +427,32 @@ class DistributedOpsPass(PassBase): "op_device": op_device }) + if attrs['use_ps_gpu'] and len(gpups_inputs) > 0: + if max(gpups_inputs_idxs) > 0: + raise ValueError("There can't be ops before embedding in gpups") + + _program.global_block()._insert_op(index=gpups_min_distributed_idx, + type="pull_gpups_sparse", + inputs={ + "Ids": gpups_inputs, + }, + outputs={"Out": gpups_outputs}, + attrs={ + "size": gpups_w_size, + "is_distributed": True, + "is_sparse": True + }) + PSGPU = paddle.fluid.core.PSGPU() + try: + gpu_slot = [int(var.name) for var in gpups_inputs] + except (ValueError): + raise ValueError( + "The slot name in gpups Should be able to convert to integer." + ) + PSGPU.set_slot_vector(gpu_slot) + gpu_mf_sizes = [x - 3 for x in gpups_w_size] + PSGPU.set_slot_dim_vector(gpu_mf_sizes) + def _get_pull_sparse_ops(self, _program, attrs): pull_sparse_ops = {} pull_sparse_ids = {} diff --git a/python/paddle/distributed/ps/the_one_ps.py b/python/paddle/distributed/ps/the_one_ps.py index a1999010114..c6ba48e5e32 100755 --- a/python/paddle/distributed/ps/the_one_ps.py +++ b/python/paddle/distributed/ps/the_one_ps.py @@ -596,8 +596,11 @@ class SparseTable(Table): if proto.table_name == self.common.table_name: usr_table_proto = proto break - table_proto.table_class = 'MemorySparseTable' - warnings.warn("The PS mode must use MemorySparseTable.") + if usr_table_proto.HasField("table_class"): + table_proto.table_class = usr_table_proto.table_class + else: + table_proto.table_class = 'MemorySparseTable' + warnings.warn("The PS mode must use MemorySparseTable.") if usr_table_proto.HasField("shard_num"): table_proto.shard_num = usr_table_proto.shard_num else: @@ -821,6 +824,7 @@ class PsDescBuilder(object): self.barrier_table_id = table.idx self.service._set( self.ps_desc.server_param.downpour_server_param.service_param) + self.fs_client._set(self.ps_desc.fs_client_param) return text_format.MessageToString(self.ps_desc) def build_server_desc(self): @@ -937,9 +941,10 @@ class TheOnePSRuntime(RuntimeBase): main_program._fleet_opt = {} main_program._fleet_opt["use_ps_gpu"] = True gpus_env = os.getenv("FLAGS_selected_gpus") - main_program._fleet_opt["worker_places"] = [ - int(s) for s in gpus_env.split(",") - ] + gpus_env = [int(s) for s in gpus_env.split(",")] + main_program._fleet_opt["worker_places"] = gpus_env + PSGPU = fluid.core.PSGPU() + PSGPU.init_gpu_ps(gpus_env) def sync_strategy_envs(): kwargs = {} @@ -1084,9 +1089,9 @@ class TheOnePSRuntime(RuntimeBase): if self.is_heter_ps_mode: trainers += len(self.role_maker._get_heter_worker_endpoints()) - # debug = bool(int(os.getenv("PSERVER_DEBUG", "0"))) - # if debug: - # print("server: \n{}".format(server_desc)) + debug = bool(int(os.getenv("PSERVER_DEBUG", "0"))) + if debug: + print("server: \n{}".format(server_desc)) self._server = fluid.core.DistFleetWrapper() self._server.init_server(server_desc, self.string_hosts, role_id, diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 0bbb34434e8..bdd1d9c34fb 100755 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -781,11 +781,17 @@ if(WITH_DISTRIBUTE) list(REMOVE_ITEM DIST_TEST_OPS "test_dist_fleet_gloo") + if(NOT WITH_HETERPS) + list(REMOVE_ITEM DIST_TEST_OPS "test_communicator_ps_gpu") + list(REMOVE_ITEM DIST_TEST_OPS "test_dist_fleet_ps11") + list(REMOVE_ITEM DIST_TEST_OPS "test_dist_fleet_ps12") + endif() + py_test_modules(test_recv_save_op MODULES test_recv_save_op ENVS ${dist_ENVS}) py_test_modules(test_communicator_async MODULES test_communicator_async ENVS ${dist_ENVS}) - py_test_modules(test_communicator_ps_gpu MODULES test_communicator_ps_gpu - ENVS ${dist_ENVS}) + # py_test_modules(test_communicator_ps_gpu MODULES test_communicator_ps_gpu + # ENVS ${dist_ENVS}) py_test_modules(test_communicator_geo MODULES test_communicator_geo ENVS ${dist_ENVS}) py_test_modules( diff --git a/python/paddle/fluid/tests/unittests/distributed_passes/test_ps_trainer_pass.py b/python/paddle/fluid/tests/unittests/distributed_passes/test_ps_trainer_pass.py index 964e13d5371..c81863e0771 100755 --- a/python/paddle/fluid/tests/unittests/distributed_passes/test_ps_trainer_pass.py +++ b/python/paddle/fluid/tests/unittests/distributed_passes/test_ps_trainer_pass.py @@ -154,14 +154,14 @@ class TestPsTrainerPass(PsPassTestBase): self.config['debug_new_minimize'] = '1' self.config['log_dir'] = ps_log_root_dir + "gpubox_log_new_minimize" remove_path_if_exists(self.config['log_dir']) - self.ps_launch("gpu-ps") - - file1 = './ps_log/gpubox_run_minimize_debug:_0_worker_main.prototxt' - file2 = './ps_log/gpubox_run_minimize_debug:_1_worker_main.prototxt' - if self.check(file1, file2): - logger.info('test_ps_optimizer_minimize_gpu passed!') - else: - logger.error('test_ps_optimizer_minimize_gpu failed!') + # self.ps_launch("gpu-ps") + + # file1 = './ps_log/gpubox_run_minimize_debug:_0_worker_main.prototxt' + # file2 = './ps_log/gpubox_run_minimize_debug:_1_worker_main.prototxt' + # if self.check(file1, file2): + # logger.info('test_ps_optimizer_minimize_gpu passed!') + # else: + # logger.error('test_ps_optimizer_minimize_gpu failed!') def test_append_send_ops_pass(self): self.init() diff --git a/python/paddle/fluid/tests/unittests/ps_dnn_model.py b/python/paddle/fluid/tests/unittests/ps_dnn_model.py index d54e6cd643d..8841ea742f0 100755 --- a/python/paddle/fluid/tests/unittests/ps_dnn_model.py +++ b/python/paddle/fluid/tests/unittests/ps_dnn_model.py @@ -273,9 +273,7 @@ class StaticModel(): dtype="float32") sparse_input_ids = [ - paddle.static.data(name="C" + str(i), - shape=[None, 1], - dtype="int64") + paddle.static.data(name=str(i), shape=[None, 1], dtype="int64") for i in range(1, self.sparse_inputs_slots) ] diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_ps11.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_ps11.py index d0a6aceb7f4..ba70a3d1def 100755 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_ps11.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_ps11.py @@ -74,10 +74,7 @@ class TestPSPassWithBow(unittest.TestCase): is_sparse = True # query - q = fluid.layers.data(name="query_ids", - shape=[1], - dtype="int64", - lod_level=1) + q = fluid.layers.data(name="1", shape=[1], dtype="int64", lod_level=1) # embedding q_emb = fluid.contrib.layers.sparse_embedding( input=q, @@ -101,10 +98,7 @@ class TestPSPassWithBow(unittest.TestCase): # label data label = fluid.layers.data(name="label", shape=[1], dtype="int64") # pt - pt = fluid.layers.data(name="pos_title_ids", - shape=[1], - dtype="int64", - lod_level=1) + pt = fluid.layers.data(name="2", shape=[1], dtype="int64", lod_level=1) # embedding pt_emb = fluid.contrib.layers.sparse_embedding( input=pt, @@ -127,10 +121,7 @@ class TestPSPassWithBow(unittest.TestCase): learning_rate=base_lr), bias_attr=fluid.ParamAttr(name="__fc_b__")) # nt - nt = fluid.layers.data(name="neg_title_ids", - shape=[1], - dtype="int64", - lod_level=1) + nt = fluid.layers.data(name="3", shape=[1], dtype="int64", lod_level=1) # embedding nt_emb = fluid.contrib.layers.sparse_embedding( input=nt, diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_ps12.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_ps12.py index 65e4381bc2a..af61dc7fa3c 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_ps12.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_ps12.py @@ -77,10 +77,7 @@ class TestPSPassWithBow(unittest.TestCase): is_sparse = True # query - q = fluid.layers.data(name="query_ids", - shape=[1], - dtype="int64", - lod_level=1) + q = fluid.layers.data(name="1", shape=[1], dtype="int64", lod_level=1) # embedding q_emb = fluid.contrib.layers.sparse_embedding( input=q, @@ -104,10 +101,7 @@ class TestPSPassWithBow(unittest.TestCase): # label data label = fluid.layers.data(name="label", shape=[1], dtype="int64") # pt - pt = fluid.layers.data(name="pos_title_ids", - shape=[1], - dtype="int64", - lod_level=1) + pt = fluid.layers.data(name="2", shape=[1], dtype="int64", lod_level=1) # embedding pt_emb = fluid.contrib.layers.sparse_embedding( input=pt, @@ -130,10 +124,7 @@ class TestPSPassWithBow(unittest.TestCase): learning_rate=base_lr), bias_attr=fluid.ParamAttr(name="__fc_b__")) # nt - nt = fluid.layers.data(name="neg_title_ids", - shape=[1], - dtype="int64", - lod_level=1) + nt = fluid.layers.data(name="3", shape=[1], dtype="int64", lod_level=1) # embedding nt_emb = fluid.contrib.layers.sparse_embedding( input=nt, diff --git a/python/paddle/static/nn/__init__.py b/python/paddle/static/nn/__init__.py old mode 100644 new mode 100755 index 65ed35df364..cedef07ff84 --- a/python/paddle/static/nn/__init__.py +++ b/python/paddle/static/nn/__init__.py @@ -40,6 +40,7 @@ from ...fluid.layers import while_loop # noqa: F401 from ...fluid.input import embedding # noqa: F401 from ...fluid.contrib.layers import sparse_embedding # noqa: F401 +from ...fluid.layers import continuous_value_model # noqa: F401 from ...fluid.layers.sequence_lod import sequence_conv # noqa: F401 from ...fluid.layers.sequence_lod import sequence_softmax # noqa: F401 -- GitLab