未验证 提交 b5c63423 编写于 作者: 1 123malin 提交者: GitHub

Update ps gpu (#29209)

* fix paramete prefetch & device guard
Co-authored-by: NMrChengmo <cmchengmo@163.com>
Co-authored-by: Nchengmo <chengmo@baidu.com>
上级 865a4598
......@@ -250,7 +250,6 @@ void prefetchs(const std::vector<std::string> &id_var_names,
for (size_t i = 0; i < table_names.size(); i++) {
tables.push_back(std::make_pair(table_names[i], endpoints[i]));
}
std::unordered_map<int64_t, std::vector<float>> recved_vec_map;
prefetch_core(ids_union, tables, context, scope, is_distributed,
&recved_vec_map);
......@@ -283,23 +282,22 @@ void prefetchs(const std::vector<std::string> &id_var_names,
}
} else {
#ifdef PADDLE_WITH_CUDA
std::vector<float> ids_value_vec(ids_size * vec_dim_1);
for (auto idx = 0; idx < static_cast<int>(ids_size); idx++) {
const auto &id = ids[idx];
auto stream = context.cuda_device_context().stream();
if (padding_idx != distributed::kNoPadding && id == padding_idx) {
platform::GpuMemsetAsync(out_d + idx * vec_dim_1, 0,
sizeof(float) * vec_dim_1, stream);
memset(&ids_value_vec[idx * vec_dim_1], 0, sizeof(float) * vec_dim_1);
} else {
auto &cpu_place =
BOOST_GET_CONST(platform::CPUPlace,
paddle::platform::CPUDeviceContext().GetPlace());
auto &gpu_place =
BOOST_GET_CONST(platform::CUDAPlace, out_t->place());
memory::Copy(gpu_place, out_d + idx * vec_dim_1, cpu_place,
&recved_vec_map[id][0], sizeof(float) * vec_dim_1,
stream);
memcpy(&ids_value_vec[idx * vec_dim_1], &recved_vec_map[id][0],
sizeof(float) * vec_dim_1);
}
}
auto &gpu_place = BOOST_GET_CONST(platform::CUDAPlace, out_t->place());
auto &cpu_place = BOOST_GET_CONST(
platform::CPUPlace, paddle::platform::CPUDeviceContext().GetPlace());
auto stream = context.cuda_device_context().stream();
memory::Copy(gpu_place, out_d, cpu_place, &ids_value_vec[0],
sizeof(float) * ids_size * vec_dim_1, stream);
#else
PADDLE_ENFORCE(true, platform::errors::PermissionDenied(
"Paddle is not compiled with GPU!"));
......
......@@ -129,7 +129,7 @@ def Singleton(cls):
@Singleton
class CompileTimeStrategy(object):
def __init__(self, main_program, startup_program, strategy, role_maker):
self.min_block_size = 8192
self.min_block_size = 81920
self.origin_main_program = main_program
self.origin_startup_program = startup_program
......@@ -677,16 +677,16 @@ class CompileTimeStrategy(object):
split_count = 1
# if min_block_size == -1:
# split_count = 1
# else:
# split_count = slice_count
# max_pserver_count = int(
# math.floor(var_numel / float(min_block_size)))
# if max_pserver_count == 0:
# max_pserver_count = 1
# if max_pserver_count < slice_count:
# split_count = max_pserver_count
if min_block_size == -1:
split_count = 1
else:
split_count = slice_count
max_pserver_count = int(
math.floor(var_numel / float(min_block_size)))
if max_pserver_count == 0:
max_pserver_count = 1
if max_pserver_count < slice_count:
split_count = max_pserver_count
block_size = int(math.ceil(var_numel / float(split_count)))
if len(var.shape) >= 2:
......@@ -758,8 +758,8 @@ class CompileTimeStrategy(object):
# 3. grad_param_mapping : grad.blockx->param.blockx
# 4. param_grad_ep_mapping : ep->{"params" : [], "grads" : [] }
dps, dgs = self._get_param_grad_blocks(self.merged_dense_pairs, -1,
False)
dps, dgs = self._get_param_grad_blocks(self.merged_dense_pairs,
self.min_block_size, False)
sps, sgs = self._get_param_grad_blocks(self.merged_sparse_pairs,
self.min_block_size, True)
......
......@@ -153,6 +153,32 @@ def gen_fake_line(dnn_data_num=7,
return line
def gen_zero_line(dnn_data_num=7, lr_data_num=5):
# for embedding zero padding test
line = ""
# for deep data
for index in range(dnn_data_num):
data = str(0)
if index < dnn_data_num - 1:
data += " "
line += data
line += "\t"
# for wide data
for index in range(lr_data_num):
data = str(0) + ":" + str(1)
if index < lr_data_num - 1:
data += " "
line += data
line += "\t"
# for label
line += str(random.randint(0, 1))
line += "\n"
return line
def prepare_fake_data(file_nums=6, file_lines=1000):
"""
Create fake data with same type as avazu_ctr_data
......@@ -165,7 +191,8 @@ def prepare_fake_data(file_nums=6, file_lines=1000):
"ctr_train_data_part_{}".format(file_index)),
'w+') as fin:
file_str = ""
for line_index in range(file_lines):
file_str += gen_zero_line()
for line_index in range(file_lines - 1):
file_str += gen_fake_line()
fin.write(file_str)
warnings.warn("Write done ctr_train_data_part_{}".format(
......
......@@ -101,7 +101,8 @@ class TestDistCTR2x2(FleetDistRunnerBase):
param_attr=fluid.ParamAttr(
name="deep_embedding",
initializer=fluid.initializer.Constant(value=0.01)),
is_sparse=True)
is_sparse=True,
padding_idx=0)
dnn_pool = fluid.layers.sequence_pool(
input=dnn_embedding, pool_type="sum")
dnn_out = dnn_pool
......@@ -123,7 +124,8 @@ class TestDistCTR2x2(FleetDistRunnerBase):
param_attr=fluid.ParamAttr(
name="wide_embedding",
initializer=fluid.initializer.Constant(value=0.01)),
is_sparse=True)
is_sparse=True,
padding_idx=0)
lr_pool = fluid.layers.sequence_pool(input=lr_embbding, pool_type="sum")
merge_layer = fluid.layers.concat(input=[dnn_out, lr_pool], axis=1)
......@@ -160,8 +162,12 @@ class TestDistCTR2x2(FleetDistRunnerBase):
Args:
fleet(Fleet api): the fleet object of Parameter Server, define distribute training role
"""
exe = fluid.Executor(fluid.CPUPlace())
device_env = os.getenv("DEVICE", 'cpu')
if device_env == 'cpu':
device = fluid.CPUPlace()
elif device_env == 'gpu':
device = fluid.CUDAPlace(0)
exe = fluid.Executor(device)
exe.run(fluid.default_startup_program())
fleet.init_worker()
......@@ -201,7 +207,12 @@ class TestDistCTR2x2(FleetDistRunnerBase):
def do_dataset_training(self, fleet):
train_file_list = ctr_dataset_reader.prepare_fake_data()
exe = fluid.Executor(fluid.CPUPlace())
device_env = os.getenv("DEVICE", 'cpu')
if device_env == 'cpu':
device = fluid.CPUPlace()
elif device_env == 'gpu':
device = fluid.CUDAPlace(0)
exe = fluid.Executor(device)
exe.run(fluid.default_startup_program())
fleet.init_worker()
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
import unittest
import tempfile
from test_dist_fleet_base import TestFleetBase
class TestPsGPUAsyncDataset2x2(TestFleetBase):
def _setup_config(self):
self._mode = "async"
self._reader = "dataset"
def check_with_place(self,
model_file,
delta=1e-3,
check_error_log=False,
need_envs={}):
required_envs = {
"PATH": os.getenv("PATH", ""),
"PYTHONPATH": os.getenv("PYTHONPATH", ""),
"LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""),
"FLAGS_rpc_deadline": "5000", # 5sec to fail fast
"http_proxy": "",
"SAVE_MODEL": "1",
"Debug": "1",
"DEVICE": "gpu"
}
required_envs.update(need_envs)
if check_error_log:
required_envs["GLOG_v"] = "3"
required_envs["GLOG_logtostderr"] = "1"
tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs)
def test_dist_train(self):
self.check_with_place(
"dist_fleet_ctr.py", delta=1e-5, check_error_log=True)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册