From 969e6378b93d579700c6fef6c830002f99583b3f Mon Sep 17 00:00:00 2001 From: hutuxian Date: Tue, 11 Jun 2019 10:28:10 +0800 Subject: [PATCH] Pipeline Concurrency (#17402) Add Pipeline Concurrency Train Mode: - Cpp: pipeline_trainer & section_worker - Python: PipelineOptimizer - Add a new data_feed type: PrivateInstantDataFeed - Add a test demo of pipeline trainer and the test model is gnn - Do not support win32 now --- paddle/fluid/API.spec | 9 + paddle/fluid/framework/CMakeLists.txt | 14 +- paddle/fluid/framework/async_executor.cc | 5 +- paddle/fluid/framework/data_feed.cc | 210 ++++++++ paddle/fluid/framework/data_feed.h | 97 +++- paddle/fluid/framework/data_feed_factory.cc | 3 + paddle/fluid/framework/device_worker.h | 105 ++++ .../fluid/framework/device_worker_factory.cc | 3 + paddle/fluid/framework/executor.cc | 5 +- paddle/fluid/framework/pipeline_trainer.cc | 268 +++++++++ paddle/fluid/framework/section_worker.cc | 411 ++++++++++++++ paddle/fluid/framework/trainer.h | 53 ++ paddle/fluid/framework/trainer_desc.proto | 27 + paddle/fluid/framework/trainer_factory.cc | 3 + paddle/fluid/platform/timer.h | 2 +- python/paddle/fluid/dataset.py | 37 +- python/paddle/fluid/device_worker.py | 54 +- python/paddle/fluid/executor.py | 37 +- python/paddle/fluid/framework.py | 3 + python/paddle/fluid/optimizer.py | 231 +++++++- .../paddle/fluid/tests/demo/pipeline_train.py | 508 ++++++++++++++++++ python/paddle/fluid/trainer_desc.py | 23 +- python/paddle/fluid/trainer_factory.py | 11 +- 23 files changed, 2085 insertions(+), 34 deletions(-) create mode 100644 paddle/fluid/framework/pipeline_trainer.cc create mode 100644 paddle/fluid/framework/section_worker.cc create mode 100644 python/paddle/fluid/tests/demo/pipeline_train.py diff --git a/paddle/fluid/API.spec b/paddle/fluid/API.spec index 91624a83413..b8eef7c4a68 100644 --- a/paddle/fluid/API.spec +++ b/paddle/fluid/API.spec @@ -542,6 +542,15 @@ paddle.fluid.optimizer.ExponentialMovingAverage.__init__ (ArgSpec(args=['self', paddle.fluid.optimizer.ExponentialMovingAverage.apply (ArgSpec(args=['self', 'executor', 'need_restore'], varargs=None, keywords=None, defaults=(True,)), ('document', '30f494752ac8921dc5835a63637f453a')) paddle.fluid.optimizer.ExponentialMovingAverage.restore (ArgSpec(args=['self', 'executor'], varargs=None, keywords=None, defaults=None), ('document', '8c8a1791608b02a1ede53d6dd3a4fcec')) paddle.fluid.optimizer.ExponentialMovingAverage.update (ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None), ('document', 'ea10f08af6d7aac3b7974aa976e4085f')) +paddle.fluid.optimizer.PipelineOptimizer.__init__ (ArgSpec(args=['self', 'optimizer', 'cut_list', 'place_list', 'concurrency_list', 'queue_size', 'sync_steps', 'start_cpu_core_id'], varargs=None, keywords=None, defaults=(None, None, None, 30, 1, 0)), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) +paddle.fluid.optimizer.PipelineOptimizer.create_vars (ArgSpec(args=['self', 'block', 'main_program'], varargs=None, keywords=None, defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) +paddle.fluid.optimizer.PipelineOptimizer.extract_section_ops (ArgSpec(args=['self', 'ops', 'cut_point_name'], varargs=None, keywords=None, defaults=None), ('document', '4a29be77da04b5c30dd7202f44c79b70')) +paddle.fluid.optimizer.PipelineOptimizer.extract_section_opt_ops (ArgSpec(args=['self', 'ops', 'cut_point_name'], varargs=None, keywords=None, defaults=None), ('document', '99e0f641222c1ce4dd0d7194c3b2c653')) +paddle.fluid.optimizer.PipelineOptimizer.find_input_output (ArgSpec(args=['self', 'ops', 'name', 'is_forward'], varargs=None, keywords=None, defaults=(True,)), ('document', '92d77fb262766b352746f09cca81db93')) +paddle.fluid.optimizer.PipelineOptimizer.find_persistable_vars (ArgSpec(args=['self', 'ops', 'whole_parameters'], varargs=None, keywords=None, defaults=None), ('document', '877b7cc290f0647455e5e4409e825923')) +paddle.fluid.optimizer.PipelineOptimizer.find_section_opt (ArgSpec(args=['self', 'ops', 'params'], varargs=None, keywords=None, defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) +paddle.fluid.optimizer.PipelineOptimizer.minimize (ArgSpec(args=['self', 'loss', 'startup_program', 'parameter_list', 'no_grad_set'], varargs=None, keywords=None, defaults=(None, None, None)), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) +paddle.fluid.optimizer.PipelineOptimizer.split_program (ArgSpec(args=['self', 'main_program', 'cut_list'], varargs=None, keywords=None, defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) paddle.fluid.backward.append_backward (ArgSpec(args=['loss', 'parameter_list', 'no_grad_set', 'callbacks'], varargs=None, keywords=None, defaults=(None, None, None)), ('document', '08a5dd9f6f376ff3d55e0b1d92115cbd')) paddle.fluid.regularizer.L1DecayRegularizer.__init__ (ArgSpec(args=['self', 'regularization_coeff'], varargs=None, keywords=None, defaults=(0.0,)), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) paddle.fluid.regularizer.L2DecayRegularizer.__init__ (ArgSpec(args=['self', 'regularization_coeff'], varargs=None, keywords=None, defaults=(0.0,)), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index c41efc5e016..0a2b3cf2b80 100644 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -173,20 +173,20 @@ endif() cc_library(executor_gc_helper SRCS executor_gc_helper.cc DEPS scope proto_desc operator garbage_collector) if(WITH_DISTRIBUTE) - cc_library(executor SRCS executor.cc multi_trainer.cc dataset_factory.cc + cc_library(executor SRCS executor.cc multi_trainer.cc pipeline_trainer.cc dataset_factory.cc dist_multi_trainer.cc trainer_factory.cc trainer.cc data_feed_factory.cc data_feed.cc device_worker.cc hogwild_worker.cc downpour_worker.cc - pull_dense_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry + pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry device_context scope framework_proto trainer_desc_proto glog fs shell fleet_wrapper lodtensor_printer lod_rank_table feed_fetch_method sendrecvop_rpc ${GLOB_DISTRIBUTE_DEPS} graph_to_program_pass variable_helper data_feed_proto ${NGRAPH_EXE_DEPS} timer) set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor") set_source_files_properties(executor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) else() - cc_library(executor SRCS executor.cc multi_trainer.cc dataset_factory.cc + cc_library(executor SRCS executor.cc multi_trainer.cc pipeline_trainer.cc dataset_factory.cc dist_multi_trainer.cc trainer_factory.cc trainer.cc data_feed_factory.cc data_feed.cc device_worker.cc hogwild_worker.cc downpour_worker.cc - pull_dense_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry + pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry device_context scope framework_proto data_feed_proto trainer_desc_proto glog lod_rank_table fs shell fleet_wrapper lodtensor_printer feed_fetch_method graph_to_program_pass variable_helper ${NGRAPH_EXE_DEPS} timer data_feed_proto) @@ -201,10 +201,10 @@ cc_library(parallel_executor SRCS parallel_executor.cc DEPS fast_threaded_ssa_graph_executor variable_helper) cc_library(async_executor SRCS async_executor.cc data_feed.cc data_feed_factory.cc - executor_thread_worker.cc multi_trainer.cc dist_multi_trainer.cc + executor_thread_worker.cc multi_trainer.cc dist_multi_trainer.cc pipeline_trainer.cc trainer_factory.cc trainer.cc device_worker.cc hogwild_worker.cc - downpour_worker.cc pull_dense_worker.cc device_worker_factory.cc - data_set.cc dataset_factory.cc + downpour_worker.cc pull_dense_worker.cc section_worker.cc + device_worker_factory.cc data_set.cc dataset_factory.cc DEPS op_registry device_context scope framework_proto trainer_desc_proto glog lod_rank_table fleet_wrapper lodtensor_printer feed_fetch_method graph_to_program_pass data_feed_proto diff --git a/paddle/fluid/framework/async_executor.cc b/paddle/fluid/framework/async_executor.cc index 89153d82d07..7eb80a4617a 100644 --- a/paddle/fluid/framework/async_executor.cc +++ b/paddle/fluid/framework/async_executor.cc @@ -85,8 +85,9 @@ void AsyncExecutor::RunFromFile(const ProgramDesc& main_program, } DataFeedDesc data_feed_desc; - google::protobuf::TextFormat::ParseFromString(data_feed_desc_str, - &data_feed_desc); + bool success = data_feed_desc.ParseFromString(data_feed_desc_str); + PADDLE_ENFORCE(success, "Fail to parse DataFeedDesc from string:\n%s", + data_feed_desc_str.c_str()); actual_thread_num_ = thread_num; int file_cnt = filelist.size(); diff --git a/paddle/fluid/framework/data_feed.cc b/paddle/fluid/framework/data_feed.cc index 4f40786a959..e89f3f1a4e0 100644 --- a/paddle/fluid/framework/data_feed.cc +++ b/paddle/fluid/framework/data_feed.cc @@ -20,6 +20,9 @@ limitations under the License. */ #include "paddle/fluid/framework/data_feed.h" #ifdef _LINUX #include +#include +#include +#include #endif #include #include "gflags/gflags.h" @@ -87,6 +90,13 @@ void DataFeed::CheckStart() { PADDLE_ENFORCE(finish_start_, "Datafeed has not started running yet."); } +void DataFeed::AssignFeedVar(const Scope& scope) { + CheckInit(); + for (size_t i = 0; i < use_slots_.size(); ++i) { + feed_vec_[i] = scope.FindVar(use_slots_[i])->GetMutable(); + } +} + template void PrivateQueueDataFeed::SetQueueSize(int queue_size) { PADDLE_ENFORCE(queue_size > 0, "Illegal queue size: %d.", queue_size); @@ -1009,5 +1019,205 @@ void MultiSlotInMemoryDataFeed::DeserializeIns( fleet_ptr->Deserialize(ins, str); } +#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) +template +void PrivateInstantDataFeed::PutToFeedVec() { + for (size_t i = 0; i < use_slots_.size(); ++i) { + const auto& type = ins_vec_[i].GetType(); + const auto& offset = ins_vec_[i].GetOffset(); + int total_instance = static_cast(offset.back()); + + if (type[0] == 'f') { // float + const auto& feasign = ins_vec_[i].GetFloatData(); + float* tensor_ptr = feed_vec_[i]->mutable_data( + {total_instance, 1}, platform::CPUPlace()); + memcpy(tensor_ptr, &feasign[0], total_instance * sizeof(float)); + } else if (type[0] == 'u') { // uint64 + // no uint64_t type in paddlepaddle + const auto& feasign = ins_vec_[i].GetUint64Data(); + int64_t* tensor_ptr = feed_vec_[i]->mutable_data( + {total_instance, 1}, platform::CPUPlace()); + memcpy(tensor_ptr, &feasign[0], total_instance * sizeof(int64_t)); + } + + LoD data_lod{offset}; + feed_vec_[i]->set_lod(data_lod); + if (use_slots_is_dense_[i]) { + int64_t total_dims = 1; + for (const auto e : use_slots_shape_[i]) { + total_dims *= e; + } + PADDLE_ENFORCE( + total_dims == total_instance, + "The actual data size of slot[%s] doesn't match its declaration", + use_slots_[i].c_str()); + feed_vec_[i]->Resize(framework::make_ddim(use_slots_shape_[i])); + } + } +} + +template +int PrivateInstantDataFeed::Next() { + if (ParseOneMiniBatch()) { + PutToFeedVec(); + return ins_vec_[0].GetBatchSize(); + } + Postprocess(); + + std::string filename; + if (!PickOneFile(&filename)) { + return -1; + } + if (!Preprocess(filename)) { + return -1; + } + + PADDLE_ENFORCE(true == ParseOneMiniBatch(), "Fail to parse mini-batch data"); + PutToFeedVec(); + return ins_vec_[0].GetBatchSize(); +} + +template +void PrivateInstantDataFeed::Init(const DataFeedDesc& data_feed_desc) { + finish_init_ = false; + finish_set_filelist_ = false; + finish_start_ = false; + + PADDLE_ENFORCE(data_feed_desc.has_multi_slot_desc(), + "Multi_slot_desc has not been set."); + paddle::framework::MultiSlotDesc multi_slot_desc = + data_feed_desc.multi_slot_desc(); + SetBatchSize(data_feed_desc.batch_size()); + size_t all_slot_num = multi_slot_desc.slots_size(); + all_slots_.resize(all_slot_num); + all_slots_type_.resize(all_slot_num); + use_slots_index_.resize(all_slot_num); + multi_inductive_shape_index_.resize(all_slot_num); + use_slots_.clear(); + use_slots_is_dense_.clear(); + for (size_t i = 0; i < all_slot_num; ++i) { + const auto& slot = multi_slot_desc.slots(i); + all_slots_[i] = slot.name(); + all_slots_type_[i] = slot.type(); + use_slots_index_[i] = slot.is_used() ? use_slots_.size() : -1; + if (slot.is_used()) { + use_slots_.push_back(all_slots_[i]); + use_slots_is_dense_.push_back(slot.is_dense()); + std::vector local_shape; + if (slot.is_dense()) { + for (size_t j = 0; j < slot.shape_size(); ++j) { + if (slot.shape(j) == -1) { + multi_inductive_shape_index_[i].push_back(j); + } + } + } + for (size_t j = 0; j < slot.shape_size(); ++j) { + local_shape.push_back(slot.shape(j)); + } + use_slots_shape_.push_back(local_shape); + } + } + feed_vec_.resize(use_slots_.size()); + ins_vec_.resize(use_slots_.size()); + + finish_init_ = true; +} + +template class PrivateInstantDataFeed>; + +bool MultiSlotFileInstantDataFeed::Preprocess(const std::string& filename) { + fd_ = open(filename.c_str(), O_RDONLY); + PADDLE_ENFORCE(fd_ != -1, "Fail to open file: %s", filename.c_str()); + + struct stat sb; + fstat(fd_, &sb); + end_ = static_cast(sb.st_size); + + buffer_ = + reinterpret_cast(mmap(NULL, end_, PROT_READ, MAP_PRIVATE, fd_, 0)); + PADDLE_ENFORCE(buffer_ != MAP_FAILED, strerror(errno)); + + offset_ = 0; + return true; +} + +bool MultiSlotFileInstantDataFeed::Postprocess() { + if (buffer_ != nullptr) { + munmap(buffer_, end_); + buffer_ = nullptr; + } + if (fd_ != -1) { + close(fd_); + fd_ = -1; + end_ = 0; + offset_ = 0; + } + return true; +} + +bool MultiSlotFileInstantDataFeed::ParseOneMiniBatch() { + if (offset_ == end_) { + return false; + } + + batch_size_ = 0; + while (batch_size_ < default_batch_size_ && offset_ < end_) { + for (size_t i = 0; i < use_slots_index_.size(); ++i) { + int idx = use_slots_index_[i]; + char type = all_slots_type_[i][0]; + + uint16_t num = *reinterpret_cast(buffer_ + offset_); + PADDLE_ENFORCE( + num, + "The number of ids can not be zero, you need padding " + "it in data generator; or if there is something wrong with " + "the data, please check if the data contains unresolvable " + "characters."); + offset_ += sizeof(uint16_t); + + if (idx != -1) { + int inductive_size = multi_inductive_shape_index_[i].size(); + if (UNLIKELY(batch_size_ == 0)) { + ins_vec_[idx].Init(all_slots_type_[i], default_batch_size_ * num); + ins_vec_[idx].InitOffset(default_batch_size_); + uint64_t* inductive_shape = + reinterpret_cast(buffer_ + offset_); + for (int inductive_id = 0; inductive_id < inductive_size; + ++inductive_id) { + use_slots_shape_[i][multi_inductive_shape_index_[i][inductive_id]] = + static_cast(*(inductive_shape + inductive_id)); + } + } + num -= inductive_size; + offset_ += sizeof(uint64_t) * inductive_size; + + if (type == 'f') { + ins_vec_[idx].AppendValues( + reinterpret_cast(buffer_ + offset_), num); + offset_ += num * sizeof(float); + } else if (type == 'u') { + ins_vec_[idx].AppendValues( + reinterpret_cast(buffer_ + offset_), num); + offset_ += num * sizeof(uint64_t); + } + } else { + if (type == 'f') { + offset_ += num * sizeof(float); + } else if (type == 'u') { + offset_ += num * sizeof(uint64_t); + } + } + } + ++batch_size_; + // OPTIMIZE: It is better to insert check codes between instances for format + // checking + } + + PADDLE_ENFORCE(batch_size_ == default_batch_size_ || offset_ == end_, + "offset_ != end_"); + return true; +} +#endif + } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/data_feed.h b/paddle/fluid/framework/data_feed.h index 594ba3cbcc4..7fea85601c4 100644 --- a/paddle/fluid/framework/data_feed.h +++ b/paddle/fluid/framework/data_feed.h @@ -59,7 +59,7 @@ class DataFeed { file_idx_ = nullptr; } virtual ~DataFeed() {} - virtual void Init(const paddle::framework::DataFeedDesc& data_feed_desc) = 0; + virtual void Init(const DataFeedDesc& data_feed_desc) = 0; virtual bool CheckFile(const char* filename) { PADDLE_THROW("This function(CheckFile) is not implemented."); } @@ -84,6 +84,9 @@ class DataFeed { // This function is used for binding feed_vec memory virtual void AddFeedVar(Variable* var, const std::string& name); + // This function is used for binding feed_vec memory in a given scope + virtual void AssignFeedVar(const Scope& scope); + // This function will do nothing at default virtual void SetMemoryData(void* memory_data) {} // This function will do nothing at default @@ -148,6 +151,8 @@ class DataFeed { std::vector> use_slots_shape_; std::vector inductive_shape_index_; std::vector total_dims_without_inductive_; + // For the inductive shape passed within data + std::vector> multi_inductive_shape_index_; std::vector use_slots_index_; // -1: not used; >=0: the index of use_slots_ @@ -173,7 +178,6 @@ class PrivateQueueDataFeed : public DataFeed { public: PrivateQueueDataFeed() {} virtual ~PrivateQueueDataFeed() {} - virtual void Init(const paddle::framework::DataFeedDesc& data_feed_desc) = 0; virtual bool Start(); virtual int Next(); @@ -212,7 +216,7 @@ class InMemoryDataFeed : public PrivateQueueDataFeed { public: InMemoryDataFeed(); virtual ~InMemoryDataFeed() {} - virtual void Init(const paddle::framework::DataFeedDesc& data_feed_desc) = 0; + virtual void Init(const DataFeedDesc& data_feed_desc) = 0; virtual bool Start(); virtual int Next(); virtual void SetMemoryData(void* memory_data); @@ -263,16 +267,25 @@ class MultiSlotType { public: MultiSlotType() {} ~MultiSlotType() {} - void Init(const std::string& type) { + void Init(const std::string& type, size_t reserved_size = 0) { CheckType(type); if (type_[0] == 'f') { float_feasign_.clear(); + if (reserved_size) { + float_feasign_.reserve(reserved_size); + } } else if (type_[0] == 'u') { uint64_feasign_.clear(); + if (reserved_size) { + uint64_feasign_.reserve(reserved_size); + } } type_ = type; } - void InitOffset() { + void InitOffset(size_t max_batch_size = 0) { + if (max_batch_size > 0) { + offset_.reserve(max_batch_size + 1); + } offset_.resize(1); // LoDTensor' lod is counted from 0, the size of lod // is one size larger than the size of data. @@ -288,6 +301,16 @@ class MultiSlotType { CheckUint64(); uint64_feasign_.push_back(v); } + void CopyValues(const float* input, size_t size) { + CheckFloat(); + float_feasign_.resize(size); + memcpy(float_feasign_.data(), input, size * sizeof(float)); + } + void CopyValues(const uint64_t* input, size_t size) { + CheckUint64(); + uint64_feasign_.resize(size); + memcpy(uint64_feasign_.data(), input, size * sizeof(uint64_t)); + } void AddIns(const MultiSlotType& ins) { if (ins.GetType()[0] == 'f') { // float CheckFloat(); @@ -301,11 +324,22 @@ class MultiSlotType { uint64_feasign_.insert(uint64_feasign_.end(), vec.begin(), vec.end()); } } + void AppendValues(const uint64_t* input, size_t size) { + CheckUint64(); + offset_.push_back(offset_.back() + size); + uint64_feasign_.insert(uint64_feasign_.end(), input, input + size); + } + void AppendValues(const float* input, size_t size) { + CheckFloat(); + offset_.push_back(offset_.back() + size); + float_feasign_.insert(float_feasign_.end(), input, input + size); + } const std::vector& GetFloatData() const { return float_feasign_; } std::vector& MutableFloatData() { return float_feasign_; } const std::vector& GetUint64Data() const { return uint64_feasign_; } std::vector& MutableUint64Data() { return uint64_feasign_; } const std::string& GetType() const { return type_; } + size_t GetBatchSize() { return offset_.size() - 1; } std::string& MutableType() { return type_; } std::string DebugString() { @@ -355,7 +389,7 @@ class MultiSlotDataFeed public: MultiSlotDataFeed() {} virtual ~MultiSlotDataFeed() {} - virtual void Init(const paddle::framework::DataFeedDesc& data_feed_desc); + virtual void Init(const DataFeedDesc& data_feed_desc); virtual bool CheckFile(const char* filename); // virtual void ReadThread(); @@ -374,7 +408,7 @@ class MultiSlotInMemoryDataFeed public: MultiSlotInMemoryDataFeed() {} virtual ~MultiSlotInMemoryDataFeed() {} - virtual void Init(const paddle::framework::DataFeedDesc& data_feed_desc); + virtual void Init(const DataFeedDesc& data_feed_desc); protected: virtual void AddInstanceToInsVec(std::vector* vec_ins, @@ -389,5 +423,54 @@ class MultiSlotInMemoryDataFeed const std::string& str); }; +#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) +template +class PrivateInstantDataFeed : public DataFeed { + public: + PrivateInstantDataFeed() {} + virtual ~PrivateInstantDataFeed() {} + void Init(const DataFeedDesc& data_feed_desc) override; + bool Start() override { return true; } + int Next() override; + + protected: + // The batched data buffer + std::vector ins_vec_; + + // This function is used to preprocess with a given filename, e.g. open it or + // mmap + virtual bool Preprocess(const std::string& filename) = 0; + + // This function is used to postprocess system resource such as closing file + // NOTICE: Ensure that it is safe to call before Preprocess + virtual bool Postprocess() = 0; + + // The reading and parsing method. + virtual bool ParseOneMiniBatch() = 0; + + // This function is used to put ins_vec to feed_vec + virtual void PutToFeedVec(); +}; + +class MultiSlotFileInstantDataFeed + : public PrivateInstantDataFeed> { + public: + MultiSlotFileInstantDataFeed() {} + virtual ~MultiSlotFileInstantDataFeed() {} + + protected: + int fd_{-1}; + char* buffer_{nullptr}; + size_t end_{0}; + size_t offset_{0}; + + bool Preprocess(const std::string& filename) override; + + bool Postprocess() override; + + bool ParseOneMiniBatch() override; +}; +#endif + } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/data_feed_factory.cc b/paddle/fluid/framework/data_feed_factory.cc index 201d6c0d0b9..ec1acad99bc 100644 --- a/paddle/fluid/framework/data_feed_factory.cc +++ b/paddle/fluid/framework/data_feed_factory.cc @@ -64,5 +64,8 @@ std::shared_ptr DataFeedFactory::CreateDataFeed( REGISTER_DATAFEED_CLASS(MultiSlotDataFeed); REGISTER_DATAFEED_CLASS(MultiSlotInMemoryDataFeed); +#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) +REGISTER_DATAFEED_CLASS(MultiSlotFileInstantDataFeed); +#endif } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/device_worker.h b/paddle/fluid/framework/device_worker.h index 7f89d05fe07..be5f663e1c9 100644 --- a/paddle/fluid/framework/device_worker.h +++ b/paddle/fluid/framework/device_worker.h @@ -14,6 +14,7 @@ limitations under the License. */ #pragma once +#include #include #include #include @@ -35,9 +36,17 @@ limitations under the License. */ #include "paddle/fluid/platform/port.h" #include "paddle/fluid/platform/timer.h" +#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) +#include "paddle/fluid/platform/nccl_helper.h" +#endif + namespace paddle { namespace framework { +#define SEC_LOG \ + VLOG(3) << "[s" << section_id_ << "p" << pipeline_id_ << "t" << thread_id_ \ + << "]: " + class PullDenseWorker { public: virtual ~PullDenseWorker() {} @@ -196,5 +205,101 @@ class DownpourWorker : public HogwildWorker { std::vector<::std::future> push_dense_status_; }; +#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) +using ScopeQueue = operators::reader::BlockingQueue; + +class SyncFunctor { + public: + SyncFunctor(int rank_id, int rank_num, int sync_steps); + virtual ~SyncFunctor() {} + + void SetSyncParam(const std::vector& sync_param) { + sync_param_ = &sync_param; + } + void SetNcclCtxMap(platform::NCCLContextMap* nccl_ctx_map) { + nccl_ctx_map_ = nccl_ctx_map; + } + + int operator()(Scope* scope); + static std::vector pipeline_scopes_; + static uint64_t sync_flag_; + + protected: + const int rank_id_; + const int rank_num_; + const std::vector* sync_param_ = nullptr; + platform::NCCLContextMap* nccl_ctx_map_ = nullptr; + + uint64_t sync_signal_; + const int sync_steps_; + int counter_; + + void Synchronize(); +}; + +class SectionWorker : public DeviceWorker { + public: + SectionWorker() {} + ~SectionWorker() override {} + + void Initialize(const TrainerDesc& desc) override; + + void BindingDataFeedMemory() override {} + void CreateDeviceResource(const ProgramDesc& main_prog) override{}; + + void TrainFiles() override; + void TrainFilesWithProfiler() override; + + void PrintFetchVars() override {} + + const platform::Place& place() const { return place_; } + + void SetSectionIndex(int section_id) { section_id_ = section_id; } + void SetDeviceIndex(int tid) override { pipeline_id_ = tid; } + void SetThreadIndex(int thread_id) { thread_id_ = thread_id; } + void SetVarNames(const std::vector& in_var_names, + const std::vector& out_var_names) { + in_var_names_ = &in_var_names; + out_var_names_ = &out_var_names; + } + void SetScopeQueue(ScopeQueue* in_scope_queue, ScopeQueue* out_scope_queue) { + in_scope_queue_ = in_scope_queue; + out_scope_queue_ = out_scope_queue; + } + void SetCountMutex(std::mutex* mutex) { worker_count_mutex_ = mutex; } + void SetWorkerCount(int* worker_count) { worker_count_ = worker_count; } + void SetSectionNum(int section_num) { section_num_ = section_num; } + void SetPipelineNum(int pipeline_num) { pipeline_num_ = pipeline_num; } + void SetNextSectionPlace(const paddle::platform::Place& place) { + next_section_place_ = place; + } + SyncFunctor* sync_func_ = nullptr; + void SetSyncFunctor(SyncFunctor* sync_func) { sync_func_ = sync_func; } + + static std::atomic cpu_id_; + + protected: + void AutoSetCPUAffinity(bool reuse); + int section_id_; + int pipeline_id_; + int section_num_; + int pipeline_num_; + int thread_id_; + + // This worker will consume scope from in_scope_queue_ + // and produce scope to out_scope_queue_ + ScopeQueue* in_scope_queue_ = nullptr; + ScopeQueue* out_scope_queue_ = nullptr; + const std::vector* in_var_names_ = nullptr; + const std::vector* out_var_names_ = nullptr; + std::mutex* worker_count_mutex_ = nullptr; + int* worker_count_ = nullptr; + paddle::platform::Place next_section_place_; + + std::vector> ops_; + + platform::DeviceContext* dev_ctx_ = nullptr; +}; +#endif } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/device_worker_factory.cc b/paddle/fluid/framework/device_worker_factory.cc index 2a7b368145c..dc85941f57d 100644 --- a/paddle/fluid/framework/device_worker_factory.cc +++ b/paddle/fluid/framework/device_worker_factory.cc @@ -61,5 +61,8 @@ std::shared_ptr DeviceWorkerFactory::CreateDeviceWorker( REGISTER_DEVICE_WORKER_CLASS(HogwildWorker); REGISTER_DEVICE_WORKER_CLASS(DownpourWorker); +#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) +REGISTER_DEVICE_WORKER_CLASS(SectionWorker); +#endif } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/executor.cc b/paddle/fluid/framework/executor.cc index 40ab2ad7009..e36871e8d82 100644 --- a/paddle/fluid/framework/executor.cc +++ b/paddle/fluid/framework/executor.cc @@ -122,8 +122,9 @@ void Executor::RunFromDataset(const ProgramDesc& main_program, Scope* scope, const std::string& trainer_desc_str) { VLOG(3) << "Start to RunFromDataset in executor"; TrainerDesc trainer_desc; - google::protobuf::TextFormat::ParseFromString(trainer_desc_str, - &trainer_desc); + bool success = trainer_desc.ParseFromString(trainer_desc_str); + PADDLE_ENFORCE(success, "Fail to parse TrainerDesc from string:\n%s", + trainer_desc_str.c_str()); VLOG(3) << "Going to create trainer, trainer class is " << trainer_desc.class_name(); std::shared_ptr trainer; diff --git a/paddle/fluid/framework/pipeline_trainer.cc b/paddle/fluid/framework/pipeline_trainer.cc new file mode 100644 index 00000000000..3edffd434a1 --- /dev/null +++ b/paddle/fluid/framework/pipeline_trainer.cc @@ -0,0 +1,268 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) +#include "paddle/fluid/framework/data_feed_factory.h" +#include "paddle/fluid/framework/device_worker_factory.h" +#include "paddle/fluid/framework/trainer.h" +#include "paddle/fluid/framework/trainer_desc.pb.h" + +namespace paddle { +namespace framework { + +void PipelineTrainer::Initialize(const TrainerDesc& trainer_desc, + Dataset* dataset) { + pipeline_num_ = trainer_desc.thread_num(); + VLOG(3) << "pipeline num: " << pipeline_num_; + + SetDataset(dataset); + // get filelist from trainer_desc here + dataset->CreateReaders(); + VLOG(3) << "readers created"; + const std::vector> readers = + dataset->GetReaders(); + VLOG(3) << "readers num: " << readers.size(); + + pipeline_config_ = trainer_desc.section_param(); + scope_queue_size_ = pipeline_config_.queue_size(); + sync_steps_ = pipeline_config_.sync_steps(); + section_num_ = pipeline_config_.section_config_size(); + + VLOG(3) << "scope_queue_size: " << scope_queue_size_; + VLOG(3) << "section num: " << section_num_; + VLOG(3) << "sync_steps: " << sync_steps_; + + workers_.resize(section_num_); + in_var_names_.resize(section_num_); + out_var_names_.resize(section_num_); + worker_count_.resize(section_num_); + worker_count_mutex_.resize(section_num_); + param_need_sync_.reset(new std::vector); + + int reader_index = 0; + for (int i = 0; i < section_num_; ++i) { + const auto& section_config = pipeline_config_.section_config(i); + int concurrency = section_config.concurrency(); + VLOG(3) << "the thread num of each pipeline in section " << i + << " is: " << concurrency; + in_var_names_[i].reset(new std::vector( + section_config.section_in_var_names().begin(), + section_config.section_in_var_names().end())); + out_var_names_[i].reset(new std::vector( + section_config.section_out_var_names().begin(), + section_config.section_out_var_names().end())); + worker_count_[i].resize(pipeline_num_); + worker_count_mutex_[i].resize(pipeline_num_); + for (int j = 0; j < pipeline_num_; ++j) { + worker_count_[i][j] = new int(concurrency); + worker_count_mutex_[i][j].reset(new std::mutex); + } + + platform::Place place; + workers_[i].resize(pipeline_num_); + for (int j = 0; j < pipeline_num_; ++j) { + workers_[i][j].resize(concurrency); + + switch (section_config.place()) { + case SectionConfig::CPUPlace: + place = platform::CPUPlace(); + break; + case SectionConfig::CUDAPlace: + // Note that one section has at most one GPU place in one pipeline + place = platform::CUDAPlace(j); + break; + case SectionConfig::CUDAPinnedPlace: + place = platform::CUDAPinnedPlace(); + break; + default: + PADDLE_ENFORCE(false, "Unkown place type in SectionConfig: %d", + section_config.place()); + } + + for (int k = 0; k < concurrency; ++k) { + workers_[i][j][k] = DeviceWorkerFactory::CreateDeviceWorker( + trainer_desc.device_worker_name()); + auto this_worker = + std::dynamic_pointer_cast( + workers_[i][j][k]); + this_worker->SetSectionIndex(i); + this_worker->SetDeviceIndex(j); + this_worker->SetThreadIndex(k); + this_worker->SetSectionNum(section_num_); + this_worker->SetPipelineNum(pipeline_num_); + if (i == 0) { + this_worker->SetDataFeed(readers[reader_index++]); + } + this_worker->SetPlace(place); + this_worker->Initialize(trainer_desc); + } + } + } + param_need_sync_.reset( + new std::vector(pipeline_config_.param_need_sync().begin(), + pipeline_config_.param_need_sync().end())); + VLOG(3) << "param_need_sync_ have: "; + for (const std::string& name : *param_need_sync_) { + VLOG(3) << name; + } + // set debug here + SetDebug(trainer_desc.debug()); +} + +void PipelineTrainer::InitFirstScopeQueue(ScopeQueue* scope_queue, + int pipeline_id, + const ProgramDesc& main_program) { + for (int i = 0; i < scope_queue_size_; ++i) { + Scope* scope = &pipeline_scopes_[pipeline_id]->NewScope(); + for (auto& var : main_program.Block(0).AllVars()) { + if (!var->Persistable()) { + auto* ptr = scope->Var(var->Name()); + InitializeVariable(ptr, var->GetType()); + } + } + scope_queue->Send(scope); + } +} + +void PipelineTrainer::CopyParameters(const Scope& root_scope, int pipeline_id) { + for (const std::string& name : *param_need_sync_) { + const LoDTensor& root_tensor = root_scope.FindVar(name)->Get(); + + // TODO(hutxian): check a new var of the same name is created in + // pipeline_scope + LoDTensor* gpu_tensor = + pipeline_scopes_[pipeline_id]->Var(name)->GetMutable(); + platform::Place place = platform::CUDAPlace(pipeline_id); + TensorCopy(*static_cast(&root_tensor), place, + static_cast(gpu_tensor)); + } +} + +void PipelineTrainer::InitTrainerEnv(const ProgramDesc& main_program, + const platform::Place& place) { + PADDLE_ENFORCE(root_scope_, "Null root_scope pointer"); + SectionWorker::cpu_id_.store(pipeline_config_.start_cpu_core_id()); + scope_queues_.resize(section_num_); + pipeline_scopes_.resize(pipeline_num_); + + VLOG(3) << "Init ScopeQueues and create all scopes"; + for (int i = 0; i < section_num_; ++i) { + for (int j = 0; j < pipeline_num_; ++j) { + scope_queues_[i].emplace_back(new ScopeQueue(scope_queue_size_)); + if (i == 0) { + pipeline_scopes_[j] = &root_scope_->NewScope(); + CopyParameters(*root_scope_, j); + InitFirstScopeQueue(scope_queues_[0].back().get(), j, main_program); + } + } + } + + for (int i = 0; i < section_num_; ++i) { + for (int j = 0; j < pipeline_num_; ++j) { + for (size_t k = 0; k < workers_[i][j].size(); ++k) { + auto this_worker = + std::dynamic_pointer_cast( + workers_[i][j][k]); + this_worker->SetRootScope(root_scope_); + this_worker->SetCountMutex(worker_count_mutex_[i][j].get()); + this_worker->SetWorkerCount(worker_count_[i][j]); + this_worker->SetScopeQueue(scope_queues_[i][j].get(), + (i == section_num_ - 1) + ? scope_queues_[0][j].get() + : scope_queues_[i + 1][j].get()); + this_worker->SetVarNames(*in_var_names_[i], *out_var_names_[i]); + if (i != section_num_ - 1) { + // For data copy in adjacent different place + this_worker->SetNextSectionPlace( + std::dynamic_pointer_cast( + workers_[i + 1][j][0]) + ->place()); + } + } + } + } + + if (pipeline_num_ > 1) { + construct_sync_functor(); + } +} + +void PipelineTrainer::construct_sync_functor() { + std::vector cuda_places; + for (int i = 0; i < pipeline_num_; ++i) { + cuda_places.emplace_back(platform::CUDAPlace(i)); + } + nccl_ctx_map_.reset(new platform::NCCLContextMap(cuda_places)); + sync_functors_.resize(pipeline_num_); + SyncFunctor::sync_flag_ = 0; + SyncFunctor::pipeline_scopes_.resize(0); + + for (int j = 0; j < pipeline_num_; ++j) { + SyncFunctor* sync_function = new SyncFunctor(j, pipeline_num_, sync_steps_); + sync_function->SetSyncParam(*param_need_sync_); + sync_function->SetNcclCtxMap(nccl_ctx_map_.get()); + SyncFunctor::pipeline_scopes_.push_back(this->pipeline_scopes_[j]); + sync_functors_[j].reset(sync_function); + } + for (int i = section_num_ - 1; i >= 0; --i) { + if (SectionConfig::CUDAPlace == + pipeline_config_.section_config(i).place()) { + for (int j = 0; j < pipeline_num_; ++j) { + for (size_t k = 0; k < workers_[i][j].size(); ++k) { + auto this_worker = + std::dynamic_pointer_cast( + workers_[i][j][k]); + this_worker->SetSyncFunctor(sync_functors_[j].get()); + } + } + break; + } + } +} + +void PipelineTrainer::Run() { + VLOG(3) << "Going to run"; + for (int i = 0; i < section_num_; ++i) { + for (int j = 0; j < pipeline_num_; ++j) { + for (size_t k = 0; k < workers_[i][j].size(); ++k) { + if (!debug_) { + section_threads_.push_back( + std::thread(&DeviceWorker::TrainFiles, workers_[i][j][k].get())); + } else { + section_threads_.push_back(std::thread( + &DeviceWorker::TrainFilesWithProfiler, workers_[i][j][k].get())); + } + } + } + } +} + +void PipelineTrainer::Finalize() { + for (auto& th : section_threads_) { + th.join(); + } + for (const auto& var : *param_need_sync_) { + auto* root_tensor = root_scope_->Var(var)->GetMutable(); + // TODO(hutuxian): Add a final all-reduce? + const auto& thread_tensor = + pipeline_scopes_[0]->FindVar(var)->Get(); + TensorCopySync(thread_tensor, platform::CPUPlace(), root_tensor); + } + dataset_ptr_->DestroyReaders(); + root_scope_->DropKids(); +} + +} // end namespace framework +} // end namespace paddle +#endif diff --git a/paddle/fluid/framework/section_worker.cc b/paddle/fluid/framework/section_worker.cc new file mode 100644 index 00000000000..c1a404c1cb2 --- /dev/null +++ b/paddle/fluid/framework/section_worker.cc @@ -0,0 +1,411 @@ +/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) +#include "google/protobuf/io/zero_copy_stream_impl.h" +#include "google/protobuf/message.h" +#include "google/protobuf/text_format.h" + +#include "paddle/fluid/framework/device_worker.h" +#include "paddle/fluid/framework/tensor_util.h" +#include "paddle/fluid/framework/trainer_desc.pb.h" +#include "paddle/fluid/platform/cpu_helper.h" +#include "paddle/fluid/platform/device_context.h" +#include "paddle/fluid/platform/lodtensor_printer.h" + +namespace paddle { +namespace framework { + +uint64_t SyncFunctor::sync_flag_ = 0; +std::vector SyncFunctor::pipeline_scopes_; + +SyncFunctor::SyncFunctor(int rank_id, int rank_num, int sync_steps) + : rank_id_(rank_id), rank_num_(rank_num), sync_steps_(sync_steps) { + PADDLE_ENFORCE(rank_num > 1, "rank_num should larger than 1"); + counter_ = 0; + sync_signal_ = 0; + uint8_t* ptr = reinterpret_cast(&sync_signal_); + for (int i = 0; i < rank_num_; ++i) { + ptr[i] = 0xFF; + } +} + +int SyncFunctor::operator()(Scope* scope) { + ++counter_; + if (counter_ < sync_steps_) { + return 0; + } + if (counter_ == sync_steps_) { + reinterpret_cast(&sync_flag_)[rank_id_] = 0xFF; + } + + if (sync_flag_ == sync_signal_) { + static std::mutex mutex; + if (mutex.try_lock()) { + if (sync_flag_ == sync_signal_) { + Synchronize(); + sync_flag_ = 0; + } + mutex.unlock(); + } + } + + if (sync_flag_ == 0) { + counter_ = 0; + } + return 0; +} + +void SyncFunctor::Synchronize() { + for (const std::string& name : *sync_param_) { + platform::NCCLGroupGuard guard; + for (int i = 0; i < rank_num_; ++i) { + const platform::NCCLContext& nccl_ctx = nccl_ctx_map_->at(i); + LoDTensor* tensor = + pipeline_scopes_[i]->Var(name)->GetMutable(); + // TODO(hutuxian): do not depend on data type explicitly + float* data = + tensor->mutable_data(nccl_ctx_map_->DevCtx(i)->GetPlace()); + const int numel = tensor->numel(); + + paddle::framework::AttributeMap attrs; + attrs.insert({"scale", static_cast(1. / rank_num_)}); + auto scale_op = framework::OpRegistry::CreateOp("scale", {{"X", {name}}}, + {{"Out", {name}}}, attrs); + scale_op->Run(*(pipeline_scopes_[i]), + nccl_ctx_map_->DevCtx(i)->GetPlace()); + PADDLE_ENFORCE(platform::dynload::ncclAllReduce( + data, data, numel, ncclFloat, ncclSum, nccl_ctx.comm(), + dynamic_cast( + platform::DeviceContextPool::Instance().Get( + platform::CUDAPlace(i))) + ->stream())); + } + } + nccl_ctx_map_->WaitAll(); +} + +std::atomic SectionWorker::cpu_id_(0); +void SectionWorker::Initialize(const TrainerDesc& trainer_desc) { + dev_ctx_ = platform::DeviceContextPool::Instance().Get(place_); + std::shared_ptr program; + program.reset(new ProgramDesc( + trainer_desc.section_param().section_config(section_id_).program_desc())); + for (auto& op_desc : program->Block(0).AllOps()) { + ops_.push_back(OpRegistry::CreateOp(*op_desc)); + } +} + +void SectionWorker::AutoSetCPUAffinity(bool reuse) { + int thread_cpu_id = cpu_id_.fetch_add(1); + + unsigned concurrency_cap = std::thread::hardware_concurrency(); + unsigned proc = thread_cpu_id; + + if (proc >= concurrency_cap) { + if (reuse) { + proc %= concurrency_cap; + } else { + LOG(INFO) << "All " << concurrency_cap + << " CPUs have been set affinities. Fail to set " + << thread_cpu_id << "th thread"; + return; + } + } + + cpu_set_t mask; + CPU_ZERO(&mask); + CPU_SET(proc, &mask); + + if (-1 == sched_setaffinity(0, sizeof(mask), &mask)) { + LOG(WARNING) << "Fail to set thread affinity to CPU " << proc; + return; + } + + CPU_ZERO(&mask); + if ((0 != sched_getaffinity(0, sizeof(mask), &mask)) || + (0 == CPU_ISSET(proc, &mask))) { + LOG(WARNING) << "Fail to set thread affinity to CPU " << proc; + } + SEC_LOG << "Set " << thread_cpu_id << "th thread affinity to CPU " << proc; +} + +void SectionWorker::TrainFiles() { + SEC_LOG << "begin section_worker TrainFiles"; + AutoSetCPUAffinity(true); + + int64_t step_cnt = 0; + int64_t accum_num = 0; + int batch_size = 0; + Scope* scope = nullptr; + while (in_scope_queue_->Receive(&scope)) { + if (device_reader_ != nullptr) { + device_reader_->AssignFeedVar(*scope); + batch_size = device_reader_->Next(); + if (batch_size <= 0) { + break; + } + SEC_LOG << "read batch size: " << batch_size; + } else { + // TODO(hutuxian): Keep batch_size in scope? Or is there a better way to + // fetch batch_size? Some variables may not have batch_size. + PADDLE_ENFORCE( + in_var_names_->size(), + "Section without a reader or in variable is not supported by now"); + const LoDTensor& tensor = + scope->FindVar(in_var_names_->at(0))->Get(); + batch_size = + tensor.lod().size() ? tensor.lod()[0].size() - 1 : tensor.dims()[0]; + SEC_LOG << "input batch size: " << batch_size; + } + + Scope* exe_scope = scope; + if (section_id_ > 0 && platform::is_gpu_place(place_)) { + SEC_LOG << "CPU2GPU memory copy"; + + if (scope->kids().empty()) { + exe_scope = &scope->NewScope(); + } else { + exe_scope = scope->kids().front(); + PADDLE_ENFORCE(scope->kids().size() == 1, "scope->kids().size(): %zu", + scope->kids().size()); + } + + for (const std::string& name : *in_var_names_) { + const LoDTensor& src_tensor = scope->FindVar(name)->Get(); + if (platform::is_gpu_place(src_tensor.place())) { + continue; + } + LoDTensor* gpu_tensor = exe_scope->Var(name)->GetMutable(); + gpu_tensor->set_lod(src_tensor.lod()); + TensorCopy(*static_cast(&src_tensor), place_, *dev_ctx_, + static_cast(gpu_tensor)); + } + } + + SEC_LOG << "begin running ops"; + + for (auto& op : ops_) { + op->Run(*exe_scope, place_); + } + exe_scope->DropKids(); + // Wait for GPU calc finising, as the cudaMemcpy and GPU calc may be in + // different streams + // No effect when it is a CPUDeviceContext + dev_ctx_->Wait(); + + if (section_id_ != section_num_ - 1 && platform::is_gpu_place(place_)) { + // FIXME: Temporarily we assume two adjacent sections are in different + // places, + // and we do data transformation only in sections in GPU place, so the + // data is + // transform from GPU to CPU + // A better way to handle such a data transformation is to record each + // place of + // joint-out variables, and do transform as required + + SEC_LOG << "GPU2CPU memory copy"; + + for (const std::string& name : *out_var_names_) { + const LoDTensor& src_tensor = + exe_scope->FindVar(name)->Get(); + LoDTensor* dst_tensor = scope->Var(name)->GetMutable(); + dst_tensor->set_lod(src_tensor.lod()); + TensorCopy(*static_cast(&src_tensor), + next_section_place_, *dev_ctx_, + static_cast(dst_tensor)); + } + } + + out_scope_queue_->Send(scope); + + if (sync_func_) { + (*sync_func_)(scope); + } + + ++step_cnt; + accum_num += batch_size; + } + + worker_count_mutex_->lock(); + --(*worker_count_); + worker_count_mutex_->unlock(); + + if (*worker_count_ <= 0) { + while (section_id_ < section_num_ - 1 && out_scope_queue_->Size()) { + sleep(1); + } + out_scope_queue_->Close(); + } +} + +void SectionWorker::TrainFilesWithProfiler() { + SEC_LOG << "begin section_worker TrainFiles with profiler"; + AutoSetCPUAffinity(true); + + int64_t step_cnt = 0; + int64_t accum_num = 0; + int batch_size = 0; + Scope* scope = nullptr; + + platform::Timer reader_timer; + platform::Timer cal_timer; + platform::Timer trans_timer; + platform::Timer sync_timer; + platform::Timer main_timer; + platform::Timer outer_timer; + + std::vector op_total_time; + std::vector op_name; + for (auto& op : ops_) { + op_name.push_back(op->Type()); + } + op_total_time.resize(ops_.size()); + for (size_t i = 0; i < op_total_time.size(); ++i) { + op_total_time[i] = 0.0; + } + platform::Timer timeline; + + bool started = false; + while (in_scope_queue_->Receive(&scope)) { + if (UNLIKELY(!started)) { + outer_timer.Start(); + started = true; + } + main_timer.Resume(); + + if (device_reader_ != nullptr) { + reader_timer.Resume(); + device_reader_->AssignFeedVar(*scope); + batch_size = device_reader_->Next(); + reader_timer.Pause(); + if (batch_size <= 0) { + break; + } + SEC_LOG << "read batch size: " << batch_size; + } else { + PADDLE_ENFORCE( + in_var_names_->size(), + "Section without a reader or in variable is not supported by now"); + const LoDTensor& tensor = + scope->FindVar(in_var_names_->at(0))->Get(); + batch_size = + tensor.lod().size() ? tensor.lod()[0].size() - 1 : tensor.dims()[0]; + SEC_LOG << "input batch size: " << batch_size; + } + + Scope* exe_scope = scope; + if (section_id_ > 0 && platform::is_gpu_place(place_)) { + SEC_LOG << "CPU2GPU memory copy"; + trans_timer.Resume(); + if (scope->kids().empty()) { + exe_scope = &scope->NewScope(); + } else { + exe_scope = scope->kids().front(); + PADDLE_ENFORCE(scope->kids().size() == 1, "scope->kids().size(): %zu", + scope->kids().size()); + } + + for (const std::string& name : *in_var_names_) { + const LoDTensor& src_tensor = scope->FindVar(name)->Get(); + if (platform::is_gpu_place(src_tensor.place())) { + continue; + } + LoDTensor* gpu_tensor = exe_scope->Var(name)->GetMutable(); + gpu_tensor->set_lod(src_tensor.lod()); + TensorCopy(*static_cast(&src_tensor), place_, *dev_ctx_, + static_cast(gpu_tensor)); + } + trans_timer.Pause(); + } + + SEC_LOG << "begin running ops"; + cal_timer.Resume(); + int op_id = 0; + for (auto& op : ops_) { + timeline.Start(); + op->Run(*exe_scope, place_); + timeline.Pause(); + op_total_time[op_id++] += timeline.ElapsedUS(); + } + exe_scope->DropKids(); + // Wait for GPU calc finising, as the cudaMemcpy and GPU calc may be in + // different streams + // No effect when it is a CPUDeviceContext + dev_ctx_->Wait(); + cal_timer.Pause(); + + if (section_id_ != section_num_ - 1 && platform::is_gpu_place(place_)) { + // FIXME: Temporarily we assume two adjacent sections are in different + // places, + // and we do data transformation only in sections in GPU place, so the + // data is + // transform from GPU to CPU + // A better way to handle such a data transformation is to record each + // place of + // joint-out variables, and do transform as required + + SEC_LOG << "GPU2CPU memory copy"; + trans_timer.Resume(); + for (const std::string& name : *out_var_names_) { + const LoDTensor& src_tensor = + exe_scope->FindVar(name)->Get(); + LoDTensor* dst_tensor = scope->Var(name)->GetMutable(); + dst_tensor->set_lod(src_tensor.lod()); + TensorCopy(*static_cast(&src_tensor), + next_section_place_, *dev_ctx_, + static_cast(dst_tensor)); + } + trans_timer.Pause(); + } + + out_scope_queue_->Send(scope); + + if (sync_func_) { + sync_timer.Resume(); + (*sync_func_)(scope); + sync_timer.Pause(); + } + + ++step_cnt; + accum_num += batch_size; + main_timer.Pause(); + } + outer_timer.Pause(); + + worker_count_mutex_->lock(); + --(*worker_count_); + worker_count_mutex_->unlock(); + + if (*worker_count_ <= 0) { + while (section_id_ < section_num_ - 1 && out_scope_queue_->Size()) { + sleep(1); + } + out_scope_queue_->Close(); + } + LOG(ERROR) << "log_for_profile" + << " card:" << pipeline_id_ << " thread:" << thread_id_ + << " section:" << section_id_ << " step_count:" << step_cnt + << " batch_count:" << accum_num + << " read_time:" << reader_timer.ElapsedUS() + << " trans_time:" << trans_timer.ElapsedUS() + << " cal_time:" << cal_timer.ElapsedUS() + << " sync_time:" << sync_timer.ElapsedUS() + << " main_time:" << main_timer.ElapsedUS() + << " outer_time:" << outer_timer.ElapsedUS(); + for (size_t i = 0; i < ops_.size(); ++i) { + LOG(ERROR) << "op: " << op_name[i] + << ", mean time: " << op_total_time[i] / accum_num; + } +} +} // namespace framework +} // namespace paddle +#endif diff --git a/paddle/fluid/framework/trainer.h b/paddle/fluid/framework/trainer.h index b29736cfbbe..b491725974c 100644 --- a/paddle/fluid/framework/trainer.h +++ b/paddle/fluid/framework/trainer.h @@ -91,5 +91,58 @@ class DistMultiTrainer : public MultiTrainer { std::shared_ptr pull_dense_worker_; }; +#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) +class PipelineTrainer : public TrainerBase { + public: + PipelineTrainer() {} + ~PipelineTrainer() override {} + void Initialize(const TrainerDesc& trainer_desc, Dataset* data_set) override; + void InitTrainerEnv(const ProgramDesc& main_program, + const platform::Place& place) override; + void InitOtherEnv(const ProgramDesc& main_program) override {} + void Run() override; + void Finalize() override; + + protected: + int section_num_; + int pipeline_num_; + int scope_queue_size_; + int sync_steps_; + + SectionWorkerParameter pipeline_config_; + + // The in/output var names for each section + std::vector>> in_var_names_; + std::vector>> out_var_names_; + + // Counter for the running thread + std::vector> worker_count_; + std::vector>> worker_count_mutex_; + + // worker: [section_id][pipeline_id][thread_id] + std::vector>>> + workers_; + std::vector section_threads_; + + // We use scope to maintain context info, and scopes + // will be deliverd between different sections. + std::vector>> scope_queues_; + std::vector pipeline_scopes_; + + // The parameters that should be syncronized between different cards using + // nccl all-reduce + std::shared_ptr> param_need_sync_; + std::vector> sync_functors_; + std::shared_ptr nccl_ctx_map_; + + std::vector> readers_; + + void InitFirstScopeQueue(ScopeQueue* scope_queue, int pipeline_id, + const ProgramDesc& main_program); + void CopyParameters(const Scope& root_scope, int pipeline_id); + void construct_sync_functor(); +}; +#endif } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/trainer_desc.proto b/paddle/fluid/framework/trainer_desc.proto index ae8798b6076..4910fb740c5 100644 --- a/paddle/fluid/framework/trainer_desc.proto +++ b/paddle/fluid/framework/trainer_desc.proto @@ -13,7 +13,9 @@ See the License for the specific language governing permissions and limitations under the License. */ syntax = "proto2"; +option optimize_for = LITE_RUNTIME; import "data_feed.proto"; +import "framework.proto"; package paddle.framework; message TrainerDesc { @@ -36,6 +38,7 @@ message TrainerDesc { optional HogwildWorkerParameter hogwild_param = 101; optional DownpourWorkerParameter downpour_param = 103; optional PullDenseWorkerParameter pull_dense_param = 102; + optional SectionWorkerParameter section_param = 104; // datafeed desc optional DataFeedDesc data_desc = 201; } @@ -51,6 +54,30 @@ message DownpourWorkerParameter { optional bool push_dense = 6 [ default = true ]; } +message SectionWorkerParameter { + repeated SectionConfig section_config = 1; + optional int32 queue_size = 2 [ default = 1 ]; + optional int64 sync_steps = 3 [ default = 1 ]; + optional int32 start_cpu_core_id = 4 [ default = 1 ]; + repeated string param_need_sync = 5; +} + +message SectionConfig { + enum Place { + CPUPlace = 0; + CUDAPlace = 1; + CUDAPinnedPlace = 2; + } + + // FIXME: How to use proto::ProgramDesc + // required string program_desc_str = 1; + optional proto.ProgramDesc program_desc = 1; + optional Place place = 2; + optional int32 concurrency = 3 [ default = 1 ]; + repeated string section_in_var_names = 4; + repeated string section_out_var_names = 5; +} + message FetchConfig { enum Method { PRINT = 0; } repeated string fetch_var_names = 1; diff --git a/paddle/fluid/framework/trainer_factory.cc b/paddle/fluid/framework/trainer_factory.cc index 6b4461c0c42..ce0eb5ec30c 100644 --- a/paddle/fluid/framework/trainer_factory.cc +++ b/paddle/fluid/framework/trainer_factory.cc @@ -63,5 +63,8 @@ std::shared_ptr TrainerFactory::CreateTrainer( REGISTER_TRAINER_CLASS(MultiTrainer); REGISTER_TRAINER_CLASS(DistMultiTrainer); +#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) +REGISTER_TRAINER_CLASS(PipelineTrainer); +#endif } // namespace framework } // namespace paddle diff --git a/paddle/fluid/platform/timer.h b/paddle/fluid/platform/timer.h index 56019ae7cf2..ff0e1d95c29 100644 --- a/paddle/fluid/platform/timer.h +++ b/paddle/fluid/platform/timer.h @@ -50,7 +50,7 @@ class Timer { struct timeval _start; struct timeval _now; int _count; - int _elapsed; + int64_t _elapsed; bool _paused; // get us difference between start and now diff --git a/python/paddle/fluid/dataset.py b/python/paddle/fluid/dataset.py index abc8e9c26d9..b3d58a589bd 100644 --- a/python/paddle/fluid/dataset.py +++ b/python/paddle/fluid/dataset.py @@ -21,7 +21,7 @@ __all__ = ['DatasetFactory', 'InMemoryDataset', 'QueueDataset'] class DatasetFactory(object): """ DatasetFactory is a factory which create dataset by its name, - you can create "QueueDataset" or "InMemoryDataset", + you can create "QueueDataset" or "InMemoryDataset", or "FileInstantDataset", the default is "QueueDataset". Example: @@ -38,7 +38,7 @@ class DatasetFactory(object): def create_dataset(self, datafeed_class="QueueDataset"): """ - Create "QueueDataset" or "InMemoryDataset", + Create "QueueDataset" or "InMemoryDataset", or "FileInstantDataset", the default is "QueueDataset". Args: @@ -450,3 +450,36 @@ class QueueDataset(DatasetBase): raise NotImplementedError( "QueueDataset does not support global shuffle, " "please use InMemoryDataset for global_shuffle") + + +class FileInstantDataset(DatasetBase): + """ + FileInstantDataset, it will process data streamly. + Example: + import paddle.fluid as fluid + dataset = fluid.DatasetFactory.create_dataset("FileInstantDataset") + """ + + def __init__(self): + """ + Init + """ + super(FileInstantDataset, self).__init__() + self.proto_desc.name = "MultiSlotFileInstantDataFeed" + + def local_shuffle(self): + """ + Local shuffle + FileInstantDataset does not support local shuffle + """ + raise NotImplementedError( + "FileInstantDataset does not support local shuffle, " + "please use InMemoryDataset for local_shuffle") + + def global_shuffle(self, fleet=None): + """ + Global shuffle + """ + raise NotImplementedError( + "FileInstantDataset does not support global shuffle, " + "please use InMemoryDataset for global_shuffle") diff --git a/python/paddle/fluid/device_worker.py b/python/paddle/fluid/device_worker.py index 2f656692738..80989d5804d 100644 --- a/python/paddle/fluid/device_worker.py +++ b/python/paddle/fluid/device_worker.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -__all__ = ['DeviceWorker', 'Hogwild', 'DownpourSGD'] +__all__ = ['DeviceWorker', 'Hogwild', 'DownpourSGD', 'Section'] class DeviceWorker(object): @@ -181,6 +181,58 @@ class DownpourSGD(DeviceWorker): downpour.push_sparse = False +class Section(DeviceWorker): + """ + SectionWorker + """ + + def __init__(self): + """ + Init. + """ + super(Section, self).__init__() + + def _gen_worker_desc(self, trainer_desc): + """ + Generator worker desc, which device worker is SectionWorker. + Args: + trainer_desc(TrainerDesc): a TrainerDesc object + """ + from google.protobuf import text_format + from . import core + trainer_desc.device_worker_name = "SectionWorker" + pipeline_opt = self._program._pipeline_opt + section_param = trainer_desc.section_param + section_param.queue_size = pipeline_opt["queue_size"] + section_param.sync_steps = pipeline_opt["sync_steps"] + section_param.start_cpu_core_id = pipeline_opt["start_cpu_core_id"] + for e in pipeline_opt["param_need_sync"]: + section_param.param_need_sync.append(e) + for i, program in enumerate(pipeline_opt["section_program_list"]): + cfg = section_param.section_config.add() + cfg.program_desc.ParseFromString(program["program"]._get_desc() + .serialize_to_string()) + # TODO: why does not work + #cfg.program_desc.CopyFrom(program.program._get_desc()) + place = pipeline_opt["place_list"][i] + if isinstance(place, core.CPUPlace): + cfg.place = cfg.CPUPlace + elif isinstance(place, core.CUDAPlace): + cfg.place = cfg.CUDAPlace + elif isinstance(place, core.CUDAPinnedPlace): + cfg.place = cfg.CUDAPinnedPlace + else: + raise NotImplementedError( + "SectionWorker only supports CPUPlace, CUDAPlace and CUDAPinnedPlace now." + ) + + cfg.concurrency = pipeline_opt["concurrency_list"][i] + for var in program["input_set"]: + cfg.section_in_var_names.append(var) + for var in program["output_set"]: + cfg.section_out_var_names.append(var) + + class DeviceWorkerFactory(object): def _create_device_worker(self, worker_type): classname = worker_type.capitalize() diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index dfa9a0f4d37..bf9754ce2bf 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -781,12 +781,23 @@ class Executor(object): assert len(fetch_list) == len(fetch_info) compiled = isinstance(program, compiler.CompiledProgram) if not compiled: - trainer = TrainerFactory()._create_trainer(program._fleet_opt) + # TODO: Need a better way to distinguish and specify different execution mode + if program._pipeline_opt: + trainer = TrainerFactory()._create_trainer( + program._pipeline_opt) + else: + trainer = TrainerFactory()._create_trainer(program._fleet_opt) trainer._set_program(program) else: - trainer = TrainerFactory()._create_trainer( - program.program._fleet_opt) + if program._pipeline_opt: + trainer = TrainerFactory()._create_trainer( + program.program._pipeline_opt) + else: + trainer = TrainerFactory()._create_trainer( + program.program._fleet_opt) trainer._set_program(program.program) + + # The following thread_num-determined logic will be deprecated if thread <= 0: if dataset.thread_num <= 0: raise RuntimeError( @@ -796,6 +807,26 @@ class Executor(object): trainer._set_thread(dataset.thread_num) else: trainer._set_thread(thread) + + # Adjust the reader size for small file num + if program._pipeline_opt: + dataset.set_thread(thread * + program._pipeline_opt["concurrency_list"][0]) + file_size = len(dataset.dataset.get_filelist()) + if file_size < thread: + thread = file_size + print( + "Pipeline: setting the pipeline num to %d is enough because there are only %d files" + % (file_size, file_size)) + if file_size < thread * program._pipeline_opt["concurrency_list"][ + 0]: + print( + "Pipeline: setting the 1st element in concurrency_list to %d is enough because there are only %d files" + % (file_size / thread, file_size)) + program._pipeline_opt["concurrency_list"][ + 0] = file_size / thread + dataset.set_thread( + program._pipeline_opt["concurrency_list"][0] * thread) trainer._set_debug(debug) trainer._set_fetch_var_and_info(fetch_list, fetch_info, print_period) return scope, trainer diff --git a/python/paddle/fluid/framework.py b/python/paddle/fluid/framework.py index 4c0dd79500b..30d4af15e0b 100644 --- a/python/paddle/fluid/framework.py +++ b/python/paddle/fluid/framework.py @@ -2785,6 +2785,9 @@ class Program(object): self._fleet_opt = None self._program_config = None + # assigned if this program has been parsed by a pipeline optimizer + self._pipeline_opt = None + @property def _is_mem_optimized(self): # if the program is optimized, operator input/outputs diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index 17ac54e90a9..006cd291439 100644 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -23,7 +23,7 @@ from paddle.fluid.framework import Program, Variable, name_scope, default_main_p from . import framework from . import layers from . import unique_name -from .backward import append_backward +from .backward import append_backward, _some_in_set_, _append_grad_suffix_ from .clip import append_gradient_clip_ops, error_clip_callback from .framework import program_guard from .initializer import Constant @@ -43,7 +43,7 @@ __all__ = [ 'AdamaxOptimizer', 'DecayedAdagradOptimizer', 'RMSPropOptimizer', 'FtrlOptimizer', 'Adadelta', 'ModelAverage', 'LarsMomentum', 'LarsMomentumOptimizer', 'DGCMomentumOptimizer', 'LambOptimizer', - 'ExponentialMovingAverage' + 'ExponentialMovingAverage', 'PipelineOptimizer' ] @@ -2607,3 +2607,230 @@ class ExponentialMovingAverage(object): executor (Executor): The Executor to execute restoring. """ executor.run(self.restore_program) + + +class PipelineOptimizer(object): + def __init__(self, + optimizer, + cut_list=None, + place_list=None, + concurrency_list=None, + queue_size=30, + sync_steps=1, + start_cpu_core_id=0): + # TODO: check properties + self._optimizer = optimizer + self._cut_list = cut_list + self._place_list = place_list + self._concurrency_list = concurrency_list + self._queue_size = queue_size + self._sync_steps = sync_steps + self._start_cpu_core_id = start_cpu_core_id + + def create_vars(self, block, main_program): + used_var_set = set() + for op_idx in range(block.desc.op_size()): + op_desc = block.desc.op(op_idx) + vars = op_desc.input_arg_names() + op_desc.output_arg_names() + for var in vars: + if var in used_var_set: + continue + used_var_set.add(var) + source_var = main_program.block(0).var(str(var)) + block._clone_variable(source_var, False) + + def extract_section_opt_ops(self, ops, cut_point_name): + """ + Extract opt ops in the given section + """ + output_names = set(cut_point_name) + relevant_op_flags = [True] * len(ops) + for i, op in reversed(list(enumerate(ops))): + if _some_in_set_(op.desc.output_arg_names(), output_names): + for name in op.desc.input_arg_names(): + output_names.add(name) + else: + relevant_op_flags[i] = False + + op_path = [ops[i] for i in range(len(ops)) if relevant_op_flags[i]] + return op_path + + def find_input_output(self, ops, name, is_forward=True): + """ + Find the inputs or outputs of a section + """ + all_set = set() + part_set = set() + for op in ops: + if is_forward: + part_set.update(op.desc.output_arg_names()) + else: + part_set.update(op.desc.input_arg_names()) + all_set.update(op.desc.output_arg_names()) + all_set.update(op.desc.input_arg_names()) + return all_set - part_set + + def find_persistable_vars(self, ops, whole_parameters): + """ + find the persistable input vars in current section + """ + res = set() + for op in ops: + vars = op.desc.input_arg_names() + for var in vars: + if var in whole_parameters: + res.add(var) + return res + + def _is_opt_role_op(self, op): + op_maker = core.op_proto_and_checker_maker + optimize_role = core.op_proto_and_checker_maker.OpRole.Optimize + if op_maker.kOpRoleAttrName() in op.attr_names and \ + int(op.all_attrs()[op_maker.kOpRoleAttrName()]) & int(optimize_role) != 0: + return True + return False + + def _is_lr_role_op(self, op): + op_maker = core.op_proto_and_checker_maker + optimize_role = core.op_proto_and_checker_maker.OpRole.LRSched + if op_maker.kOpRoleAttrName() in op.attr_names and \ + int(op.all_attrs()[op_maker.kOpRoleAttrName()]) == int(optimize_role): + return True + return False + + def extract_section_ops(self, ops, cut_point_name): + """ + Extract ops in the given section + """ + output_names = set(cut_point_name) + relevant_op_flags = [True] * len(ops) + for i, op in reversed(list(enumerate(ops))): + if not self._is_opt_role_op(op) and _some_in_set_( + op.desc.output_arg_names(), output_names): + for name in op.desc.input_arg_names(): + output_names.add(name) + elif op.desc.type() == "print" and op.desc.input_arg_names()[ + 0] in output_names: + continue + else: + relevant_op_flags[i] = False + + op_path = [ops[i] for i in range(len(ops)) if relevant_op_flags[i]] + return op_path + + def find_section_opt(self, ops, params): + res = self.extract_section_opt_ops(ops, params) + return res + + def split_program(self, main_program, cut_list): + programs = [] + block = main_program.block(0) + whole_parameters = [e.name for e in block.all_parameters()] + cut_var_names = [] + cut_len = len(cut_list) + sec_params = [] + for i, cut_vars in enumerate(cut_list[:-1]): + cut_var_names.append([cut_var.name for cut_var in cut_vars]) + for i, cut_vars in reversed(list(enumerate(cut_list[:-1]))): + cut_var_names.append( + [_append_grad_suffix_(cut_var.name) for cut_var in cut_vars]) + if i == 0: + cut_var_names[-1] += [var.name for var in cut_list[-1]] + ops = block.ops[:] + for i, cut_vars in enumerate(cut_var_names): + program = { + "program": Program(), + "input_set": set(), + "output_set": set() + } + cur_ops = self.extract_section_ops(ops, cut_vars) + if i == 0: + for op in ops: + if self._is_lr_role_op(op): + cur_ops.append(op) + #prevent inplace in/out + program["input_set"].update( + self.find_input_output( + cur_ops, [], is_forward=True)) + for e in cur_ops: + ops.remove(e) + + if i < cut_len: + sec_params.append( + self.find_persistable_vars(cur_ops, whole_parameters)) + if i >= cut_len - 1: + opt_ops = self.find_section_opt(ops, + sec_params[2 * cut_len - 2 - i]) + + for e in opt_ops: + ops.remove(e) + cur_ops += opt_ops + + op_descs = [op.desc for op in cur_ops] + for op_desc in op_descs: + ap_op = program["program"].block(0).desc.append_op() + ap_op.copy_from(op_desc) + program["input_set"].update( + self.find_input_output( + cur_ops, cut_vars, is_forward=True)) + program["input_set"].update(sec_params[min(i, 2 * cut_len - 2 - i)]) + program["output_set"].update( + self.find_input_output( + cur_ops, cut_vars, is_forward=False)) + programs.append(program) + program = { + "program": Program(), + "input_set": set(), + "output_set": set() + } + op_descs = [op.desc for op in ops] + for op_desc in op_descs: + ap_op = program["program"].block(0).desc.append_op() + ap_op.copy_from(op_desc) + program["input_set"].update( + [cut_var.name + "@GRAD" for cut_var in cut_list[0]]) + program["input_set"].update( + self.find_input_output( + ops, [], is_forward=True)) + program["input_set"].update(sec_params[0]) + programs.append(program) + inputs = set() + for program in reversed(list(programs)): + output_list = list(program["output_set"]) + for output in output_list: + if output not in inputs: + program["output_set"].remove(output) + inputs.update(program["input_set"]) + return programs + + def minimize(self, + loss, + startup_program=None, + parameter_list=None, + no_grad_set=None): + self._optimizer.minimize(loss, startup_program, parameter_list, + no_grad_set) + program = loss.block.program + program_list = self.split_program(program, self._cut_list) + for p in program_list: + self.create_vars(p["program"].block(0), program) + whole_parameters = [e.name for e in program.block(0).all_parameters()] + param_need_sync = [] + for i, section_p in enumerate(program_list): + if not isinstance(self._place_list[i], core.CUDAPlace): + continue + section_var = [e for e in section_p["program"].block(0).vars] + for p in section_var: + if p in whole_parameters: + param_need_sync.append(p) + program._pipeline_opt = { + "trainer": "PipelineTrainer", + "device_worker": "Section", + "section_program_list": program_list, + "place_list": self._place_list, + "concurrency_list": self._concurrency_list, + "queue_size": self._queue_size, + "start_cpu_core_id": self._start_cpu_core_id, + "sync_steps": self._sync_steps, + "param_need_sync": param_need_sync + } diff --git a/python/paddle/fluid/tests/demo/pipeline_train.py b/python/paddle/fluid/tests/demo/pipeline_train.py new file mode 100644 index 00000000000..54fa719e29d --- /dev/null +++ b/python/paddle/fluid/tests/demo/pipeline_train.py @@ -0,0 +1,508 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserve. +# +#Licensed under the Apache License, Version 2.0 (the "License"); +#you may not use this file except in compliance with the License. +#You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +#Unless required by applicable law or agreed to in writing, software +#distributed under the License is distributed on an "AS IS" BASIS, +#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +#See the License for the specific language governing permissions and +#limitations under the License. + +import numpy as np +import copy +import pickle +import os +from functools import partial +import logging +import time +import paddle +import paddle.fluid as fluid +import paddle.fluid.layers as layers +import argparse +import random +import sys +import math + +logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger("fluid") +logger.setLevel(logging.INFO) + +batch_size = 100 +ncards = 4 +nreaders = 4 +nscopes = 30 +learning_rate = 0.1 +is_profile = False +sync_steps = 1 + + +def parse_args(): + parser = argparse.ArgumentParser("gnn") + parser.add_argument( + '--train_path', + type=str, + default='./data/diginetica/train.txt', + help='dir of training data') + parser.add_argument( + '--config_path', + type=str, + default='./data/diginetica/config.txt', + help='dir of config') + parser.add_argument( + '--model_path', + type=str, + default='./saved_model', + help="path of model parameters") + parser.add_argument( + '--epoch_num', + type=int, + default=30, + help='number of epochs to train for') + parser.add_argument( + '--batch_size', type=int, default=100, help='input batch size') + parser.add_argument( + '--hidden_size', type=int, default=100, help='hidden state size') + parser.add_argument('--l2', type=float, default=1e-5, help='l2 penalty') + parser.add_argument('--lr', type=float, default=0.001, help='learning rate') + parser.add_argument( + '--emb_lr_rate', type=float, default=0.5, help='learning rate') + parser.add_argument( + '--step', type=int, default=1, help='gnn propogation steps') + parser.add_argument( + '--lr_dc', type=float, default=0.1, help='learning rate decay rate') + parser.add_argument( + '--lr_dc_step', + type=int, + default=3, + help='the number of steps after which the learning rate decay') + parser.add_argument( + '--use_cuda', type=int, default=0, help='whether to use gpu') + parser.add_argument( + '--use_parallel', + type=int, + default=1, + help='whether to use parallel executor') + return parser.parse_args() + + +def network(batch_size, items_num, hidden_size, step, rate): + stdv = 1.0 / math.sqrt(hidden_size) + + items = layers.data( + name="items", + shape=[batch_size, -1, 1], + dtype="int64", + append_batch_size=False) #[bs, uniq_max, 1] + seq_index = layers.data( + name="seq_index", + shape=[batch_size, -1], + dtype="int64", + append_batch_size=False) #[-1(seq_max)*batch_size, 1] + last_index = layers.data( + name="last_index", + shape=[batch_size], + dtype="int64", + append_batch_size=False) #[batch_size, 1] + adj_in = layers.data( + name="adj_in", + shape=[batch_size, -1, -1], + dtype="float32", + append_batch_size=False) + adj_out = layers.data( + name="adj_out", + shape=[batch_size, -1, -1], + dtype="float32", + append_batch_size=False) + mask = layers.data( + name="mask", + shape=[batch_size, -1, 1], + dtype="float32", + append_batch_size=False) + label = layers.data( + name="label", + shape=[batch_size, 1], + dtype="int64", + append_batch_size=False) + + items_emb = layers.embedding( + input=items, + is_sparse=True, + param_attr=fluid.ParamAttr( + name="emb", + learning_rate=rate, + initializer=fluid.initializer.Uniform( + low=-stdv, high=stdv)), + size=[items_num, hidden_size]) #[batch_size, uniq_max, h] + data_feed = [items, seq_index, last_index, adj_in, adj_out, mask, label] + + pre_state = items_emb + for i in range(step): + pre_state = layers.reshape( + x=pre_state, shape=[batch_size, -1, hidden_size]) + state_in = layers.fc( + input=pre_state, + name="state_in", + size=hidden_size, + act=None, + num_flatten_dims=2, + param_attr=fluid.ParamAttr(initializer=fluid.initializer.Uniform( + low=-stdv, high=stdv)), + bias_attr=fluid.ParamAttr(initializer=fluid.initializer.Uniform( + low=-stdv, high=stdv))) #[batch_size, uniq_max, h] + state_out = layers.fc( + input=pre_state, + name="state_out", + size=hidden_size, + act=None, + num_flatten_dims=2, + param_attr=fluid.ParamAttr(initializer=fluid.initializer.Uniform( + low=-stdv, high=stdv)), + bias_attr=fluid.ParamAttr(initializer=fluid.initializer.Uniform( + low=-stdv, high=stdv))) #[batch_size, uniq_max, h] + + state_adj_in = layers.matmul(adj_in, + state_in) #[batch_size, uniq_max, h] + state_adj_out = layers.matmul(adj_out, + state_out) #[batch_size, uniq_max, h] + + gru_input = layers.concat([state_adj_in, state_adj_out], axis=2) + + gru_input = layers.reshape(x=gru_input, shape=[-1, hidden_size * 2]) + gru_fc = layers.fc(input=gru_input, + name="gru_fc", + size=3 * hidden_size, + bias_attr=False) + pre_state, _, _ = fluid.layers.gru_unit( + input=gru_fc, + hidden=layers.reshape( + x=pre_state, shape=[-1, hidden_size]), + size=3 * hidden_size) + + final_state = pre_state + seq_index = layers.reshape(seq_index, shape=[-1]) + seq = layers.gather(final_state, seq_index) #[batch_size*-1(seq_max), h] + last = layers.gather(final_state, last_index) #[batch_size, h] + + seq = layers.reshape( + seq, shape=[batch_size, -1, hidden_size]) #[batch_size, -1(seq_max), h] + last = layers.reshape( + last, shape=[batch_size, hidden_size]) #[batch_size, h] + + seq_fc = layers.fc( + input=seq, + name="seq_fc", + size=hidden_size, + bias_attr=False, + act=None, + num_flatten_dims=2, + param_attr=fluid.ParamAttr(initializer=fluid.initializer.Uniform( + low=-stdv, high=stdv))) #[batch_size, -1(seq_max), h] + last_fc = layers.fc(input=last, + name="last_fc", + size=hidden_size, + bias_attr=False, + act=None, + num_flatten_dims=1, + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Uniform( + low=-stdv, high=stdv))) #[bathc_size, h] + + seq_fc_t = layers.transpose( + seq_fc, perm=[1, 0, 2]) #[-1(seq_max), batch_size, h] + add = layers.elementwise_add(seq_fc_t, + last_fc) #[-1(seq_max), batch_size, h] + b = layers.create_parameter( + shape=[hidden_size], + dtype='float32', + default_initializer=fluid.initializer.Constant(value=0.0)) #[h] + add = layers.elementwise_add(add, b) #[-1(seq_max), batch_size, h] + + add_sigmoid = layers.sigmoid(add) #[-1(seq_max), batch_size, h] + add_sigmoid = layers.transpose( + add_sigmoid, perm=[1, 0, 2]) #[batch_size, -1(seq_max), h] + + weight = layers.fc(input=add_sigmoid, + name="weight_fc", + size=1, + act=None, + num_flatten_dims=2, + bias_attr=False, + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Uniform( + low=-stdv, high=stdv))) #[batch_size, -1, 1] + weight *= mask + weight_mask = layers.elementwise_mul(seq, weight, axis=0) + global_attention = layers.reduce_sum(weight_mask, dim=1) + + final_attention = layers.concat( + [global_attention, last], axis=1) #[batch_size, 2*h] + final_attention_fc = layers.fc( + input=final_attention, + name="fina_attention_fc", + size=hidden_size, + bias_attr=False, + act=None, + param_attr=fluid.ParamAttr(initializer=fluid.initializer.Uniform( + low=-stdv, high=stdv))) #[batch_size, h] + + all_vocab = layers.create_global_var( + shape=[items_num - 1, 1], + value=0, + dtype="int64", + persistable=True, + name="all_vocab") + + all_emb = layers.embedding( + input=all_vocab, + is_sparse=True, + param_attr=fluid.ParamAttr( + name="emb", + learning_rate=rate, + initializer=fluid.initializer.Uniform( + low=-stdv, high=stdv)), + size=[items_num, hidden_size]) #[all_vocab, h] + + logits = layers.matmul( + x=final_attention_fc, y=all_emb, + transpose_y=True) #[batch_size, all_vocab] + softmax = layers.softmax_with_cross_entropy( + logits=logits, label=label) #[batch_size, 1] + loss = layers.reduce_mean(softmax) # [1] + #fluid.layers.Print(loss) + acc = layers.accuracy(input=logits, label=label, k=20) + return loss, acc, data_feed, [items_emb, all_emb] + + +def train(): + args = parse_args() + lr = args.lr + rate = args.emb_lr_rate + train_data_dir = "./gnn_data_new_8" + filelist = [ + os.path.join(train_data_dir, f) for f in os.listdir(train_data_dir) + if os.path.isfile(os.path.join(train_data_dir, f)) + ][:] + + items_num = read_config(args.config_path) + loss, acc, data_vars, cut_list = network(batch_size, items_num, + args.hidden_size, args.step, rate) + + print("card: %d, thread: %d, lr: %f, lr_rate: %f, scope: %d, sync_step: %d" + % (ncards, nreaders, lr, rate, nscopes, sync_steps)) + + place = fluid.CPUPlace() + exe = fluid.Executor(place) + + step_per_epoch = 750000 // batch_size + """ + opt = fluid.optimizer.SGD( + learning_rate=fluid.layers.exponential_decay( + learning_rate=args.lr, + decay_steps=step_per_epoch * 10, + decay_rate=args.lr_dc), + regularization=fluid.regularizer.L2DecayRegularizer(regularization_coeff=args.l2)) + """ + opt = fluid.optimizer.SGD(lr) + opt = fluid.optimizer.PipelineOptimizer( + opt, + cut_list=[cut_list, [loss, acc]], + place_list=[fluid.CPUPlace(), fluid.CUDAPlace(0), fluid.CPUPlace()], + concurrency_list=[1, 1, nreaders], + queue_size=nscopes, + sync_steps=sync_steps) + opt.minimize(loss) + + exe.run(fluid.default_startup_program()) + + all_vocab = fluid.global_scope().var("all_vocab").get_tensor() + all_vocab.set( + np.arange(1, items_num).astype("int64").reshape((-1, 1)), place) + + logger.info("begin train") + + dataset = fluid.DatasetFactory().create_dataset("FileInstantDataset") + dataset.set_use_var(data_vars) + dataset.set_batch_size(batch_size) + dataset.set_filelist(filelist) + + total_time = [] + start_time = time.time() + loss_sum = 0.0 + acc_sum = 0.0 + global_step = 0 + + for i in range(25): + logger.info("begin epoch %d" % (i)) + epoch_sum = [] + random.shuffle(filelist) + dataset.set_filelist(filelist) + exe.train_from_dataset( + fluid.default_main_program(), + dataset, + thread=ncards, + debug=is_profile, + fetch_list=[loss, acc], + fetch_info=["loss", "acc"], + print_period=1) + model_path = args.model_path + model_path += "_" + str(lr) + "_" + str(rate) + save_dir = model_path + "/epoch_" + str(i) + fetch_vars = [loss, acc] + feed_list = [ + "items", "seq_index", "last_index", "adj_in", "adj_out", "mask", + "label" + ] + fluid.io.save_inference_model(save_dir, feed_list, fetch_vars, exe) + + +class Data(): + def __init__(self, path, shuffle=False): + data = pickle.load(open(path, 'rb')) + self.shuffle = shuffle + self.length = len(data[0]) + self.input = list(zip(data[0], data[1])) + + def make_data(self, cur_batch, batch_size): + cur_batch = [list(e) for e in cur_batch] + max_seq_len = 0 + for e in cur_batch: + max_seq_len = max(max_seq_len, len(e[0])) + last_id = [] + for e in cur_batch: + last_id.append(len(e[0]) - 1) + e[0] += [0] * (max_seq_len - len(e[0])) + + max_uniq_len = 0 + for e in cur_batch: + max_uniq_len = max(max_uniq_len, len(np.unique(e[0]))) + + items, adj_in, adj_out, seq_index, last_index = [], [], [], [], [] + mask, label = [], [] + + id = 0 + for e in cur_batch: + node = np.unique(e[0]) + items.append(node.tolist() + (max_uniq_len - len(node)) * [0]) + adj = np.zeros((max_uniq_len, max_uniq_len)) + + for i in np.arange(len(e[0]) - 1): + if e[0][i + 1] == 0: + break + u = np.where(node == e[0][i])[0][0] + v = np.where(node == e[0][i + 1])[0][0] + adj[u][v] = 1 + + u_deg_in = np.sum(adj, 0) + u_deg_in[np.where(u_deg_in == 0)] = 1 + adj_in.append(np.divide(adj, u_deg_in).transpose()) + + u_deg_out = np.sum(adj, 1) + u_deg_out[np.where(u_deg_out == 0)] = 1 + adj_out.append(np.divide(adj.transpose(), u_deg_out).transpose()) + + seq_index.append( + [np.where(node == i)[0][0] + id * max_uniq_len for i in e[0]]) + last_index.append( + np.where(node == e[0][last_id[id]])[0][0] + id * max_uniq_len) + label.append(e[1] - 1) + mask.append([[1] * (last_id[id] + 1) + [0] * + (max_seq_len - last_id[id] - 1)]) + id += 1 + + items = np.array(items).astype("uint64").reshape((batch_size, -1, 1)) + seq_index = np.array(seq_index).astype("uint64").reshape( + (batch_size, -1)) + last_index = np.array(last_index).astype("uint64").reshape( + (batch_size, 1)) + adj_in = np.array(adj_in).astype("float32").reshape( + (batch_size, max_uniq_len, max_uniq_len)) + adj_out = np.array(adj_out).astype("float32").reshape( + (batch_size, max_uniq_len, max_uniq_len)) + mask = np.array(mask).astype("float32").reshape((batch_size, -1, 1)) + label = np.array(label).astype("uint64").reshape((batch_size, 1)) + return list( + zip(items, seq_index, last_index, adj_in, adj_out, mask, label)) + + def reader(self, batch_size, batch_group_size, train=True): + if self.shuffle: + random.shuffle(self.input) + group_remain = self.length % batch_group_size + for bg_id in range(0, self.length - group_remain, batch_group_size): + cur_bg = copy.deepcopy(self.input[bg_id:bg_id + batch_group_size]) + if train: + cur_bg = sorted(cur_bg, key=lambda x: len(x[0]), reverse=True) + for i in range(0, batch_group_size, batch_size): + cur_batch = cur_bg[i:i + batch_size] + yield self.make_data(cur_batch, batch_size) + + #deal with the remaining, discard at most batch_size data + if group_remain < batch_size: + return + remain_data = copy.deepcopy(self.input[-group_remain:]) + if train: + remain_data = sorted( + remain_data, key=lambda x: len(x[0]), reverse=True) + for i in range(0, batch_group_size, batch_size): + if i + batch_size <= len(remain_data): + cur_batch = remain_data[i:i + batch_size] + yield self.make_data(cur_batch, batch_size) + + +def read_config(path): + with open(path, "r") as fin: + item_num = int(fin.readline()) + return item_num + + +induce_map = {0: [0], 1: [0], 2: [], 3: [0, 1], 4: [0, 1], 5: [0], 6: []} + + +def binary_print(slot, fout, index): + shape_array = slot.shape + num = 1 + for e in shape_array: + num *= e + num += len(induce_map[index]) + num = np.uint16(num) + num.tofile(fout) + for e in induce_map[index]: + tmp_shape = np.uint64(shape_array[e]) + tmp_shape.tofile(fout) + slot.tofile(fout) + + +def make_binary_data(): + data_reader = Data('./data/diginetica/train.txt', True) + index = 0 + id = -1 + filename = None + fout = None + binary = True + for data in data_reader.reader(batch_size, 20 * batch_size, True): + if index % (batch_size * 900) == 0: + id += 1 + if not binary: + filename = "./gnn_data_text/" + str(id) + else: + filename = "./gnn_data_new_8/" + str(id) + print("filename: " + filename) + if fout: + fout.close() + fout = open(filename, "wb" if binary else "w") + + for ins in data: + for i, slot in enumerate(ins): + if binary: + binary_print(slot, fout, i) + else: + text_print(slot, fout, i) + index += batch_size + + +if __name__ == "__main__": + make_binary_data() + train() diff --git a/python/paddle/fluid/trainer_desc.py b/python/paddle/fluid/trainer_desc.py index d61b4c9d97e..806d09895ad 100644 --- a/python/paddle/fluid/trainer_desc.py +++ b/python/paddle/fluid/trainer_desc.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -__all__ = ['TrainerDesc', 'MultiTrainer', 'DistMultiTrainer'] +__all__ = ['TrainerDesc', 'MultiTrainer', 'DistMultiTrainer', 'PipelineTrainer'] # can be initialized from train_desc, @@ -66,7 +66,7 @@ class TrainerDesc(object): def _desc(self): from google.protobuf import text_format - return text_format.MessageToString(self.proto_desc) + return self.proto_desc.SerializeToString() class MultiTrainer(TrainerDesc): @@ -102,3 +102,22 @@ class DistMultiTrainer(TrainerDesc): self._device_worker._set_infer(self._infer) self._device_worker._set_program(self._program) self._device_worker._gen_worker_desc(self.proto_desc) + + +class PipelineTrainer(TrainerDesc): + def __init__(self): + super(PipelineTrainer, self).__init__() + pass + + def _set_program(self, program): + super(PipelineTrainer, self)._set_program(program) + self._program = program + + def _gen_trainer_desc(self): + super(PipelineTrainer, self)._gen_trainer_desc() + self.proto_desc.class_name = "PipelineTrainer" + if self._program == None: + raise RuntimeError("None Program") + self._device_worker._set_infer(self._infer) + self._device_worker._set_program(self._program) + self._device_worker._gen_worker_desc(self.proto_desc) diff --git a/python/paddle/fluid/trainer_factory.py b/python/paddle/fluid/trainer_factory.py index 1c9630a8851..67d240cccd6 100644 --- a/python/paddle/fluid/trainer_factory.py +++ b/python/paddle/fluid/trainer_factory.py @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .trainer_desc import MultiTrainer, DistMultiTrainer -from .device_worker import Hogwild, DownpourSGD +from .trainer_desc import MultiTrainer, DistMultiTrainer, PipelineTrainer +from .device_worker import Hogwild, DownpourSGD, Section __all__ = ["TrainerFactory"] @@ -35,8 +35,9 @@ class TrainerFactory(object): device_worker_class = opt_info["device_worker"] trainer = globals()[trainer_class]() device_worker = globals()[device_worker_class]() - device_worker._set_fleet_desc(opt_info["fleet_desc"]) + if "fleet_desc" in opt_info: + device_worker._set_fleet_desc(opt_info["fleet_desc"]) + trainer._set_fleet_desc(opt_info["fleet_desc"]) + trainer._set_use_cvm(opt_info["use_cvm"]) trainer._set_device_worker(device_worker) - trainer._set_fleet_desc(opt_info["fleet_desc"]) - trainer._set_use_cvm(opt_info["use_cvm"]) return trainer -- GitLab