async_executor.md 9.0 KB
Newer Older
1 2
## Motivation of this work

3
There are many deep learning applications that use sparse features as inputs, such as sentiment analysis[1], word2vec[2], click through rate estimation[3]. Two characteristics exist in these applications: 1) large amount of training data exist in real world, especially in industrial environment. 2) input sparse features may not overlap in large ratio between data replicas if we use data-parallelism training method given large amount of training data. The two characteristics lead to an interesting problem of how to speed up data-parallel deep learning model with large amount of sparse features. A famous algorithm is Hogwild[4] proposed before the rise of deep learning. The authors of Hogwild state that stochasitic gradient descent algorithms can be implemented in lock-free mode that allows processors access to shared memory of model parameters and is able to over-write each-other's work. The authors show that when the associated optimization problem is sparse, Hogwild! can achieve a nearly optimal rate of convergence. In this work, we will implement an executor that can support Hogwild like update for deep learning training. Serveral experiments on natural language processing models will be conducted to show efficiency and convergence properties of the proposed executor.
4

D
dongdaxiang 已提交
5 6
## User Interface Design
``` python
G
guru4elephant 已提交
7
def train_loop():
G
guru4elephant 已提交
8 9 10
    filelist = ["testfile.data"] # filelist file to be handled
    data = fluid.layers.data(name="doc", shape=[1], dtype="int64", lod_level=1) # input text data
    label = fluid.layers.data(name="title", shape=[1], dtype="int64", lod_level=1) # label data
G
guru4elephant 已提交
11
    dataset = fluid.DatasetDesc(desc='data.prototxt', slots=[data, label])
G
guru4elephant 已提交
12
    dataset.set_batch_size(128)
G
guru4elephant 已提交
13 14 15
    avg_cost, acc, prediction = bow_net(data, label)
    sgd_optimizer = fluid.optimizer.Adagrad(learning_rate=0.002)
    opt_ops, weight_and_grad = sgd_optimizer.minimize(avg_cost)
G
guru4elephant 已提交
16
    async_executor = fluid.AsyncExecutor()
G
guru4elephant 已提交
17
    async_executor.run_startup_program(fluid.default_startup_program())
G
guru4elephant 已提交
18 19
    epochs = 10
    for i in range(epochs):
G
guru4elephant 已提交
20
        thread_num = len(filelist)
G
guru4elephant 已提交
21 22 23 24 25 26 27 28
        acc_val = async_executor.run(
            fluid.default_main_program(),   # make sure this can be changed during iteration
            dataset,        # make sure this can be changed during iteration
            filelist,       # this can be changed during iteration
            thread_num,     # make sure this can be changed during iteration
            [acc])          # fetch can be done with python, but the scope should be exposed
        for val in acc_val:
            print("accuracy %f" % val)
D
dongdaxiang 已提交
29
```
G
guru4elephant 已提交
30
## Difference between async_executor and other executors
G
guru4elephant 已提交
31
async_executor is mainly designed for cpu training scenarios where data throughputs are high and the computation part of training is not intensive compared with GPU trained models such as resnet-50. Since data throughputs ability is very important in async_executor, we have to design very fast data IO modules to handle very large scale data reading. Another different key aspect is that memory is not a problem in cpu training scenarios given 128G or 256G RAM in modern clusters. 
G
guru4elephant 已提交
32 33

executor and parallel_executor are designed for geneneral training cases in particular for gpu training. Executor is a single thread implementation for model training and it is mostly used for startup_program running currently. Another application scenario of executor is reinforcement learning where input data and main_program may change through training. Parallel_executor is mainly designed for synchronous training on high performance devices such as gpu. Operators are executed concurrently following topological orders on different graphs and model parameter gradients are synchrounized iteratively.
D
dongdaxiang 已提交
34 35

## Data Feeding Approach
G
guru4elephant 已提交
36
![Data Feeding Approach](https://github.com/guru4elephant/FluidDoc/blob/develop/doc/fluid/design/async_executor/async_executor_reader_design.png)
G
guru4elephant 已提交
37
Why we use multiple queues for data reader? a experimental result page needs to be added.
D
dongdaxiang 已提交
38

G
guru4elephant 已提交
39 40
## Main Interface of Async Executor
We have RunFromFiles interface which is an execution interface for users to call. Every time a user calls RunFromFiles, a main_program should be provided and it is running in the global scope previously defined. A list of file names and corresponding Dataset should be provided. Inside the RunFromFiles interface, readers will be created through Dataset configurations. Files will be fed into created readers. 
G
guru4elephant 已提交
41
``` c++
G
guru4elephant 已提交
42
std::vector<float> AsyncExecutor::RunFromFile(
G
guru4elephant 已提交
43
    const ProgramDesc& main_program,
G
guru4elephant 已提交
44 45 46 47
    const std::string& data_feed_desc_str,
    const std::vector<std::string>& filelist,
    const int thread_num,
    const std::vector<std::string>& fetch_var_names) {
G
guru4elephant 已提交
48 49
  std::vector<std::thread> threads;

G
guru4elephant 已提交
50 51
  DataFeedDesc data_feed_desc;
  google::protobuf::TextFormat::ParseFromString(data_feed_desc_str, &data_feed_desc);
G
guru4elephant 已提交
52
  /*
G
guru4elephant 已提交
53 54 55
    readerDesc: protobuf description for reader initlization
    argument: class_name, batch_size, use_slot, queue_size, buffer_size, padding_index
    
G
guru4elephant 已提交
56 57 58 59 60 61
    reader: 
    1) each thread has a reader, reader will read input data and 
    put it into input queue
    2) each reader has a Next() iterface, that can fetch an instance
    from the input queue
   */
G
guru4elephant 已提交
62
  std::vector<std::shared_ptr<DataFeed> > readers;
G
guru4elephant 已提交
63 64
  PrepareReaders(readers, thread_num, data_feed_desc, filelist);
  
G
guru4elephant 已提交
65 66 67 68 69 70 71 72 73
  std::vector<std::shared_ptr<ExecutorThreadWorker> > workers;
  workers.resize(thread_num);
  for (auto& worker : workers) {
    worker.reset(new ExecutorThreadWorker);
  }

  // prepare thread resource here
  for (int thidx = 0; thidx < thread_num; ++thidx) {
    CreateThreads(workers[thidx].get(), main_program,
G
guru4elephant 已提交
74
                  readers[thidx], fetch_var_names, root_scope_, thidx);
G
guru4elephant 已提交
75
  }
G
guru4elephant 已提交
76

G
guru4elephant 已提交
77
  // start executing ops in multiple threads
G
guru4elephant 已提交
78
  for (int thidx = 0; thidx < thread_num; ++thidx) {
G
guru4elephant 已提交
79 80 81 82 83 84 85
    threads.push_back(std::thread(&ExecutorThreadWorker::TrainFiles,
                                  workers[thidx].get()));
  }

  for (auto& th : threads) {
    th.join();
  }
G
guru4elephant 已提交
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105

  std::vector<float> fetch_values;
  fetch_values.resize(fetch_var_names.size(), 0);

  std::vector<std::vector<float>*> fetch_value_vectors;
  fetch_value_vectors.resize(thread_num);
  for (int i = 0; i < thread_num; ++i) {
    fetch_value_vectors[i] = &workers[i]->GetFetchValues();
  }

  for (unsigned int i = 0; i < fetch_var_names.size(); ++i) {
    float value = 0.0;
    for (int j = 0; j < thread_num; ++j) {
      value += fetch_value_vectors[j]->at(i);
    }
    value /= thread_num;
    fetch_values[i] = value;
  }

  return fetch_values;
G
guru4elephant 已提交
106
}
G
guru4elephant 已提交
107 108 109 110

```
Inside the function ```CreateThreads```, 
``` c++
G
guru4elephant 已提交
111 112 113 114 115 116 117 118 119
void AsyncExecutor::CreateThreads(
    ExecutorThreadWorker* worker,
    const ProgramDesc& main_program,
    const std::shared_ptr<DataFeed>& reader,
    const std::vector<std::string>& fetch_var_names,
    Scope& root_scope,
    const int thread_index) {
  worker->SetThreadId(thread_index);
  worker->SetRootScope(&root_scope);
G
guru4elephant 已提交
120 121
  worker->CreateThreadResource(main_program, place_);
  worker->SetDataFeed(reader);
G
guru4elephant 已提交
122 123
  worker->SetFetchVarNames(fetch_var_names);
  worker->BindingDataFeedMemory();
G
guru4elephant 已提交
124
}
G
guru4elephant 已提交
125

G
guru4elephant 已提交
126
```
G
guru4elephant 已提交
127 128 129 130
Inside the function ```Trainfiles```, 
``` c++
void ExecutorThreadWorker::TrainFiles() {
  // todo: configurable
G
guru4elephant 已提交
131
  SetDevice();
G
guru4elephant 已提交
132 133 134 135 136 137 138 139 140 141

  int fetch_var_num = fetch_var_names_.size();
  fetch_values_.clear();
  fetch_values_.resize(fetch_var_num, 0);

  thread_reader_->Start();

  int cur_batch;
  int batch_cnt = 0;
  while ((cur_batch = thread_reader_->Next()) > 0) {
G
guru4elephant 已提交
142
    // executor run here
G
guru4elephant 已提交
143 144 145
    for (auto& op : ops_) {
      op->Run(*thread_scope_, place_);
    }
G
guru4elephant 已提交
146 147 148 149 150 151 152 153 154 155

    float avg_inspect = 0.0;
    for (int i = 0; i < fetch_var_num; ++i) {
      avg_inspect = thread_scope_->FindVar(fetch_var_names_[i])
                                 ->GetMutable<LoDTensor>()
                                 ->data<float>()[0];
      fetch_values_[i] += avg_inspect;
    }

    ++batch_cnt;
G
guru4elephant 已提交
156 157
    thread_scope_->DropKids();
  }
G
guru4elephant 已提交
158 159 160 161 162 163 164

  if (batch_cnt) {
    // when the number of files is less than the number of threads
    for (int i = 0; i < fetch_var_num; ++i) {
      fetch_values_[i] = fetch_values_[i] / batch_cnt;
    }
  }
G
guru4elephant 已提交
165

G
guru4elephant 已提交
166
```
D
dongdaxiang 已提交
167 168

## How to print variable information during execution
G
guru4elephant 已提交
169
Inside async_executor, no information is printed. Variable can be fetched through an execution of async_executor. The fetched variables can be printed through python. Since we train several files of instances within async_executor, the fetched variables are not accurate. In this version of design, we only fetch variables of the last iteration for each thread and we average the fetched variables by batch_size * thread_num. 
D
dongdaxiang 已提交
170 171

## How to save models
G
guru4elephant 已提交
172 173
Models can be saved between execution of async_executor through io.save method. 

G
guru4elephant 已提交
174 175 176 177 178 179
## POC experiments
### Text Classification
* network configuration
* data preparation
* performance and accuracy

180 181 182 183
## references
1. [Sentiment Analysis](https://arxiv.org/pdf/1801.07883.pdf)
2. [Word2Vec](https://arxiv.org/abs/1301.3781)
3. [Click Through Rate Estimation](https://static.googleusercontent.com/media/research.google.com/zh-CN//pubs/archive/45530.pdf)
G
guru4elephant 已提交
184
4. [Hogwild](https://people.eecs.berkeley.edu/~brecht/papers/hogwildTR.pdf)