未验证 提交 6b0f4181 编写于 作者: G guru4elephant 提交者: GitHub

Update async_executor.md

上级 1bc552b6
...@@ -57,6 +57,7 @@ We have RunFromFiles interface which is an execution interface for users to call ...@@ -57,6 +57,7 @@ We have RunFromFiles interface which is an execution interface for users to call
``` c++ ``` c++
void AsyncExecutor::RunFromFiles( void AsyncExecutor::RunFromFiles(
const ProgramDesc& main_program, const ProgramDesc& main_program,
const DataFeedDesc& data_feed_desc,
const std::vector<std::string> & files, const std::vector<std::string> & files,
const int thread_num) { const int thread_num) {
// todo: remove fluid related interface // todo: remove fluid related interface
...@@ -65,20 +66,32 @@ void AsyncExecutor::RunFromFiles( ...@@ -65,20 +66,32 @@ void AsyncExecutor::RunFromFiles(
threads.resize(thread_num); threads.resize(thread_num);
/* /*
readerDesc: protobuf description for reader initlization
argument: class_name, batch_size, use_slot, queue_size, buffer_size, padding_index
reader: reader:
1) each thread has a reader, reader will read input data and 1) each thread has a reader, reader will read input data and
put it into input queue put it into input queue
2) each reader has a Next() iterface, that can fetch an instance 2) each reader has a Next() iterface, that can fetch an instance
from the input queue from the input queue
*/ */
// todo: should be factory method for creating datafeed
std::vector<std::shared_ptr<DataFeed> > readers; std::vector<std::shared_ptr<DataFeed> > readers;
readers.resize(thread_num); readers.resize(thread_num);
for (auto& reader : readers) { for (int i = 0; i < readers.size(); ++i) {
reader.reset(new DataFeed); readers[i] = DataFeedFactory::CreateDataFeed(data_feed_desc.name());
reader.add_filelist(files);
} }
// todo(dongdaxiang): add the following code for worker generalization
/*
std::vector<std::shared_ptr<ExecutorStrategy> > workers;
workers.resize(thread_num);
std::string str_name = strategy_.name;
for (auto& worker : workers) {
worker.reset(
ExecutorStrategyFactory::CreateExecutorStrategy(str_name));
}
*/
std::vector<std::shared_ptr<ExecutorThreadWorker> > workers; std::vector<std::shared_ptr<ExecutorThreadWorker> > workers;
workers.resize(thread_num); workers.resize(thread_num);
for (auto& worker : workers) { for (auto& worker : workers) {
...@@ -90,7 +103,6 @@ void AsyncExecutor::RunFromFiles( ...@@ -90,7 +103,6 @@ void AsyncExecutor::RunFromFiles(
CreateThreads(workers[thidx].get(), main_program, CreateThreads(workers[thidx].get(), main_program,
readers[thidx].get(), root_scope_, thidx); readers[thidx].get(), root_scope_, thidx);
} }
// start executing ops in multiple threads // start executing ops in multiple threads
for (int thidx = 0; thidx < thread_num_; ++thidx) { for (int thidx = 0; thidx < thread_num_; ++thidx) {
threads.push_back(std::thread(&ExecutorThreadWorker::TrainFiles, threads.push_back(std::thread(&ExecutorThreadWorker::TrainFiles,
...@@ -100,7 +112,7 @@ void AsyncExecutor::RunFromFiles( ...@@ -100,7 +112,7 @@ void AsyncExecutor::RunFromFiles(
for (auto& th : threads) { for (auto& th : threads) {
th.join(); th.join();
} }
// fetch variables in scope 0, and return // fetch variables in scope 0, and return, to be added
} }
``` ```
...@@ -112,28 +124,28 @@ void AsyncExecutor::CreateThreads(const ExecutorThreadWorker* worker, ...@@ -112,28 +124,28 @@ void AsyncExecutor::CreateThreads(const ExecutorThreadWorker* worker,
const Scope& root_scope, const Scope& root_scope,
const int thread_index) { const int thread_index) {
worker->SetThreadid(thread_index); worker->SetThreadid(thread_index);
worker->CreateThreadOperators(main_program); worker->CreateThreadResource(main_program, place_);
worker->CreateThreadScope(main_program); worker->SetDataFeed(reader);
worker->BindingDataFeedMemory(reader); worker->BindingDataFeedMemory(reader);
worker->SetMainProgram(main_program);
worker->SetRootScope(root_scope); worker->SetRootScope(root_scope);
} }
``` ```
Inside the function ```Trainfiles```, Inside the function ```Trainfiles```,
``` c++ ``` c++
void ExecutorThreadWorker::TrainFiles() { void ExecutorThreadWorker::TrainFiles() {
// todo: configurable // todo: configurable
SetDevice(); // cpu core binding here SetDevice();
thread_reader_->Start(); // a reader should start to run a seperate thread first thread_reader_->Start(); // start reading thread within reader
while (int cur_batch = thread_reader_->Next()) { while (int cur_batch = thread_reader_->Next()) {
// all operators run here // executor run here
for (auto& op : ops_) { for (auto& op : ops_) {
op->Run(*thread_scope_, place_); op->Run(*thread_scope_, place_);
} }
// remove intermediate variables created in child scope
thread_scope_->DropKids(); thread_scope_->DropKids();
} }
} }
``` ```
## How to print variable information during execution ## How to print variable information during execution
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册