From fff795e5c86940c4e6fea6b619ba2d59ef7f1ce9 Mon Sep 17 00:00:00 2001 From: dongdaxiang Date: Mon, 15 Apr 2019 20:23:58 +0800 Subject: [PATCH] add nccl_wrapper --- paddle/fluid/framework/fleet/nccl_wrapper.h | 143 ++++++++++++++++++++ 1 file changed, 143 insertions(+) create mode 100644 paddle/fluid/framework/fleet/nccl_wrapper.h diff --git a/paddle/fluid/framework/fleet/nccl_wrapper.h b/paddle/fluid/framework/fleet/nccl_wrapper.h new file mode 100644 index 0000000000..9eeb4ce625 --- /dev/null +++ b/paddle/fluid/framework/fleet/nccl_wrapper.h @@ -0,0 +1,143 @@ +/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include "paddle/fluid/framework/program_desc.h" +#include "paddle/fluid/framework/scope.h" +#include "paddle/fluid/framework/variable_helper.h" +#include "paddle/fluid/platform/macros.h" // for DISABLE_COPY_AND_ASSIGN + +namespace paddle { +namespace framework { + +class NCCLWrapper { + public: + virtual ~NCCLWrapper() {} + NCCLWrapper() {} + + // Pull sparse variables from server in Sync mode + // Param: scope, table_id, var_names, fea_keys + // Param: fea_values + void PullSparseVarsSync(const Scope& scope, const uint64_t table_id, + const std::vector& var_names, + std::vector* fea_keys, + std::vector>* fea_values, + int fea_dim); + + void PullDenseVarsSync(const Scope& scope, const uint64_t table_id, + const std::vector& var_names); + + void PullDenseVarsAsync( + const Scope& scope, const uint64_t table_id, + const std::vector& var_names, + std::vector<::std::future>* pull_dense_status); + + void PushDenseParamSync(const Scope& scope, const uint64_t table_id, + const std::vector& var_names); + + // Push dense variables to server in async mode + // Param: scope, table_id, var_names, + // Param: push_sparse_status + void PushDenseVarsAsync( + const Scope& scope, const uint64_t table_id, + const std::vector& var_names, + std::vector<::std::future>* push_sparse_status); + + void PushDenseVarsSync(Scope* scope, const uint64_t table_id, + const std::vector& var_names); + + // Push sparse variables with labels to server in Async mode + // This is specially designed for click/show stats in server + // Param: scope, table_id, var_grad_names, + // fea_keys, fea_labels, sparse_grad_names + // Param: push_values, push_sparse_status + void PushSparseVarsWithLabelAsync( + const Scope& scope, const uint64_t table_id, + const std::vector& fea_keys, + const std::vector& fea_labels, + const std::vector& sparse_key_names, + const std::vector& sparse_grad_names, const int emb_dim, + std::vector>* push_values, + std::vector<::std::future>* push_sparse_status); + + // Push sparse variables to server in Async mode + // Param: scope, table_id, fea_keys, sparse_grad_names + // Param: push_values, push_sparse_status + /* + void PushSparseVarsAsync( + const Scope& scope, + const uint64_t table_id, + const std::vector& fea_keys, + const std::vector& sparse_grad_names, + std::vector>* push_values, + std::vector<::std::future>* push_sparse_status); + */ + + void InitServer(const std::string& dist_desc, int index); + void InitWorker(const std::string& dist_desc, + const std::vector& host_sign_list, int node_num, + int index); + void StopServer(); + uint64_t RunServer(); + void GatherServers(const std::vector& host_sign_list, int node_num); + // gather client ip + void GatherClients(const std::vector& host_sign_list); + // get client info + std::vector GetClientsInfo(); + // create client to client connection + void CreateClient2ClientConnection(); + + // register client to client communication + typedef std::function MsgHandlerFunc; + int RegisterClientToClientMsgHandler(int msg_type, MsgHandlerFunc handler); + // send client to client message + std::future SendClientToClientMsg(int msg_type, int to_client_id, + const std::string& msg); + + template + void Serialize(const std::vector& t, std::string* str); + template + void Deserialize(std::vector* t, const std::string& str); + static std::shared_ptr GetInstance() { + if (NULL == s_instance_) { + s_instance_.reset(new paddle::framework::FleetWrapper()); + } + return s_instance_; + } + +#ifdef PADDLE_WITH_PSLIB + static std::shared_ptr pslib_ptr_; +#endif + + private: + static std::shared_ptr s_instance_; +#ifdef PADDLE_WITH_PSLIB + std::map> _regions; +#endif + + protected: + static bool is_initialized_; + DISABLE_COPY_AND_ASSIGN(FleetWrapper); +}; + +} // end namespace framework +} // end namespace paddle -- GitLab