diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 1a940b30c1564c7622f646b4697375179c607f91..0d06d0f2c9529f7f5dea58bc9710a24e559d8e00 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -382,9 +382,11 @@ class Executor(object): """ Close this executor. - You can no long use this executor after calling this method. + You can no longer use this executor after calling this method. For the distributed training, this method would free the resource on PServers related to the current Trainer. + TODO(typhoonzero): Define "no longer use" meaning? Can user create + a new Executor for the same program and run? TODO(panyx0718): Why ParallelExecutor doesn't have close? Example: @@ -397,7 +399,7 @@ class Executor(object): self.executor.close() self._closed = True - def _run_parallel(self, scope, feed, fetch_list, fetch_var_name, + def _run_parallel(self, program, scope, feed, fetch_list, fetch_var_name, return_numpy): if isinstance(feed, dict): feed_tensor_dict = dict() @@ -413,7 +415,7 @@ class Executor(object): self.executor.feed_and_split_tensor_into_local_scopes( feed_tensor_dict) elif isinstance(feed, list) or isinstance(feed, tuple): - if len(feed) != len(self._places): + if len(feed) != len(program._places): raise ValueError( "Feed a list of tensor, the list should be the same size as places" ) @@ -428,7 +430,7 @@ class Executor(object): tensor = each[feed_name] if not isinstance(tensor, core.LoDTensor): tmp = core.LoDTensor() - tmp.set(tensor, self._places[i]) + tmp.set(tensor, program._places[i]) tensor = tmp res_dict[feed_name] = tensor res.append(res_dict) @@ -462,7 +464,7 @@ class Executor(object): Args: program(Program|CompiledProgram): the program that need to run, - if not provided, then default_main_program will be used. + if not provided, then default_main_program (not compiled) will be used. feed(dict): feed variable map, e.g. {"image": ImageData, "label": LabelData} fetch_list(list): a list of variable or variable names that user want to get, run will return them according to this list. feed_var_name(str): the name for the input variable of feed Operator. @@ -525,6 +527,7 @@ class Executor(object): self.executor = program._executor if program._is_data_parallel: return self._run_parallel( + program, scope=scope, feed=feed, fetch_list=fetch_list, diff --git a/python/paddle/fluid/tests/unittests/test_eager_deletion_dynamic_rnn_base.py b/python/paddle/fluid/tests/unittests/test_eager_deletion_dynamic_rnn_base.py index 81b0b667814e851e8bd47ae1a3b0bf00a9a73ecd..bc3c422f2f68b79b2d938e25625093b2ce8977bb 100644 --- a/python/paddle/fluid/tests/unittests/test_eager_deletion_dynamic_rnn_base.py +++ b/python/paddle/fluid/tests/unittests/test_eager_deletion_dynamic_rnn_base.py @@ -22,6 +22,7 @@ import unittest import paddle import paddle.fluid.core as core import paddle.fluid as fluid +from paddle.fluid import compiler def train(network, use_cuda, use_parallel_executor, batch_size=32, pass_num=2): @@ -57,19 +58,19 @@ def train(network, use_cuda, use_parallel_executor, batch_size=32, pass_num=2): exe = fluid.Executor(place) exe.run(fluid.default_startup_program()) + train_cp = compiler.CompiledProgram(fluid.default_main_program()) if use_parallel_executor: - train_exe = fluid.ParallelExecutor( - use_cuda=use_cuda, loss_name=cost.name) + train_cp = train_cp.with_data_parallel(loss_name=cost.name) fetch_list = [cost.name] else: - train_exe = exe fetch_list = [cost] for pass_id in six.moves.xrange(pass_num): batch_id = 0 for data in reader(): - train_exe.run(feed=data, - fetch_list=fetch_list if batch_id % 4 == 0 else []) + exe.run(train_cp, + feed=data, + fetch_list=fetch_list if batch_id % 4 == 0 else []) batch_id += 1 if batch_id > 16: break diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_crf.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_crf.py index 1c6cfce0c2b772fa78fa08fa1bfb383c1e4f7939..ba63213a410b8b2579b6842c5a6ecd720c7957b3 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_crf.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_crf.py @@ -16,6 +16,7 @@ from __future__ import print_function import paddle.dataset.conll05 as conll05 import paddle.fluid as fluid +from paddle.fluid import compiler import paddle.fluid.core as core import unittest import paddle @@ -157,10 +158,8 @@ class TestCRFModel(unittest.TestCase): exe = fluid.Executor(place) exe.run(startup) - pe = fluid.ParallelExecutor( - use_cuda=use_cuda, - loss_name=avg_cost.name, - build_strategy=build_strategy) + train_cp = compiler.CompiledProgram(main).with_data_parallel( + loss_name=avg_cost.name, build_strategy=build_strategy) feeder = fluid.DataFeeder( feed_list=[ @@ -172,8 +171,9 @@ class TestCRFModel(unittest.TestCase): data = train_data() for i in range(10): cur_batch = next(data) - print(pe.run(feed=feeder.feed(cur_batch), - fetch_list=[avg_cost.name])[0]) + print(exe.run(train_cp, + feed=feeder.feed(cur_batch), + fetch_list=[avg_cost.name])[0]) def _new_build_strategy(self, use_reduce=False): build_strategy = fluid.BuildStrategy() diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_dry_run.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_dry_run.py index 18d95c94ad36316b7149eb5412260b40a57ac002..17f8f5a0b4f753aabe8af3f97c2018cd2cf54dc1 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_dry_run.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_dry_run.py @@ -13,6 +13,7 @@ # limitations under the License. import paddle.fluid as fluid +from paddle.fluid import compiler import unittest import logging import six @@ -36,21 +37,18 @@ class TestBase(unittest.TestCase): with fluid.program_guard(main_prog, startup_prog): with fluid.scope_guard(scope): loss = network_func() - fluid.Executor( - fluid.CUDAPlace(0) - if use_gpu else fluid.CPUPlace()).run(startup_prog) + exe = fluid.Executor( + fluid.CUDAPlace(0) if use_gpu else fluid.CPUPlace()) + exe.run(startup_prog) for _ in six.moves.xrange(iter): exe_strategy = fluid.ExecutionStrategy() exe_strategy._dry_run = True exe_strategy.use_experimental_executor = use_experimental_executor - pe = fluid.ParallelExecutor( - use_cuda=use_gpu, - loss_name=loss.name, - main_program=main_prog, - exec_strategy=exe_strategy) + train_cp = compiler.CompiledProgram(main_prog).with_data_parallel( + loss_name=loss.name, exec_strategy=exe_strategy) for _ in six.moves.xrange(iter_per_pe): - pe.run([]) + exe.run(train_cp) class TestMNISTDryRun(TestBase): diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_feed.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_feed.py index 06da1632f24cd4376356ff8e53169c7221b20b71..ee0941f19838355180edb5771f1e85292a64de59 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_feed.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_feed.py @@ -16,6 +16,7 @@ from __future__ import print_function import math import paddle.fluid as fluid +from paddle.fluid import compiler import paddle.fluid.core as core import unittest import numpy as np @@ -58,12 +59,13 @@ class TestFetchAndFeed(unittest.TestCase): exe = fluid.Executor(place) exe.run(startup) - pe = fluid.ParallelExecutor( - use_cuda=use_cuda, loss_name=loss.name, main_program=main_program) - run_parallel_exe(main_program, pe, use_cuda, data, label, loss) + train_cp = compiler.CompiledProgram(main_program).with_data_parallel( + loss_name=loss.name) - def run_parallel_exe_with_fetch(self, main, pe, use_cuda, data, label, - loss): + run_parallel_exe(train_cp, exe, use_cuda, data, label, loss) + + def run_parallel_exe_with_fetch(self, compiled_program, exe, use_cuda, data, + label, loss): def get_data(batch_size=8): np.random.seed(5) while True: @@ -78,7 +80,7 @@ class TestFetchAndFeed(unittest.TestCase): # conv2d_1.b_0@GRAD. Those variables should not be pruned. # fluid.memory_optimize(main) fetch_list = [] - all_vars = main.global_block().vars + all_vars = compiled_program._program.global_block().vars for k, v in all_vars.items(): if ('tmp' not in k) and ( @@ -89,14 +91,18 @@ class TestFetchAndFeed(unittest.TestCase): for batch_id, img_label in enumerate(get_data()): img, l = img_label train_inputs = {data.name: img, label.name: l} - ret = pe.run(fetch_list, feed=train_inputs, return_numpy=True) + ret = exe.run(compiled_program, + fetch_list=fetch_list, + feed=train_inputs, + return_numpy=True) for i in range(len(fetch_list)): assert not math.isnan(np.sum(ret[i])) and \ not math.isinf(np.sum(ret[i])) if batch_id == 2: break - def run_parallel_exe_with_feed(self, main, pe, use_cuda, data, label, loss): + def run_parallel_exe_with_feed(self, compiled_program, exe, use_cuda, data, + label, loss): def get_data(batch_size=8): np.random.seed(5) while True: @@ -114,7 +120,9 @@ class TestFetchAndFeed(unittest.TestCase): reader = feeder.decorate_reader(get_data, multi_devices=True) for batch_id, data in enumerate(reader()): - loss_np = pe.run(feed=data, fetch_list=[loss.name])[0] + loss_np = exe.run(compiled_program, + feed=data, + fetch_list=[loss.name])[0] print(batch_id, loss_np) if batch_id == 2: break diff --git a/python/paddle/fluid/tests/unittests/test_pass_builder.py b/python/paddle/fluid/tests/unittests/test_pass_builder.py index 5a3ec8ff0180281babeaa006133b3ff9dc6d8083..8c9e489e02839e25cfabe14c16bfd91a908bd734 100644 --- a/python/paddle/fluid/tests/unittests/test_pass_builder.py +++ b/python/paddle/fluid/tests/unittests/test_pass_builder.py @@ -16,6 +16,7 @@ from __future__ import print_function import paddle.fluid as fluid import paddle.fluid.core as core +from paddle.fluid import compiler import numpy as np import unittest import os @@ -61,22 +62,21 @@ class TestPassBuilder(unittest.TestCase): exe.run(startup) feed_dict = {'image': image, 'label': label} - train_exe = fluid.ParallelExecutor( - use_cuda=use_cuda, + train_cp = compiler.CompiledProgram(main).with_data_parallel( + loss_name=loss.name, build_strategy=build_strategy) + test_cp = compiler.CompiledProgram(test_program).with_data_parallel( loss_name=loss.name, - main_program=main, - build_strategy=build_strategy) - - test_exe = fluid.ParallelExecutor( - use_cuda=use_cuda, - main_program=test_program, - share_vars_from=train_exe, - build_strategy=build_strategy) + build_strategy=build_strategy, + share_vars_from=train_cp) for i in range(5): - test_loss, = test_exe.run([loss.name], feed=feed_dict) - - train_loss, = train_exe.run([loss.name], feed=feed_dict) + _ = exe.run(train_cp, fetch_list=[loss.name], feed=feed_dict) + test_loss, = exe.run(test_cp, + fetch_list=[loss.name], + feed=feed_dict) + train_loss = exe.run(train_cp, + fetch_list=[loss.name], + feed=feed_dict) avg_test_loss_val = np.array(test_loss).mean() if math.isnan(float(avg_test_loss_val)): diff --git a/python/paddle/fluid/tests/unittests/test_py_func_op.py b/python/paddle/fluid/tests/unittests/test_py_func_op.py index 655378f7f8c18f5936643f6c178d0e6b3dd8cac8..18207373acae45678a68d84bdf05776f5cffca43 100644 --- a/python/paddle/fluid/tests/unittests/test_py_func_op.py +++ b/python/paddle/fluid/tests/unittests/test_py_func_op.py @@ -14,6 +14,7 @@ import os import paddle.fluid as fluid +from paddle.fluid import compiler import paddle import unittest import six @@ -140,9 +141,10 @@ def test_main(use_cuda, use_py_func_op, use_parallel_executor): exe = fluid.Executor(place) exe.run(fluid.default_startup_program()) + + train_cp = compiler.CompiledProgram(fluid.default_main_program()) if use_parallel_executor: - exe = fluid.ParallelExecutor( - use_cuda=use_cuda, loss_name=loss.name) + train_cp = train_cp.with_data_parallel(loss_name=loss.name) fetch_list = [loss.name] else: fetch_list = [loss] @@ -150,9 +152,10 @@ def test_main(use_cuda, use_py_func_op, use_parallel_executor): ret = [] for epoch_id in six.moves.range(2): for d in r(): - L, = exe.run(feed=feeder.feed(d), fetch_list=fetch_list) + L, = exe.run(train_cp, + feed=feeder.feed(d), + fetch_list=fetch_list) ret.append(L) - return np.array(ret) diff --git a/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py b/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py index be059263c809e31c16249e619ba69769ee3674f4..a3701f0808b98b80b62866ffe1250d065361025c 100644 --- a/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py +++ b/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py @@ -16,6 +16,7 @@ from __future__ import print_function import unittest import paddle.fluid as fluid +from paddle.fluid import compiler import paddle.fluid.core as core import numpy as np import threading @@ -188,18 +189,18 @@ class TestPyReaderUsingExecutor(unittest.TestCase): place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() - startup_exe = fluid.Executor(place) - startup_exe.run(startup_program) + exe = fluid.Executor(place) + exe.run(startup_program) + train_cp = compiler.CompiledProgram(main_program) if use_parallel_executor: - main_exe = fluid.ParallelExecutor(use_cuda, loss_name=loss.name) + train_cp = train_cp.with_data_parallel(loss_name=loss.name) if use_cuda: self.batch_size_times = core.get_cuda_device_count() else: self.batch_size_times = int( os.environ.get('CPU_NUM', multiprocessing.cpu_count())) else: - main_exe = startup_exe self.batch_size_times = 1 reader = self.tensor_reader(use_decorate_paddle_reader) @@ -214,7 +215,8 @@ class TestPyReaderUsingExecutor(unittest.TestCase): self.outputs = [] for _ in range(self.iterations): - fetches = main_exe.run(fetch_list=[in_data.name, label.name]) + fetches = exe.run(train_cp, + fetch_list=[in_data.name, label.name]) fetches = [as_numpy(fetch) for fetch in fetches] self.outputs.append(fetches) diff --git a/python/paddle/fluid/tests/unittests/test_reader_reset.py b/python/paddle/fluid/tests/unittests/test_reader_reset.py index c568cedb122639a43d0f195b8aea4a7b15b4e1a9..da89ccb961c0af99aea117218eb429a5599c2bd2 100644 --- a/python/paddle/fluid/tests/unittests/test_reader_reset.py +++ b/python/paddle/fluid/tests/unittests/test_reader_reset.py @@ -15,6 +15,7 @@ from __future__ import print_function import os import paddle.fluid as fluid +from paddle.fluid import compiler import paddle import numpy as np import unittest @@ -74,20 +75,13 @@ class TestReaderReset(unittest.TestCase): exe = fluid.Executor(place) exe.run(startup_prog) - build_strategy = fluid.BuildStrategy() - exec_strategy = fluid.ExecutionStrategy() - parallel_exe = fluid.ParallelExecutor( - use_cuda=self.use_cuda, - main_program=main_prog, - build_strategy=build_strategy, - exec_strategy=exec_strategy) - - data_appeared = [False] * self.total_ins_num + train_cp = compiler.CompiledProgram(main_prog).with_data_parallel() pass_count = 0 while (True): try: - data_val, label_val = parallel_exe.run(fetch_list, - return_numpy=True) + data_val, label_val = exe.run(train_cp, + fetch_list=fetch_list, + return_numpy=True) ins_num = data_val.shape[0] broadcasted_label = np.ones((ins_num, ) + tuple( self.ins_shape)) * label_val.reshape((ins_num, 1)) diff --git a/python/paddle/fluid/tests/unittests/test_weight_decay.py b/python/paddle/fluid/tests/unittests/test_weight_decay.py index f37d2bfb2e86b452cf7fd05c3e5871de2e33d629..e5e7e76737177f7f4aaae7d7e28e9e5166b96de5 100644 --- a/python/paddle/fluid/tests/unittests/test_weight_decay.py +++ b/python/paddle/fluid/tests/unittests/test_weight_decay.py @@ -22,6 +22,7 @@ import paddle import paddle.fluid.core as core import paddle.fluid as fluid +from paddle.fluid import compiler def get_places(): @@ -111,17 +112,17 @@ class TestWeightDecay(unittest.TestCase): if use_reduce else fluid.BuildStrategy.ReduceStrategy.AllReduce build_strategy.memory_optimize = use_ir_memory_optimize - parallel_exe = fluid.ParallelExecutor( - use_cuda, + train_cp = compiler.CompiledProgram(fluid.default_main_program( + )).with_data_parallel( loss_name=loss.name, exec_strategy=exec_strategy, build_strategy=build_strategy) loss_set = [] for data in self.train_data: - out = parallel_exe.run(feed=feeder.feed(data), - fetch_list=[loss.name]) - print("loss %s" % (np.average(out))) + out = exe.run(train_cp, + feed=feeder.feed(data), + fetch_list=[loss.name]) loss_set.append(np.average(out)) return loss_set