From b5c6342336c7579df32cd6eaa5f02cd3841350bf Mon Sep 17 00:00:00 2001 From: 123malin Date: Mon, 30 Nov 2020 19:14:04 +0800 Subject: [PATCH] Update ps gpu (#29209) * fix paramete prefetch & device guard Co-authored-by: MrChengmo Co-authored-by: chengmo --- .../distributed/parameter_prefetch.cc | 22 ++++--- .../fleet/parameter_server/ir/public.py | 26 ++++----- .../tests/unittests/ctr_dataset_reader.py | 29 +++++++++- .../fluid/tests/unittests/dist_fleet_ctr.py | 21 +++++-- .../unittests/test_dist_fleet_ps_gpu_ctr.py | 58 +++++++++++++++++++ 5 files changed, 125 insertions(+), 31 deletions(-) create mode 100644 python/paddle/fluid/tests/unittests/test_dist_fleet_ps_gpu_ctr.py diff --git a/paddle/fluid/operators/distributed/parameter_prefetch.cc b/paddle/fluid/operators/distributed/parameter_prefetch.cc index 67aef609865..df47422fc05 100644 --- a/paddle/fluid/operators/distributed/parameter_prefetch.cc +++ b/paddle/fluid/operators/distributed/parameter_prefetch.cc @@ -250,7 +250,6 @@ void prefetchs(const std::vector &id_var_names, for (size_t i = 0; i < table_names.size(); i++) { tables.push_back(std::make_pair(table_names[i], endpoints[i])); } - std::unordered_map> recved_vec_map; prefetch_core(ids_union, tables, context, scope, is_distributed, &recved_vec_map); @@ -283,23 +282,22 @@ void prefetchs(const std::vector &id_var_names, } } else { #ifdef PADDLE_WITH_CUDA + std::vector ids_value_vec(ids_size * vec_dim_1); for (auto idx = 0; idx < static_cast(ids_size); idx++) { const auto &id = ids[idx]; - auto stream = context.cuda_device_context().stream(); if (padding_idx != distributed::kNoPadding && id == padding_idx) { - platform::GpuMemsetAsync(out_d + idx * vec_dim_1, 0, - sizeof(float) * vec_dim_1, stream); + memset(&ids_value_vec[idx * vec_dim_1], 0, sizeof(float) * vec_dim_1); } else { - auto &cpu_place = - BOOST_GET_CONST(platform::CPUPlace, - paddle::platform::CPUDeviceContext().GetPlace()); - auto &gpu_place = - BOOST_GET_CONST(platform::CUDAPlace, out_t->place()); - memory::Copy(gpu_place, out_d + idx * vec_dim_1, cpu_place, - &recved_vec_map[id][0], sizeof(float) * vec_dim_1, - stream); + memcpy(&ids_value_vec[idx * vec_dim_1], &recved_vec_map[id][0], + sizeof(float) * vec_dim_1); } } + auto &gpu_place = BOOST_GET_CONST(platform::CUDAPlace, out_t->place()); + auto &cpu_place = BOOST_GET_CONST( + platform::CPUPlace, paddle::platform::CPUDeviceContext().GetPlace()); + auto stream = context.cuda_device_context().stream(); + memory::Copy(gpu_place, out_d, cpu_place, &ids_value_vec[0], + sizeof(float) * ids_size * vec_dim_1, stream); #else PADDLE_ENFORCE(true, platform::errors::PermissionDenied( "Paddle is not compiled with GPU!")); diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py b/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py index fe2ba38ee00..fecbb8fd4da 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py @@ -129,7 +129,7 @@ def Singleton(cls): @Singleton class CompileTimeStrategy(object): def __init__(self, main_program, startup_program, strategy, role_maker): - self.min_block_size = 8192 + self.min_block_size = 81920 self.origin_main_program = main_program self.origin_startup_program = startup_program @@ -677,16 +677,16 @@ class CompileTimeStrategy(object): split_count = 1 - # if min_block_size == -1: - # split_count = 1 - # else: - # split_count = slice_count - # max_pserver_count = int( - # math.floor(var_numel / float(min_block_size))) - # if max_pserver_count == 0: - # max_pserver_count = 1 - # if max_pserver_count < slice_count: - # split_count = max_pserver_count + if min_block_size == -1: + split_count = 1 + else: + split_count = slice_count + max_pserver_count = int( + math.floor(var_numel / float(min_block_size))) + if max_pserver_count == 0: + max_pserver_count = 1 + if max_pserver_count < slice_count: + split_count = max_pserver_count block_size = int(math.ceil(var_numel / float(split_count))) if len(var.shape) >= 2: @@ -758,8 +758,8 @@ class CompileTimeStrategy(object): # 3. grad_param_mapping : grad.blockx->param.blockx # 4. param_grad_ep_mapping : ep->{"params" : [], "grads" : [] } - dps, dgs = self._get_param_grad_blocks(self.merged_dense_pairs, -1, - False) + dps, dgs = self._get_param_grad_blocks(self.merged_dense_pairs, + self.min_block_size, False) sps, sgs = self._get_param_grad_blocks(self.merged_sparse_pairs, self.min_block_size, True) diff --git a/python/paddle/fluid/tests/unittests/ctr_dataset_reader.py b/python/paddle/fluid/tests/unittests/ctr_dataset_reader.py index 93ca21f5276..9e3f0b7d912 100644 --- a/python/paddle/fluid/tests/unittests/ctr_dataset_reader.py +++ b/python/paddle/fluid/tests/unittests/ctr_dataset_reader.py @@ -153,6 +153,32 @@ def gen_fake_line(dnn_data_num=7, return line +def gen_zero_line(dnn_data_num=7, lr_data_num=5): + # for embedding zero padding test + line = "" + + # for deep data + for index in range(dnn_data_num): + data = str(0) + if index < dnn_data_num - 1: + data += " " + line += data + line += "\t" + + # for wide data + for index in range(lr_data_num): + data = str(0) + ":" + str(1) + if index < lr_data_num - 1: + data += " " + line += data + line += "\t" + + # for label + line += str(random.randint(0, 1)) + line += "\n" + return line + + def prepare_fake_data(file_nums=6, file_lines=1000): """ Create fake data with same type as avazu_ctr_data @@ -165,7 +191,8 @@ def prepare_fake_data(file_nums=6, file_lines=1000): "ctr_train_data_part_{}".format(file_index)), 'w+') as fin: file_str = "" - for line_index in range(file_lines): + file_str += gen_zero_line() + for line_index in range(file_lines - 1): file_str += gen_fake_line() fin.write(file_str) warnings.warn("Write done ctr_train_data_part_{}".format( diff --git a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py index f650dd0f7e9..b9e2da28df0 100644 --- a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py +++ b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py @@ -101,7 +101,8 @@ class TestDistCTR2x2(FleetDistRunnerBase): param_attr=fluid.ParamAttr( name="deep_embedding", initializer=fluid.initializer.Constant(value=0.01)), - is_sparse=True) + is_sparse=True, + padding_idx=0) dnn_pool = fluid.layers.sequence_pool( input=dnn_embedding, pool_type="sum") dnn_out = dnn_pool @@ -123,7 +124,8 @@ class TestDistCTR2x2(FleetDistRunnerBase): param_attr=fluid.ParamAttr( name="wide_embedding", initializer=fluid.initializer.Constant(value=0.01)), - is_sparse=True) + is_sparse=True, + padding_idx=0) lr_pool = fluid.layers.sequence_pool(input=lr_embbding, pool_type="sum") merge_layer = fluid.layers.concat(input=[dnn_out, lr_pool], axis=1) @@ -160,8 +162,12 @@ class TestDistCTR2x2(FleetDistRunnerBase): Args: fleet(Fleet api): the fleet object of Parameter Server, define distribute training role """ - - exe = fluid.Executor(fluid.CPUPlace()) + device_env = os.getenv("DEVICE", 'cpu') + if device_env == 'cpu': + device = fluid.CPUPlace() + elif device_env == 'gpu': + device = fluid.CUDAPlace(0) + exe = fluid.Executor(device) exe.run(fluid.default_startup_program()) fleet.init_worker() @@ -201,7 +207,12 @@ class TestDistCTR2x2(FleetDistRunnerBase): def do_dataset_training(self, fleet): train_file_list = ctr_dataset_reader.prepare_fake_data() - exe = fluid.Executor(fluid.CPUPlace()) + device_env = os.getenv("DEVICE", 'cpu') + if device_env == 'cpu': + device = fluid.CPUPlace() + elif device_env == 'gpu': + device = fluid.CUDAPlace(0) + exe = fluid.Executor(device) exe.run(fluid.default_startup_program()) fleet.init_worker() diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_ps_gpu_ctr.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_ps_gpu_ctr.py new file mode 100644 index 00000000000..9308a3e4792 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_ps_gpu_ctr.py @@ -0,0 +1,58 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import os +import unittest +import tempfile +from test_dist_fleet_base import TestFleetBase + + +class TestPsGPUAsyncDataset2x2(TestFleetBase): + def _setup_config(self): + self._mode = "async" + self._reader = "dataset" + + def check_with_place(self, + model_file, + delta=1e-3, + check_error_log=False, + need_envs={}): + required_envs = { + "PATH": os.getenv("PATH", ""), + "PYTHONPATH": os.getenv("PYTHONPATH", ""), + "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""), + "FLAGS_rpc_deadline": "5000", # 5sec to fail fast + "http_proxy": "", + "SAVE_MODEL": "1", + "Debug": "1", + "DEVICE": "gpu" + } + + required_envs.update(need_envs) + + if check_error_log: + required_envs["GLOG_v"] = "3" + required_envs["GLOG_logtostderr"] = "1" + + tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs) + + def test_dist_train(self): + self.check_with_place( + "dist_fleet_ctr.py", delta=1e-5, check_error_log=True) + + +if __name__ == '__main__': + unittest.main() -- GitLab