diff --git a/paddle/fluid/operators/distributed/parameter_prefetch.cc b/paddle/fluid/operators/distributed/parameter_prefetch.cc index 67aef6098654c23accb82ab7994f39add258ced2..df47422fc059f562da644efd79a39919229ebca5 100644 --- a/paddle/fluid/operators/distributed/parameter_prefetch.cc +++ b/paddle/fluid/operators/distributed/parameter_prefetch.cc @@ -250,7 +250,6 @@ void prefetchs(const std::vector &id_var_names, for (size_t i = 0; i < table_names.size(); i++) { tables.push_back(std::make_pair(table_names[i], endpoints[i])); } - std::unordered_map> recved_vec_map; prefetch_core(ids_union, tables, context, scope, is_distributed, &recved_vec_map); @@ -283,23 +282,22 @@ void prefetchs(const std::vector &id_var_names, } } else { #ifdef PADDLE_WITH_CUDA + std::vector ids_value_vec(ids_size * vec_dim_1); for (auto idx = 0; idx < static_cast(ids_size); idx++) { const auto &id = ids[idx]; - auto stream = context.cuda_device_context().stream(); if (padding_idx != distributed::kNoPadding && id == padding_idx) { - platform::GpuMemsetAsync(out_d + idx * vec_dim_1, 0, - sizeof(float) * vec_dim_1, stream); + memset(&ids_value_vec[idx * vec_dim_1], 0, sizeof(float) * vec_dim_1); } else { - auto &cpu_place = - BOOST_GET_CONST(platform::CPUPlace, - paddle::platform::CPUDeviceContext().GetPlace()); - auto &gpu_place = - BOOST_GET_CONST(platform::CUDAPlace, out_t->place()); - memory::Copy(gpu_place, out_d + idx * vec_dim_1, cpu_place, - &recved_vec_map[id][0], sizeof(float) * vec_dim_1, - stream); + memcpy(&ids_value_vec[idx * vec_dim_1], &recved_vec_map[id][0], + sizeof(float) * vec_dim_1); } } + auto &gpu_place = BOOST_GET_CONST(platform::CUDAPlace, out_t->place()); + auto &cpu_place = BOOST_GET_CONST( + platform::CPUPlace, paddle::platform::CPUDeviceContext().GetPlace()); + auto stream = context.cuda_device_context().stream(); + memory::Copy(gpu_place, out_d, cpu_place, &ids_value_vec[0], + sizeof(float) * ids_size * vec_dim_1, stream); #else PADDLE_ENFORCE(true, platform::errors::PermissionDenied( "Paddle is not compiled with GPU!")); diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py b/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py index fe2ba38ee00b6aaea382b6262d963cc8df8f0cdd..fecbb8fd4da98c99552b040f6b916bc53b4ff9ec 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py @@ -129,7 +129,7 @@ def Singleton(cls): @Singleton class CompileTimeStrategy(object): def __init__(self, main_program, startup_program, strategy, role_maker): - self.min_block_size = 8192 + self.min_block_size = 81920 self.origin_main_program = main_program self.origin_startup_program = startup_program @@ -677,16 +677,16 @@ class CompileTimeStrategy(object): split_count = 1 - # if min_block_size == -1: - # split_count = 1 - # else: - # split_count = slice_count - # max_pserver_count = int( - # math.floor(var_numel / float(min_block_size))) - # if max_pserver_count == 0: - # max_pserver_count = 1 - # if max_pserver_count < slice_count: - # split_count = max_pserver_count + if min_block_size == -1: + split_count = 1 + else: + split_count = slice_count + max_pserver_count = int( + math.floor(var_numel / float(min_block_size))) + if max_pserver_count == 0: + max_pserver_count = 1 + if max_pserver_count < slice_count: + split_count = max_pserver_count block_size = int(math.ceil(var_numel / float(split_count))) if len(var.shape) >= 2: @@ -758,8 +758,8 @@ class CompileTimeStrategy(object): # 3. grad_param_mapping : grad.blockx->param.blockx # 4. param_grad_ep_mapping : ep->{"params" : [], "grads" : [] } - dps, dgs = self._get_param_grad_blocks(self.merged_dense_pairs, -1, - False) + dps, dgs = self._get_param_grad_blocks(self.merged_dense_pairs, + self.min_block_size, False) sps, sgs = self._get_param_grad_blocks(self.merged_sparse_pairs, self.min_block_size, True) diff --git a/python/paddle/fluid/tests/unittests/ctr_dataset_reader.py b/python/paddle/fluid/tests/unittests/ctr_dataset_reader.py index 93ca21f5276ca7e7716e7e8eb96ab711c0d641f9..9e3f0b7d9126e5fdb6767a5be4344943209d4c67 100644 --- a/python/paddle/fluid/tests/unittests/ctr_dataset_reader.py +++ b/python/paddle/fluid/tests/unittests/ctr_dataset_reader.py @@ -153,6 +153,32 @@ def gen_fake_line(dnn_data_num=7, return line +def gen_zero_line(dnn_data_num=7, lr_data_num=5): + # for embedding zero padding test + line = "" + + # for deep data + for index in range(dnn_data_num): + data = str(0) + if index < dnn_data_num - 1: + data += " " + line += data + line += "\t" + + # for wide data + for index in range(lr_data_num): + data = str(0) + ":" + str(1) + if index < lr_data_num - 1: + data += " " + line += data + line += "\t" + + # for label + line += str(random.randint(0, 1)) + line += "\n" + return line + + def prepare_fake_data(file_nums=6, file_lines=1000): """ Create fake data with same type as avazu_ctr_data @@ -165,7 +191,8 @@ def prepare_fake_data(file_nums=6, file_lines=1000): "ctr_train_data_part_{}".format(file_index)), 'w+') as fin: file_str = "" - for line_index in range(file_lines): + file_str += gen_zero_line() + for line_index in range(file_lines - 1): file_str += gen_fake_line() fin.write(file_str) warnings.warn("Write done ctr_train_data_part_{}".format( diff --git a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py index f650dd0f7e9824ce2c2c495f1fb85fe1fc421d08..b9e2da28df003d637b696ea5c099e1cc9f19645b 100644 --- a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py +++ b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py @@ -101,7 +101,8 @@ class TestDistCTR2x2(FleetDistRunnerBase): param_attr=fluid.ParamAttr( name="deep_embedding", initializer=fluid.initializer.Constant(value=0.01)), - is_sparse=True) + is_sparse=True, + padding_idx=0) dnn_pool = fluid.layers.sequence_pool( input=dnn_embedding, pool_type="sum") dnn_out = dnn_pool @@ -123,7 +124,8 @@ class TestDistCTR2x2(FleetDistRunnerBase): param_attr=fluid.ParamAttr( name="wide_embedding", initializer=fluid.initializer.Constant(value=0.01)), - is_sparse=True) + is_sparse=True, + padding_idx=0) lr_pool = fluid.layers.sequence_pool(input=lr_embbding, pool_type="sum") merge_layer = fluid.layers.concat(input=[dnn_out, lr_pool], axis=1) @@ -160,8 +162,12 @@ class TestDistCTR2x2(FleetDistRunnerBase): Args: fleet(Fleet api): the fleet object of Parameter Server, define distribute training role """ - - exe = fluid.Executor(fluid.CPUPlace()) + device_env = os.getenv("DEVICE", 'cpu') + if device_env == 'cpu': + device = fluid.CPUPlace() + elif device_env == 'gpu': + device = fluid.CUDAPlace(0) + exe = fluid.Executor(device) exe.run(fluid.default_startup_program()) fleet.init_worker() @@ -201,7 +207,12 @@ class TestDistCTR2x2(FleetDistRunnerBase): def do_dataset_training(self, fleet): train_file_list = ctr_dataset_reader.prepare_fake_data() - exe = fluid.Executor(fluid.CPUPlace()) + device_env = os.getenv("DEVICE", 'cpu') + if device_env == 'cpu': + device = fluid.CPUPlace() + elif device_env == 'gpu': + device = fluid.CUDAPlace(0) + exe = fluid.Executor(device) exe.run(fluid.default_startup_program()) fleet.init_worker() diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_ps_gpu_ctr.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_ps_gpu_ctr.py new file mode 100644 index 0000000000000000000000000000000000000000..9308a3e4792f38cc65b9ef54a013556210321a75 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_ps_gpu_ctr.py @@ -0,0 +1,58 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import os +import unittest +import tempfile +from test_dist_fleet_base import TestFleetBase + + +class TestPsGPUAsyncDataset2x2(TestFleetBase): + def _setup_config(self): + self._mode = "async" + self._reader = "dataset" + + def check_with_place(self, + model_file, + delta=1e-3, + check_error_log=False, + need_envs={}): + required_envs = { + "PATH": os.getenv("PATH", ""), + "PYTHONPATH": os.getenv("PYTHONPATH", ""), + "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""), + "FLAGS_rpc_deadline": "5000", # 5sec to fail fast + "http_proxy": "", + "SAVE_MODEL": "1", + "Debug": "1", + "DEVICE": "gpu" + } + + required_envs.update(need_envs) + + if check_error_log: + required_envs["GLOG_v"] = "3" + required_envs["GLOG_logtostderr"] = "1" + + tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs) + + def test_dist_train(self): + self.check_with_place( + "dist_fleet_ctr.py", delta=1e-5, check_error_log=True) + + +if __name__ == '__main__': + unittest.main()