diff --git a/python/paddle/fluid/compiler.py b/python/paddle/fluid/compiler.py index 63331f5708a5f3b8ab72533ece38e9a75584fb2d..e5b1ab351e91434e1e16e99f15f2553631eff5a7 100644 --- a/python/paddle/fluid/compiler.py +++ b/python/paddle/fluid/compiler.py @@ -15,6 +15,7 @@ import multiprocessing import os import six +import sys from .. import compat as cpt from . import core @@ -29,27 +30,50 @@ def _place_obj(place): return p -class _ProgramCompiler(object): +class CompiledProgram(object): def __init__(self, program): self._program = program + self._scope = None + self._place = None + self._executor = None self._compiled = False self._is_data_parallel = False def _with_data_parallel(self, loss_name=None, build_strategy=None, - exec_strategy=None): + exec_strategy=None, + share_vars_from=None): assert not self._is_data_parallel, "Already compiled with parallel." self._is_data_parallel = True self._build_strategy = build_strategy self._exec_strategy = exec_strategy self._loss_name = loss_name + self._share_vars_from = share_vars_from return self + def _with_distributed(self): + raise NotImplementedError() + + def _with_inference_optimize(self): + raise NotImplementedError() + def _compile_data_parallel(self): - self._places = [] - self._local_scopes = [] + if self._share_vars_from: + if self._scope: + sys.stderr.write("share_vars_from is set, scope is ignored.\n") + if not self._share_vars_from._is_data_parallel: + raise ValueError("share_vars_from is not data parallel. Cannot " + "share vars from it.") + if self._share_vars_from._executor is None: + raise ValueError( + "share_vars_from is not compiled and run, so there is no " + "var to share.") + self._local_scopes = self._share_vars_from._executor.local_scopes() + else: + self._local_scopes = [] + self._places = [] if self._exec_strategy is None: self._exec_strategy = ExecutionStrategy() if self._build_strategy is None: @@ -104,12 +128,14 @@ class _ProgramCompiler(object): def _compile(self, scope, place): if self._compiled: + if scope and self._scope != scope: + raise ValueError("Cannot compile with different scope") + if place and self._place != place: + raise ValueError("Cannot compile with different place") return self - self._compiled = True self._scope = scope self._place = place - if self._is_data_parallel: self._executor = self._compile_data_parallel() else: diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index ee7df740075dbebab6721d5c0849005c7e315572..7c417cd8285895ac99f6fee732a87985bb396550 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -481,11 +481,13 @@ class Executor(object): if scope is None: scope = global_scope() - compiled = isinstance(program, compiler._ProgramCompiler) + compiled = isinstance(program, compiler.CompiledProgram) + # For backward compatibility, run directly. if not compiled: - p = core.Place() - p.set_place(self.place) - self.executor = core.Executor(p) + if not self.executor: + p = core.Place() + p.set_place(self.place) + self.executor = core.Executor(p) return self._run( program, feed=feed, diff --git a/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py b/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py index 2038b57a6c20433a2d7c02eb21676cb6dce00eab..784fe64c4ec682a4f0ebb24912a933c721c6b555 100644 --- a/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py +++ b/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py @@ -81,12 +81,12 @@ class TestParallelExecutorBase(unittest.TestCase): if use_cuda and core.is_compiled_with_cuda(): build_strategy.remove_unnecessary_lock = True if use_parallel_executor: - binary = compiler._ProgramCompiler(main)._with_data_parallel( + binary = compiler.CompiledProgram(main)._with_data_parallel( loss_name=loss.name, build_strategy=build_strategy, exec_strategy=exec_strategy) else: - binary = compiler._ProgramCompiler(main) + binary = compiler.CompiledProgram(main) if batch_size is not None: batch_size *= fluid.core.get_cuda_device_count( diff --git a/python/paddle/fluid/tests/unittests/test_dist_base.py b/python/paddle/fluid/tests/unittests/test_dist_base.py index 5cc5d9f3d3e98b9ab6117a563e70bff27e150c30..aacf52e01126b0ed1c486d191d7adbe6bbf0e803 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_base.py @@ -132,7 +132,7 @@ class TestDistRunnerBase(object): build_stra.num_trainers = 1 build_stra.trainer_id = 0 - binary = compiler._ProgramCompiler(trainer_prog)._with_data_parallel( + binary = compiler.CompiledProgram(trainer_prog)._with_data_parallel( loss_name=avg_cost.name, build_strategy=build_stra, exec_strategy=strategy) diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_test_while_train.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_test_while_train.py index db2826653edf6bf6ddd498cbd56b07da646cebf4..3cc954a77a902ccbfdb15e45d2750eea3cfa7f6e 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_test_while_train.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_test_while_train.py @@ -15,6 +15,7 @@ from __future__ import print_function import paddle.fluid as fluid +from paddle.fluid import compiler import paddle.fluid.core as core import numpy as np import unittest @@ -61,22 +62,22 @@ class ParallelExecutorTestingDuringTraining(unittest.TestCase): exe.run(startup) feed_dict = {'image': image, 'label': label} - train_exe = fluid.ParallelExecutor( - use_cuda=use_cuda, - loss_name=loss.name, - main_program=main, - build_strategy=build_strategy) - - test_exe = fluid.ParallelExecutor( - use_cuda=use_cuda, - main_program=test_program, - share_vars_from=train_exe, - build_strategy=build_strategy) + train_cp = compiler.CompiledProgram(main)._with_data_parallel( + loss_name=loss.name, build_strategy=build_strategy) + test_cp = compiler.CompiledProgram( + test_program)._with_data_parallel( + loss_name=loss.name, + build_strategy=build_strategy, + share_vars_from=train_cp) for i in range(5): - test_loss, = test_exe.run([loss.name], feed=feed_dict) - - train_loss, = train_exe.run([loss.name], feed=feed_dict) + exe.run(train_cp, feed=feed_dict, fetch_list=[loss.name]) + test_loss, = exe.run(test_cp, + feed=feed_dict, + fetch_list=[loss.name]) + train_loss, = exe.run(train_cp, + feed=feed_dict, + fetch_list=[loss.name]) avg_test_loss_val = np.array(test_loss).mean() if math.isnan(float(avg_test_loss_val)):