diff --git a/doc/fluid/design/async_executor/async_executor.md b/doc/fluid/design/async_executor/async_executor.md index e5d485194ca7029180f6062258e44c835ea19054..0af993b90717087b321fd28bed5060b474cc8a20 100644 --- a/doc/fluid/design/async_executor/async_executor.md +++ b/doc/fluid/design/async_executor/async_executor.md @@ -57,6 +57,7 @@ We have RunFromFiles interface which is an execution interface for users to call ``` c++ void AsyncExecutor::RunFromFiles( const ProgramDesc& main_program, + const DataFeedDesc& data_feed_desc, const std::vector & files, const int thread_num) { // todo: remove fluid related interface @@ -65,20 +66,32 @@ void AsyncExecutor::RunFromFiles( threads.resize(thread_num); /* + readerDesc: protobuf description for reader initlization + argument: class_name, batch_size, use_slot, queue_size, buffer_size, padding_index + reader: 1) each thread has a reader, reader will read input data and put it into input queue 2) each reader has a Next() iterface, that can fetch an instance from the input queue */ - // todo: should be factory method for creating datafeed std::vector > readers; readers.resize(thread_num); - for (auto& reader : readers) { - reader.reset(new DataFeed); - reader.add_filelist(files); + for (int i = 0; i < readers.size(); ++i) { + readers[i] = DataFeedFactory::CreateDataFeed(data_feed_desc.name()); } + // todo(dongdaxiang): add the following code for worker generalization + /* + std::vector > workers; + workers.resize(thread_num); + std::string str_name = strategy_.name; + for (auto& worker : workers) { + worker.reset( + ExecutorStrategyFactory::CreateExecutorStrategy(str_name)); + } + */ + std::vector > workers; workers.resize(thread_num); for (auto& worker : workers) { @@ -90,7 +103,6 @@ void AsyncExecutor::RunFromFiles( CreateThreads(workers[thidx].get(), main_program, readers[thidx].get(), root_scope_, thidx); } - // start executing ops in multiple threads for (int thidx = 0; thidx < thread_num_; ++thidx) { threads.push_back(std::thread(&ExecutorThreadWorker::TrainFiles, @@ -100,7 +112,7 @@ void AsyncExecutor::RunFromFiles( for (auto& th : threads) { th.join(); } - // fetch variables in scope 0, and return + // fetch variables in scope 0, and return, to be added } ``` @@ -112,28 +124,28 @@ void AsyncExecutor::CreateThreads(const ExecutorThreadWorker* worker, const Scope& root_scope, const int thread_index) { worker->SetThreadid(thread_index); - worker->CreateThreadOperators(main_program); - worker->CreateThreadScope(main_program); + worker->CreateThreadResource(main_program, place_); + worker->SetDataFeed(reader); worker->BindingDataFeedMemory(reader); - worker->SetMainProgram(main_program); worker->SetRootScope(root_scope); } + ``` Inside the function ```Trainfiles```, ``` c++ void ExecutorThreadWorker::TrainFiles() { // todo: configurable - SetDevice(); // cpu core binding here - thread_reader_->Start(); // a reader should start to run a seperate thread first + SetDevice(); + thread_reader_->Start(); // start reading thread within reader while (int cur_batch = thread_reader_->Next()) { - // all operators run here + // executor run here for (auto& op : ops_) { op->Run(*thread_scope_, place_); } - // remove intermediate variables created in child scope thread_scope_->DropKids(); } } + ``` ## How to print variable information during execution