diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc index 07097c7e75c6ce638549716cd6523f387cdefd92..bad9a6b2ba1cb2f61a1314c0bb59b9d35858d23f 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc @@ -168,7 +168,13 @@ void ThreadedSSAGraphExecutor::InsertFetchOps( for (size_t i = 0; i < fetch_tensors.size(); ++i) { auto &var_name = fetch_tensors[i]; - auto &vars = fetched_vars.at(var_name); + + auto fetched_var_it = fetched_vars.find(var_name); + PADDLE_ENFORCE(fetched_var_it != fetched_vars.end(), + "Cannot find fetched variable.(Perhaps the main_program " + "is not set to ParallelExecutor)"); + + auto &vars = fetched_var_it->second; auto *op = new FetchOpHandle(fetch_data, i, &local_scopes_); fetch_ops->emplace_back(op); diff --git a/python/paddle/fluid/tests/demo/pyreader.py b/python/paddle/fluid/tests/demo/pyreader.py index a7550fd1f13948d2279c4de2ee0f69af56701c84..47b20a0c5c8e0c7e49c67e77a27587acdc080ee0 100644 --- a/python/paddle/fluid/tests/demo/pyreader.py +++ b/python/paddle/fluid/tests/demo/pyreader.py @@ -36,7 +36,7 @@ def network(is_train): prediction = fluid.layers.fc(input=hidden, size=10, act='softmax') loss = fluid.layers.cross_entropy(input=prediction, label=label) - return fluid.layers.mean(loss), queue + return fluid.layers.mean(loss), queue, reader def pipe_reader_to_queue(reader_creator, queue): @@ -70,27 +70,46 @@ def main(): with fluid.program_guard(train_prog, startup_prog): with fluid.unique_name.guard(): - loss, train_queue = network(True) + loss, train_queue, train_reader = network(True) adam = fluid.optimizer.Adam(learning_rate=0.01) adam.minimize(loss) test_prog = fluid.Program() with fluid.program_guard(test_prog, fluid.Program()): with fluid.unique_name.guard(): - test_loss, test_queue = network(False) + test_loss, test_queue, test_reader = network(False) fluid.Executor(fluid.CUDAPlace(0)).run(startup_prog) - trainer = fluid.ParallelExecutor(use_cuda=True, loss_name=loss.name) - tester = fluid.ParallelExecutor(use_cuda=True, share_vars_from=trainer) + trainer = fluid.ParallelExecutor( + use_cuda=True, loss_name=loss.name, main_program=train_prog) + + tester = fluid.ParallelExecutor( + use_cuda=True, share_vars_from=trainer, main_program=test_prog) for epoch_id in xrange(10): - pipe_reader_to_queue(paddle.batch(mnist.train(), 32), train_queue) - pipe_reader_to_queue(paddle.batch(mnist.test(), 32), test_queue) + train_data_thread = pipe_reader_to_queue( + paddle.batch(mnist.train(), 32), train_queue) try: - print 'train_loss', numpy.array(trainer.run(fetch_list=[loss.name])) + while True: + print 'train_loss', numpy.array( + trainer.run(fetch_list=[loss.name])) except fluid.core.EOFException: print 'End of epoch', epoch_id + train_reader.reset() + train_data_thread.join() + + test_data_thread = pipe_reader_to_queue( + paddle.batch(mnist.train(), 32), test_queue) + try: + while True: + print numpy.array(tester.run(fetch_list=[test_loss.name])) + except fluid.core.EOFException: + print 'End of testing' + test_reader.reset() + + test_data_thread.join() + break if __name__ == '__main__':