From 683d115e52ec2d9a74b977df3a16b3e8df8687e4 Mon Sep 17 00:00:00 2001 From: guru4elephant <35550832+guru4elephant@users.noreply.github.com> Date: Thu, 29 Nov 2018 23:30:31 +0800 Subject: [PATCH] Update async_executor.md --- .../design/async_executor/async_executor.md | 147 +++++++++--------- 1 file changed, 74 insertions(+), 73 deletions(-) diff --git a/doc/fluid/design/async_executor/async_executor.md b/doc/fluid/design/async_executor/async_executor.md index 02439a8e1..a96d1ebd7 100644 --- a/doc/fluid/design/async_executor/async_executor.md +++ b/doc/fluid/design/async_executor/async_executor.md @@ -5,27 +5,43 @@ There are many deep learning applications that use sparse features as inputs, su ## User Interface Design ``` python def train_loop(): - filelist = ["testfile.data"] # filelist file to be handled - data = fluid.layers.data(name="doc", shape=[1], dtype="int64", lod_level=1) # input text data - label = fluid.layers.data(name="title", shape=[1], dtype="int64", lod_level=1) # label data - dataset = fluid.DatasetDesc(desc='data.prototxt', slots=[data, label]) - dataset.set_batch_size(128) - avg_cost, acc, prediction = bow_net(data, label) + # Download data + with tarfile.open(paddle.dataset.common.download(URL, "imdb", MD5)) as tarf: + tarf.extractall(path='./') + tarf.close() + # Initialize dataset description + dataset = fluid.DataFeedDesc('train_data/data.prototxt') + dataset.set_batch_size(128) # See API doc for how to change other fields + print dataset.desc() # Debug purpose: see what we get + # define network + # input text data + data = fluid.layers.data( + name="words", shape=[1], dtype="int64", lod_level=1) + # label data + label = fluid.layers.data(name="label", shape=[1], dtype="int64") + avg_cost, acc, prediction = bow_net(data, label) sgd_optimizer = fluid.optimizer.Adagrad(learning_rate=0.002) opt_ops, weight_and_grad = sgd_optimizer.minimize(avg_cost) - async_executor = fluid.AsyncExecutor() - async_executor.run_startup_program(fluid.default_startup_program()) + # Run startup program + startup_program = fluid.default_startup_program() + place = fluid.CPUPlace() + executor = fluid.Executor(place) + executor.run(startup_program) + main_program = fluid.default_main_program() epochs = 10 + filelist = ["train_data/part-%d" % i for i in range(12)] for i in range(epochs): - thread_num = len(filelist) - acc_val = async_executor.run( - fluid.default_main_program(), # make sure this can be changed during iteration - dataset, # make sure this can be changed during iteration - filelist, # this can be changed during iteration - thread_num, # make sure this can be changed during iteration - [acc]) # fetch can be done with python, but the scope should be exposed - for val in acc_val: - print("accuracy %f" % val) + thread_num = 4 + executor.run_from_files( + main_program, # This can be changed during iteration + dataset, # This can be changed during iteration + filelist, # This can be changed during iteration + thread_num, # This can be changed during iteration + [data, acc], # Multiple fetch targets can be specified + debug=False) + fluid.io.save_inference_model('imdb/epoch%d.model' % i, + [data.name, label.name], [acc], executor) + ``` ## Difference between async_executor and other executors async_executor is mainly designed for cpu training scenarios where data throughputs are high and the computation part of training is not intensive compared with GPU trained models such as resnet-50. Since data throughputs ability is very important in async_executor, we have to design very fast data IO modules to handle very large scale data reading. Another different key aspect is that memory is not a problem in cpu training scenarios given 128G or 256G RAM in modern clusters. @@ -39,70 +55,55 @@ Why we use multiple queues for data reader? a experimental result page needs to ## Main Interface of Async Executor We have RunFromFiles interface which is an execution interface for users to call. Every time a user calls RunFromFiles, a main_program should be provided and it is running in the global scope previously defined. A list of file names and corresponding Dataset should be provided. Inside the RunFromFiles interface, readers will be created through Dataset configurations. Files will be fed into created readers. ``` c++ -std::vector AsyncExecutor::RunFromFile( - const ProgramDesc& main_program, - const std::string& data_feed_desc_str, - const std::vector& filelist, - const int thread_num, - const std::vector& fetch_var_names) { +void AsyncExecutor::RunFromFile(const ProgramDesc& main_program, + const std::string& data_feed_desc_str, + const std::vector& filelist, + const int thread_num, + const std::vector& fetch_var_names, + const bool debug) { std::vector threads; - - DataFeedDesc data_feed_desc; - google::protobuf::TextFormat::ParseFromString(data_feed_desc_str, &data_feed_desc); - /* - readerDesc: protobuf description for reader initlization - argument: class_name, batch_size, use_slot, queue_size, buffer_size, padding_index - - reader: - 1) each thread has a reader, reader will read input data and - put it into input queue - 2) each reader has a Next() iterface, that can fetch an instance - from the input queue - */ - std::vector > readers; - PrepareReaders(readers, thread_num, data_feed_desc, filelist); - - std::vector > workers; - workers.resize(thread_num); + auto& block = main_program.Block(0); + for (auto var_name : fetch_var_names) { + auto var_desc = block.FindVar(var_name); + auto shapes = var_desc->GetShape(); + PADDLE_ENFORCE(shapes[shapes.size() - 1] == 1, + "var %s: Fetched var has wrong shape, " + "only variables with the last dimension size 1 supported", + var_name); + } + DataFeedDesc data_feed_desc; + google::protobuf::TextFormat::ParseFromString(data_feed_desc_str, + &data_feed_desc); + int actual_thread_num = thread_num; + int file_cnt = filelist.size(); + PADDLE_ENFORCE(file_cnt > 0, "File list cannot be empty"); + if (actual_thread_num > file_cnt) { + VLOG(1) << "Thread num = " << thread_num << ", file num = " << file_cnt + << ". Changing thread_num = " << file_cnt; + actual_thread_num = file_cnt; + } + std::vector> readers; + PrepareReaders(readers, actual_thread_num, data_feed_desc, filelist); + std::vector> workers; + workers.resize(actual_thread_num); for (auto& worker : workers) { worker.reset(new ExecutorThreadWorker); } - - // prepare thread resource here - for (int thidx = 0; thidx < thread_num; ++thidx) { - CreateThreads(workers[thidx].get(), main_program, - readers[thidx], fetch_var_names, root_scope_, thidx); + // prepare thread resource here + for (int thidx = 0; thidx < actual_thread_num; ++thidx) { + CreateThreads(workers[thidx].get(), main_program, readers[thidx], + fetch_var_names, root_scope_, thidx, debug); } - - // start executing ops in multiple threads - for (int thidx = 0; thidx < thread_num; ++thidx) { - threads.push_back(std::thread(&ExecutorThreadWorker::TrainFiles, - workers[thidx].get())); + // start executing ops in multiple threads + for (int thidx = 0; thidx < actual_thread_num; ++thidx) { + threads.push_back( + std::thread(&ExecutorThreadWorker::TrainFiles, workers[thidx].get())); } - - for (auto& th : threads) { + for (auto& th : threads) { th.join(); } - - std::vector fetch_values; - fetch_values.resize(fetch_var_names.size(), 0); - - std::vector*> fetch_value_vectors; - fetch_value_vectors.resize(thread_num); - for (int i = 0; i < thread_num; ++i) { - fetch_value_vectors[i] = &workers[i]->GetFetchValues(); - } - - for (unsigned int i = 0; i < fetch_var_names.size(); ++i) { - float value = 0.0; - for (int j = 0; j < thread_num; ++j) { - value += fetch_value_vectors[j]->at(i); - } - value /= thread_num; - fetch_values[i] = value; - } - - return fetch_values; + root_scope_->DropKids(); + return; } ``` -- GitLab