From 78f4ef854212b178a1d2c86a21213ded2257960d Mon Sep 17 00:00:00 2001 From: dongdaxiang Date: Mon, 12 Nov 2018 19:12:48 +0800 Subject: [PATCH] add refactored file of async_executor and data_feed --- .../framework/async_executor_refactor.cc | 276 ++++++++++++++++++ .../fluid/framework/async_executor_refactor.h | 89 ++++++ paddle/fluid/framework/data_feed_refactor.h | 133 +++++++++ 3 files changed, 498 insertions(+) create mode 100644 paddle/fluid/framework/async_executor_refactor.cc create mode 100644 paddle/fluid/framework/async_executor_refactor.h create mode 100644 paddle/fluid/framework/data_feed_refactor.h diff --git a/paddle/fluid/framework/async_executor_refactor.cc b/paddle/fluid/framework/async_executor_refactor.cc new file mode 100644 index 00000000000..d59b1355b56 --- /dev/null +++ b/paddle/fluid/framework/async_executor_refactor.cc @@ -0,0 +1,276 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/framework/async_executor.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "google/protobuf/message.h" +#include "google/protobuf/text_format.h" +#include "google/protobuf/io/zero_copy_stream_impl.h" + +#include "gflags/gflags.h" +#include "paddle/fluid/framework/feed_fetch_method.h" +#include "paddle/fluid/framework/feed_fetch_type.h" +#include "paddle/fluid/framework/lod_rank_table.h" +#include "paddle/fluid/framework/lod_tensor_array.h" +#include "paddle/fluid/framework/op_registry.h" +#include "paddle/fluid/framework/reader.h" +#include "paddle/fluid/platform/place.h" +#include "paddle/fluid/inference/io.h" +#include "paddle/fluid/pybind/pybind.h" +namespace paddle { +namespace framework { + +void CreateTensor(Variable* var, proto::VarType::Type var_type) { + if (var_type == proto::VarType::LOD_TENSOR) { + var->GetMutable(); + } else if (var_type == proto::VarType::SELECTED_ROWS) { + var->GetMutable(); + } else if (var_type == proto::VarType::FEED_MINIBATCH) { + var->GetMutable(); + } else if (var_type == proto::VarType::FETCH_LIST) { + var->GetMutable(); + } else if (var_type == proto::VarType::STEP_SCOPES) { + var->GetMutable>(); + } else if (var_type == proto::VarType::LOD_RANK_TABLE) { + var->GetMutable(); + } else if (var_type == proto::VarType::LOD_TENSOR_ARRAY) { + var->GetMutable(); + } else if (var_type == proto::VarType::PLACE_LIST) { + var->GetMutable(); + } else if (var_type == proto::VarType::READER) { + var->GetMutable(); + } else if (var_type == proto::VarType::RAW) { + // GetMutable will be called in operator + } else { + PADDLE_THROW( + "Variable type %d is not in " + "[LOD_TENSOR, SELECTED_ROWS, FEED_MINIBATCH, FETCH_LIST, " + "LOD_RANK_TABLE, PLACE_LIST, READER, CHANNEL, RAW]", + var_type); + } +} + +void ExecutorThreadWorker::CreateThreadOperators(const ProgramDesc& program) { + auto& block = program.Block(0); + op_names_.clear(); + for (auto& op_desc : block.AllOps()) { + std::unique_ptr local_op = OpRegistry::CreateOp(*op_desc); + op_names_.push_back(op_desc->Type()); + OperatorBase* local_op_ptr = local_op.release(); + ops_.push_back(local_op_ptr); + continue; + } +} + +void ExecutorThreadWorker::CreateThreadScope(const ProgramDesc& program) { + auto& block = program.Block(0); + thread_scope_ = &root_scope_->NewScope(); + for (auto& var : block.AllVars()) { + if (var->Persistable()) { + auto* ptr = root_scope_->Var(var->Name()); + CreateTensor(ptr, var->GetType()); + } else { + auto* ptr = thread_scope_->Var(var->Name()); + CreateTensor(ptr, var->GetType()); + } + } +} + +void ExecutorThreadWorker::SetDataFeed( + const std::shared_ptr& datafeed) { + local_reader_ = datafeed; +} + +void ExecutorThreadWorker::BindingDataFeedMemory() { + const std::vector& input_feed = + thread_reader_->GetUseSlotAlias(); + for (auto name : input_feed) { + local_reader_->AddFeedVar(thread_scope_->Var(name), name); + } +} + +void ExecutorThreadWorker::SetDevice() { + // at most 48 threads binding currently + static unsigned priority[] = { + 0, 1, 2, 3, 4, 5, + 6, 7, 8, 9, 10, 11, + 12, 13, 14, 15, 16, 17, + 18, 19, 20, 21, 22, 23, + 24, 25, 26, 27, 28, 29, + 30, 31, 32, 33, 34, 35, + 36, 37, 38, 39, 40, 41, + 42, 43, 44, 45, 46, 47 + }; + + unsigned int i = this->thread_id_; + + if (i < sizeof(priority) / sizeof(unsigned)) { + unsigned proc = priority[i]; + + cpu_set_t mask; + CPU_ZERO(&mask); + CPU_SET(proc, &mask); + + if (-1 == sched_setaffinity(0, sizeof(mask), &mask)) { + LOG(ERROR) << "WARNING: Failed to set thread affinity for thread " << i; + } else { + CPU_ZERO(&mask); + if ((0 == sched_getaffinity(0, sizeof(mask), &mask)) + && CPU_ISSET(proc, &mask)) { + LOG(ERROR) << "TRACE: Thread " << i << + " is running on processor " << proc << "..."; + } + } + } +} + +void ExecutorThreadWorker::TrainFiles() { + // todo: configurable + SetDevice(); + thread_reader_->Start(); + while (int cur_batch = thread_reader_->Next()) { + // executor run here + for (auto& op : ops_) { + op->Run(*thread_scope_, place_); + } + thread_scope_->DropKids(); + } +} + +void ExecutorThreadWorker::SetThreadId(int tid) { + thread_id_ = tid; +} + +void ExecutorThreadWorker::SetPlace(const platform::Place& place) { + place_ = place; +} + +void ExecutorThreadWorker::SetMainProgram( + const ProgramDesc& main_program_desc) { + main_program_.reset(new ProgramDesc(main_program_desc)); +} + +void ExecutorThreadWorker::SetRootScope(Scope* g_scope) { + root_scope_ = g_scope; +} + +void ExecutorThreadWorker::SetMaxTrainingEpoch(int max_epoch) { + max_epoch_ = max_epoch; +} + +AsyncExecutor::AsyncExecutor(const platform::Place& place) : place_(place) {} + +void AsyncExecutor::CreateThreads(const ExecutorThreadWorker* worker, + const ProgramDesc& main_program, + const DataFeed& reader, + const Scope& root_scope, + const int thread_index) { + worker->SetThreadid(thread_index); + worker->CreateThreadOperators(main_program); + worker->CreateThreadScope(main_program); + worker->SetDataFeed(reader); + worker->BindingDataFeedMemory(reader); + worker->SetMainProgram(main_program); + worker->SetRootScope(root_scope); +} + +void AsyncExecutor::CheckFiles( + const std::vector& files) { + // function for user to check file formats + // should be exposed to users +} + +/* + in case there are binary files we want to train + and in general this is the fastest way to train + different calls allow thread_num to be different + threads are created on the fly + workers are created on the fly + readers are created on the fly + files are fed into readers on the fly + */ +/* + class_name + batch_size: max batch size + use_slot: + queue_size: + buffer_size: + padding_index: + */ +void AsyncExecutor::RunFromFiles( + const ProgramDesc& main_program, + const DataFeedDesc& data_feed_desc, + const std::vector & files, + const int thread_num) { + // todo: remove fluid related interface + root_scope_->DropKids(); + std::vector threads; + threads.resize(thread_num); + + /* + readerDesc: protobuf description for reader initlization + argument: class_name, batch_size, use_slot, queue_size, buffer_size, padding_index + + reader: + 1) each thread has a reader, reader will read input data and + put it into input queue + 2) each reader has a Next() iterface, that can fetch an instance + from the input queue + */ + // todo: should be factory method for creating datafeed + std::vector > readers; + readers.resize(thread_num); + for (auto& reader : readers) { + // create by factory name + reader.reset(new DataFeed); + reader.SetFileList(files); + } + + std::vector > workers; + workers.resize(thread_num); + for (auto& worker : workers) { + worker.reset(new ExecutorThreadWorker); + } + + // prepare thread resource here + for (int thidx = 0; thidx < thread_num; ++thidx) { + CreateThreads(workers[thidx].get(), main_program, + readers[thidx].get(), root_scope_, thidx); + } + + // start executing ops in multiple threads + for (int thidx = 0; thidx < thread_num_; ++thidx) { + threads.push_back(std::thread(&ExecutorThreadWorker::TrainFiles, + workers[thidx].get())); + } + + for (auto& th : threads) { + th.join(); + } + // fetch variables in scope 0, and return +} + +} // einit_modelnd namespace framework +} // end namespace paddle + +/* vim: set expandtab ts=2 sw=2 sts=2 tw=100: */ diff --git a/paddle/fluid/framework/async_executor_refactor.h b/paddle/fluid/framework/async_executor_refactor.h new file mode 100644 index 00000000000..6245e75517f --- /dev/null +++ b/paddle/fluid/framework/async_executor_refactor.h @@ -0,0 +1,89 @@ +/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#ifndef PADDLE_FLUID_FRAMEWORK_ASYNC_EXECUTOR_H_ +#define PADDLE_FLUID_FRAMEWORK_ASYNC_EXECUTOR_H_ + +#include +#include // NOLINT +#include +#include +#include +#include // NOLINT +#include +#include "paddle/fluid/framework/data_feed.h" +#include "paddle/fluid/framework/datafeed_creator.h" +#include "paddle/fluid/framework/executor.h" +#include "paddle/fluid/framework/program_desc.h" +#include "paddle/fluid/framework/scope.h" + +namespace paddle { +namespace framework { +void CreateTensor(Variable* var, proto::VarType::Type var_type); + +class ExecutorThreadWorker { + public: + ExecutorThreadWorker() {} + ~ExecutorThreadWorker() {} + void CreateThreadScope(const framework::ProgramDesc& program); + void SetDataFeed(const DataFeed& datafeed); + void SetThreadId(int tid); + void CreateThreadOperators(const framework::ProgramDesc& program); + void SetRootScope(Scope* g_scope); + void SetDevice(); + void SetMainProgram(const ProgramDesc& main_program_desc); + void SetPlace(const paddle::platform::Place& place); + void BindingDataFeedMemory(); + void SetSparseCommData(const std::map& param_names); + void SetDataFeed(const std::shared_ptr& datafeed); + + protected: + // thread index + std::shared_ptr thread_reader_; // shared queue, thread buffer + int thread_id_; + // op name + std::vector op_names_; + // local ops for forward and backward + std::vector ops_; + // main program for training + std::unique_ptr main_program_; + // execution place + platform::Place place_; + // root scope for model parameters + Scope* root_scope_; + // a thread scope, father scope is global score which is shared + Scope* thread_scope_; +}; + +class AsyncExecutor { + public: + explicit AsyncExecutor(const platform::Place& place); + virtual ~AsyncExecutor() {} + void SetRootScope(const Scope* root_scope); + Scope* GetRootScope() { return root_scope_; } + void CheckFiles(const std::vector& files); + void RunFromFiles( + const ProgramDesc& main_program, + const std::vector& files, + const int thread_num); + + public: + Scope* root_scope_; + platform::Place place_; +}; + +} // namespace framework +} // namespace paddle +#endif // PADDLE_FLUID_FRAMEWORK_ASYNC_EXECUTOR_H_ +/* vim: set expandtab ts=2 sw=2 sts=2 tw=100: */ diff --git a/paddle/fluid/framework/data_feed_refactor.h b/paddle/fluid/framework/data_feed_refactor.h new file mode 100644 index 00000000000..7e6d894026b --- /dev/null +++ b/paddle/fluid/framework/data_feed_refactor.h @@ -0,0 +1,133 @@ +/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#ifndef PADDLE_FLUID_FRAMEWORK_DATA_FEED_H_ +#define PADDLE_FLUID_FRAMEWORK_DATA_FEED_H_ + +#include +#include +#include +#include +#include // NOLINT +#include +#include +#include // NOLINT +#include +#include +#include // NOLINT +#include + +#include "paddle/fluid/framework/executor.h" +#include "paddle/fluid/framework/program_desc.h" +#include "paddle/fluid/framework/scope.h" + +namespace paddle { +namespace framework { +struct Gauc { + int show, click; + uint64_t fea; + std::string lineid; +}; + +struct Instance { + std::vector> feed_vec_buffer; + std::vector> feed_vec_lod; + std::vector other_label; + std::vector gauc_vec; +}; + +class DataFeed { + public: + DataFeed() {} + virtual ~DataFeed() {} + virtual void Init() = 0; + /* + * This function will be used to check file format. + * Considering that this function may be used alone, + * it does not check anything. + * */ + virtual bool CheckFile(const char* filename) = 0; + virtual bool SetFileList(const std::vector& files) = 0; + virtual bool Next() = 0; + virtual const std::vector& GetAllSlotIds() { + return all_slot_ids_; + } + + virtual const std::vector& GetUseSlotIds() { + return use_slot_ids_; + } + + virtual const std::vector& GetUseSlotAlias() { + return use_slot_alias_; + } + + virtual void AddFeedVar(Variable* var, + const std::string& name) = 0; + virtual void SetBatchSize(int batch) { default_batch_size_ = batch; } + virtual int GetBatchSize() { return batch_size_; } + virtual void SetQueueSize(int queue_size) {} + virtual void SetBufferSize(int buffer_size) {} + + std::vector& GetFeedVec() { + return feed_vec_; + } + + virtual std::vector& GetFeedVec(const Instance& ins) { + LOG(ERROR) << "use defalut get_feed_vec"; + return feed_vec_; + } + + protected: + std::vector all_slot_ids_; + std::vector use_slot_ids_; + std::vector use_slot_alias_; + std::vector feed_vec_; + int default_batch_size_; + int batch_size_; +}; + +class TextClassDataFeed : public DataFeed { + public: + virtual ~TextClassDataFeed() {} + virtual void Init(); + virtual bool ReadBatch(); + virtual void AddFeedVar(Variable* feed, const std::string& name); + virtual void BindScope(Scope* scope) {} + virtual bool SetFile(const char* filename); + + virtual bool CheckFile(const char* filename) { + // TODO(xxx) + return false; + } + + void SetBatchSize(int batch) {batch_size_ = batch;} + + private: + int ReadWholeFile(const std::string& filename, char* buffer); + char* file_content_buffer_; + char* file_content_buffer_ptr_; + int* batch_id_buffer_; + int* label_ptr_; + int file_size_; + std::vector names_; + std::shared_ptr file_content_buffer_host_; + std::shared_ptr batch_id_host_; + std::shared_ptr label_host_; +}; + +} // namespace framework +} // namespace paddle + +#endif // PADDLE_FLUID_FRAMEWORK_DATA_FEED_H_ +/* vim: set expandtab ts=2 sw=2 sts=2 tw=100: */ -- GitLab