From 89c9b23fa1956e954f51f0751b9ba8dfbe7f0a21 Mon Sep 17 00:00:00 2001 From: guru4elephant <35550832+guru4elephant@users.noreply.github.com> Date: Mon, 12 Nov 2018 13:10:18 +0800 Subject: [PATCH] Update async_executor.md --- .../design/async_executor/async_executor.md | 92 ++++++++++--------- 1 file changed, 51 insertions(+), 41 deletions(-) diff --git a/doc/fluid/design/async_executor/async_executor.md b/doc/fluid/design/async_executor/async_executor.md index 68bc8fec8..cf0450ecd 100644 --- a/doc/fluid/design/async_executor/async_executor.md +++ b/doc/fluid/design/async_executor/async_executor.md @@ -4,43 +4,44 @@ There are many deep learning applications that use sparse features as inputs, su ## User Interface Design ``` python -import paddle.fluid as fluid - - - -startup_program = fluid.default_startup_program() -main_program = fluid.default_main_program() - -filelist = "filelist.txt" -train_dataset = fluid.datasets.MyFeeder(filelist, - transforms.Transform([ - transforms.tokenize()])) - -train_loader = fluid.data.DataLoader( - train_dataset, batch_size=args.batch_size, shuffle=(train_sampler is None), - num_workers=args.workers, pin_memory=True, sampler=train_sampler) - -cur_block = fluid.default_main_program().current_block() -abs_input_var = cur_block.create_var(name='abs_input', - shape=[-1, 32, 32], - dtype='float32') -abs_output_var = cur_block.create_var(name='abs_output', - shape=[-1, 32, 32], - dtype='float32') - -op_desc = cur_block.desc.append_op() -abs_op = Operator(block=cur_block, desc=op_desc, type='abs', - inputs={'X': [abs_input_var]}, outputs={'Out': [abs_output_var]}) - -for i, (slots, label) in enumerate(train_loader): - paddle.async_executor(feed_list=[slots, label], - startup_program=startup_program, - main_program=main_program, - fetch_list=[abs_output_var], - fetch_iter=10) - # do something on fetch list - - +def train_loop(): + filelist = ["file%d.txt" % i for i in range(10)] + dataset = MultiSlotDataset() + dataset.set_batch_size(128) + # input text data + data = fluid.layers.data(name="words", shape=[1], dtype="int64", lod_level=1) + # label data + label = fluid.layers.data(name="label", shape=[1], dtype="int64") + + avg_cost, acc, prediction = bow_net(data, label) + sgd_optimizer = fluid.optimizer.Adagrad(learning_rate=0.002) + opt_ops, weight_and_grad = sgd_optimizer.minimize(avg_cost) + + for w in weight_and_grad[0]: + reduce(lambda x * y, 1, w.shape) + + varnames = [var.name for var in weight_and_grad[0]] + dataset.set_field_name([data.name, label.name]) + startup_program = fluid.default_startup_program() + main_program = fluid.default_main_program() + infer_prog = get_infer_prog([data.name, label.name], [acc, predict]) + + place = fluid.CPUPlace() + executor = fluid.AsyncExecutor() + executor.run_startup_program(startup_program) + epochs = 10 + for i in range(epochs): + acc_val = executor.run( + program=main_program, # make sure this can be changed during iteration + reader=dataset, # make sure this can be changed during iteration + filelist=filelist, # this can be changed during iteration + thread=thread_num, # make sure this can be changed during iteration + fetch=[acc]) # how to define fetch, and what kind of things to return here + print("accuracy %f" % acc_val) + executor.save_model(infer_prog, "epoch%d.model" % i) + + # todo: + # inference to be added, should loadup a program and a global scope ``` ## Difference between async_executor and other executors async_executor is mainly designed for cpu training scenarios where data throughputs are high and the computation part of training is not intensive compared with GPU trained models such as resnet-50. Since data throughputs ability is very important in async_executor, we have to design very fast data IO modules to handle very large scale data reading. Another different key aspect is that memory is not a problem in cpu training scenarios given 128G or 256G RAW in modern server. @@ -52,13 +53,23 @@ to be discussed. ## Inside Structure of Async Executor ``` c++ -void AsyncExecutor::RunFromFiles(const std::vector & files, - const int thread_num) { +void AsyncExecutor::RunFromFiles( + const ProgramDesc& main_program, + const std::vector & files, + const int thread_num) { + // todo: remove fluid related interface root_scope_->DropKids(); std::vector threads; threads.resize(thread_num); - // prepare readers + /* + reader: + 1) each thread has a reader, reader will read input data and + put it into input queue + 2) each reader has a Next() iterface, that can fetch an instance + from the input queue + */ + // todo: should be factory method for creating datafeed std::vector > readers; readers.resize(thread_num); for (auto& reader : readers) { @@ -89,7 +100,6 @@ void AsyncExecutor::RunFromFiles(const std::vector & files, } // fetch variables in scope 0, and return } - ``` ## How to print variable information during execution -- GitLab