提交 6efc30f5 编写于 作者: X xiexionghang

for shuffler and debug

上级 76e8be34
......@@ -10,7 +10,8 @@ INCPATHS('$OUT/../')
INCPATHS('../../third-party')
INCPATHS('../../third-party/eigen')
INCPATHS('$OUT_ROOT/baidu/third-party/python/output/include/python2.7')
LDFLAGS('-lpthread -lcrypto -lrt -ldl -lssl -lz -lgomp -fopenmp ')
LDFLAGS('-lpthread -lcrypto -lrt -ldl -lssl -lz -lgomp -fopenmp')
#LDFLAGS('-lpthread -lcrypto -lrt -ldl -lssl -lz -lgomp -fopenmp -lasan')
CONFIGS('baidu/third-party/any@15595d8324be9e8a9a80d9ae442fdd12bd66df5d@git_branch')
CONFIGS('baidu/third-party/boost@v1.41.0@git_branch')
CONFIGS('baidu/third-party/c-ares@v1.13.0@git_branch')
......@@ -84,6 +85,7 @@ custom_trainer_src = GLOB('paddle/fluid/train/custom_trainer/feed/*/*.cc', Exclu
CPPFLAGS_STR = '-DHPPL_STUB_FUNC -DLAPACK_FOUND -DPADDLE_DISABLE_PROFILER -DPADDLE_NO_PYTHON -DCUSTOM_TRAINER -DPADDLE_ON_INFERENCE -DPADDLE_USE_DSO -DPADDLE_USE_PTHREAD_BARRIER -DPADDLE_USE_PTHREAD_SPINLOCK -DPADDLE_VERSION=0.0.0 -DPADDLE_WITH_AVX -DPADDLE_WITH_MKLML -DPADDLE_WITH_XBYAK -DXBYAK64 -DXBYAK_NO_OP_NAMES -D_GNU_SOURCE -D__STDC_LIMIT_MACROS -DPYBIND_AVX_MKLML' + r" -DPADDLE_REVISION=\"%s@%s@%s\"" % (REPO_URL(), REPO_BRANCH(), REPO_REVISION())
CFLAGS_STR = '-m64 -fPIC -fno-omit-frame-pointer -Werror -Wall -Wextra -Wnon-virtual-dtor -Wdelete-non-virtual-dtor -Wno-unused-parameter -Wno-unused-function -Wno-error=literal-suffix -Wno-error=sign-compare -Wno-error=unused-local-typedefs -Wno-error=maybe-uninitialized -Wno-narrowing -Wnarrowing -fopenmp -mavx -O3 -DNDEBUG '
#CFLAGS_STR = '-m64 -fPIC -fsanitize=address -fsanitize-recover=address -fno-omit-frame-pointer -Werror -Wall -Wextra -Wnon-virtual-dtor -Wdelete-non-virtual-dtor -Wno-unused-parameter -Wno-unused-function -Wno-error=literal-suffix -Wno-error=sign-compare -Wno-error=unused-local-typedefs -Wno-error=maybe-uninitialized -Wno-narrowing -Wnarrowing -fopenmp -mavx -O3 -DNDEBUG '
CXXFLAGS_STR = '-std=c++11 ' + CFLAGS_STR
SharedLibrary("paddle_fluid_avx_mklml", PreBuilt(True))
......
......@@ -42,6 +42,8 @@ int32_t DenseInputAccessor::create(::paddle::framework::Scope* scope) {
GetMutable<paddle::framework::LoDTensor>();
auto* data = tensor->data<float>();
regions.emplace_back(data, variable.dim);
if (FLAGS_feed_trainer_debug_dense_name == variable.name)
VLOG(2) << "[Debug][CreateDense]" << ScopeHelper::to_string(scope, variable.name);
}
auto* ps_client = _trainer_context->pslib->ps_client();
auto push_status = ps_client->push_dense_param(regions.data(), regions.size(), _table_id);
......@@ -50,13 +52,7 @@ int32_t DenseInputAccessor::create(::paddle::framework::Scope* scope) {
// rpc拉取数据,需保证单线程运行
int32_t DenseInputAccessor::pull_dense(size_t table_id) {
float* data_buffer = NULL;
if (_data_buffer == nullptr) {
_data_buffer = new float[_total_dim];
}
// TODO 使用双buffer DataBuffer,避免训练期改写,当前异步SGD下,问题不大
data_buffer = _data_buffer;
float* data_buffer = new float[_total_dim];
size_t data_buffer_idx = 0;
std::vector<paddle::ps::Region> regions;
for (auto& variable : _x_variables) {
......@@ -65,7 +61,10 @@ int32_t DenseInputAccessor::pull_dense(size_t table_id) {
}
auto* ps_client = _trainer_context->pslib->ps_client();
auto push_status = ps_client->pull_dense(regions.data(), regions.size(), table_id);
return push_status.get();
int32_t ret = push_status.get();
// TODO 使用双buffer DataBuffer,避免训练期改写,当前异步SGD下,问题不大
_data_buffer = data_buffer;
return ret;
}
int32_t DenseInputAccessor::forward(SampleInstance* samples, size_t num,
......@@ -101,23 +100,11 @@ int32_t DenseInputAccessor::forward(SampleInstance* samples, size_t num,
data_buffer_idx += variable.dim;
}
if (!FLAGS_feed_trainer_debug_dense_name.empty()) {
data_buffer_idx = 0;
std::stringstream ssm;
for (auto& variable : _x_variables) {
if (variable.name != FLAGS_feed_trainer_debug_dense_name) {
data_buffer_idx += variable.dim;
continue;
}
ssm.str("");
auto& tensor = ScopeHelper::var<paddle::framework::LoDTensor>(scope, variable.name);
const auto* var_data = tensor.data<float>();
for (size_t data_idx = 0; data_idx < variable.dim; ++data_idx) {
if (data_idx > 0)
ssm << ",";
ssm << _data_buffer[data_buffer_idx + data_idx];
}
data_buffer_idx += variable.dim;
VLOG(2) << "[DEBUG]pull_dense: " << ssm.str();
VLOG(2) << "[Debug][PullDense]" << ScopeHelper::to_string(scope, variable.name);
}
}
if (_need_async_pull) {
......@@ -143,21 +130,11 @@ int32_t DenseInputAccessor::backward(SampleInstance* samples, size_t num,
auto push_status = ps_client->push_dense(regions.data(), regions.size(), _table_id);
//push_status.get();
if (!FLAGS_feed_trainer_debug_dense_name.empty()) {
std::stringstream ssm;
for (auto& variable : _x_variables) {
ssm.str("");
if (variable.name != FLAGS_feed_trainer_debug_dense_name) {
continue;
}
auto& tensor = scope->Var(variable.gradient_name)->
Get<paddle::framework::LoDTensor>();
const auto* var_data = tensor.data<float>();
for (size_t data_idx = 0; data_idx < variable.dim; ++data_idx) {
if (data_idx > 0)
ssm << ",";
ssm << var_data[data_idx];
}
VLOG(2) << "[DEBUG]push_dense: " << ssm.str();
VLOG(2) << "[Debug][PushDense]" << ScopeHelper::to_string(scope, variable.gradient_name);
}
}
return 0;
......
......@@ -155,7 +155,7 @@ namespace feed {
}
switch (save_way) {
case ModelSaveWay::ModelSaveInferenceDelta:
return true;
return delta_id(epoch_id) % 6 == 0;
case ModelSaveWay::ModelSaveInferenceBase:
return is_last_epoch(epoch_id);
case ModelSaveWay::ModelSaveTrainCheckpoint:
......
......@@ -64,6 +64,11 @@ int32_t LabelInputAccessor::backward(SampleInstance* samples, size_t num,
sample_predict_data_idx += label.label_dim;
}
}
/* for debug
for (auto& label : _labels) {
VLOG(2) << "[Debug][Lable]" << ScopeHelper::to_string(scope, label.label_name) << ScopeHelper::to_string(scope, label.output_name);
}
*/
return 0;
}
......
......@@ -44,9 +44,14 @@ int PSlib::init_server() {
}
int PSlib::init_client() {
// 所有节点都启动psclient
_client_ptr.reset(paddle::ps::PSClientFactory::create(_ps_param));
_client_ptr->configure(_ps_param, *(_environment->ps_environment()),
_environment->rank_id(EnvironmentRole::ALL));
_environment->barrier(EnvironmentRole::ALL);
_environment->ps_environment()->gather_ps_clients();
_client_ptr->create_client2client_connection();
return 0;
}
......@@ -64,8 +69,8 @@ paddle::PSParameter* PSlib::get_param() {
void PSlib::init_gflag() {
int cnt = 4;
std::shared_ptr<char*> params(new char*[cnt]);
char** params_ptr = params.get();
char** params_ptr = new char*[cnt];
std::cout << "alloc_ptr" << params_ptr << std::flush;
char p0[] = "exe default";
char p1[] = "-max_body_size=314217728";
char p2[] = "-bthread_concurrency=40";
......@@ -74,7 +79,10 @@ void PSlib::init_gflag() {
params_ptr[1] = p1;
params_ptr[2] = p2;
params_ptr[3] = p3;
::google::ParseCommandLineFlags(&cnt, &params_ptr, true);
// ParseCommandLineFlags would change param_ptr, so copy it
char** params_ptrp = params_ptr;
::google::ParseCommandLineFlags(&cnt, &params_ptrp, true);
delete[] params_ptr;
}
} // namespace feed
......
......@@ -14,6 +14,7 @@
namespace paddle {
namespace custom_trainer {
namespace feed {
class RuntimeEnvironment;
class PSlib {
......
......@@ -100,6 +100,7 @@ protected:
virtual void print_log(EnvironmentRole role, EnvironmentLogType type,
EnvironmentLogLevel level, const std::string& log_str) = 0;
std::string _debug_verion;
std::string _job_id = "default_job_id";
std::string _job_name = "default_job_name";
};
......
......@@ -48,6 +48,31 @@ public:
return tensor->mutable_data<T>(place);
}
static std::string to_string(paddle::framework::Scope* scope, const std::string& name) {
CHECK(scope->FindVar(name) != nullptr) << "Var named:" << name << " is not exists in scope";
auto& tensor = scope->Var(name)->Get<paddle::framework::LoDTensor>();
auto& ddim = tensor.dims();
thread_local std::stringstream ssm;
ssm.str("");
ssm << "[" << name << "][";
for (auto i = 0; i < ddim.size(); ++i) {
if (i > 0) ssm << "X";
ssm << ddim.at(i);
}
ssm << "][";
auto last_dim = ddim.at(ddim.size() - 1);
auto sample_rate = last_dim > 100 ? last_dim / 100 : 1; // 保证最后一层 最多只打100个
auto* data = tensor.data<float>();
for (auto i = 0; i < tensor.numel(); i += last_dim) {
auto* dim_data = data + i;
for (auto j = 0; j < last_dim; j += sample_rate, dim_data += sample_rate) {
ssm << *dim_data << " ";
}
}
ssm << "]";
return ssm.str();
}
};
} // namespace feed
......
......@@ -6,6 +6,7 @@
#include <string>
#include <vector>
#include <memory>
#include <time.h>
#include <yaml-cpp/yaml.h>
#include "paddle/fluid/framework/channel.h"
#include "paddle/fluid/train/custom_trainer/feed/common/pipeline.h"
......
......@@ -11,6 +11,7 @@
#include "paddle/fluid/train/custom_trainer/feed/accessor/epoch_accessor.h"
#include "paddle/fluid/train/custom_trainer/feed/io/file_system.h"
#include "paddle/fluid/train/custom_trainer/feed/dataset/dataset_container.h"
#include "paddle/fluid/train/custom_trainer/feed/shuffler/shuffler.h"
namespace paddle {
namespace custom_trainer {
......@@ -30,6 +31,9 @@ int DatasetContainer::initialize(
_data_root_paths = config["root_path"].as<std::vector<std::string>>();
_data_split_interval = config["data_spit_interval"].as<int>();
_data_path_formater = config["data_path_formater"].as<std::string>();
std::string shuffler = config["shuffler"]["name"].as<std::string>();
_shuffler.reset(CREATE_INSTANCE(Shuffler, shuffler));
_shuffler->initialize(config, context);
std::string data_reader_class = config["data_reader"].as<std::string>();
DataReader* data_reader = CREATE_INSTANCE(DataReader, data_reader_class);
_data_reader.reset(data_reader);
......@@ -182,7 +186,9 @@ void DatasetContainer::async_download_data(uint64_t start_timestamp) {
}
VLOG(2) << "End download data num:" << dataset_info->data_channel->Size()
<< ", data_timestap:" << start_timestamp
<< ", for epoch:" << epoch_accessor->text(start_timestamp);
<< ", for epoch:" << epoch_accessor->text(start_timestamp) << ", Start shuffle";
_shuffler->shuffle(dataset_info->data_channel);
VLOG(2) << "Shuffle done";
dataset_info->status = DatasetStatus::Ready;
start_timestamp += epoch_accessor->epoch_time_interval();
}
......
......@@ -16,6 +16,8 @@ namespace paddle {
namespace custom_trainer {
namespace feed {
class Shuffler;
inline int data_num_for_train(uint64_t train_begin_timestamp, uint32_t train_time_interval, uint32_t data_time_interval) {
uint64_t data_begin_time = train_begin_timestamp;
uint64_t data_end_time = data_begin_time + train_time_interval;
......@@ -76,6 +78,7 @@ protected:
std::vector<std::string> _data_root_paths; //支持同时读取多个目录
TrainerContext* _trainer_context;
std::shared_ptr<Shuffler> _shuffler;
std::shared_ptr<DataReader> _data_reader;
std::shared_ptr<std::thread> _downloader_thread;
std::vector<std::shared_ptr<DatasetInfo>> _dataset_list;//预取的数据列表
......
......@@ -24,6 +24,9 @@ int MultiThreadExecutor::initialize(YAML::Node exe_config,
_thread_executors.resize(_train_thread_num);
auto e_class = exe_config["class"].as<std::string>();
_train_exe_name = exe_config["name"].as<std::string>();
if (exe_config["debug_layer_list"]) {
_debug_layer_list = exe_config["debug_layer_list"].as<std::vector<std::string>>();
}
omp_set_num_threads(_train_thread_num);
#pragma omp parallel for
......@@ -163,9 +166,16 @@ paddle::framework::Channel<DataItem> MultiThreadExecutor::run(
}
timer.Pause();
scope_ctx->push_gradient_cost_ms = timer.ElapsedMS();
// Monitor && Debug
for (auto& monitor : _monitors) {
monitor->add_data(epoch_id, this, scope_ctx);
}
if (_debug_layer_list.size() > 0) {
for (auto& layer_name : _debug_layer_list) {
VLOG(2) << "[Debug][Layer]" << ScopeHelper::to_string(scope, layer_name);
}
}
delete scope_ctx; // 所有pipe完成后,再回收sample
}
return 0;
......@@ -175,6 +185,7 @@ paddle::framework::Channel<DataItem> MultiThreadExecutor::run(
std::vector<int> gradient_status;
while (gradient_pipe->read(gradient_status) > 0) {
}
// 输出相关监控&统计项
for (auto& monitor : _monitors) {
if (monitor->need_compute_result(epoch_id)) {
......
......@@ -74,6 +74,7 @@ protected:
YAML::Node _model_config;
std::string _train_exe_name;
TrainerContext* _trainer_context = nullptr;
std::vector<std::string> _debug_layer_list;
std::vector<std::shared_ptr<Monitor>> _monitors;
std::vector<std::shared_ptr<Executor>> _thread_executors;
std::vector<std::shared_ptr<DataInputAccessor>> _input_accessors;
......
......@@ -69,6 +69,7 @@ int LearnerProcess::load_model(uint64_t epoch_id) {
if (!environment->is_master_node(EnvironmentRole::WORKER)) {
return 0;
}
auto* fs = _context_ptr->file_system.get();
std::set<uint32_t> loaded_table_set;
auto model_dir = _context_ptr->epoch_accessor->checkpoint_path();
for (auto& executor : _executors) {
......@@ -77,9 +78,9 @@ int LearnerProcess::load_model(uint64_t epoch_id) {
if (loaded_table_set.count(itr.first)) {
continue;
}
auto table_model_path = _context_ptr->file_system->path_join(
auto table_model_path = fs->path_join(
model_dir, string::format_string("%03d", itr.first));
if (_context_ptr->file_system->list(table_model_path).size() == 0) {
if ((!fs->exists(table_model_path)) || fs->list(table_model_path).size() == 0) {
VLOG(2) << "miss table_model:" << table_model_path << ", initialize by default";
auto scope = std::move(executor->fetch_scope());
CHECK(itr.second[0]->create(scope.get()) == 0);
......@@ -152,7 +153,7 @@ int LearnerProcess::run() {
}
input_channel = executor->run(input_channel, dataset->data_parser(data_name));
timer.Pause();
VLOG(2) << "End executor:" << executor->train_exe_name() << ", cost" << timer.ElapsedSec();
VLOG(2) << "End executor:" << executor->train_exe_name() << ", cost:" << timer.ElapsedSec();
// 等待异步梯度完成
_context_ptr->ps_client()->flush();
......@@ -183,7 +184,7 @@ int LearnerProcess::run() {
CHECK(itr.second[0]->shrink() == 0);
}
}
VLOG(2) << "End shrink table, cost" << timer.ElapsedSec();
VLOG(2) << "End shrink table, cost:" << timer.ElapsedSec();
}
environment->barrier(EnvironmentRole::WORKER);
......
......@@ -92,6 +92,7 @@ class ModelBuilder:
def build_and_save(self):
"""Build programs and save to _save_path
"""
scope1 = fluid.Scope()
main_program = fluid.Program()
startup_program = fluid.Program()
with fluid.program_guard(main_program, startup_program):
......@@ -120,6 +121,8 @@ class ModelBuilder:
for name, program in programs.items():
with open(os.path.join(self._save_path, name), 'w') as f:
f.write(program.desc.serialize_to_string())
with open(os.path.join(self._save_path, name + '.pbtxt'), 'w') as fout:
fout.write(str(program))
params = filter(fluid.io.is_parameter, main_program.list_vars())
vars = []
......
......@@ -31,8 +31,8 @@ def inference():
param_attr={"batch_size":1e4, "batch_sum_default":0.0, "batch_square":1e4})
lr_x = 1.0
init_range = 0.2
fc_layers_size = [511, 255, 255, 127, 127, 127, 127]
fc_layers_act = ["relu"] * len(fc_layers_size)
fc_layers_size = [511, 255, 255, 127, 127, 127, 127, 1]
fc_layers_act = ["relu"] * (len(fc_layers_size) - 1) + [None]
scales_tmp = [net.shape[1]] + fc_layers_size
scales = []
for i in range(len(scales_tmp)):
......@@ -41,7 +41,7 @@ def inference():
net = fluid.layers.fc(
input = net,
size = fc_layers_size[i],
name = 'fc_' + str(i + 1),
name = 'fc_' + str(i),
act = fc_layers_act[i],
param_attr = \
fluid.ParamAttr(learning_rate=lr_x, \
......@@ -49,7 +49,7 @@ def inference():
bias_attr = \
fluid.ParamAttr(learning_rate=lr_x, \
initializer=fluid.initializer.NormalInitializer(loc=0.0, scale=1.0 * scales[i])))
ctr_output = fluid.layers.fc(net, 1, act='sigmoid', name='ctr')
ctr_output = fluid.layers.sigmoid(fluid.layers.clip(net, min=-15.0, max=15.0), name="ctr")
accessors = [
{ "class": "AbacusSparseJoinAccessor", "input": "sparses", "table_id": 0, "need_gradient": False},
......@@ -83,7 +83,7 @@ def loss_function(ctr_output):
# TODO: calc loss here
label = fluid.layers.data(name='label_ctr', shape=ctr_output.shape, dtype='float32')
loss = fluid.layers.square_error_cost(input=ctr_output, label=label)
loss = fluid.layers.log_loss(input=ctr_output, label=label)
loss = fluid.layers.mean(loss, name='loss_ctr')
return loss, [label]
......@@ -42,20 +42,24 @@ input_accessor:
- async_pull: true
class: DenseInputAccessor
input:
- name: fc_1.w_0
- name: fc_0.w_0
shape: [4488, 511]
- name: fc_1.b_0
- name: fc_0.b_0
shape: [511]
- name: fc_2.w_0
- name: fc_1.w_0
shape: [511, 255]
- name: fc_1.b_0
shape: [255]
- name: fc_2.w_0
shape: [255, 255]
- name: fc_2.b_0
shape: [255]
- name: fc_3.w_0
shape: [255, 255]
shape: [255, 127]
- name: fc_3.b_0
shape: [255]
shape: [127]
- name: fc_4.w_0
shape: [255, 127]
shape: [127, 127]
- name: fc_4.b_0
shape: [127]
- name: fc_5.w_0
......@@ -67,12 +71,8 @@ input_accessor:
- name: fc_6.b_0
shape: [127]
- name: fc_7.w_0
shape: [127, 127]
- name: fc_7.b_0
shape: [127]
- name: ctr.w_0
shape: [127, 1]
- name: ctr.b_0
- name: fc_7.b_0
shape: [1]
need_gradient: true
table_id: 1
......@@ -90,7 +90,7 @@ input_accessor:
- class: LabelInputAccessor
input:
- label_name: label_ctr
output_name: ctr.tmp_2
output_name: ctr.tmp_0
shape: [-1, 1]
inputs:
- name: cvm_input
......@@ -100,8 +100,8 @@ labels:
shape: [-1, 1]
loss: loss_ctr
monitor:
- {class: AucMonitor, compute_interval: 600, name: epoch_auc, target: ctr.tmp_2, target_idx: 0}
- {class: AucMonitor, compute_interval: 86400, name: day_auc, target: ctr.tmp_2, target_idx: 0}
- {class: AucMonitor, compute_interval: 600, name: epoch_auc, target: ctr.tmp_0, target_idx: 0}
- {class: AucMonitor, compute_interval: 86400, name: day_auc, target: ctr.tmp_0, target_idx: 0}
outputs:
- name: ctr.tmp_2
- name: ctr.tmp_0
shape: [-1, 1]
......@@ -42,16 +42,20 @@ input_accessor:
- async_pull: true
class: DenseInputAccessor
input:
- name: fc_1.w_0
- name: fc_0.w_0
shape: [3672, 511]
- name: fc_1.b_0
- name: fc_0.b_0
shape: [511]
- name: fc_2.w_0
- name: fc_1.w_0
shape: [511, 255]
- name: fc_2.b_0
- name: fc_1.b_0
shape: [255]
- name: fc_3.w_0
- name: fc_2.w_0
shape: [255, 127]
- name: fc_2.b_0
shape: [127]
- name: fc_3.w_0
shape: [127, 127]
- name: fc_3.b_0
shape: [127]
- name: fc_4.w_0
......@@ -59,19 +63,15 @@ input_accessor:
- name: fc_4.b_0
shape: [127]
- name: fc_5.w_0
shape: [127, 127]
- name: fc_5.b_0
shape: [127]
- name: ctr.w_0
shape: [127, 1]
- name: ctr.b_0
- name: fc_5.b_0
shape: [1]
need_gradient: true
table_id: 3
- class: LabelInputAccessor
input:
- label_name: label_ctr
output_name: ctr.tmp_2
output_name: ctr.tmp_0
shape: [-1, 1]
inputs:
- name: cvm_input
......@@ -81,8 +81,8 @@ labels:
shape: [-1, 1]
loss: loss_ctr
monitor:
- {class: AucMonitor, compute_interval: 600, name: epoch_auc, target: ctr.tmp_2, target_idx: 0}
- {class: AucMonitor, compute_interval: 86400, name: day_auc, target: ctr.tmp_2, target_idx: 0}
- {class: AucMonitor, compute_interval: 600, name: epoch_auc, target: ctr.tmp_0, target_idx: 0}
- {class: AucMonitor, compute_interval: 86400, name: day_auc, target: ctr.tmp_0, target_idx: 0}
outputs:
- name: ctr.tmp_2
- name: ctr.tmp_0
shape: [-1, 1]
......@@ -27,8 +27,8 @@ def inference():
net = cvm_input
lr_x = 1.0
init_range = 0.2
fc_layers_size = [511, 255, 127, 127, 127]
fc_layers_act = ["relu"] * len(fc_layers_size)
fc_layers_size = [511, 255, 127, 127, 127, 1]
fc_layers_act = ["relu"] * (len(fc_layers_size) - 1) + [None]
scales_tmp = [net.shape[1]] + fc_layers_size
scales = []
for i in range(len(scales_tmp)):
......@@ -37,7 +37,7 @@ def inference():
net = fluid.layers.fc(
input = net,
size = fc_layers_size[i],
name = 'fc_' + str(i + 1),
name = 'fc_' + str(i),
act = fc_layers_act[i],
param_attr = \
fluid.ParamAttr(learning_rate=lr_x, \
......@@ -46,7 +46,7 @@ def inference():
fluid.ParamAttr(learning_rate=lr_x, \
initializer=fluid.initializer.NormalInitializer(loc=0.0, scale=1.0 * scales[i])))
ctr_output = fluid.layers.fc(net, 1, act='sigmoid', name='ctr')
ctr_output = fluid.layers.sigmoid(fluid.layers.clip(net, min=-15.0, max=15.0), name="ctr")
accessors = [
{ "class": "AbacusSparseUpdateAccessor", "input": "sparses", "table_id": 0, "need_gradient": True},
......@@ -79,7 +79,7 @@ def loss_function(ctr_output):
# TODO: calc loss here
label = fluid.layers.data(name='label_ctr', shape=ctr_output.shape, dtype='float32')
loss = fluid.layers.square_error_cost(input=ctr_output, label=label)
loss = fluid.layers.log_loss(input=ctr_output, label=label)
loss = fluid.layers.mean(loss, name='loss_ctr')
return loss, [label]
#pragma once
#include "paddle/fluid/framework/archive.h"
#include "paddle/fluid/train/custom_trainer/feed/trainer_context.h"
#include "paddle/fluid/train/custom_trainer/feed/shuffler/shuffler.h"
namespace paddle {
namespace custom_trainer {
namespace feed {
int Shuffler::initialize(YAML::Node config,
std::shared_ptr<TrainerContext> context_ptr) {
_trainer_context = context_ptr.get();
_shuffle_key_func = shuffle_key_factory(config["shuffle_key_func"].as<std::string>("RANDOM"));
return 0;
}
class LocalShuffler : public Shuffler {
public:
LocalShuffler() {}
virtual ~LocalShuffler() {}
virtual int shuffle(::paddle::framework::Channel<DataItem>& data_channel) {
std::vector<DataItem> data_items(data_channel->Size());
data_channel->ReadAll(data_items);
std::shuffle(data_items.begin(), data_items.end(), local_random_engine());
data_channel->Open();
data_channel->Clear();
data_channel->WriteMove(data_items.size(), &data_items[0]);
data_channel->Close();
return 0;
}
};
REGIST_CLASS(DataParser, LocalShuffler);
class GlobalShuffler : public Shuffler {
public:
GlobalShuffler() {}
virtual ~GlobalShuffler() {}
virtual int initialize(YAML::Node config,
std::shared_ptr<TrainerContext> context_ptr) {
Shuffler::initialize(config, context_ptr);
_max_concurrent_num = config["max_concurrent_num"].as<int>(4); // 最大并发发送数
_max_package_size = config["max_package_size"].as<int>(1024); // 最大包个数,一次发送package个数据
return 0;
}
virtual int shuffle(::paddle::framework::Channel<DataItem>& data_channel) {
uint32_t send_count = 0;
uint32_t package_size = _max_package_size;
uint32_t concurrent_num = _max_concurrent_num;
uint32_t current_wait_idx = 0;
auto* environment = _trainer_context->environment.get();
auto worker_num = environment->node_num(EnvironmentRole::WORKER);
std::vector<std::vector<std::future<int>>> waits(concurrent_num);
std::vector<DataItem> send_buffer(concurrent_num * package_size);
std::vector<paddle::framework::BinaryArchive> request_data_buffer(worker_num);
while (true) {
auto read_size = data_channel->Read(concurrent_num * package_size, &send_buffer[0]);
if (read_size == 0) {
break;
}
for (size_t idx = 0; idx < read_size; idx += package_size) {
// data shard && seriliaze
for (size_t i = 0; i < worker_num; ++i) {
request_data_buffer[i].Clear();
}
for (size_t i = idx; i < package_size && i < read_size; ++i) {
auto worker_idx = _shuffle_key_func(send_buffer[i].id) % worker_num;
// TODO Serialize To Arcive
//request_data_buffer[worker_idx] << send_buffer[i];
}
std::string data_vec[worker_num];
for (size_t i = 0; i < worker_num; ++i) {
auto& buffer = request_data_buffer[i];
data_vec[i].assign(buffer.Buffer(), buffer.Length());
}
// wait async done
for (auto& wait_s : waits[current_wait_idx]) {
if (!wait_s.valid()) {
break;
}
CHECK(wait_s.get() == 0);
}
// send shuffle data
for (size_t i = 0; i < worker_num; ++i) {
waits[current_wait_idx][i] = _trainer_context->pslib->ps_client()->send_client2client_msg(3, i * 2, data_vec[i]);
}
// update status
// 如果在训练期,则限速shuffle
// 如果在wait状态,全速shuffle
if (_trainer_context->is_status(TrainerStatus::Training)) {
concurrent_num = 1;
package_size = _max_concurrent_num / 2;
} else {
package_size = _max_package_size;
concurrent_num = _max_concurrent_num;
}
++current_wait_idx;
current_wait_idx = current_wait_idx >= concurrent_num ? 0 : current_wait_idx;
}
}
return 0;
}
private:
uint32_t _max_package_size = 0;
uint32_t _max_concurrent_num = 0;
};
REGIST_CLASS(DataParser, GlobalShuffler);
} // namespace feed
} // namespace custom_trainer
} // namespace paddle
#pragma once
#include "paddle/fluid/train/custom_trainer/feed/dataset/data_reader.h"
namespace paddle {
namespace custom_trainer {
namespace feed {
class TrainerContext;
inline double current_realtime() {
struct timespec tp;
clock_gettime(CLOCK_REALTIME, &tp);
return tp.tv_sec + tp.tv_nsec * 1e-9;
}
inline std::default_random_engine& local_random_engine() {
struct engine_wrapper_t {
std::default_random_engine engine;
engine_wrapper_t() {
static std::atomic<unsigned long> x(0);
std::seed_seq sseq = {x++, x++, x++, (unsigned long)(current_realtime() * 1000)};
engine.seed(sseq);
}
};
thread_local engine_wrapper_t r;
return r.engine;
}
inline uint64_t shuffle_key_random(const std::string& /*key*/) {
return local_random_engine()();
}
inline uint64_t shuffle_key_hash(const std::string& key) {
static std::hash<std::string> hasher;
return hasher(key);
}
inline uint64_t shuffle_key_numeric(const std::string& key) {
return strtoull(key.c_str(), NULL, 10);
}
typedef uint64_t (*ShuffleKeyFunc)(const std::string& key);
inline ShuffleKeyFunc shuffle_key_factory(const std::string& name) {
if (name == "NUMERIC") {
return &shuffle_key_numeric;
} else if (name == "HASH") {
return &shuffle_key_hash;
}
return &shuffle_key_random;
}
class Shuffler {
public:
Shuffler() {}
virtual ~Shuffler() {}
virtual int initialize(YAML::Node config,
std::shared_ptr<TrainerContext> context_ptr);
virtual int shuffle(::paddle::framework::Channel<DataItem>& data_channel) = 0;
protected:
ShuffleKeyFunc _shuffle_key_func;
TrainerContext* _trainer_context;
};
REGIST_REGISTERER(Shuffler);
} // namespace feed
} // namespace custom_trainer
} // namespace paddle
......@@ -13,6 +13,7 @@ namespace paddle {
namespace custom_trainer {
namespace feed {
class PSlib;
class Process;
class Dataset;
class FileSystem;
......@@ -28,6 +29,11 @@ enum class ModelSaveWay {
ModelSaveInferenceBase = 2
};
enum class TrainerStatus {
Training = 0, // 训练状态
Saving = 1 // 模型存储状态
};
class SignCacheDict {
public:
int32_t sign2index(uint64_t sign) {
......@@ -44,7 +50,17 @@ public:
inline paddle::ps::PSClient* ps_client() {
return pslib->ps_client();
}
inline bool is_status(TrainerStatus status) {
auto bit_idx = static_cast<uint32_t>(status);
return ((trainer_status >> bit_idx) & 1) > 0;
}
// 非线程安全, 其实TrainerContext所有成员的线程安全性 取决于 成员本身的线程安全性
inline void set_status(TrainerStatus status, bool on) {
auto bit_idx = static_cast<uint32_t>(status);
trainer_status = trainer_status & (1L << bit_idx);
}
uint32_t trainer_status; // trainer当前,由于可同时处于多种状态,这里分bit存储状态
YAML::Node trainer_config;
paddle::platform::CPUPlace cpu_place;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册