diff --git a/paddle/fluid/framework/details/ssa_graph_checker.h b/paddle/fluid/framework/details/ssa_graph_checker.h index 542c4a172891ba9d3621918986089b2e400b6ae8..304b221e7e4c414a0ab562a1b99836d3b7c02efb 100644 --- a/paddle/fluid/framework/details/ssa_graph_checker.h +++ b/paddle/fluid/framework/details/ssa_graph_checker.h @@ -19,7 +19,7 @@ namespace paddle { namespace framework { namespace details { -class SSAGraph; +struct SSAGraph; class SSAGraghBuilderWithChecker : public SSAGraphBuilder { public: diff --git a/paddle/fluid/operators/detail/grpc_server.cc b/paddle/fluid/operators/detail/grpc_server.cc index 18651544a1c0207127be335c37fe85c6e24dc16e..2d34f85838c34f1dfe43d2130e127d0258072fa7 100644 --- a/paddle/fluid/operators/detail/grpc_server.cc +++ b/paddle/fluid/operators/detail/grpc_server.cc @@ -162,16 +162,18 @@ class RequestPrefetch final : public RequestBase { void Process() override { // prefetch process... - std::string varname = request_->OutVarname(); - VLOG(3) << "RequestPrefetch " << varname; + std::string in_var_name = request_->Varname(); + std::string out_var_name = request_->OutVarname(); + VLOG(3) << "RequestPrefetch, in_var_name: " << in_var_name + << " out_var_name: " << out_var_name; auto scope = request_->GetMutableLocalScope(); - auto invar = scope->FindVar(varname); - framework::Variable* outvar = nullptr; + auto invar = scope->FindVar(in_var_name); + framework::Variable* outvar = scope->FindVar(out_var_name); - request_handler_->Handle(varname, scope, invar, &outvar); + request_handler_->Handle(in_var_name, scope, invar, &outvar, out_var_name); - SerializeToByteBuffer(varname, outvar, *request_handler_->dev_ctx(), + SerializeToByteBuffer(out_var_name, outvar, *request_handler_->dev_ctx(), &reply_); Finish(reply_, &responder_); } @@ -287,7 +289,7 @@ void AsyncGRPCServer::TryToRegisterNewOne(const std::string& rpc_name, } else if (rpc_name == kRequestPrefetch) { b = new RequestPrefetch(&service_, cq.get(), handler, req_id); } else { - PADDLE_ENFORCE(false, "not surpported rpc"); + PADDLE_ENFORCE(false, "not supported rpc"); } reqs[req_id] = b; diff --git a/paddle/fluid/operators/detail/request_handler.h b/paddle/fluid/operators/detail/request_handler.h index 595bfe3787a7f0200f3f3ffc3a67fe2ccaba9008..a2d08747d59220d30a5b8fd56074fd2739ae3bab 100644 --- a/paddle/fluid/operators/detail/request_handler.h +++ b/paddle/fluid/operators/detail/request_handler.h @@ -61,9 +61,12 @@ class RequestHandler { void SetDevCtx(const platform::DeviceContext* dev_ctx) { dev_ctx_ = dev_ctx; } void SetProgram(framework::ProgramDesc* program) { program_ = program; } void SetExecutor(framework::Executor* executor) { executor_ = executor; } + + // Used for dist lookup table prefetch void SetPrefetchPreparedCtx( - std::unique_ptr prepared) { - prefetch_ctx_.reset(prepared.release()); + std::unordered_map< + std::string, std::shared_ptr>* g) { + prefetch_var_name_to_prepared_ctx_ = g; } // Used for async. @@ -79,9 +82,6 @@ class RequestHandler { bool sync_mode() { return sync_mode_; } framework::Scope* scope() { return scope_; } const platform::DeviceContext* dev_ctx() { return dev_ctx_; } - framework::ExecutorPrepareContext* prefetch_ctx() { - return prefetch_ctx_.get(); - } framework::ProgramDesc* program() { return program_; } framework::Executor* executor() { return executor_; } @@ -100,8 +100,8 @@ class RequestHandler { // *request_handler_->dev_ctx(), &reply_); // } virtual bool Handle(const std::string& varname, framework::Scope* scope, - framework::Variable* var, - framework::Variable** outvar) = 0; + framework::Variable* var, framework::Variable** outvar, + const std::string& out_var_name = "") = 0; protected: const bool sync_mode_; @@ -110,12 +110,17 @@ class RequestHandler { framework::Executor* executor_; framework::Scope* scope_; framework::ProgramDesc* program_; - std::unique_ptr prefetch_ctx_; + + // used for distribute lookup table prefetch + std::unordered_map>* + prefetch_var_name_to_prepared_ctx_; // Used for async. std::unordered_map>* grad_to_prepared_ctx_; + RPCServer* rpc_server_; }; diff --git a/paddle/fluid/operators/detail/request_handler_impl.cc b/paddle/fluid/operators/detail/request_handler_impl.cc index bf277db5fb6a617dc80a765bc0d78665c397eac7..7425bee798cd9ba0af8cd777a6db63862c8a4031 100644 --- a/paddle/fluid/operators/detail/request_handler_impl.cc +++ b/paddle/fluid/operators/detail/request_handler_impl.cc @@ -30,7 +30,8 @@ namespace detail { bool RequestSendHandler::Handle(const std::string& varname, framework::Scope* scope, framework::Variable* invar, - framework::Variable** outvar) { + framework::Variable** outvar, + const std::string& out_var_name) { VLOG(4) << "RequestSendHandler:" << varname; // Async @@ -82,7 +83,8 @@ void RequestSendHandler::ResetSparseVarRecorder() { bool RequestGetHandler::Handle(const std::string& varname, framework::Scope* scope, framework::Variable* invar, - framework::Variable** outvar) { + framework::Variable** outvar, + const std::string& out_var_name) { VLOG(4) << "RequestGetHandler:" << varname; if (varname != FETCH_BARRIER_MESSAGE) { @@ -105,13 +107,14 @@ bool RequestGetHandler::Handle(const std::string& varname, bool RequestPrefetchHandler::Handle(const std::string& varname, framework::Scope* scope, framework::Variable* invar, - framework::Variable** outvar) { + framework::Variable** outvar, + const std::string& out_var_name) { VLOG(4) << "RequestPrefetchHandler " << varname; - auto var_desc = program_->Block(0).FindVar(varname); - *outvar = scope->FindVar(varname); + auto var_desc = program_->Block(0).FindVar(out_var_name); InitializeVariable(*outvar, var_desc->GetType()); - executor_->RunPreparedContext(prefetch_ctx_.get(), scope); + executor_->RunPreparedContext( + (*prefetch_var_name_to_prepared_ctx_)[varname].get(), scope); return true; } diff --git a/paddle/fluid/operators/detail/request_handler_impl.h b/paddle/fluid/operators/detail/request_handler_impl.h index c392267cfaeb7e94e6a23d6445c09b756e7e58b1..3f77c09a9598b431d747f1b824615e49d939098e 100644 --- a/paddle/fluid/operators/detail/request_handler_impl.h +++ b/paddle/fluid/operators/detail/request_handler_impl.h @@ -39,7 +39,8 @@ class RequestSendHandler final : public RequestHandler { explicit RequestSendHandler(bool sync_mode) : RequestHandler(sync_mode) {} virtual ~RequestSendHandler() {} bool Handle(const std::string& varname, framework::Scope* scope, - framework::Variable* var, framework::Variable** outvar) override; + framework::Variable* var, framework::Variable** outvar, + const std::string& out_var_name = "") override; void ResetSparseVarRecorder(); private: @@ -52,7 +53,8 @@ class RequestGetHandler final : public RequestHandler { explicit RequestGetHandler(bool sync_mode) : RequestHandler(sync_mode) {} virtual ~RequestGetHandler() {} bool Handle(const std::string& varname, framework::Scope* scope, - framework::Variable* var, framework::Variable** outvar) override; + framework::Variable* var, framework::Variable** outvar, + const std::string& out_var_name = "") override; }; class RequestPrefetchHandler final : public RequestHandler { @@ -60,7 +62,8 @@ class RequestPrefetchHandler final : public RequestHandler { explicit RequestPrefetchHandler(bool sync_mode) : RequestHandler(sync_mode) {} virtual ~RequestPrefetchHandler() {} bool Handle(const std::string& varname, framework::Scope* scope, - framework::Variable* var, framework::Variable** outvar) override; + framework::Variable* var, framework::Variable** outvar, + const std::string& out_var_name = "") override; }; } // namespace detail diff --git a/paddle/fluid/operators/detail/rpc_server_test.cc b/paddle/fluid/operators/detail/rpc_server_test.cc index f49274a7b53d04f82718887aabaffd5d33053dfe..463a7b80cfac280de5afe91ee85caaaf074cef32 100644 --- a/paddle/fluid/operators/detail/rpc_server_test.cc +++ b/paddle/fluid/operators/detail/rpc_server_test.cc @@ -98,11 +98,17 @@ void StartServer() { framework::Executor exe(place); platform::CPUDeviceContext ctx(place); auto* block = AppendPrefetchBlcok(&program); - auto prepared = exe.Prepare(program, block->ID()); + std::string in_var_name("ids"); + std::vector prefetch_block_ids{block->ID()}; + auto prepared = exe.Prepare(program, prefetch_block_ids); InitTensorsOnServer(&scope, &place, 10); + std::unordered_map> + prefetch_var_name_to_prepared; + prefetch_var_name_to_prepared[in_var_name] = prepared[0]; g_req_handler->SetProgram(&program); - g_req_handler->SetPrefetchPreparedCtx(std::move(prepared)); + g_req_handler->SetPrefetchPreparedCtx(&prefetch_var_name_to_prepared); g_req_handler->SetDevCtx(&ctx); g_req_handler->SetScope(&scope); g_req_handler->SetExecutor(&exe); diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 76106004404bb0bb108cca003869f07c43bbf62f..4d12278799f66f2fb92b7580ba0c43e845aa4d3a 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -96,19 +96,22 @@ static int64_t GetTimestamp() { return tp.tv_sec * 1000 + tp.tv_usec / 1000; } -void ListenAndServOp::RunSyncLoop(framework::Executor *executor, - framework::ProgramDesc *program, - framework::Scope *recv_scope, - framework::BlockDesc *prefetch_block) const { +void ListenAndServOp::RunSyncLoop( + framework::Executor *executor, framework::ProgramDesc *program, + framework::Scope *recv_scope, + const std::vector &prefetch_block_id_list) const { size_t num_blocks = program->Size(); PADDLE_ENFORCE_GE(num_blocks, 2, "server program should have at least 2 blocks"); - std::vector block_list; - for (size_t blkid = 1; blkid < num_blocks; ++blkid) { - block_list.push_back(blkid); + std::vector optimize_block_id_list; + for (int blkid = 1; blkid < num_blocks; ++blkid) { + if (std::find(prefetch_block_id_list.begin(), prefetch_block_id_list.end(), + blkid) == prefetch_block_id_list.end()) { + optimize_block_id_list.push_back(blkid); + } } - auto optimize_prepared = executor->Prepare(*program, block_list); + auto optimize_prepared = executor->Prepare(*program, optimize_block_id_list); // Insert placeholder for block0 which holds current op itself. optimize_prepared.insert( optimize_prepared.begin(), @@ -135,16 +138,17 @@ void ListenAndServOp::RunSyncLoop(framework::Executor *executor, std::vector parallel_blkids; parallel_blkids.push_back(1); double ts = GetTimestamp(); - for (size_t blkid = 2; blkid < num_blocks; ++blkid) { - if (blkid != static_cast(prefetch_block->ID())) { - if (program->Block(blkid).Parent() != last_parent_blkid) { - ParallelExecuteBlocks(parallel_blkids, executor, optimize_prepared, - program, recv_scope); - parallel_blkids.clear(); - last_parent_blkid = program->Block(blkid).Parent(); - } - parallel_blkids.push_back(blkid); + for (size_t i = 1; i < optimize_block_id_list.size(); ++i) { + // skip the first optimize block because it is already in the + // parallel_blkids. + int blkid = optimize_block_id_list[i]; + if (program->Block(blkid).Parent() != last_parent_blkid) { + ParallelExecuteBlocks(parallel_blkids, executor, optimize_prepared, + program, recv_scope); + parallel_blkids.clear(); + last_parent_blkid = program->Block(blkid).Parent(); } + parallel_blkids.push_back(blkid); } ParallelExecuteBlocks(parallel_blkids, executor, optimize_prepared, program, recv_scope); @@ -210,18 +214,19 @@ void ListenAndServOp::RunAsyncLoop(framework::Executor *executor, } // while(true) } -static void FillRequestCtx(detail::RequestHandler *h, framework::Scope *scope, - platform::DeviceContext *dev_ctx, - framework::Executor *executor, - framework::ProgramDesc *program, - framework::ExecutorPrepareContext *prefetch_ctx, - detail::RPCServer *rpc_server) { +static void FillRequestCtx( + detail::RequestHandler *h, framework::Scope *scope, + platform::DeviceContext *dev_ctx, framework::Executor *executor, + framework::ProgramDesc *program, + std::unordered_map> + *prefetch_ctx, + detail::RPCServer *rpc_server) { h->SetScope(scope); h->SetDevCtx(dev_ctx); h->SetExecutor(executor); h->SetProgram(program); - h->SetPrefetchPreparedCtx( - std::unique_ptr(prefetch_ctx)); + h->SetPrefetchPreparedCtx(prefetch_ctx); h->SetRPCServer(rpc_server); } @@ -255,17 +260,42 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, request_prefetch_handler_.get()); auto *optimize_block = Attr(kOptimizeBlock); - auto *prefetch_block = Attr(kPrefetchBlock); auto *program = optimize_block->Program(); framework::Executor executor(dev_place); // prepare for prefetch - VLOG(3) << "prefetch block id is " << prefetch_block->ID(); - auto prefetch_prepared = executor.Prepare(*program, prefetch_block->ID()); + std::vector prefetch_block_id_list; + std::unordered_map block_id_to_prefetch_var_name; + + auto prefetch_var_name_to_block_id_str = + Attr>(kPrefetchVarNameToBlockId); + for (const auto &prefetch_var_name_and_id : + prefetch_var_name_to_block_id_str) { + std::vector pieces; + split(prefetch_var_name_and_id, ':', &pieces); + VLOG(3) << "after split, prefetch_var = " << pieces[0] + << ", id=" << pieces[1]; + PADDLE_ENFORCE_EQ(pieces.size(), 2); + + int block_id = std::stoi(pieces[1]); + prefetch_block_id_list.push_back(block_id); + block_id_to_prefetch_var_name[block_id] = pieces[0]; + } + + auto prefetch_prepared = executor.Prepare(*program, prefetch_block_id_list); + + std::unordered_map> + prefetch_var_name_to_prepared_ctx; + for (size_t i = 0; i < prefetch_block_id_list.size(); ++i) { + auto block_id = prefetch_block_id_list[i]; + auto prefetch_var_name = block_id_to_prefetch_var_name[block_id]; + prefetch_var_name_to_prepared_ctx[prefetch_var_name] = prefetch_prepared[i]; + } auto f = std::bind(FillRequestCtx, std::placeholders::_1, &recv_scope, - &dev_ctx, &executor, program, prefetch_prepared.release(), - rpc_service_.get()); + &dev_ctx, &executor, program, + &prefetch_var_name_to_prepared_ctx, rpc_service_.get()); f(request_send_handler_.get()); f(request_get_handler_.get()); @@ -283,7 +313,7 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, // Write to a file of server selected port for python use. SavePort(); if (sync_mode) { - RunSyncLoop(&executor, program, &recv_scope, prefetch_block); + RunSyncLoop(&executor, program, &recv_scope, prefetch_block_id_list); } else { RunAsyncLoop(&executor, program); } @@ -309,8 +339,9 @@ class ListenAndServOpMaker : public framework::OpProtoAndCheckerMaker { AddAttr("sync_mode", "if works at sync_mode or not").SetDefault(true); AddAttr(kOptimizeBlock, "BlockID to run on server side."); - AddAttr(kPrefetchBlock, - "prefetch block to run on server side."); + AddAttr>(kPrefetchVarNameToBlockId, + "prefetch blocks to run on server side.") + .SetDefault({}); AddAttr("Fanin", "How many clients send to this server.") .SetDefault(1); } diff --git a/paddle/fluid/operators/listen_and_serv_op.h b/paddle/fluid/operators/listen_and_serv_op.h index 87952cb0e683596b2b0395890b6e25b15f74d7e2..46c3a19e20b3f2dd970a672bb99f98e83d3e25bf 100644 --- a/paddle/fluid/operators/listen_and_serv_op.h +++ b/paddle/fluid/operators/listen_and_serv_op.h @@ -18,6 +18,7 @@ limitations under the License. */ #include #include #include +#include #include "paddle/fluid/framework/executor.h" #include "paddle/fluid/framework/lod_tensor.h" @@ -30,7 +31,7 @@ namespace paddle { namespace operators { constexpr char kOptimizeBlock[] = "OptimizeBlock"; -constexpr char kPrefetchBlock[] = "PrefetchBlock"; +constexpr char kPrefetchVarNameToBlockId[] = "prefetch_var_name_to_block_id"; void RunServer(std::shared_ptr service); @@ -46,7 +47,7 @@ class ListenAndServOp : public framework::OperatorBase { void RunSyncLoop(framework::Executor* executor, framework::ProgramDesc* program, framework::Scope* recv_scope, - framework::BlockDesc* prefetch_block) const; + const std::vector& prefetch_block_id_list) const; void RunAsyncLoop(framework::Executor* executor, framework::ProgramDesc* program) const; diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index 5b299a0f29d2f17f7ecf3509fee341712f3c9d70..2480d4e76a1b5fd76b7dc8299c2f8fcae967145e 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -515,35 +515,38 @@ class DistributeTranspiler: grad_to_block_id, None) # process distributed lookup_table - prefetch_block = None + prefetch_var_name_to_block_id = [] if self.has_distributed_lookup_table: pserver_index = self.pserver_endpoints.index(endpoint) table_opt_block = self._create_table_optimize_block( pserver_index, pserver_program, pre_block_idx, grad_to_block_id) - prefetch_block = self._create_prefetch_block( + prefetch_var_name_to_block_id = self._create_prefetch_block( pserver_index, pserver_program, table_opt_block) # NOTE: if has_distributed_lookup_table is False, then prefetch_block will # not be executed, so it's safe to use optimize_block to hold the place if self.has_distributed_lookup_table: - assert prefetch_block is not None + assert len(prefetch_var_name_to_block_id) > 0 else: - assert prefetch_block is None - prefetch_block = pserver_program.global_block() + assert len(prefetch_var_name_to_block_id) == 0 + + attrs = { + "OptimizeBlock": pserver_program.block(1), + "endpoint": endpoint, + "Fanin": self.trainer_num, + "sync_mode": self.sync_mode, + "grad_to_block_id": grad_to_block_id + } + if len(prefetch_var_name_to_block_id) > 0: + attrs['prefetch_var_name_to_block_id'] \ + = prefetch_var_name_to_block_id # step5 append the listen_and_serv op pserver_program.global_block().append_op( type="listen_and_serv", inputs={'X': recv_inputs}, outputs={}, - attrs={ - "OptimizeBlock": pserver_program.block(1), - "endpoint": endpoint, - "Fanin": self.trainer_num, - "PrefetchBlock": prefetch_block, - "sync_mode": self.sync_mode, - "grad_to_block_id": grad_to_block_id - }) + attrs=attrs) pserver_program.sync_with_cpp() return pserver_program @@ -608,8 +611,15 @@ class DistributeTranspiler: def _replace_lookup_table_op_with_prefetch(self, program, pserver_endpoints): # 1. replace lookup_table_op with split_ids_op -> prefetch_op -> sum_op - self.prefetch_input_vars = None - self.prefetch_output_vars = None + # self.all_prefetch_input_vars = + # [[var0_prefetch_in_pserver0, var0_prefetch_in_pserver1] + # [var1_prefetch_in_pserver0, var1_prefetch_in_pserver1]] + self.all_prefetch_input_vars = [] + + # self.all_prefetch_input_vars = + # [[var0_prefetch_in_pserver0, var0_prefetch_in_pserver1] + # [var1_prefetch_in_pserver0, var1_prefetch_in_pserver1]] + self.all_prefetch_output_vars = [] continue_search_lookup_table_op = True while continue_search_lookup_table_op: @@ -623,18 +633,19 @@ class DistributeTranspiler: ids_name = op.input("Ids") out_name = op.output("Out") - if self.prefetch_input_vars is None: - ids_var = program.global_block().vars[ids_name[0]] - self.prefetch_input_vars = self.create_splited_vars( - source_var=ids_var, - block=program.global_block(), - tag="_prefetch_in_") - if self.prefetch_output_vars is None: - out_var = program.global_block().vars[out_name[0]] - self.prefetch_output_vars = self.create_splited_vars( - source_var=out_var, - block=program.global_block(), - tag="_prefetch_out_") + ids_var = program.global_block().vars[ids_name[0]] + prefetch_input_vars = self.create_splited_vars( + source_var=ids_var, + block=program.global_block(), + tag="_prefetch_in_") + self.all_prefetch_input_vars.append(prefetch_input_vars) + + out_var = program.global_block().vars[out_name[0]] + prefetch_output_vars = self.create_splited_vars( + source_var=out_var, + block=program.global_block(), + tag="_prefetch_out_") + self.all_prefetch_output_vars.append(prefetch_output_vars) # insert split_ids_op program.global_block().insert_op( @@ -646,14 +657,14 @@ class DistributeTranspiler: for varname in ids_name ] }, - outputs={"Out": self.prefetch_input_vars}) + outputs={"Out": prefetch_input_vars}) # insert prefetch_op program.global_block().insert_op( index=op_index + 1, type="prefetch", - inputs={'X': self.prefetch_input_vars}, - outputs={"Out": self.prefetch_output_vars}, + inputs={'X': prefetch_input_vars}, + outputs={"Out": prefetch_output_vars}, attrs={ "epmap": pserver_endpoints, RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE @@ -663,7 +674,7 @@ class DistributeTranspiler: program.global_block().insert_op( index=op_index + 2, type="concat", - inputs={'X': self.prefetch_output_vars}, + inputs={'X': prefetch_output_vars}, outputs={ "Out": [ program.global_block().vars[varname] @@ -709,30 +720,34 @@ class DistributeTranspiler: optimize_block): # STEP: create prefetch block table_var = pserver_program.global_block().vars[self.table_name] - prefetch_block = pserver_program.create_block(optimize_block.idx) - trainer_ids = self.prefetch_input_vars[pserver_index] - pserver_ids = pserver_program.global_block().create_var( - name=trainer_ids.name, - type=trainer_ids.type, - shape=trainer_ids.shape, - dtype=trainer_ids.dtype) - trainer_out = self.prefetch_output_vars[pserver_index] - pserver_out = pserver_program.global_block().create_var( - name=trainer_out.name, - type=trainer_out.type, - shape=trainer_out.shape, - dtype=trainer_out.dtype) - prefetch_block.append_op( - type="lookup_sparse_table", - inputs={'Ids': pserver_ids, - "W": table_var}, - outputs={"Out": pserver_out}, - attrs={ - "is_sparse": True, # has no effect on lookup_table op - "is_distributed": True, - "padding_idx": -1 - }) - return prefetch_block + prefetch_var_name_to_block_id = [] + for index in range(len(self.all_prefetch_input_vars)): + prefetch_block = pserver_program.create_block(optimize_block.idx) + trainer_ids = self.all_prefetch_input_vars[index][pserver_index] + pserver_ids = pserver_program.global_block().create_var( + name=trainer_ids.name, + type=trainer_ids.type, + shape=trainer_ids.shape, + dtype=trainer_ids.dtype) + trainer_out = self.all_prefetch_output_vars[index][pserver_index] + pserver_out = pserver_program.global_block().create_var( + name=trainer_out.name, + type=trainer_out.type, + shape=trainer_out.shape, + dtype=trainer_out.dtype) + prefetch_block.append_op( + type="lookup_sparse_table", + inputs={'Ids': pserver_ids, + "W": table_var}, + outputs={"Out": pserver_out}, + attrs={ + "is_sparse": True, # has no effect on lookup_table op + "is_distributed": True, + "padding_idx": -1 + }) + prefetch_var_name_to_block_id.append(trainer_ids.name + ":" + str( + prefetch_block.idx)) + return prefetch_var_name_to_block_id def _create_table_optimize_block(self, pserver_index, pserver_program, pre_block_idx, grad_to_block_id):