From 85c6937bb7fbfced6475e46739510b2f737f34be Mon Sep 17 00:00:00 2001 From: zhaocaibei123 <48509226+zhaocaibei123@users.noreply.github.com> Date: Thu, 21 Jul 2022 14:51:23 +0800 Subject: [PATCH] add slot attr for push sparse op (#44422) * add slot attr for push sparse op * add pybind * remove fleet * add unittest * fix --- paddle/fluid/distributed/ps/wrapper/fleet.cc | 23 +++++------ paddle/fluid/distributed/ps/wrapper/fleet.h | 2 + paddle/fluid/framework/data_feed.cc | 14 +++---- paddle/fluid/framework/data_set.cc | 2 +- paddle/fluid/framework/io/fs.cc | 40 +++++++++++++++---- paddle/fluid/framework/io/fs.h | 10 ++++- paddle/fluid/framework/io/test_fs.cc | 13 ++++++ paddle/fluid/operators/lookup_table_op.cc | 1 + paddle/fluid/operators/lookup_table_v2_op.cc | 1 + .../pscore/distributed_push_sparse_op.cc | 5 +++ .../pscore/distributed_push_sparse_op.h | 3 ++ .../distributed/passes/ps_trainer_pass.py | 12 ++++-- python/paddle/fluid/contrib/layers/nn.py | 10 +++-- 13 files changed, 100 insertions(+), 36 deletions(-) diff --git a/paddle/fluid/distributed/ps/wrapper/fleet.cc b/paddle/fluid/distributed/ps/wrapper/fleet.cc index 3d7190cf553..375ea05fc32 100644 --- a/paddle/fluid/distributed/ps/wrapper/fleet.cc +++ b/paddle/fluid/distributed/ps/wrapper/fleet.cc @@ -529,10 +529,12 @@ void FleetWrapper::PushSparseFromTensorAsync( uint64_t padding_id, platform::Place place, std::vector* inputs, + std::vector& slots, const LoDTensor* shows, const LoDTensor* clks, std::vector* outputs, bool use_cvm_op) { + CHECK(slots.size() == inputs->size()); int batch_size = -1; bool batch_size_consist = true; for (auto* input : *inputs) { @@ -568,8 +570,8 @@ void FleetWrapper::PushSparseFromTensorAsync( // TODO(zhaocaibei123): check type of show/clk is int? float? uint64? // const long int* show_tensor = shows->data(); // const long int* clk_tensor = clks->data(); - const int64_t* show_tensor = shows->data(); - const int64_t* clk_tensor = clks->data(); + const float* show_tensor = shows->data(); + const float* clk_tensor = clks->data(); for (size_t index = 0; index < inputs->size(); ++index) { framework::LoDTensor* g_tensor = outputs->at(index); @@ -603,15 +605,14 @@ void FleetWrapper::PushSparseFromTensorAsync( push_keys.emplace_back(real_id); if (use_cvm_op) { push_values.emplace_back(fea_dim + 1); - push_values.back()[0] = 2; // TODO(zhaocaibei123): slot + push_values.back()[0] = static_cast(slots[index]); float* data = push_values.back().data() + 1; memcpy(data, g + output_len, sizeof(float) * fea_dim); } else { push_values.emplace_back(fea_dim + 3); // slot show clk grad... consistent with CtrCommonPushValue defined - // in - // ctr_accessor.h - push_values.back()[0] = 2; // TODO(zhaocaibei123): slot + // in ctr_accessor.h + push_values.back()[0] = static_cast(slots[index]); push_values.back()[1] = (i >= show_size ? 1 : static_cast(show_tensor[i])); push_values.back()[2] = @@ -631,18 +632,16 @@ void FleetWrapper::PushSparseFromTensorAsync( push_keys.emplace_back(real_id); if (use_cvm_op) { push_values.emplace_back(fea_dim + 1); - push_values.back()[0] = 2; // TODO(zhaocaibei123): slot + push_values.back()[0] = static_cast(slots[index]); float* data = push_values.back().data() + 1; memcpy(data, g + output_len, sizeof(float) * fea_dim); } else { push_values.emplace_back(fea_dim + 3); // slot show clk grad... consistent with CtrCommonPushValue defined in // ctr_accessor.h - push_values.back()[0] = 2; // TODO(zhaocaibei123): slot - push_values.back()[1] = - (i >= show_size ? 1 : static_cast(show_tensor[i])); - push_values.back()[2] = - (i >= clk_size ? 0 : static_cast(clk_tensor[i])); + push_values.back()[0] = static_cast(slots[index]); + push_values.back()[1] = (i >= show_size ? 1 : show_tensor[i]); + push_values.back()[2] = (i >= clk_size ? 0 : clk_tensor[i]); float* data = push_values.back().data() + 3; memcpy(data, g + output_len, sizeof(float) * fea_dim); } diff --git a/paddle/fluid/distributed/ps/wrapper/fleet.h b/paddle/fluid/distributed/ps/wrapper/fleet.h index 3ff6cfaf8e4..2d3a371b4e7 100644 --- a/paddle/fluid/distributed/ps/wrapper/fleet.h +++ b/paddle/fluid/distributed/ps/wrapper/fleet.h @@ -190,11 +190,13 @@ class FleetWrapper { const std::vector& input_names, std::vector* inputs, // NOLINT std::vector* outputs); // NOLINT + void PushSparseFromTensorAsync(const uint64_t table_id, int fea_dim, uint64_t padding_id, platform::Place place, std::vector* inputs, + std::vector& slots, // NOLINT const LoDTensor* shows, const LoDTensor* clicks, std::vector* outputs, diff --git a/paddle/fluid/framework/data_feed.cc b/paddle/fluid/framework/data_feed.cc index 131416280da..7d312414aed 100644 --- a/paddle/fluid/framework/data_feed.cc +++ b/paddle/fluid/framework/data_feed.cc @@ -309,7 +309,7 @@ void PrivateQueueDataFeed::ReadThread() { std::string filename; while (PickOneFile(&filename)) { int err_no = 0; - fp_ = fs_open_read(filename, &err_no, pipe_command_); + fp_ = fs_open_read(filename, &err_no, pipe_command_, true); __fsetlocking(&*fp_, FSETLOCKING_BYCALLER); T instance; while (ParseOneInstanceFromPipe(&instance)) { @@ -538,7 +538,7 @@ void InMemoryDataFeed::LoadIntoMemory() { } else { #endif int err_no = 0; - this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_); + this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_, true); #ifdef PADDLE_WITH_BOX_PS } #endif @@ -574,7 +574,7 @@ void InMemoryDataFeed::LoadIntoMemoryFromSo() { (defined PADDLE_WITH_PSLIB) VLOG(3) << "LoadIntoMemoryFromSo() begin, thread_id=" << thread_id_; int buf_len = 1024 * 1024 * 10; - char* buf = (char*)malloc(buf_len + 10); + char* buf = reinterpret_cast(malloc(buf_len + 10)); auto ps_gpu_ptr = PSGPUWrapper::GetInstance(); paddle::framework::CustomParser* parser = @@ -681,7 +681,7 @@ void MultiSlotDataFeed::ReadThread() { std::string filename; while (PickOneFile(&filename)) { int err_no = 0; - fp_ = fs_open_read(filename, &err_no, pipe_command_); + fp_ = fs_open_read(filename, &err_no, pipe_command_, true); CHECK(fp_ != nullptr); __fsetlocking(&*fp_, FSETLOCKING_BYCALLER); std::vector instance; @@ -2175,7 +2175,7 @@ void SlotRecordInMemoryDataFeed::LoadIntoMemoryByFile(void) { lines); } else { int err_no = 0; - this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_); + this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_, true); CHECK(this->fp_ != nullptr); __fsetlocking(&*(this->fp_), FSETLOCKING_BYCALLER); @@ -2265,7 +2265,7 @@ void SlotRecordInMemoryDataFeed::LoadIntoMemoryByLine(void) { do { int err_no = 0; - this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_); + this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_, true); CHECK(this->fp_ != nullptr); __fsetlocking(&*(this->fp_), FSETLOCKING_BYCALLER); lines = line_reader.read_file(this->fp_.get(), line_func, lines); @@ -2314,7 +2314,7 @@ void SlotRecordInMemoryDataFeed::LoadIntoMemoryByCommand(void) { do { int err_no = 0; - this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_); + this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_, true); CHECK(this->fp_ != nullptr); __fsetlocking(&*(this->fp_), FSETLOCKING_BYCALLER); diff --git a/paddle/fluid/framework/data_set.cc b/paddle/fluid/framework/data_set.cc index 842532f5888..ea8a2c4ff5d 100644 --- a/paddle/fluid/framework/data_set.cc +++ b/paddle/fluid/framework/data_set.cc @@ -102,7 +102,7 @@ void DatasetImpl::SetHdfsConfig(const std::string& fs_name, cmd += " -D fs.default.name=" + fs_name; cmd += " -D hadoop.job.ugi=" + fs_ugi; cmd += " -Ddfs.client.block.write.retries=15 -Ddfs.rpc.timeout=500000"; - paddle::framework::hdfs_set_command(cmd); + paddle::framework::dataset_hdfs_set_command(cmd); } template diff --git a/paddle/fluid/framework/io/fs.cc b/paddle/fluid/framework/io/fs.cc index aa181dcd83a..da87bd33f0d 100644 --- a/paddle/fluid/framework/io/fs.cc +++ b/paddle/fluid/framework/io/fs.cc @@ -230,6 +230,20 @@ const std::string& hdfs_command() { return hdfs_command_internal(); } void hdfs_set_command(const std::string& x) { hdfs_command_internal() = x; } +// dataset and model may be on different afs cluster +static std::string& dataset_hdfs_command_internal() { + static std::string x = "hadoop fs"; + return x; +} + +const std::string& dataset_hdfs_command() { + return dataset_hdfs_command_internal(); +} + +void dataset_hdfs_set_command(const std::string& x) { + dataset_hdfs_command_internal() = x; +} + static std::string& customized_download_cmd_internal() { static std::string x = ""; return x; @@ -243,17 +257,28 @@ void set_download_command(const std::string& x) { std::shared_ptr hdfs_open_read(std::string path, int* err_no, - const std::string& converter) { + const std::string& converter, + bool read_data) { if (download_cmd() != "") { // use customized download command path = string::format_string( "%s \"%s\"", download_cmd().c_str(), path.c_str()); } else { if (fs_end_with_internal(path, ".gz")) { - path = string::format_string( - "%s -text \"%s\"", hdfs_command().c_str(), path.c_str()); + if (read_data) { + path = string::format_string( + "%s -text \"%s\"", dataset_hdfs_command().c_str(), path.c_str()); + } else { + path = string::format_string( + "%s -text \"%s\"", hdfs_command().c_str(), path.c_str()); + } } else { - path = string::format_string( - "%s -cat \"%s\"", hdfs_command().c_str(), path.c_str()); + if (read_data) { + path = string::format_string( + "%s -cat \"%s\"", dataset_hdfs_command().c_str(), path.c_str()); + } else { + path = string::format_string( + "%s -cat \"%s\"", hdfs_command().c_str(), path.c_str()); + } } } @@ -370,13 +395,14 @@ int fs_select_internal(const std::string& path) { std::shared_ptr fs_open_read(const std::string& path, int* err_no, - const std::string& converter) { + const std::string& converter, + bool read_data) { switch (fs_select_internal(path)) { case 0: return localfs_open_read(path, converter); case 1: - return hdfs_open_read(path, err_no, converter); + return hdfs_open_read(path, err_no, converter, read_data); default: PADDLE_THROW(platform::errors::Unimplemented( diff --git a/paddle/fluid/framework/io/fs.h b/paddle/fluid/framework/io/fs.h index bdfafe7220c..0ebc7fea089 100644 --- a/paddle/fluid/framework/io/fs.h +++ b/paddle/fluid/framework/io/fs.h @@ -64,13 +64,18 @@ extern const std::string& hdfs_command(); extern void hdfs_set_command(const std::string& x); +extern const std::string& dataset_hdfs_command(); + +extern void dataset_hdfs_set_command(const std::string& x); + extern const std::string& download_cmd(); extern void set_download_command(const std::string& x); extern std::shared_ptr hdfs_open_read(std::string path, int* err_no, - const std::string& converter); + const std::string& converter, + bool read_data); extern std::shared_ptr hdfs_open_write(std::string path, int* err_no, @@ -91,7 +96,8 @@ extern void hdfs_mv(const std::string& src, const std::string& dest); // aut-detect fs extern std::shared_ptr fs_open_read(const std::string& path, int* err_no, - const std::string& converter); + const std::string& converter, + bool read_data = false); extern std::shared_ptr fs_open_write(const std::string& path, int* err_no, diff --git a/paddle/fluid/framework/io/test_fs.cc b/paddle/fluid/framework/io/test_fs.cc index adb6141fd56..0f9f463a996 100644 --- a/paddle/fluid/framework/io/test_fs.cc +++ b/paddle/fluid/framework/io/test_fs.cc @@ -45,5 +45,18 @@ TEST(FS, mv) { } catch (...) { VLOG(3) << "test hdfs_mv, catch expected errors of unknown prefix"; } + + try { + paddle::framework::dataset_hdfs_set_command( + "hadoop -D hadoop.job.ugi=anotherxxx fs -text"); + int err_no = 0; + paddle::framework::hdfs_open_read("afs:/none.gz", &err_no, "", true); + paddle::framework::hdfs_open_read("afs:/none.gz", &err_no, "", false); + paddle::framework::hdfs_open_read("afs:/none", &err_no, "", true); + paddle::framework::hdfs_open_read("afs:/none", &err_no, "", false); + } catch (...) { + VLOG(3) << "test hdfs_open_read, catch expected errors of unknown path"; + } + #endif } diff --git a/paddle/fluid/operators/lookup_table_op.cc b/paddle/fluid/operators/lookup_table_op.cc index 74b406aef64..02cd9a205f0 100644 --- a/paddle/fluid/operators/lookup_table_op.cc +++ b/paddle/fluid/operators/lookup_table_op.cc @@ -134,6 +134,7 @@ class LookupTableOpMaker : public framework::OpProtoAndCheckerMaker { "in the order of input variables for mapping") .SetDefault({}); AddAttr("trainer_id", "trainer id from 0 ~ worker_num.").SetDefault(0); + AddAttr("slot", "slot of id").SetDefault(0).AsExtra(); AddAttr("grad_inplace", "(boolean, default false) " "If the grad op reuse the input's variable.") diff --git a/paddle/fluid/operators/lookup_table_v2_op.cc b/paddle/fluid/operators/lookup_table_v2_op.cc index 37da33b0a3d..7baf76a1e10 100644 --- a/paddle/fluid/operators/lookup_table_v2_op.cc +++ b/paddle/fluid/operators/lookup_table_v2_op.cc @@ -105,6 +105,7 @@ class LookupTableV2OpMaker : public framework::OpProtoAndCheckerMaker { AddAttr("trainer_id", "trainer id from 0 ~ worker_num.") .SetDefault(0) .AsExtra(); + AddAttr("slot", "slot of id").SetDefault(0).AsExtra(); AddAttr>("height_sections", "Height for each output SelectedRows.") .SetDefault(std::vector({})) diff --git a/paddle/fluid/operators/pscore/distributed_push_sparse_op.cc b/paddle/fluid/operators/pscore/distributed_push_sparse_op.cc index a2bf63da10b..840e3393989 100644 --- a/paddle/fluid/operators/pscore/distributed_push_sparse_op.cc +++ b/paddle/fluid/operators/pscore/distributed_push_sparse_op.cc @@ -113,6 +113,11 @@ class DistributedPushSparseOpMaker : public framework::OpProtoAndCheckerMaker { AddAttr("use_cvm_op", "(boolean, default false) Use cvm op or not.") .SetDefault(false); + AddAttr>("slots", + "[slot_id1, slot_id2] Slots array of Ids.") + .SetDefault({}) + .AsExtra(); + AddComment(R"DOC( Lookup Tablel Prefetch Operator. This operator is used to perform lookup on parameter W, diff --git a/paddle/fluid/operators/pscore/distributed_push_sparse_op.h b/paddle/fluid/operators/pscore/distributed_push_sparse_op.h index d7bb3e27728..c85c31ba3c0 100644 --- a/paddle/fluid/operators/pscore/distributed_push_sparse_op.h +++ b/paddle/fluid/operators/pscore/distributed_push_sparse_op.h @@ -33,6 +33,7 @@ class DistributedPushSparseKernel : public framework::OpKernel { auto table_id = context.Attr("table_id"); auto emb_dim = context.Attr("size"); auto use_cvm_op = context.Attr("use_cvm_op"); + auto slots = context.Attr>("slots"); auto inputs = context.MultiInput("Ids"); auto shows = context.Input("Shows"); @@ -47,6 +48,7 @@ class DistributedPushSparseKernel : public framework::OpKernel { static_cast(padding_idx), context.GetPlace(), &inputs, + slots, shows, clks, &outputs, @@ -103,6 +105,7 @@ class DistributedPushSparseKernel : public framework::OpKernel { static_cast(padding_idx), context.GetPlace(), &tmp_input_vec, + slots, tmp_shows_tensor, tmp_clicks_tensor, &tmp_output_vec); diff --git a/python/paddle/distributed/passes/ps_trainer_pass.py b/python/paddle/distributed/passes/ps_trainer_pass.py index 9c37fc025b4..422e7cc8864 100755 --- a/python/paddle/distributed/passes/ps_trainer_pass.py +++ b/python/paddle/distributed/passes/ps_trainer_pass.py @@ -150,7 +150,7 @@ class DistributedOpsPass(PassBase): print('ShowClickEntry not configured, will not use') show = _program.global_block().create_var( name="show", - dtype=core.VarDesc.VarType.INT64, + dtype=core.VarDesc.VarType.FP32, persistable=False, stop_gradient=True) _program.global_block()._insert_op(index=0, @@ -165,7 +165,7 @@ class DistributedOpsPass(PassBase): clk = _program.global_block().create_var( name="clk", - dtype=core.VarDesc.VarType.INT64, + dtype=core.VarDesc.VarType.FP32, persistable=False, stop_gradient=True) _program.global_block()._insert_op(index=0, @@ -190,6 +190,9 @@ class DistributedOpsPass(PassBase): padding_idx = ops[0].attr("padding_idx") is_distributed = ops[0].attr("is_distributed") op_type = ops[0].type + + slots = [op.attr("slot") for op in ops] + print('debug zcb slots: ', slots) outputs = [ _program.global_block().vars[op.input("Out@GRAD")[0]] for op in ops @@ -204,7 +207,7 @@ class DistributedOpsPass(PassBase): 'W': w, "Outputs": outputs, "Shows": show, - "Clicks": clk + "Clicks": clk, }, outputs={"Outputs": outputs}, attrs={ @@ -213,7 +216,8 @@ class DistributedOpsPass(PassBase): "padding_idx": padding_idx, "table_id": table_id, "size": self.emb_size[param], - "use_cvm_op": use_cvm_op + "use_cvm_op": use_cvm_op, + "slots": slots }) def _pull_sparse_fuse(self, _program, pull_sparse_ops, attrs, send_ctx): diff --git a/python/paddle/fluid/contrib/layers/nn.py b/python/paddle/fluid/contrib/layers/nn.py index e71c73b3914..36bdffdda78 100644 --- a/python/paddle/fluid/contrib/layers/nn.py +++ b/python/paddle/fluid/contrib/layers/nn.py @@ -1073,7 +1073,8 @@ def sparse_embedding(input, entry=None, table_class="MemorySparseTable", param_attr=None, - dtype='float32'): + dtype='float32', + slot=None): r""" :api_attr: Static Graph @@ -1220,6 +1221,9 @@ def sparse_embedding(input, ) entry_str = entry._to_attr() + if slot == None: + slot = 0 + helper.append_op(type='lookup_table', inputs={ 'Ids': input, @@ -1233,9 +1237,9 @@ def sparse_embedding(input, 'remote_prefetch': True, 'is_test': is_test, 'entry': entry_str, - 'table_class': table_class + 'table_class': table_class, + 'slot': slot }) - return tmp -- GitLab