未验证 提交 683d115e 编写于 作者: G guru4elephant 提交者: GitHub

Update async_executor.md

上级 55a709eb
...@@ -5,27 +5,43 @@ There are many deep learning applications that use sparse features as inputs, su ...@@ -5,27 +5,43 @@ There are many deep learning applications that use sparse features as inputs, su
## User Interface Design ## User Interface Design
``` python ``` python
def train_loop(): def train_loop():
filelist = ["testfile.data"] # filelist file to be handled # Download data
data = fluid.layers.data(name="doc", shape=[1], dtype="int64", lod_level=1) # input text data with tarfile.open(paddle.dataset.common.download(URL, "imdb", MD5)) as tarf:
label = fluid.layers.data(name="title", shape=[1], dtype="int64", lod_level=1) # label data tarf.extractall(path='./')
dataset = fluid.DatasetDesc(desc='data.prototxt', slots=[data, label]) tarf.close()
dataset.set_batch_size(128) # Initialize dataset description
avg_cost, acc, prediction = bow_net(data, label) dataset = fluid.DataFeedDesc('train_data/data.prototxt')
dataset.set_batch_size(128) # See API doc for how to change other fields
print dataset.desc() # Debug purpose: see what we get
# define network
# input text data
data = fluid.layers.data(
name="words", shape=[1], dtype="int64", lod_level=1)
# label data
label = fluid.layers.data(name="label", shape=[1], dtype="int64")
avg_cost, acc, prediction = bow_net(data, label)
sgd_optimizer = fluid.optimizer.Adagrad(learning_rate=0.002) sgd_optimizer = fluid.optimizer.Adagrad(learning_rate=0.002)
opt_ops, weight_and_grad = sgd_optimizer.minimize(avg_cost) opt_ops, weight_and_grad = sgd_optimizer.minimize(avg_cost)
async_executor = fluid.AsyncExecutor() # Run startup program
async_executor.run_startup_program(fluid.default_startup_program()) startup_program = fluid.default_startup_program()
place = fluid.CPUPlace()
executor = fluid.Executor(place)
executor.run(startup_program)
main_program = fluid.default_main_program()
epochs = 10 epochs = 10
filelist = ["train_data/part-%d" % i for i in range(12)]
for i in range(epochs): for i in range(epochs):
thread_num = len(filelist) thread_num = 4
acc_val = async_executor.run( executor.run_from_files(
fluid.default_main_program(), # make sure this can be changed during iteration main_program, # This can be changed during iteration
dataset, # make sure this can be changed during iteration dataset, # This can be changed during iteration
filelist, # this can be changed during iteration filelist, # This can be changed during iteration
thread_num, # make sure this can be changed during iteration thread_num, # This can be changed during iteration
[acc]) # fetch can be done with python, but the scope should be exposed [data, acc], # Multiple fetch targets can be specified
for val in acc_val: debug=False)
print("accuracy %f" % val) fluid.io.save_inference_model('imdb/epoch%d.model' % i,
[data.name, label.name], [acc], executor)
``` ```
## Difference between async_executor and other executors ## Difference between async_executor and other executors
async_executor is mainly designed for cpu training scenarios where data throughputs are high and the computation part of training is not intensive compared with GPU trained models such as resnet-50. Since data throughputs ability is very important in async_executor, we have to design very fast data IO modules to handle very large scale data reading. Another different key aspect is that memory is not a problem in cpu training scenarios given 128G or 256G RAM in modern clusters. async_executor is mainly designed for cpu training scenarios where data throughputs are high and the computation part of training is not intensive compared with GPU trained models such as resnet-50. Since data throughputs ability is very important in async_executor, we have to design very fast data IO modules to handle very large scale data reading. Another different key aspect is that memory is not a problem in cpu training scenarios given 128G or 256G RAM in modern clusters.
...@@ -39,70 +55,55 @@ Why we use multiple queues for data reader? a experimental result page needs to ...@@ -39,70 +55,55 @@ Why we use multiple queues for data reader? a experimental result page needs to
## Main Interface of Async Executor ## Main Interface of Async Executor
We have RunFromFiles interface which is an execution interface for users to call. Every time a user calls RunFromFiles, a main_program should be provided and it is running in the global scope previously defined. A list of file names and corresponding Dataset should be provided. Inside the RunFromFiles interface, readers will be created through Dataset configurations. Files will be fed into created readers. We have RunFromFiles interface which is an execution interface for users to call. Every time a user calls RunFromFiles, a main_program should be provided and it is running in the global scope previously defined. A list of file names and corresponding Dataset should be provided. Inside the RunFromFiles interface, readers will be created through Dataset configurations. Files will be fed into created readers.
``` c++ ``` c++
std::vector<float> AsyncExecutor::RunFromFile( void AsyncExecutor::RunFromFile(const ProgramDesc& main_program,
const ProgramDesc& main_program, const std::string& data_feed_desc_str,
const std::string& data_feed_desc_str, const std::vector<std::string>& filelist,
const std::vector<std::string>& filelist, const int thread_num,
const int thread_num, const std::vector<std::string>& fetch_var_names,
const std::vector<std::string>& fetch_var_names) { const bool debug) {
std::vector<std::thread> threads; std::vector<std::thread> threads;
auto& block = main_program.Block(0);
DataFeedDesc data_feed_desc; for (auto var_name : fetch_var_names) {
google::protobuf::TextFormat::ParseFromString(data_feed_desc_str, &data_feed_desc); auto var_desc = block.FindVar(var_name);
/* auto shapes = var_desc->GetShape();
readerDesc: protobuf description for reader initlization PADDLE_ENFORCE(shapes[shapes.size() - 1] == 1,
argument: class_name, batch_size, use_slot, queue_size, buffer_size, padding_index "var %s: Fetched var has wrong shape, "
"only variables with the last dimension size 1 supported",
reader: var_name);
1) each thread has a reader, reader will read input data and }
put it into input queue DataFeedDesc data_feed_desc;
2) each reader has a Next() iterface, that can fetch an instance google::protobuf::TextFormat::ParseFromString(data_feed_desc_str,
from the input queue &data_feed_desc);
*/ int actual_thread_num = thread_num;
std::vector<std::shared_ptr<DataFeed> > readers; int file_cnt = filelist.size();
PrepareReaders(readers, thread_num, data_feed_desc, filelist); PADDLE_ENFORCE(file_cnt > 0, "File list cannot be empty");
if (actual_thread_num > file_cnt) {
std::vector<std::shared_ptr<ExecutorThreadWorker> > workers; VLOG(1) << "Thread num = " << thread_num << ", file num = " << file_cnt
workers.resize(thread_num); << ". Changing thread_num = " << file_cnt;
actual_thread_num = file_cnt;
}
std::vector<std::shared_ptr<DataFeed>> readers;
PrepareReaders(readers, actual_thread_num, data_feed_desc, filelist);
std::vector<std::shared_ptr<ExecutorThreadWorker>> workers;
workers.resize(actual_thread_num);
for (auto& worker : workers) { for (auto& worker : workers) {
worker.reset(new ExecutorThreadWorker); worker.reset(new ExecutorThreadWorker);
} }
// prepare thread resource here
// prepare thread resource here for (int thidx = 0; thidx < actual_thread_num; ++thidx) {
for (int thidx = 0; thidx < thread_num; ++thidx) { CreateThreads(workers[thidx].get(), main_program, readers[thidx],
CreateThreads(workers[thidx].get(), main_program, fetch_var_names, root_scope_, thidx, debug);
readers[thidx], fetch_var_names, root_scope_, thidx);
} }
// start executing ops in multiple threads
// start executing ops in multiple threads for (int thidx = 0; thidx < actual_thread_num; ++thidx) {
for (int thidx = 0; thidx < thread_num; ++thidx) { threads.push_back(
threads.push_back(std::thread(&ExecutorThreadWorker::TrainFiles, std::thread(&ExecutorThreadWorker::TrainFiles, workers[thidx].get()));
workers[thidx].get()));
} }
for (auto& th : threads) {
for (auto& th : threads) {
th.join(); th.join();
} }
root_scope_->DropKids();
std::vector<float> fetch_values; return;
fetch_values.resize(fetch_var_names.size(), 0);
std::vector<std::vector<float>*> fetch_value_vectors;
fetch_value_vectors.resize(thread_num);
for (int i = 0; i < thread_num; ++i) {
fetch_value_vectors[i] = &workers[i]->GetFetchValues();
}
for (unsigned int i = 0; i < fetch_var_names.size(); ++i) {
float value = 0.0;
for (int j = 0; j < thread_num; ++j) {
value += fetch_value_vectors[j]->at(i);
}
value /= thread_num;
fetch_values[i] = value;
}
return fetch_values;
} }
``` ```
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册