diff --git a/paddle/fluid/distributed/ps/wrapper/fleet.cc b/paddle/fluid/distributed/ps/wrapper/fleet.cc index 3d7190cf5533626e004d9a2a000c9189a4d4dbf2..375ea05fc323bfa80e2f9995cc3d0c0d6c93ae0e 100644 --- a/paddle/fluid/distributed/ps/wrapper/fleet.cc +++ b/paddle/fluid/distributed/ps/wrapper/fleet.cc @@ -529,10 +529,12 @@ void FleetWrapper::PushSparseFromTensorAsync( uint64_t padding_id, platform::Place place, std::vector* inputs, + std::vector& slots, const LoDTensor* shows, const LoDTensor* clks, std::vector* outputs, bool use_cvm_op) { + CHECK(slots.size() == inputs->size()); int batch_size = -1; bool batch_size_consist = true; for (auto* input : *inputs) { @@ -568,8 +570,8 @@ void FleetWrapper::PushSparseFromTensorAsync( // TODO(zhaocaibei123): check type of show/clk is int? float? uint64? // const long int* show_tensor = shows->data(); // const long int* clk_tensor = clks->data(); - const int64_t* show_tensor = shows->data(); - const int64_t* clk_tensor = clks->data(); + const float* show_tensor = shows->data(); + const float* clk_tensor = clks->data(); for (size_t index = 0; index < inputs->size(); ++index) { framework::LoDTensor* g_tensor = outputs->at(index); @@ -603,15 +605,14 @@ void FleetWrapper::PushSparseFromTensorAsync( push_keys.emplace_back(real_id); if (use_cvm_op) { push_values.emplace_back(fea_dim + 1); - push_values.back()[0] = 2; // TODO(zhaocaibei123): slot + push_values.back()[0] = static_cast(slots[index]); float* data = push_values.back().data() + 1; memcpy(data, g + output_len, sizeof(float) * fea_dim); } else { push_values.emplace_back(fea_dim + 3); // slot show clk grad... consistent with CtrCommonPushValue defined - // in - // ctr_accessor.h - push_values.back()[0] = 2; // TODO(zhaocaibei123): slot + // in ctr_accessor.h + push_values.back()[0] = static_cast(slots[index]); push_values.back()[1] = (i >= show_size ? 1 : static_cast(show_tensor[i])); push_values.back()[2] = @@ -631,18 +632,16 @@ void FleetWrapper::PushSparseFromTensorAsync( push_keys.emplace_back(real_id); if (use_cvm_op) { push_values.emplace_back(fea_dim + 1); - push_values.back()[0] = 2; // TODO(zhaocaibei123): slot + push_values.back()[0] = static_cast(slots[index]); float* data = push_values.back().data() + 1; memcpy(data, g + output_len, sizeof(float) * fea_dim); } else { push_values.emplace_back(fea_dim + 3); // slot show clk grad... consistent with CtrCommonPushValue defined in // ctr_accessor.h - push_values.back()[0] = 2; // TODO(zhaocaibei123): slot - push_values.back()[1] = - (i >= show_size ? 1 : static_cast(show_tensor[i])); - push_values.back()[2] = - (i >= clk_size ? 0 : static_cast(clk_tensor[i])); + push_values.back()[0] = static_cast(slots[index]); + push_values.back()[1] = (i >= show_size ? 1 : show_tensor[i]); + push_values.back()[2] = (i >= clk_size ? 0 : clk_tensor[i]); float* data = push_values.back().data() + 3; memcpy(data, g + output_len, sizeof(float) * fea_dim); } diff --git a/paddle/fluid/distributed/ps/wrapper/fleet.h b/paddle/fluid/distributed/ps/wrapper/fleet.h index 3ff6cfaf8e4d6371e870b7adda3b1cc7a2e9eec4..2d3a371b4e7080cf475f899395511e60e30d70ba 100644 --- a/paddle/fluid/distributed/ps/wrapper/fleet.h +++ b/paddle/fluid/distributed/ps/wrapper/fleet.h @@ -190,11 +190,13 @@ class FleetWrapper { const std::vector& input_names, std::vector* inputs, // NOLINT std::vector* outputs); // NOLINT + void PushSparseFromTensorAsync(const uint64_t table_id, int fea_dim, uint64_t padding_id, platform::Place place, std::vector* inputs, + std::vector& slots, // NOLINT const LoDTensor* shows, const LoDTensor* clicks, std::vector* outputs, diff --git a/paddle/fluid/framework/data_feed.cc b/paddle/fluid/framework/data_feed.cc index 131416280da111cdfce89103de516caa2a3558a8..7d312414aed66493931d1461557a5a13d1d79f23 100644 --- a/paddle/fluid/framework/data_feed.cc +++ b/paddle/fluid/framework/data_feed.cc @@ -309,7 +309,7 @@ void PrivateQueueDataFeed::ReadThread() { std::string filename; while (PickOneFile(&filename)) { int err_no = 0; - fp_ = fs_open_read(filename, &err_no, pipe_command_); + fp_ = fs_open_read(filename, &err_no, pipe_command_, true); __fsetlocking(&*fp_, FSETLOCKING_BYCALLER); T instance; while (ParseOneInstanceFromPipe(&instance)) { @@ -538,7 +538,7 @@ void InMemoryDataFeed::LoadIntoMemory() { } else { #endif int err_no = 0; - this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_); + this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_, true); #ifdef PADDLE_WITH_BOX_PS } #endif @@ -574,7 +574,7 @@ void InMemoryDataFeed::LoadIntoMemoryFromSo() { (defined PADDLE_WITH_PSLIB) VLOG(3) << "LoadIntoMemoryFromSo() begin, thread_id=" << thread_id_; int buf_len = 1024 * 1024 * 10; - char* buf = (char*)malloc(buf_len + 10); + char* buf = reinterpret_cast(malloc(buf_len + 10)); auto ps_gpu_ptr = PSGPUWrapper::GetInstance(); paddle::framework::CustomParser* parser = @@ -681,7 +681,7 @@ void MultiSlotDataFeed::ReadThread() { std::string filename; while (PickOneFile(&filename)) { int err_no = 0; - fp_ = fs_open_read(filename, &err_no, pipe_command_); + fp_ = fs_open_read(filename, &err_no, pipe_command_, true); CHECK(fp_ != nullptr); __fsetlocking(&*fp_, FSETLOCKING_BYCALLER); std::vector instance; @@ -2175,7 +2175,7 @@ void SlotRecordInMemoryDataFeed::LoadIntoMemoryByFile(void) { lines); } else { int err_no = 0; - this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_); + this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_, true); CHECK(this->fp_ != nullptr); __fsetlocking(&*(this->fp_), FSETLOCKING_BYCALLER); @@ -2265,7 +2265,7 @@ void SlotRecordInMemoryDataFeed::LoadIntoMemoryByLine(void) { do { int err_no = 0; - this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_); + this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_, true); CHECK(this->fp_ != nullptr); __fsetlocking(&*(this->fp_), FSETLOCKING_BYCALLER); lines = line_reader.read_file(this->fp_.get(), line_func, lines); @@ -2314,7 +2314,7 @@ void SlotRecordInMemoryDataFeed::LoadIntoMemoryByCommand(void) { do { int err_no = 0; - this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_); + this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_, true); CHECK(this->fp_ != nullptr); __fsetlocking(&*(this->fp_), FSETLOCKING_BYCALLER); diff --git a/paddle/fluid/framework/data_set.cc b/paddle/fluid/framework/data_set.cc index 842532f588831909d242c35273f0006c39e300fa..ea8a2c4ff5d01c2b9ff80cd3690ed62efa86d157 100644 --- a/paddle/fluid/framework/data_set.cc +++ b/paddle/fluid/framework/data_set.cc @@ -102,7 +102,7 @@ void DatasetImpl::SetHdfsConfig(const std::string& fs_name, cmd += " -D fs.default.name=" + fs_name; cmd += " -D hadoop.job.ugi=" + fs_ugi; cmd += " -Ddfs.client.block.write.retries=15 -Ddfs.rpc.timeout=500000"; - paddle::framework::hdfs_set_command(cmd); + paddle::framework::dataset_hdfs_set_command(cmd); } template diff --git a/paddle/fluid/framework/io/fs.cc b/paddle/fluid/framework/io/fs.cc index aa181dcd83a13f78c7dc8421bc895eda345c635c..da87bd33f0d4b8accebeea49935ea16bab4fe9d6 100644 --- a/paddle/fluid/framework/io/fs.cc +++ b/paddle/fluid/framework/io/fs.cc @@ -230,6 +230,20 @@ const std::string& hdfs_command() { return hdfs_command_internal(); } void hdfs_set_command(const std::string& x) { hdfs_command_internal() = x; } +// dataset and model may be on different afs cluster +static std::string& dataset_hdfs_command_internal() { + static std::string x = "hadoop fs"; + return x; +} + +const std::string& dataset_hdfs_command() { + return dataset_hdfs_command_internal(); +} + +void dataset_hdfs_set_command(const std::string& x) { + dataset_hdfs_command_internal() = x; +} + static std::string& customized_download_cmd_internal() { static std::string x = ""; return x; @@ -243,17 +257,28 @@ void set_download_command(const std::string& x) { std::shared_ptr hdfs_open_read(std::string path, int* err_no, - const std::string& converter) { + const std::string& converter, + bool read_data) { if (download_cmd() != "") { // use customized download command path = string::format_string( "%s \"%s\"", download_cmd().c_str(), path.c_str()); } else { if (fs_end_with_internal(path, ".gz")) { - path = string::format_string( - "%s -text \"%s\"", hdfs_command().c_str(), path.c_str()); + if (read_data) { + path = string::format_string( + "%s -text \"%s\"", dataset_hdfs_command().c_str(), path.c_str()); + } else { + path = string::format_string( + "%s -text \"%s\"", hdfs_command().c_str(), path.c_str()); + } } else { - path = string::format_string( - "%s -cat \"%s\"", hdfs_command().c_str(), path.c_str()); + if (read_data) { + path = string::format_string( + "%s -cat \"%s\"", dataset_hdfs_command().c_str(), path.c_str()); + } else { + path = string::format_string( + "%s -cat \"%s\"", hdfs_command().c_str(), path.c_str()); + } } } @@ -370,13 +395,14 @@ int fs_select_internal(const std::string& path) { std::shared_ptr fs_open_read(const std::string& path, int* err_no, - const std::string& converter) { + const std::string& converter, + bool read_data) { switch (fs_select_internal(path)) { case 0: return localfs_open_read(path, converter); case 1: - return hdfs_open_read(path, err_no, converter); + return hdfs_open_read(path, err_no, converter, read_data); default: PADDLE_THROW(platform::errors::Unimplemented( diff --git a/paddle/fluid/framework/io/fs.h b/paddle/fluid/framework/io/fs.h index bdfafe7220caa2439e07ee72f053357841fcadfa..0ebc7fea089fbe489b28490e54ec4714a606d500 100644 --- a/paddle/fluid/framework/io/fs.h +++ b/paddle/fluid/framework/io/fs.h @@ -64,13 +64,18 @@ extern const std::string& hdfs_command(); extern void hdfs_set_command(const std::string& x); +extern const std::string& dataset_hdfs_command(); + +extern void dataset_hdfs_set_command(const std::string& x); + extern const std::string& download_cmd(); extern void set_download_command(const std::string& x); extern std::shared_ptr hdfs_open_read(std::string path, int* err_no, - const std::string& converter); + const std::string& converter, + bool read_data); extern std::shared_ptr hdfs_open_write(std::string path, int* err_no, @@ -91,7 +96,8 @@ extern void hdfs_mv(const std::string& src, const std::string& dest); // aut-detect fs extern std::shared_ptr fs_open_read(const std::string& path, int* err_no, - const std::string& converter); + const std::string& converter, + bool read_data = false); extern std::shared_ptr fs_open_write(const std::string& path, int* err_no, diff --git a/paddle/fluid/framework/io/test_fs.cc b/paddle/fluid/framework/io/test_fs.cc index adb6141fd56a1cdea7df00a0fe7206119d3e9a19..0f9f463a996e67ac394f559ead4fc9205d5940bc 100644 --- a/paddle/fluid/framework/io/test_fs.cc +++ b/paddle/fluid/framework/io/test_fs.cc @@ -45,5 +45,18 @@ TEST(FS, mv) { } catch (...) { VLOG(3) << "test hdfs_mv, catch expected errors of unknown prefix"; } + + try { + paddle::framework::dataset_hdfs_set_command( + "hadoop -D hadoop.job.ugi=anotherxxx fs -text"); + int err_no = 0; + paddle::framework::hdfs_open_read("afs:/none.gz", &err_no, "", true); + paddle::framework::hdfs_open_read("afs:/none.gz", &err_no, "", false); + paddle::framework::hdfs_open_read("afs:/none", &err_no, "", true); + paddle::framework::hdfs_open_read("afs:/none", &err_no, "", false); + } catch (...) { + VLOG(3) << "test hdfs_open_read, catch expected errors of unknown path"; + } + #endif } diff --git a/paddle/fluid/operators/lookup_table_op.cc b/paddle/fluid/operators/lookup_table_op.cc index 74b406aef648b61a34ed2b41787de24fbd0bcb25..02cd9a205f009d6c2b71ac772b15946863515f53 100644 --- a/paddle/fluid/operators/lookup_table_op.cc +++ b/paddle/fluid/operators/lookup_table_op.cc @@ -134,6 +134,7 @@ class LookupTableOpMaker : public framework::OpProtoAndCheckerMaker { "in the order of input variables for mapping") .SetDefault({}); AddAttr("trainer_id", "trainer id from 0 ~ worker_num.").SetDefault(0); + AddAttr("slot", "slot of id").SetDefault(0).AsExtra(); AddAttr("grad_inplace", "(boolean, default false) " "If the grad op reuse the input's variable.") diff --git a/paddle/fluid/operators/lookup_table_v2_op.cc b/paddle/fluid/operators/lookup_table_v2_op.cc index 37da33b0a3d7c93d16494a9e29bc0c50b227b9a2..7baf76a1e10803ce631a6a1875674ffc91769276 100644 --- a/paddle/fluid/operators/lookup_table_v2_op.cc +++ b/paddle/fluid/operators/lookup_table_v2_op.cc @@ -105,6 +105,7 @@ class LookupTableV2OpMaker : public framework::OpProtoAndCheckerMaker { AddAttr("trainer_id", "trainer id from 0 ~ worker_num.") .SetDefault(0) .AsExtra(); + AddAttr("slot", "slot of id").SetDefault(0).AsExtra(); AddAttr>("height_sections", "Height for each output SelectedRows.") .SetDefault(std::vector({})) diff --git a/paddle/fluid/operators/pscore/distributed_push_sparse_op.cc b/paddle/fluid/operators/pscore/distributed_push_sparse_op.cc index a2bf63da10bd2ff85ab2347b6999c13e956789d1..840e33939897fd04d8298e8af08b47028529e080 100644 --- a/paddle/fluid/operators/pscore/distributed_push_sparse_op.cc +++ b/paddle/fluid/operators/pscore/distributed_push_sparse_op.cc @@ -113,6 +113,11 @@ class DistributedPushSparseOpMaker : public framework::OpProtoAndCheckerMaker { AddAttr("use_cvm_op", "(boolean, default false) Use cvm op or not.") .SetDefault(false); + AddAttr>("slots", + "[slot_id1, slot_id2] Slots array of Ids.") + .SetDefault({}) + .AsExtra(); + AddComment(R"DOC( Lookup Tablel Prefetch Operator. This operator is used to perform lookup on parameter W, diff --git a/paddle/fluid/operators/pscore/distributed_push_sparse_op.h b/paddle/fluid/operators/pscore/distributed_push_sparse_op.h index d7bb3e277285aaf4857f1e7ed104cd13c898a657..c85c31ba3c0dd7e73cc1fe7a4d8f1383f8933260 100644 --- a/paddle/fluid/operators/pscore/distributed_push_sparse_op.h +++ b/paddle/fluid/operators/pscore/distributed_push_sparse_op.h @@ -33,6 +33,7 @@ class DistributedPushSparseKernel : public framework::OpKernel { auto table_id = context.Attr("table_id"); auto emb_dim = context.Attr("size"); auto use_cvm_op = context.Attr("use_cvm_op"); + auto slots = context.Attr>("slots"); auto inputs = context.MultiInput("Ids"); auto shows = context.Input("Shows"); @@ -47,6 +48,7 @@ class DistributedPushSparseKernel : public framework::OpKernel { static_cast(padding_idx), context.GetPlace(), &inputs, + slots, shows, clks, &outputs, @@ -103,6 +105,7 @@ class DistributedPushSparseKernel : public framework::OpKernel { static_cast(padding_idx), context.GetPlace(), &tmp_input_vec, + slots, tmp_shows_tensor, tmp_clicks_tensor, &tmp_output_vec); diff --git a/python/paddle/distributed/passes/ps_trainer_pass.py b/python/paddle/distributed/passes/ps_trainer_pass.py index 9c37fc025b40d29d6af59c4b7586abbe99614b91..422e7cc88642b145fdc5f63c96b05c7ac4a2a5b0 100755 --- a/python/paddle/distributed/passes/ps_trainer_pass.py +++ b/python/paddle/distributed/passes/ps_trainer_pass.py @@ -150,7 +150,7 @@ class DistributedOpsPass(PassBase): print('ShowClickEntry not configured, will not use') show = _program.global_block().create_var( name="show", - dtype=core.VarDesc.VarType.INT64, + dtype=core.VarDesc.VarType.FP32, persistable=False, stop_gradient=True) _program.global_block()._insert_op(index=0, @@ -165,7 +165,7 @@ class DistributedOpsPass(PassBase): clk = _program.global_block().create_var( name="clk", - dtype=core.VarDesc.VarType.INT64, + dtype=core.VarDesc.VarType.FP32, persistable=False, stop_gradient=True) _program.global_block()._insert_op(index=0, @@ -190,6 +190,9 @@ class DistributedOpsPass(PassBase): padding_idx = ops[0].attr("padding_idx") is_distributed = ops[0].attr("is_distributed") op_type = ops[0].type + + slots = [op.attr("slot") for op in ops] + print('debug zcb slots: ', slots) outputs = [ _program.global_block().vars[op.input("Out@GRAD")[0]] for op in ops @@ -204,7 +207,7 @@ class DistributedOpsPass(PassBase): 'W': w, "Outputs": outputs, "Shows": show, - "Clicks": clk + "Clicks": clk, }, outputs={"Outputs": outputs}, attrs={ @@ -213,7 +216,8 @@ class DistributedOpsPass(PassBase): "padding_idx": padding_idx, "table_id": table_id, "size": self.emb_size[param], - "use_cvm_op": use_cvm_op + "use_cvm_op": use_cvm_op, + "slots": slots }) def _pull_sparse_fuse(self, _program, pull_sparse_ops, attrs, send_ctx): diff --git a/python/paddle/fluid/contrib/layers/nn.py b/python/paddle/fluid/contrib/layers/nn.py index e71c73b3914b19fa5e313edf7e9cb5560f2f341b..36bdffdda78d09b81426c2f3c7252f258854fb8f 100644 --- a/python/paddle/fluid/contrib/layers/nn.py +++ b/python/paddle/fluid/contrib/layers/nn.py @@ -1073,7 +1073,8 @@ def sparse_embedding(input, entry=None, table_class="MemorySparseTable", param_attr=None, - dtype='float32'): + dtype='float32', + slot=None): r""" :api_attr: Static Graph @@ -1220,6 +1221,9 @@ def sparse_embedding(input, ) entry_str = entry._to_attr() + if slot == None: + slot = 0 + helper.append_op(type='lookup_table', inputs={ 'Ids': input, @@ -1233,9 +1237,9 @@ def sparse_embedding(input, 'remote_prefetch': True, 'is_test': is_test, 'entry': entry_str, - 'table_class': table_class + 'table_class': table_class, + 'slot': slot }) - return tmp